Google Cloud Pub/Sub messages using Go package : pubsub

Package pubsub provides an easy way to publish and receive Google Cloud Pub/Sub messages, hiding the details of the underlying server RPCs (Remote Procedure Calls). Google Cloud Pub/Sub is a many-to-many, asynchronous messaging system that decouples senders and receivers.

Publishing

Google Cloud Pub/Sub messages are published to topics. Topics may be created using the pubsub package like:

topic, err := client.CreateTopic(ctx, “topic-name”)
if err != nil {
	// Handle error.
}

Next, we create subscription so that client subscribing to it can receive the messages published to this.

_, err = client.CreateSubscription(ctx, “my-sub”, pubsub.SubscriptionConfig{Topic: topic,})
if err != nil {
	// TODO: Handle error.
}

Messages may then be published on a topic:

res := topic.Publish(ctx, &pubsub.Message{
Data: []byte(“hello world”),
})

Publish queues the message for publishing and returns immediately. When enough messages have accumulated, or enough time has elapsed, the batch of messages is sent to the Pub/Sub service.

Publish returns a PublishResult, which behaves like a future: its Get method blocks until the message has been sent to the service.

The first time you call Publish on a topic, goroutines are started in the background. To clean up these goroutines, call Stop:

topic.stop()

Receiving

To receive messages published to a topic, clients create subscriptions to the topic. There may be more than one subscription per topic; each message that is published to the topic will be delivered to all of its subscriptions.

sub := client.Subscription(“my-sub”)

Messages are then consumed from a subscription via callback:

err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
fmt.Println(string(m.Data))
m.Ack() // Acknowledge that we’ve consumed the message.
})
if err != nil {
	// TODO: Handle error.
}

The callback is invoked concurrently by multiple goroutines, maximizing throughput. To terminate a call to Receive, cancel its context.

Once client code has processed the message, it must call Message.Ack or Message.Nack; otherwise the message will eventually be redelivered. Ack/Nack MUST be called within the Receive handler function, and not from a goroutine. Otherwise, flow control (e.g. ReceiveSettings.MaxOutstandingMessages) will not be respected, and messages can get orphaned when canceling Receive.

Emulator

To use an emulator with this library, you can set the PUBSUB_EMULATOR_HOST environment variable to the address at which your emulator is running. This will send requests to that address instead of to Cloud Pub/Sub. You can then create and use a client as usual:

err := os.Setenv(“PUBSUB_EMULATOR_HOST”, “localhost:8085”)
if err != nil {
// TODO: Handle error.
}
ctx := context.Background()
client, err := pubsub.NewClient(ctx, "test")
if err != nil {
log.Fatal(err)
}
defer client.Close()

Before running the publisher and then receiver, firstly make sure that emulator is running, for running emulator locally we use:

gcloud beta emulators pubsub start --project=test

Complete project link. Hope this is useful. Any suggestions are always welcome. Will add new findings as and when added.

For further reading refer pubsub package. Thanks for reading.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.