pubsub

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2019 License: Apache-2.0 Imports: 9 Imported by: 254

Documentation

Overview

Package pubsub provides an easy and portable way to interact with publish/ subscribe systems.

Example (ReceiveWithInvertedWorkerPool)
package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	"gocloud.dev/pubsub"
	"gocloud.dev/pubsub/mempubsub"
)

func main() {
	// Open a topic and corresponding subscription.
	ctx := context.Background()
	t := mempubsub.NewTopic()
	defer t.Shutdown(ctx)
	s := mempubsub.NewSubscription(t, time.Second)
	defer s.Shutdown(ctx)

	// Send a bunch of messages to the topic.
	const nMessages = 100
	for n := 0; n < nMessages; n++ {
		m := &pubsub.Message{
			Body: []byte(fmt.Sprintf("message %d", n)),
		}
		if err := t.Send(ctx, m); err != nil {
			log.Fatal(err)
		}
	}

	// In order to make our test exit, we keep track of how many messages were
	// processed with wg, and cancel the receiveCtx when we've processed them all.
	// A more realistic application would not need this WaitGroup.
	var wg sync.WaitGroup
	wg.Add(nMessages)
	receiveCtx, cancel := context.WithCancel(ctx)
	go func() {
		wg.Wait()
		cancel()
	}()

	// Process messages using an inverted worker pool, as described here:
	// https://www.youtube.com/watch?v=5zXAHh5tJqQ&t=26m58s
	// It uses a buffered channel, sem, as a semaphore to manage the maximum
	// number of workers.
	const poolSize = 10
	sem := make(chan struct{}, poolSize)
	for {
		// Read a message. Receive will block until a message is available.
		msg, err := s.Receive(receiveCtx)
		if err != nil {
			// An error from Receive is fatal; Receive will never succeed again
			// so the application should exit.
			// In this example, we expect to get a error here when we've read all the
			// messages and receiveCtx is canceled.
			break
		}

		// Write a token to the semaphore; if there are already poolSize workers
		// active, this will block until one of them completes.
		sem <- struct{}{}
		// Process the message. For many applications, this can be expensive, so
		// we do it in a goroutine, allowing this loop to continue and Receive more
		// messages.
		go func() {
			// Record that we've processed this message, and Ack it.
			msg.Ack()
			wg.Done()
			// Read a token from the semaphore before exiting this goroutine, freeing
			// up the slot for another goroutine.
			<-sem
		}()
	}

	// Wait for all workers to finish.
	for n := poolSize; n > 0; n-- {
		sem <- struct{}{}
	}
	fmt.Printf("Read %d messages", nMessages)

}
Output:

Read 100 messages
Example (ReceiveWithTraditionalWorkerPool)
package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	"gocloud.dev/pubsub"
	"gocloud.dev/pubsub/mempubsub"
)

func main() {
	// Open a topic and corresponding subscription.
	ctx := context.Background()
	t := mempubsub.NewTopic()
	defer t.Shutdown(ctx)
	s := mempubsub.NewSubscription(t, time.Second)
	defer s.Shutdown(ctx)

	// Send a bunch of messages to the topic.
	const nMessages = 100
	for n := 0; n < nMessages; n++ {
		m := &pubsub.Message{
			Body: []byte(fmt.Sprintf("message %d", n)),
		}
		if err := t.Send(ctx, m); err != nil {
			log.Fatal(err)
		}
	}

	// In order to make our test exit, we keep track of how many messages were
	// processed with wg, and cancel the receiveCtx when we've processed them all.
	// A more realistic application would not need this WaitGroup.
	var wg sync.WaitGroup
	wg.Add(nMessages)
	receiveCtx, cancel := context.WithCancel(ctx)
	go func() {
		wg.Wait()
		cancel()
	}()

	// Process messages using a traditional worker pool. Consider using an
	// inverted pool instead (see the other example).
	const poolSize = 10
	var workerWg sync.WaitGroup
	for n := 0; n < poolSize; n++ {
		workerWg.Add(1)
		go func() {
			for {
				// Read a message. Receive will block until a message is available.
				// It's fine to call Receive from many goroutines.
				msg, err := s.Receive(receiveCtx)
				if err != nil {
					// An error from Receive is fatal; Receive will never succeed again
					// so the application should exit.
					// In this example, we expect to get a error here when we've read all
					// the messages and receiveCtx is canceled.
					workerWg.Done()
					return
				}

				// Process the message and Ack it.
				msg.Ack()
				wg.Done()
			}
		}()
	}

	// Wait for all workers to finish.
	workerWg.Wait()
	fmt.Printf("Read %d messages", nMessages)

}
Output:

Read 100 messages
Example (SendReceive)
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"gocloud.dev/pubsub"
	"gocloud.dev/pubsub/mempubsub"
)

func main() {
	// Open a topic and corresponding subscription.
	ctx := context.Background()
	t := mempubsub.NewTopic()
	defer t.Shutdown(ctx)
	s := mempubsub.NewSubscription(t, time.Second)
	defer s.Shutdown(ctx)

	// Send a message to the topic.
	if err := t.Send(ctx, &pubsub.Message{Body: []byte("Hello, world!")}); err != nil {
		log.Fatal(err)
	}

	// Receive a message from the subscription.
	m, err := s.Receive(ctx)
	if err != nil {
		log.Fatal(err)
	}

	// Print out the received message.
	fmt.Printf("%s\n", m.Body)

	// Acknowledge the message.
	m.Ack()

}
Output:

Hello, world!
Example (SendReceiveMultipleMessages)
package main

import (
	"context"
	"fmt"
	"log"
	"sort"
	"time"

	"gocloud.dev/pubsub"
	"gocloud.dev/pubsub/mempubsub"
)

func main() {
	// Open a topic and corresponding subscription.
	ctx := context.Background()
	t := mempubsub.NewTopic()
	defer t.Shutdown(ctx)
	s := mempubsub.NewSubscription(t, time.Second)
	defer s.Shutdown(ctx)

	// Send messages to the topic.
	ms := []*pubsub.Message{
		{Body: []byte("a")},
		{Body: []byte("b")},
		{Body: []byte("c")},
	}
	for _, m := range ms {
		if err := t.Send(ctx, m); err != nil {
			log.Fatal(err)
		}
	}

	// Receive messages from the subscription.
	ms2 := []string{}
	for i := 0; i < len(ms); i++ {
		m2, err := s.Receive(ctx)
		if err != nil {
			log.Fatal(err)
		}
		ms2 = append(ms2, string(m2.Body))
		m2.Ack()
	}

	// The messages may be received in a different order than they were
	// sent.
	sort.Strings(ms2)

	// Print out and acknowledge the received messages.
	for _, m2 := range ms2 {
		fmt.Println(m2)
	}

}
Output:

a
b
c

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Message

type Message struct {
	// Body contains the content of the message.
	Body []byte

	// Metadata has key/value metadata for the message.
	Metadata map[string]string
	// contains filtered or unexported fields
}

Message contains data to be published.

func (*Message) Ack

func (m *Message) Ack()

Ack acknowledges the message, telling the server that it does not need to be sent again to the associated Subscription. It returns immediately, but the actual ack is sent in the background, and is not guaranteed to succeed.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription receives published messages.

func NewSubscription

func NewSubscription(d driver.Subscription, newAckBatcher func(context.Context, *Subscription) driver.Batcher) *Subscription

NewSubscription creates a Subscription from a driver.Subscription and a function to make a batcher that sends batches of acks to the provider. If newAckBatcher is nil, a default batcher implementation will be used. NewSubscription is for use by provider implementations.

func (*Subscription) As

func (s *Subscription) As(i interface{}) bool

As converts i to provider-specific types. See provider documentation for which type(s) are supported.

See https://github.com/google/go-cloud/blob/master/internal/docs/design.md#as for more background.

func (*Subscription) Receive

func (s *Subscription) Receive(ctx context.Context) (*Message, error)

Receive receives and returns the next message from the Subscription's queue, blocking and polling if none are available. This method can be called concurrently from multiple goroutines. The Ack() method of the returned Message has to be called once the message has been processed, to prevent it from being received again.

func (*Subscription) Shutdown

func (s *Subscription) Shutdown(ctx context.Context) error

Shutdown flushes pending ack sends and disconnects the Subscription.

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic publishes messages to all its subscribers.

func NewTopic

func NewTopic(d driver.Topic) *Topic

NewTopic makes a pubsub.Topic from a driver.Topic. It is for use by provider implementations.

func (*Topic) As

func (t *Topic) As(i interface{}) bool

As converts i to provider-specific types. See provider documentation for which type(s) are supported.

See https://github.com/google/go-cloud/blob/master/internal/docs/design.md#as for more background.

func (*Topic) Send

func (t *Topic) Send(ctx context.Context, m *Message) error

Send publishes a message. It only returns after the message has been sent, or failed to be sent. Send can be called from multiple goroutines at once.

func (*Topic) Shutdown

func (t *Topic) Shutdown(ctx context.Context) error

Shutdown flushes pending message sends and disconnects the Topic. It only returns after all pending messages have been sent.

Directories

Path Synopsis
Package driver defines a set of interfaces that the pubsub package uses to interact with the underlying pubsub services.
Package driver defines a set of interfaces that the pubsub package uses to interact with the underlying pubsub services.
Package drivertest provides a conformance test for implementations of driver.
Package drivertest provides a conformance test for implementations of driver.
Package gcppubsub provides an implementation of pubsub that uses GCP PubSub.
Package gcppubsub provides an implementation of pubsub that uses GCP PubSub.
kafkapubsub module
Package mempubsub provides an in-memory pubsub implementation.
Package mempubsub provides an in-memory pubsub implementation.
natspubsub module
Package rabbitpubsub provides a pubsub driver for RabbitMQ.
Package rabbitpubsub provides a pubsub driver for RabbitMQ.
package samples contains sample programs using the pubsub API.
package samples contains sample programs using the pubsub API.
gcmsg
gcmsg is a sample application that publishes messages from stdin to an existing topic or receives messages from an existing subscription and sends them to stdout.
gcmsg is a sample application that publishes messages from stdin to an existing topic or receives messages from an existing subscription and sends them to stdout.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL