pubsub

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2019 License: Apache-2.0 Imports: 13 Imported by: 256

Documentation

Overview

Package pubsub provides an easy and portable way to interact with publish/ subscribe systems.

Subpackages contain distinct implementations of pubsub for various providers, including Cloud and on-prem solutions. For example, "gcspubsub" supports Google Cloud Pub/Sub. Your application should import one of these provider-specific subpackages and use its exported functions to get a *Topic and/or *Subscription; do not use the NewTopic/NewSubscription functions in this package. For example:

topic := mempubsub.NewTopic()
err := topic.Send(ctx.Background(), &pubsub.Message{Body: []byte("hi"))
...

Then, write your application code using the *Topic/*Subscription types. You can easily reconfigure your initialization code to choose a different provider. You can develop your application locally using memblob, or deploy it to multiple Cloud providers. You may find http://github.com/google/wire useful for managing your initialization code.

OpenCensus Integration

OpenCensus supports tracing and metric collection for multiple languages and backend providers. See https://opencensus.io.

This API collects OpenCensus traces and metrics for the following methods:

  • Topic.Send
  • Topic.Shutdown
  • Subscription.Receive
  • Subscription.Shutdown
  • The internal driver methods SendBatch, SendAcks and ReceiveBatch.

All trace and metric names begin with the package import path. The traces add the method name. For example, "gocloud.dev/pubsub/Topic.Send". The metrics are "completed_calls", a count of completed method calls by provider, method and status (error code); and "latency", a distribution of method latency by provider and method. For example, "gocloud.dev/pubsub/latency".

To enable trace collection in your application, see "Configure Exporter" at https://opencensus.io/quickstart/go/tracing. To enable metric collection in your application, see "Exporting stats" at https://opencensus.io/quickstart/go/metrics.

Example (ReceiveWithInvertedWorkerPool)
package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	"gocloud.dev/pubsub"
	"gocloud.dev/pubsub/mempubsub"
)

func main() {
	// Open a topic and corresponding subscription.
	ctx := context.Background()
	t := mempubsub.NewTopic()
	defer t.Shutdown(ctx)
	s := mempubsub.NewSubscription(t, time.Second)
	defer s.Shutdown(ctx)

	// Send a bunch of messages to the topic.
	const nMessages = 100
	for n := 0; n < nMessages; n++ {
		m := &pubsub.Message{
			Body: []byte(fmt.Sprintf("message %d", n)),
		}
		if err := t.Send(ctx, m); err != nil {
			log.Fatal(err)
		}
	}

	// In order to make our test exit, we keep track of how many messages were
	// processed with wg, and cancel the receiveCtx when we've processed them all.
	// A more realistic application would not need this WaitGroup.
	var wg sync.WaitGroup
	wg.Add(nMessages)
	receiveCtx, cancel := context.WithCancel(ctx)
	go func() {
		wg.Wait()
		cancel()
	}()

	// Process messages using an inverted worker pool, as described here:
	// https://www.youtube.com/watch?v=5zXAHh5tJqQ&t=26m58s
	// It uses a buffered channel, sem, as a semaphore to manage the maximum
	// number of workers.
	const poolSize = 10
	sem := make(chan struct{}, poolSize)
	for {
		// Read a message. Receive will block until a message is available.
		msg, err := s.Receive(receiveCtx)
		if err != nil {
			// An error from Receive is fatal; Receive will never succeed again
			// so the application should exit.
			// In this example, we expect to get a error here when we've read all the
			// messages and receiveCtx is canceled.
			break
		}

		// Write a token to the semaphore; if there are already poolSize workers
		// active, this will block until one of them completes.
		sem <- struct{}{}
		// Process the message. For many applications, this can be expensive, so
		// we do it in a goroutine, allowing this loop to continue and Receive more
		// messages.
		go func() {
			// Record that we've processed this message, and Ack it.
			msg.Ack()
			wg.Done()
			// Read a token from the semaphore before exiting this goroutine, freeing
			// up the slot for another goroutine.
			<-sem
		}()
	}

	// Wait for all workers to finish.
	for n := poolSize; n > 0; n-- {
		sem <- struct{}{}
	}
	fmt.Printf("Read %d messages", nMessages)

}
Output:

Read 100 messages
Example (ReceiveWithTraditionalWorkerPool)
package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	"gocloud.dev/pubsub"
	"gocloud.dev/pubsub/mempubsub"
)

func main() {
	// Open a topic and corresponding subscription.
	ctx := context.Background()
	t := mempubsub.NewTopic()
	defer t.Shutdown(ctx)
	s := mempubsub.NewSubscription(t, time.Second)
	defer s.Shutdown(ctx)

	// Send a bunch of messages to the topic.
	const nMessages = 100
	for n := 0; n < nMessages; n++ {
		m := &pubsub.Message{
			Body: []byte(fmt.Sprintf("message %d", n)),
		}
		if err := t.Send(ctx, m); err != nil {
			log.Fatal(err)
		}
	}

	// In order to make our test exit, we keep track of how many messages were
	// processed with wg, and cancel the receiveCtx when we've processed them all.
	// A more realistic application would not need this WaitGroup.
	var wg sync.WaitGroup
	wg.Add(nMessages)
	receiveCtx, cancel := context.WithCancel(ctx)
	go func() {
		wg.Wait()
		cancel()
	}()

	// Process messages using a traditional worker pool. Consider using an
	// inverted pool instead (see the other example).
	const poolSize = 10
	var workerWg sync.WaitGroup
	for n := 0; n < poolSize; n++ {
		workerWg.Add(1)
		go func() {
			for {
				// Read a message. Receive will block until a message is available.
				// It's fine to call Receive from many goroutines.
				msg, err := s.Receive(receiveCtx)
				if err != nil {
					// An error from Receive is fatal; Receive will never succeed again
					// so the application should exit.
					// In this example, we expect to get a error here when we've read all
					// the messages and receiveCtx is canceled.
					workerWg.Done()
					return
				}

				// Process the message and Ack it.
				msg.Ack()
				wg.Done()
			}
		}()
	}

	// Wait for all workers to finish.
	workerWg.Wait()
	fmt.Printf("Read %d messages", nMessages)

}
Output:

Read 100 messages
Example (SendReceive)
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"gocloud.dev/pubsub"
	"gocloud.dev/pubsub/mempubsub"
)

func main() {
	// Open a topic and corresponding subscription.
	ctx := context.Background()
	t := mempubsub.NewTopic()
	defer t.Shutdown(ctx)
	s := mempubsub.NewSubscription(t, time.Second)
	defer s.Shutdown(ctx)

	// Send a message to the topic.
	if err := t.Send(ctx, &pubsub.Message{Body: []byte("Hello, world!")}); err != nil {
		log.Fatal(err)
	}

	// Receive a message from the subscription.
	m, err := s.Receive(ctx)
	if err != nil {
		log.Fatal(err)
	}

	// Print out the received message.
	fmt.Printf("%s\n", m.Body)

	// Acknowledge the message.
	m.Ack()

}
Output:

Hello, world!
Example (SendReceiveMultipleMessages)
package main

import (
	"context"
	"fmt"
	"log"
	"sort"
	"time"

	"gocloud.dev/pubsub"
	"gocloud.dev/pubsub/mempubsub"
)

func main() {
	// Open a topic and corresponding subscription.
	ctx := context.Background()
	t := mempubsub.NewTopic()
	defer t.Shutdown(ctx)
	s := mempubsub.NewSubscription(t, time.Second)
	defer s.Shutdown(ctx)

	// Send messages to the topic.
	ms := []*pubsub.Message{
		{Body: []byte("a")},
		{Body: []byte("b")},
		{Body: []byte("c")},
	}
	for _, m := range ms {
		if err := t.Send(ctx, m); err != nil {
			log.Fatal(err)
		}
	}

	// Receive messages from the subscription.
	ms2 := []string{}
	for i := 0; i < len(ms); i++ {
		m2, err := s.Receive(ctx)
		if err != nil {
			log.Fatal(err)
		}
		ms2 = append(ms2, string(m2.Body))
		m2.Ack()
	}

	// The messages may be received in a different order than they were
	// sent.
	sort.Strings(ms2)

	// Print out and acknowledge the received messages.
	for _, m2 := range ms2 {
		fmt.Println(m2)
	}

}
Output:

a
b
c

Index

Examples

Constants

This section is empty.

Variables

View Source
var NewSubscription = newSubscription

NewSubscription is for use by provider implementations.

View Source
var NewTopic = newTopic

NewTopic is for use by provider implementations.

View Source
var (

	// OpenCensusViews are predefined views for OpenCensus metrics.
	// The views include counts and latency distributions for API method calls.
	// See the example at https://godoc.org/go.opencensus.io/stats/view for usage.
	OpenCensusViews = oc.Views(pkgName, latencyMeasure)
)

Functions

This section is empty.

Types

type Message

type Message struct {
	// Body contains the content of the message.
	Body []byte

	// Metadata has key/value metadata for the message.
	Metadata map[string]string
	// contains filtered or unexported fields
}

Message contains data to be published.

func (*Message) Ack

func (m *Message) Ack()

Ack acknowledges the message, telling the server that it does not need to be sent again to the associated Subscription. It returns immediately, but the actual ack is sent in the background, and is not guaranteed to succeed.

func (*Message) As added in v0.10.0

func (m *Message) As(i interface{}) bool

As converts m to provider-specific types. See Topic.As for details. As panics unless it is called on a message obtained from Subscription.Receive.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription receives published messages.

func (*Subscription) As

func (s *Subscription) As(i interface{}) bool

As converts i to provider-specific types. See Topic.As for more details.

func (*Subscription) ErrorAs added in v0.10.0

func (s *Subscription) ErrorAs(err error, target interface{}) bool

ErrorAs converts err to provider-specific types. ErrorAs panics if target is nil or not a pointer. ErrorAs returns false if err == nil. See Topic.As for more details.

func (*Subscription) Receive

func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error)

Receive receives and returns the next message from the Subscription's queue, blocking and polling if none are available. This method can be called concurrently from multiple goroutines. The Ack() method of the returned Message has to be called once the message has been processed, to prevent it from being received again.

func (*Subscription) Shutdown

func (s *Subscription) Shutdown(ctx context.Context) (err error)

Shutdown flushes pending ack sends and disconnects the Subscription.

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic publishes messages to all its subscribers.

func (*Topic) As

func (t *Topic) As(i interface{}) bool

As converts i to provider-specific types.

This function (and the other As functions in this package) are inherently provider-specific, and using them will make that part of your application non-portable, so use with care.

See the documentation for the subpackage used to instantiate Bucket to see which type(s) are supported.

Usage:

1. Declare a variable of the provider-specific type you want to access.

2. Pass a pointer to it to As.

3. If the type is supported, As will return true and copy the provider-specific type into your variable. Otherwise, it will return false.

Provider-specific types that are intended to be mutable will be exposed as a pointer to the underlying type.

See https://github.com/google/go-cloud/blob/master/internal/docs/design.md#as for more background.

func (*Topic) ErrorAs added in v0.10.0

func (t *Topic) ErrorAs(err error, target interface{}) bool

ErrorAs converts err to provider-specific types. ErrorAs panics if target is nil or not a pointer. ErrorAs returns false if err == nil. See Topic.As for more details.

func (*Topic) Send

func (t *Topic) Send(ctx context.Context, m *Message) (err error)

Send publishes a message. It only returns after the message has been sent, or failed to be sent. Send can be called from multiple goroutines at once.

func (*Topic) Shutdown

func (t *Topic) Shutdown(ctx context.Context) (err error)

Shutdown flushes pending message sends and disconnects the Topic. It only returns after all pending messages have been sent.

Directories

Path Synopsis
Package awspubsub provides an implementation of pubsub that uses AWS SNS (Simple Notification Service) and SQS (Simple Queueing Service).
Package awspubsub provides an implementation of pubsub that uses AWS SNS (Simple Notification Service) and SQS (Simple Queueing Service).
Package azurepubsub provides an implementation of pubsub using Azure Service Bus Topic and Subscription.
Package azurepubsub provides an implementation of pubsub using Azure Service Bus Topic and Subscription.
Package driver defines a set of interfaces that the pubsub package uses to interact with the underlying pubsub services.
Package driver defines a set of interfaces that the pubsub package uses to interact with the underlying pubsub services.
Package drivertest provides a conformance test for implementations of driver.
Package drivertest provides a conformance test for implementations of driver.
Package gcppubsub provides a pubsub implementation that uses GCP PubSub.
Package gcppubsub provides a pubsub implementation that uses GCP PubSub.
kafkapubsub module
Package mempubsub provides an in-memory pubsub implementation.
Package mempubsub provides an in-memory pubsub implementation.
natspubsub module
Package rabbitpubsub provides an pubsub implementation for RabbitMQ.
Package rabbitpubsub provides an pubsub implementation for RabbitMQ.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL