Documentation
¶
Overview ¶
Package pubsub provides an easy and portable way to interact with publish/ subscribe systems.
Subpackages contain distinct implementations of pubsub for various providers, including Cloud and on-prem solutions. For example, "gcspubsub" supports Google Cloud Pub/Sub. Your application should import one of these provider-specific subpackages and use its exported functions to get a *Topic and/or *Subscription; do not use the NewTopic/NewSubscription functions in this package. For example:
topic := mempubsub.NewTopic() err := topic.Send(ctx.Background(), &pubsub.Message{Body: []byte("hi")) ...
Then, write your application code using the *Topic/*Subscription types. You can easily reconfigure your initialization code to choose a different provider. You can develop your application locally using memblob, or deploy it to multiple Cloud providers. You may find http://github.com/google/wire useful for managing your initialization code.
OpenCensus Integration ¶
OpenCensus supports tracing and metric collection for multiple languages and backend providers. See https://opencensus.io.
This API collects OpenCensus traces and metrics for the following methods:
- Topic.Send
- Topic.Shutdown
- Subscription.Receive
- Subscription.Shutdown
- The internal driver methods SendBatch, SendAcks and ReceiveBatch.
All trace and metric names begin with the package import path. The traces add the method name. For example, "gocloud.dev/pubsub/Topic.Send". The metrics are "completed_calls", a count of completed method calls by provider, method and status (error code); and "latency", a distribution of method latency by provider and method. For example, "gocloud.dev/pubsub/latency".
To enable trace collection in your application, see "Configure Exporter" at https://opencensus.io/quickstart/go/tracing. To enable metric collection in your application, see "Exporting stats" at https://opencensus.io/quickstart/go/metrics.
Example (ReceiveWithInvertedWorkerPool) ¶
package main import ( "context" "fmt" "log" "sync" "time" "gocloud.dev/pubsub" "gocloud.dev/pubsub/mempubsub" ) func main() { // Open a topic and corresponding subscription. ctx := context.Background() t := mempubsub.NewTopic() defer t.Shutdown(ctx) s := mempubsub.NewSubscription(t, time.Second) defer s.Shutdown(ctx) // Send a bunch of messages to the topic. const nMessages = 100 for n := 0; n < nMessages; n++ { m := &pubsub.Message{ Body: []byte(fmt.Sprintf("message %d", n)), } if err := t.Send(ctx, m); err != nil { log.Fatal(err) } } // In order to make our test exit, we keep track of how many messages were // processed with wg, and cancel the receiveCtx when we've processed them all. // A more realistic application would not need this WaitGroup. var wg sync.WaitGroup wg.Add(nMessages) receiveCtx, cancel := context.WithCancel(ctx) go func() { wg.Wait() cancel() }() // Process messages using an inverted worker pool, as described here: // https://www.youtube.com/watch?v=5zXAHh5tJqQ&t=26m58s // It uses a buffered channel, sem, as a semaphore to manage the maximum // number of workers. const poolSize = 10 sem := make(chan struct{}, poolSize) for { // Read a message. Receive will block until a message is available. msg, err := s.Receive(receiveCtx) if err != nil { // An error from Receive is fatal; Receive will never succeed again // so the application should exit. // In this example, we expect to get a error here when we've read all the // messages and receiveCtx is canceled. break } // Write a token to the semaphore; if there are already poolSize workers // active, this will block until one of them completes. sem <- struct{}{} // Process the message. For many applications, this can be expensive, so // we do it in a goroutine, allowing this loop to continue and Receive more // messages. go func() { // Record that we've processed this message, and Ack it. msg.Ack() wg.Done() // Read a token from the semaphore before exiting this goroutine, freeing // up the slot for another goroutine. <-sem }() } // Wait for all workers to finish. for n := poolSize; n > 0; n-- { sem <- struct{}{} } fmt.Printf("Read %d messages", nMessages) }
Output: Read 100 messages
Example (ReceiveWithTraditionalWorkerPool) ¶
package main import ( "context" "fmt" "log" "sync" "time" "gocloud.dev/pubsub" "gocloud.dev/pubsub/mempubsub" ) func main() { // Open a topic and corresponding subscription. ctx := context.Background() t := mempubsub.NewTopic() defer t.Shutdown(ctx) s := mempubsub.NewSubscription(t, time.Second) defer s.Shutdown(ctx) // Send a bunch of messages to the topic. const nMessages = 100 for n := 0; n < nMessages; n++ { m := &pubsub.Message{ Body: []byte(fmt.Sprintf("message %d", n)), } if err := t.Send(ctx, m); err != nil { log.Fatal(err) } } // In order to make our test exit, we keep track of how many messages were // processed with wg, and cancel the receiveCtx when we've processed them all. // A more realistic application would not need this WaitGroup. var wg sync.WaitGroup wg.Add(nMessages) receiveCtx, cancel := context.WithCancel(ctx) go func() { wg.Wait() cancel() }() // Process messages using a traditional worker pool. Consider using an // inverted pool instead (see the other example). const poolSize = 10 var workerWg sync.WaitGroup for n := 0; n < poolSize; n++ { workerWg.Add(1) go func() { for { // Read a message. Receive will block until a message is available. // It's fine to call Receive from many goroutines. msg, err := s.Receive(receiveCtx) if err != nil { // An error from Receive is fatal; Receive will never succeed again // so the application should exit. // In this example, we expect to get a error here when we've read all // the messages and receiveCtx is canceled. workerWg.Done() return } // Process the message and Ack it. msg.Ack() wg.Done() } }() } // Wait for all workers to finish. workerWg.Wait() fmt.Printf("Read %d messages", nMessages) }
Output: Read 100 messages
Example (SendReceive) ¶
package main import ( "context" "fmt" "log" "time" "gocloud.dev/pubsub" "gocloud.dev/pubsub/mempubsub" ) func main() { // Open a topic and corresponding subscription. ctx := context.Background() t := mempubsub.NewTopic() defer t.Shutdown(ctx) s := mempubsub.NewSubscription(t, time.Second) defer s.Shutdown(ctx) // Send a message to the topic. if err := t.Send(ctx, &pubsub.Message{Body: []byte("Hello, world!")}); err != nil { log.Fatal(err) } // Receive a message from the subscription. m, err := s.Receive(ctx) if err != nil { log.Fatal(err) } // Print out the received message. fmt.Printf("%s\n", m.Body) // Acknowledge the message. m.Ack() }
Output: Hello, world!
Example (SendReceiveMultipleMessages) ¶
package main import ( "context" "fmt" "log" "sort" "time" "gocloud.dev/pubsub" "gocloud.dev/pubsub/mempubsub" ) func main() { // Open a topic and corresponding subscription. ctx := context.Background() t := mempubsub.NewTopic() defer t.Shutdown(ctx) s := mempubsub.NewSubscription(t, time.Second) defer s.Shutdown(ctx) // Send messages to the topic. ms := []*pubsub.Message{ {Body: []byte("a")}, {Body: []byte("b")}, {Body: []byte("c")}, } for _, m := range ms { if err := t.Send(ctx, m); err != nil { log.Fatal(err) } } // Receive messages from the subscription. ms2 := []string{} for i := 0; i < len(ms); i++ { m2, err := s.Receive(ctx) if err != nil { log.Fatal(err) } ms2 = append(ms2, string(m2.Body)) m2.Ack() } // The messages may be received in a different order than they were // sent. sort.Strings(ms2) // Print out and acknowledge the received messages. for _, m2 := range ms2 { fmt.Println(m2) } }
Output: a b c
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var NewSubscription = newSubscription
NewSubscription is for use by provider implementations.
var NewTopic = newTopic
NewTopic is for use by provider implementations.
var ( // OpenCensusViews are predefined views for OpenCensus metrics. // The views include counts and latency distributions for API method calls. // See the example at https://godoc.org/go.opencensus.io/stats/view for usage. OpenCensusViews = oc.Views(pkgName, latencyMeasure) )
Functions ¶
This section is empty.
Types ¶
type Message ¶
type Message struct { // Body contains the content of the message. Body []byte // Metadata has key/value metadata for the message. Metadata map[string]string // contains filtered or unexported fields }
Message contains data to be published.
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription receives published messages.
func (*Subscription) As ¶
func (s *Subscription) As(i interface{}) bool
As converts i to provider-specific types. See Topic.As for more details.
func (*Subscription) ErrorAs ¶ added in v0.10.0
func (s *Subscription) ErrorAs(err error, target interface{}) bool
ErrorAs converts err to provider-specific types. ErrorAs panics if target is nil or not a pointer. ErrorAs returns false if err == nil. See Topic.As for more details.
func (*Subscription) Receive ¶
func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error)
Receive receives and returns the next message from the Subscription's queue, blocking and polling if none are available. This method can be called concurrently from multiple goroutines. The Ack() method of the returned Message has to be called once the message has been processed, to prevent it from being received again.
type Topic ¶
type Topic struct {
// contains filtered or unexported fields
}
Topic publishes messages to all its subscribers.
func (*Topic) As ¶
As converts i to provider-specific types.
This function (and the other As functions in this package) are inherently provider-specific, and using them will make that part of your application non-portable, so use with care.
See the documentation for the subpackage used to instantiate Bucket to see which type(s) are supported.
Usage:
1. Declare a variable of the provider-specific type you want to access.
2. Pass a pointer to it to As.
3. If the type is supported, As will return true and copy the provider-specific type into your variable. Otherwise, it will return false.
Provider-specific types that are intended to be mutable will be exposed as a pointer to the underlying type.
See https://github.com/google/go-cloud/blob/master/internal/docs/design.md#as for more background.
func (*Topic) ErrorAs ¶ added in v0.10.0
ErrorAs converts err to provider-specific types. ErrorAs panics if target is nil or not a pointer. ErrorAs returns false if err == nil. See Topic.As for more details.
Directories
¶
Path | Synopsis |
---|---|
Package awspubsub provides an implementation of pubsub that uses AWS SNS (Simple Notification Service) and SQS (Simple Queueing Service).
|
Package awspubsub provides an implementation of pubsub that uses AWS SNS (Simple Notification Service) and SQS (Simple Queueing Service). |
Package azurepubsub provides an implementation of pubsub using Azure Service Bus Topic and Subscription.
|
Package azurepubsub provides an implementation of pubsub using Azure Service Bus Topic and Subscription. |
Package driver defines a set of interfaces that the pubsub package uses to interact with the underlying pubsub services.
|
Package driver defines a set of interfaces that the pubsub package uses to interact with the underlying pubsub services. |
Package drivertest provides a conformance test for implementations of driver.
|
Package drivertest provides a conformance test for implementations of driver. |
Package gcppubsub provides a pubsub implementation that uses GCP PubSub.
|
Package gcppubsub provides a pubsub implementation that uses GCP PubSub. |
kafkapubsub
module
|
|
Package mempubsub provides an in-memory pubsub implementation.
|
Package mempubsub provides an in-memory pubsub implementation. |
natspubsub
module
|
|
Package rabbitpubsub provides an pubsub implementation for RabbitMQ.
|
Package rabbitpubsub provides an pubsub implementation for RabbitMQ. |