pubsub-go

module
v2.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2024 License: MIT

README

pubsub-go/v2

This is a Go module that provides a convenience wrapper for publishing and receiving messages via Google Cloud Pub/Sub.

This library allows for messages to be received via channel, and to rapidly create multiple subscriptions using a single function call.

Installation

go get github.com/clearchanneloutdoor/pubsub-go/v2

Usage

Create a new PubSub
import psb "github.com/clearchanneloutdoor/pubsub-go/pkg"

func main() {
  client, err := psb.NewPubSub(
    context.Background(),
    pb.Options("<projectID>"))
  if err != nil {
    panic(err)
  }
}
Provide additional configuration
import (
  "cloud.google.com/go/pubsub"
  "google.golang.org/api/option"
  psb "github.com/clearchanneloutdoor/pubsub-go/pkg"
)

func main() {
  opts := pb.Options("<projectID>").
    SetClientOptions(
      option.WithGRPCConnectionPool(100),
      option.WithCredentialsFile("<path to credentials file>")
  client, err := psb.NewPubSub(context.Background(), opts)
  if err != nil {
    panic(err)
  }
}
Create a Topic
package main

import (
  "cloud.google.com/go/pubsub"
  psb "github.com/clearchanneloutdoor/pubsub-go/pkg"
)

func main() {
  // Initialize new pubsub-go PubSub 
  client, err := psb.NewPubSub(
    context.Background(), 
    pb.Options("<project ID>"))
  if err != nil {
    panic(err)
  }
  defer func() {
    if err := client.Close(); err != nil {
      panic(err)
    }
  }()

  // create the topic 
  if err := client.CreateTopic(
    "<topic ID>", 
    pubsub.TopicConfig{
      RetentionDuration: time.Hour * 24 * time.Duration(3),
    }); err != nil {
    panic(err)
  }

  // send a message to the topic
}
Create Multiple Subscriptions with Filters for a Topic
import (
  "cloud.google.com/go/pubsub"
  psb "github.com/clearchanneloutdoor/pubsub-go/pkg"
)

func main() {
  // Initialize new pubsub-go PubSub

  // Define a map for a list of Subscription Names and their (optional) filter definitions
  subs := map[string]string{
    "topic-sub": "",
    "topic-sub-ca": "attributes.region = \"CA\"",
    "topic-sub-nv": "attributes.region = \"NV\"",
    "topic-sub-tx": "attributes.region = \"TX\"",
  }

  // Create subscriptions
  if err := client.CreateSubscriptions("<topic ID>", subs, pubsub.SubscriptionConfig{
    EnableMessageOrdering: true,
    RetainAckedMessages:   false,
  }); err != nil {
    panic(err)
  }
}
Publish Message

The Publish function can be used to publish a string or an object (which is serialized as JSON).

type Example struct {
  CoolNumber int `json:"coolNumber"`
  Message string `json:"message"`
}

func main() {
  // Initialize new PubSub client

  // Publish a string message
  if err := client.Publish("<topic ID>", "hello world"); err != nil {
    panic(err)
  }

  // Publish an object as JSON
  e := Example{
    CoolNumber: 13,
    Message: "hello world",
  }
  if err := client.Publish("<topic ID>", e); err != nil {
    panic(err)
  }
}
Publish Messages in Other Formats

Publish can also be used to send messages serialized in other formats as well, if you serialize the payload prior to calling the function and pass in the serialized []byte slice:

type Example struct {
  Message string `yaml:"message"`
}

func main() {
  // Initialize new PubSub client

  // Serialize as yaml
  sy, err := yaml.Marshal(Example{Message: "hello world"})
  if err != nil {
    panic(err)
  }

  // Publish serialized yaml
  if err := client.Publish("<topic ID>", sy); err != nil {
    panic(err)
  }
}
Publish Messages with PublishSettings

PublishSettings can be specified in options used when creating the PubSub client. The settings are then used to control the behavior of the publication.

opts := psb.Options("<project ID>").
  SetPublishSettings(pubsub.PublishSettings{
    DelayThreshold:  (10 * time.Millisecond),
    CountThreshold:  1000,
    ByteThreshold:   1000000,
    Timeout:         (10 * time.Second),
  })
client, err := psb.NewPubSub(context.Background(), opts)

if err := client.Publish("<topic ID>", "hello world"); err != nil {
  panic(err)
}
Receive Messages
func main() {
  // Initialize new PubSub client

  // Create a channel for receiving messages
  messages := make(chan *pubsub.Message)
  go func() {
    // loop to continuously receive
    for {
      msg := <-messages

      // Process message
      fmt.Printf("Received a message: %s\n", string(msg.Data))

      // Acknowledge message
      msg.Ack()
    }
  }()

  // create a subscription
  if err := client.CreateSubscription("<topic ID>", "<subscription ID>", ""); err != nil {
    panic(err)
  }

  // receive messages
  if err := client.Receive("<subscription ID>", messages); err != nil {
    panic(err)
  }
}
Receive Messages with ReceiveSettings

ReceiveSettings can be specified in options used when creating the PubSub client. The settings are then used to control the behavior of the subscription.

opts := psb.Options("<project ID>").
  SetReceiveSettings(pubsub.ReceiveSettings{
   MaxExtension:           (15 * time.Second),
   MaxOutstandingMessages: 1000,
   NumGoroutines:          10,
  })
client, err := psb.NewPubSub(context.Background(), opts)

if err := ps.Receive("<subscription ID>", messages); err != nil {
  panic(err)
}

Running GCP PubSub Locally

GCP SDK

If you have installed the GCP SDK, you can run the following command to start the emulator.

gcloud beta emulators pubsub start --project=example-project --host-port=localhost:8085 

Then, export the following environment variables to point to the emulator.

export PUBSUB_EMULATOR_HOST=localhost:8085
export PUBSUB_PROJECT_ID=example-project
Run the Example

After starting the emulator, in the same terminal window where you have exported the environment variables, run the following command to start the example.

cd v2
go run examples/pubsub.go
Docker

Google publishes an emulator for GCP PubSub, so you can run it locally. This repo includes a script that will spin up a docker container with the emulator started so running a local dev environment is easier.

Huge shout out to @anguillanneuf, who wrote a blog post that made building this script much easier.

Dependencies
  • Docker
  • Openssl
Start

Open a terminal and navigate to this project's directory. once there run the following command...

sh ./local-pubsub.sh

This will take you through a wizard to get all information necessary to start a Project, Topic, and Subscription.

There is an export command that is out put once the script has completed that you'll need to copy and paste across all your open terminal windows. The reason being is that there is no way to set the GCP PubSub endpoint directly in your application; the GCP libray looks to an environment to know which endpoint to use.

The -m Option

Sending a message directly to your queue using something like curl isn't clearly documented. Rather than sending a json payload, you send a base64 encoded string of the message data.

Execute the following command to go throw a wizard that will output a properly formatted request for you.

sh ./local-pubsub.sh -m

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL