Documentation ¶
Overview ¶
Package pubsub provides an easy way to publish and receive Google Cloud Pub/Sub messages, hiding the the details of the underlying server RPCs. Google Cloud Pub/Sub is a many-to-many, asynchronous messaging system that decouples senders and receivers.
Note: This package is experimental and may make backwards-incompatible changes.
More information about Google Cloud Pub/Sub is available at https://cloud.google.com/pubsub/docs
Publishing ¶
Google Cloud Pub/Sub messages are published to topics. Topics may be created using the pubsub package like so:
topic, err := pubsubClient.CreateTopic(context.Background(), "topic-name")
Messages may then be published to a topic:
msgIDs, err := topic.Publish(ctx, &pubsub.Message{ Data: []byte("payload"), })
Receiving ¶
To receive messages published to a topic, clients create subscriptions to the topic. There may be more than one subscription per topic; each message that is published to the topic will be delivered to all of its subscriptions.
Subsciptions may be created like so:
sub, err := pubsubClient.NewSubscription(context.Background(), "sub-name", topic, 0, nil)
Messages are then consumed from a subscription via an iterator:
// Construct the iterator it, err := sub.Pull(context.Background()) if err != nil { // handle err ... } defer it.Stop() // Consume N messages for i := 0; i < N; i++ { msg, err := it.Next() if err == pubsub.Done { break } if err != nil { // handle err ... break } log.Print("got message: ", string(msg.Data)) msg.Done(true) }
The message iterator returns messages one at a time, fetching batches of messages behind the scenes as needed. Once client code has processed the message, it must call Message.Done, otherwise the message will eventually be redelivered. For more information and configuration options, see "Deadlines" below.
Note: It is possible for Messages to be redelivered, even if Message.Done has been called. Client code must be robust to multiple deliveries of messages.
Deadlines ¶
The default pubsub deadlines are suitable for most use cases, but may be overridden. This section describes the tradeoffs that should be considered when overriding the defaults.
Behind the scenes, each message returned by the Pub/Sub server has an associated lease, known as an "ACK deadline". Unless a message is acknowledged within the ACK deadline, or the client requests that the ACK deadline be extended, the message will become elegible for redelivery. As a convenience, the pubsub package will automatically extend deadlines until either:
- Message.Done is called, or
- the "MaxExtension" period elapses from the time the message is fetched from the server.
The initial ACK deadline given to each messages defaults to 10 seconds, but may be overridden during subscription creation. Selecting an ACK deadline is a tradeoff between message redelivery latency and RPC volume. If the pubsub package fails to acknowledge or extend a message (e.g. due to unexpected termination of the process), a shorter ACK deadline will generally result in faster message redelivery by the Pub/Sub system. However, a short ACK deadline may also increase the number of deadline extension RPCs that the pubsub package sends to the server.
The default max extension period is DefaultMaxExtension, and can be overridden by passing a MaxExtension option to Subscription.Pull. Selecting a max extension period is a tradeoff between the speed at which client code must process messages, and the redelivery delay if messages fail to be acknowledged (e.g. because client code neglects to do so). Using a large MaxExtension increases the available time for client code to process messages. However, if the client code neglects to call Message.Done, a large MaxExtension will increase the delay before the message is redelivered.
Index ¶
- Constants
- Variables
- type Client
- func (c *Client) Close() error
- func (c *Client) CreateSubscription(ctx context.Context, id string, topic *Topic, ackDeadline time.Duration, ...) (*Subscription, error)
- func (c *Client) CreateTopic(ctx context.Context, id string) (*Topic, error)
- func (c *Client) Subscription(id string) *Subscription
- func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator
- func (c *Client) Topic(id string) *Topic
- func (c *Client) Topics(ctx context.Context) *TopicIterator
- type Message
- type MessageIterator
- type PullOption
- type PushConfig
- type Subscription
- func (s *Subscription) Config(ctx context.Context) (*SubscriptionConfig, error)
- func (s *Subscription) Delete(ctx context.Context) error
- func (s *Subscription) Exists(ctx context.Context) (bool, error)
- func (s *Subscription) ID() string
- func (s *Subscription) ModifyPushConfig(ctx context.Context, conf *PushConfig) error
- func (s *Subscription) Pull(ctx context.Context, opts ...PullOption) (*MessageIterator, error)
- func (s *Subscription) String() string
- type SubscriptionConfig
- type SubscriptionIterator
- type Topic
- func (t *Topic) Delete(ctx context.Context) error
- func (t *Topic) Exists(ctx context.Context) (bool, error)
- func (t *Topic) ID() string
- func (t *Topic) Publish(ctx context.Context, msgs ...*Message) ([]string, error)
- func (t *Topic) String() string
- func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator
- type TopicIterator
Examples ¶
- Client.CreateSubscription
- Client.CreateTopic
- Client.Subscriptions
- Client.Topics
- MessageIterator.Next
- MessageIterator.Stop (Defer)
- MessageIterator.Stop (Goroutine)
- NewClient
- Subscription.Config
- Subscription.Delete
- Subscription.Exists
- Subscription.ModifyPushConfig
- Subscription.Pull
- Subscription.Pull (Options)
- SubscriptionIterator.Next
- Topic.Delete
- Topic.Exists
- Topic.Publish
- Topic.Subscriptions
- TopicIterator.Next
Constants ¶
const ( // ScopePubSub grants permissions to view and manage Pub/Sub // topics and subscriptions. ScopePubSub = "https://www.googleapis.com/auth/pubsub" // ScopeCloudPlatform grants permissions to view and manage your data // across Google Cloud Platform services. ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform" )
const DefaultMaxExtension = 10 * time.Minute
The default period for which to automatically extend Message acknowledgement deadlines.
const DefaultMaxPrefetch = 100
The default maximum number of messages that are prefetched from the server.
const MaxPublishBatchSize = 1000
Variables ¶
var Done = iterator.Done
Done is returned when an iteration is complete.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a Google Pub/Sub client scoped to a single project.
Clients should be reused rather than being created as needed. A Client may be shared by multiple goroutines.
func NewClient ¶
NewClient creates a new PubSub client.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() _, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // See the other examples to learn how to use the Client. }
Output:
func (*Client) Close ¶ added in v0.2.0
Close closes any resources held by the client.
Close need not be called at program exit.
func (*Client) CreateSubscription ¶ added in v0.2.0
func (c *Client) CreateSubscription(ctx context.Context, id string, topic *Topic, ackDeadline time.Duration, pushConfig *PushConfig) (*Subscription, error)
CreateSubscription creates a new subscription on a topic.
name is the name of the subscription to create. It must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and must not start with "goog".
topic is the topic from which the subscription should receive messages. It need not belong to the same project as the subscription.
ackDeadline is the maximum time after a subscriber receives a message before the subscriber should acknowledge the message. It must be between 10 and 600 seconds (inclusive), and is rounded down to the nearest second. If the provided ackDeadline is 0, then the default value of 10 seconds is used. Note: messages which are obtained via a MessageIterator need not be acknowledged within this deadline, as the deadline will be automatically extended.
pushConfig may be set to configure this subscription for push delivery.
If the subscription already exists an error will be returned.
Example ¶
package main import ( "time" "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // Create a new topic with the given name. topic, err := client.CreateTopic(ctx, "topicName") if err != nil { // TODO: Handle error. } // Create a new subscription to the previously created topic // with the given name. sub, err := client.CreateSubscription(ctx, "subName", topic, 10*time.Second, nil) if err != nil { // TODO: Handle error. } _ = sub // TODO: use the subscription. }
Output:
func (*Client) CreateTopic ¶ added in v0.2.0
CreateTopic creates a new topic. The specified topic ID must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and must not start with "goog". If the topic already exists an error will be returned.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // Create a new topic with the given name. topic, err := client.CreateTopic(ctx, "topicName") if err != nil { // TODO: Handle error. } _ = topic // TODO: use the topic. }
Output:
func (*Client) Subscription ¶
func (c *Client) Subscription(id string) *Subscription
Subscription creates a reference to a subscription.
func (*Client) Subscriptions ¶
func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator
Subscriptions returns an iterator which returns all of the subscriptions for the client's project.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // List all subscriptions of the project. it := client.Subscriptions(ctx) _ = it // TODO: iterate using Next. }
Output:
func (*Client) Topics ¶
func (c *Client) Topics(ctx context.Context) *TopicIterator
Topics returns an iterator which returns all of the topics for the client's project.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } it := client.Topics(ctx) _ = it // TODO: iterate using Next. }
Output:
type Message ¶
type Message struct { // ID identifies this message. // This ID is assigned by the server and is populated for Messages obtained from a subscription. // This field is read-only. ID string // Data is the actual data in the message. Data []byte // Attributes represents the key-value pairs the current message // is labelled with. Attributes map[string]string // The time at which the message was published. // This is populated by the server for Messages obtained from a subscription. // This field is read-only. PublishTime time.Time // contains filtered or unexported fields }
Message represents a Pub/Sub message.
func (*Message) Done ¶
Done completes the processing of a Message that was returned from a MessageIterator. ack indicates whether the message should be acknowledged. Client code must call Done when finished for each Message returned by an iterator. Done may only be called on Messages returned by a MessageIterator. If message acknowledgement fails, the Message will be redelivered. Calls to Done have no effect after the first call.
See MessageIterator.Next for an example.
type MessageIterator ¶ added in v0.2.0
type MessageIterator struct {
// contains filtered or unexported fields
}
func (*MessageIterator) Next ¶ added in v0.2.0
func (it *MessageIterator) Next() (*Message, error)
Next returns the next Message to be processed. The caller must call Message.Done when finished with it. Once Stop has been called, calls to Next will return Done.
Example ¶
package main import ( "fmt" "cloud.google.com/go/pubsub" "golang.org/x/net/context" "google.golang.org/api/iterator" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } it, err := client.Subscription("subName").Pull(ctx) if err != nil { // TODO: Handle error. } // Ensure that the iterator is closed down cleanly. defer it.Stop() // Consume 10 messages. for i := 0; i < 10; i++ { m, err := it.Next() if err == iterator.Done { // There are no more messages. This will happen if it.Stop is called. break } if err != nil { // TODO: Handle error. break } fmt.Printf("message %d: %s\n", i, m.Data) // Acknowledge the message. m.Done(true) } }
Output:
func (*MessageIterator) Stop ¶ added in v0.2.0
func (it *MessageIterator) Stop()
Client code must call Stop on a MessageIterator when finished with it. Stop will block until Done has been called on all Messages that have been returned by Next, or until the context with which the MessageIterator was created is cancelled or exceeds its deadline. Stop need only be called once, but may be called multiple times from multiple goroutines.
Example (Defer) ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { // If all uses of the iterator occur within the lifetime of a single // function, stop it with defer. ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } it, err := client.Subscription("subName").Pull(ctx) if err != nil { // TODO: Handle error. } // Ensure that the iterator is closed down cleanly. defer it.Stop() // TODO: Use the iterator (see the example for MessageIterator.Next). }
Output:
Example (Goroutine) ¶
package main import ( "time" "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() *pubsub.MessageIterator { // If you use the iterator outside the lifetime of a single function, you // must still stop it. // This (contrived) example returns an iterator that will yield messages // for ten seconds, and then stop. ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } it, err := client.Subscription("subName").Pull(ctx) if err != nil { // TODO: Handle error. } // Stop the iterator after receiving messages for ten seconds. go func() { time.Sleep(10 * time.Second) it.Stop() }() return it }
Output:
type PullOption ¶
type PullOption interface {
// contains filtered or unexported methods
}
A PullOption is an optional argument to Subscription.Pull.
func MaxExtension ¶
func MaxExtension(duration time.Duration) PullOption
MaxExtension returns a PullOption that limits how long acks deadlines are extended for.
A MessageIterator will automatically extend the ack deadline of all fetched Messages for the duration specified. Automatic deadline extension may be disabled by specifying a duration of 0.
func MaxPrefetch ¶
func MaxPrefetch(num int) PullOption
MaxPrefetch returns a PullOption that limits Message prefetching.
For performance reasons, the pubsub library may prefetch a pool of Messages to be returned serially from MessageIterator.Next. MaxPrefetch is used to limit the the size of this pool.
If num is less than 1, it will be treated as if it were 1.
type PushConfig ¶
type PushConfig struct { // A URL locating the endpoint to which messages should be pushed. Endpoint string // Endpoint configuration attributes. See https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions#PushConfig.FIELDS.attributes for more details. Attributes map[string]string }
PushConfig contains configuration for subscriptions that operate in push mode.
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription is a reference to a PubSub subscription.
func (*Subscription) Config ¶
func (s *Subscription) Config(ctx context.Context) (*SubscriptionConfig, error)
Config fetches the current configuration for the subscription.
Example ¶
package main import ( "fmt" "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") config, err := sub.Config(ctx) if err != nil { // TODO: Handle error. } fmt.Println(config) }
Output:
func (*Subscription) Delete ¶
func (s *Subscription) Delete(ctx context.Context) error
Delete deletes the subscription.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") if err := sub.Delete(ctx); err != nil { // TODO: Handle error. } }
Output:
func (*Subscription) Exists ¶
func (s *Subscription) Exists(ctx context.Context) (bool, error)
Exists reports whether the subscription exists on the server.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") ok, err := sub.Exists(ctx) if err != nil { // TODO: Handle error. } if !ok { // Subscription doesn't exist. } }
Output:
func (*Subscription) ID ¶ added in v0.2.0
func (s *Subscription) ID() string
ID returns the unique identifier of the subscription within its project.
func (*Subscription) ModifyPushConfig ¶
func (s *Subscription) ModifyPushConfig(ctx context.Context, conf *PushConfig) error
ModifyPushConfig updates the endpoint URL and other attributes of a push subscription.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") if err := sub.ModifyPushConfig(ctx, &pubsub.PushConfig{Endpoint: "https://example.com/push"}); err != nil { // TODO: Handle error. } }
Output:
func (*Subscription) Pull ¶
func (s *Subscription) Pull(ctx context.Context, opts ...PullOption) (*MessageIterator, error)
Pull returns a MessageIterator that can be used to fetch Messages. The MessageIterator will automatically extend the ack deadline of all fetched Messages, for the period specified by DefaultMaxExtension. This may be overridden by supplying a MaxExtension pull option.
If ctx is cancelled or exceeds its deadline, outstanding acks or deadline extensions will fail.
The caller must call Stop on the MessageIterator once finished with it.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } it, err := client.Subscription("subName").Pull(ctx) if err != nil { // TODO: Handle error. } // Ensure that the iterator is closed down cleanly. defer it.Stop() }
Output:
Example (Options) ¶
package main import ( "time" "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") // This program is expected to process and acknowledge messages // in 5 seconds. If not, Pub/Sub API will assume the message is not // acknowledged. it, err := sub.Pull(ctx, pubsub.MaxExtension(5*time.Second)) if err != nil { // TODO: Handle error. } // Ensure that the iterator is closed down cleanly. defer it.Stop() }
Output:
func (*Subscription) String ¶ added in v0.2.0
func (s *Subscription) String() string
String returns the globally unique printable name of the subscription.
type SubscriptionConfig ¶
type SubscriptionConfig struct { Topic *Topic PushConfig PushConfig // The default maximum time after a subscriber receives a message before // the subscriber should acknowledge the message. Note: messages which are // obtained via a MessageIterator need not be acknowledged within this // deadline, as the deadline will be automatically extended. AckDeadline time.Duration }
Subscription config contains the configuration of a subscription.
type SubscriptionIterator ¶
type SubscriptionIterator struct {
// contains filtered or unexported fields
}
SubscriptionIterator is an iterator that returns a series of subscriptions.
func (*SubscriptionIterator) Next ¶
func (subs *SubscriptionIterator) Next() (*Subscription, error)
Next returns the next subscription. If there are no more subscriptions, Done will be returned.
Example ¶
package main import ( "fmt" "cloud.google.com/go/pubsub" "golang.org/x/net/context" "google.golang.org/api/iterator" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // List all subscriptions of the project. it := client.Subscriptions(ctx) for { sub, err := it.Next() if err == iterator.Done { break } if err != nil { // TODO: Handle error. } fmt.Println(sub) } }
Output:
type Topic ¶
type Topic struct {
// contains filtered or unexported fields
}
Topic is a reference to a PubSub topic.
func (*Topic) Delete ¶
Delete deletes the topic.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } topic := client.Topic("topicName") if err := topic.Delete(ctx); err != nil { // TODO: Handle error. } }
Output:
func (*Topic) Exists ¶
Exists reports whether the topic exists on the server.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } topic := client.Topic("topicName") ok, err := topic.Exists(ctx) if err != nil { // TODO: Handle error. } if !ok { // Topic doesn't exist. } }
Output:
func (*Topic) Publish ¶
Publish publishes the supplied Messages to the topic. If successful, the server-assigned message IDs are returned in the same order as the supplied Messages. At most MaxPublishBatchSize messages may be supplied.
Example ¶
package main import ( "fmt" "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } topic := client.Topic("topicName") msgIDs, err := topic.Publish(ctx, &pubsub.Message{ Data: []byte("hello world"), }) if err != nil { // TODO: Handle error. } fmt.Printf("Published a message with a message ID: %s\n", msgIDs[0]) }
Output:
func (*Topic) String ¶ added in v0.2.0
String returns the printable globally unique name for the topic.
func (*Topic) Subscriptions ¶
func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator
Subscriptions returns an iterator which returns the subscriptions for this topic.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } topic := client.Topic("topic-name") // List all subscriptions of the topic (maybe of multiple projects). for subs := topic.Subscriptions(ctx); ; { sub, err := subs.Next() if err == pubsub.Done { break } if err != nil { // TODO: Handle error. } _ = sub // TODO: use the subscription. } }
Output:
type TopicIterator ¶
type TopicIterator struct {
// contains filtered or unexported fields
}
TopicIterator is an iterator that returns a series of topics.
func (*TopicIterator) Next ¶
func (tps *TopicIterator) Next() (*Topic, error)
Next returns the next topic. If there are no more topics, Done will be returned.
Example ¶
package main import ( "fmt" "cloud.google.com/go/pubsub" "golang.org/x/net/context" "google.golang.org/api/iterator" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // List all topics. it := client.Topics(ctx) for { t, err := it.Next() if err == iterator.Done { break } if err != nil { // TODO: Handle error. } fmt.Println(t) } }
Output: