Documentation ¶
Overview ¶
Package pubsub provides an easy way to publish and receive Google Cloud Pub/Sub messages, hiding the the details of the underlying server RPCs. Google Cloud Pub/Sub is a many-to-many, asynchronous messaging system that decouples senders and receivers.
Note: This package is experimental and may make backwards-incompatible changes.
More information about Google Cloud Pub/Sub is available at https://cloud.google.com/pubsub/docs
Publishing ¶
Google Cloud Pub/Sub messages are published to topics. Topics may be created using the pubsub package like so:
topic, err := pubsubClient.NewTopic(context.Background(), "topic-name")
Messages may then be published to a topic:
msgIDs, err := topic.Publish(ctx, &pubsub.Message{ Data: []byte("payload"), })
Receiving ¶
To receive messages published to a topic, clients create subscriptions to the topic. There may be more than one subscription per topic; each message that is published to the topic will be delivered to all of its subscriptions.
Subsciptions may be created like so:
sub, err := pubsubClient.NewSubscription(context.Background(), "sub-name", topic, 0, nil)
Messages are then consumed from a subscription via an iterator:
// Construct the iterator it, err := sub.Pull(context.Background()) if err != nil { // handle err ... } defer it.Stop() // Consume N messages for i := 0; i < N; i++ { msg, err := it.Next() if err == pubsub.Done { break } if err != nil { // handle err ... break } log.Print("got message: ", string(msg.Data)) msg.Done(true) }
The message iterator returns messages one at a time, fetching batches of messages behind the scenes as needed. Once client code has processed the message, it must call Message.Done, otherwise the message will eventually be redelivered. For more information and configuration options, see "Deadlines" below.
Note: It is possible for Messages to be redelivered, even if Message.Done has been called. Client code must be robust to multiple deliveries of messages.
Deadlines ¶
The default pubsub deadlines are suitable for most use cases, but may be overridden. This section describes the tradeoffs that should be considered when overriding the defaults.
Behind the scenes, each message returned by the Pub/Sub server has an associated lease, known as an "ACK deadline". Unless a message is acknowledged within the ACK deadline, or the client requests that the ACK deadline be extended, the message will become elegible for redelivery. As a convenience, the pubsub package will automatically extend deadlines until either:
- Message.Done is called, or
- the "MaxExtension" period elapses from the time the message is fetched from the server.
The initial ACK deadline given to each messages defaults to 10 seconds, but may be overridden during subscription creation. Selecting an ACK deadline is a tradeoff between message redelivery latency and RPC volume. If the pubsub package fails to acknowledge or extend a message (e.g. due to unexpected termination of the process), a shorter ACK deadline will generally result in faster message redelivery by the Pub/Sub system. However, a short ACK deadline may also increase the number of deadline extension RPCs that the pubsub package sends to the server.
The default max extension period is DefaultMaxExtension, and can be overridden by passing a MaxExtension option to Subscription.Pull. Selecting a max extension period is a tradeoff between the speed at which client code must process messages, and the redelivery delay if messages fail to be acknowledged (e.g. because client code neglects to do so). Using a large MaxExtension increases the available time for client code to process messages. However, if the client code neglects to call Message.Done, a large MaxExtension will increase the delay before the message is redelivered.
Example (Auth) ¶
package main import ( "io/ioutil" "log" "golang.org/x/oauth2" "golang.org/x/oauth2/google" "google.golang.org/cloud" "google.golang.org/cloud/pubsub" ) func main() context.Context { // Initialize an authorized context with Google Developers Console // JSON key. Read the google package examples to learn more about // different authorization flows you can use. // http://godoc.org/golang.org/x/oauth2/google jsonKey, err := ioutil.ReadFile("/path/to/json/keyfile.json") if err != nil { log.Fatal(err) } conf, err := google.JWTConfigFromJSON( jsonKey, pubsub.ScopeCloudPlatform, pubsub.ScopePubSub, ) if err != nil { log.Fatal(err) } ctx := cloud.NewContext("project-id", conf.Client(oauth2.NoContext)) // See the other samples to learn how to use the context. return ctx }
Output:
Index ¶
- Constants
- Variables
- func Ack(ctx context.Context, sub string, id ...string) errordeprecated
- func CreateSub(ctx context.Context, name string, topic string, deadline time.Duration, ...) errordeprecated
- func CreateTopic(ctx context.Context, name string) errordeprecated
- func DeleteSub(ctx context.Context, name string) errordeprecated
- func DeleteTopic(ctx context.Context, name string) errordeprecated
- func ModifyAckDeadline(ctx context.Context, sub string, id string, deadline time.Duration) errordeprecated
- func ModifyPushEndpoint(ctx context.Context, sub, endpoint string) errordeprecated
- func Publish(ctx context.Context, topic string, msgs ...*Message) ([]string, error)deprecated
- func SubExists(ctx context.Context, name string) (bool, error)deprecated
- func TopicExists(ctx context.Context, name string) (bool, error)deprecated
- type Client
- func (c *Client) NewSubscription(ctx context.Context, name string, topic *Topic, ackDeadline time.Duration, ...) (*Subscription, error)
- func (c *Client) NewTopic(ctx context.Context, name string) (*Topic, error)
- func (c *Client) Subscription(name string) *Subscription
- func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator
- func (c *Client) Topic(name string) *Topic
- func (c *Client) Topics(ctx context.Context) *TopicIterator
- type Iterator
- type Message
- type PullOption
- type PushConfig
- type Subscription
- func (s *Subscription) Config(ctx context.Context) (*SubscriptionConfig, error)
- func (s *Subscription) Delete(ctx context.Context) error
- func (s *Subscription) Exists(ctx context.Context) (bool, error)
- func (s *Subscription) ModifyPushConfig(ctx context.Context, conf *PushConfig) error
- func (s *Subscription) Name() string
- func (s *Subscription) Pull(ctx context.Context, opts ...PullOption) (*Iterator, error)
- type SubscriptionConfig
- type SubscriptionIterator
- type Topic
- type TopicIterator
Examples ¶
Constants ¶
const ( // ScopePubSub grants permissions to view and manage Pub/Sub // topics and subscriptions. ScopePubSub = "https://www.googleapis.com/auth/pubsub" // ScopeCloudPlatform grants permissions to view and manage your data // across Google Cloud Platform services. ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform" )
const DefaultMaxExtension = 10 * time.Minute
The default period for which to automatically extend Message acknowledgement deadlines.
const DefaultMaxPrefetch = 100
The default maximum number of messages that are prefetched from the server.
const MaxPublishBatchSize = 1000
Variables ¶
var Done = errors.New("no more messages")
Done is returned when an iteration is complete.
Functions ¶
func CreateSub
deprecated
func CreateSub(ctx context.Context, name string, topic string, deadline time.Duration, endpoint string) error
CreateSub creates a Pub/Sub subscription on the backend.
Deprecated: Use Client.NewSubscription instead.
A subscription should subscribe to an existing topic.
The messages that haven't acknowledged will be pushed back to the subscription again when the default acknowledgement deadline is reached. You can override the default deadline by providing a non-zero deadline. Deadline must not be specified to precision greater than one second.
As new messages are being queued on the subscription, you may recieve push notifications regarding to the new arrivals. To receive notifications of new messages in the queue, specify an endpoint callback URL. If endpoint is an empty string the backend will not notify the client of new messages.
If the subscription already exists an error will be returned.
func CreateTopic
deprecated
func DeleteTopic
deprecated
func ModifyAckDeadline
deprecated
ModifyAckDeadline modifies the acknowledgement deadline for the messages retrieved from the specified subscription. Deadline must not be specified to precision greater than one second.
Deprecated: Use Subscription.Pull instead, which automatically extends ack deadlines.
func ModifyPushEndpoint
deprecated
func Publish
deprecated
Publish publishes messages to the topic's subscribers. It returns message IDs upon success.
Deprecated: Use Topic.Publish instead.
Example ¶
package main import ( "io/ioutil" "log" "golang.org/x/net/context" "golang.org/x/oauth2" "golang.org/x/oauth2/google" "google.golang.org/cloud" "google.golang.org/cloud/pubsub" ) func Example_auth() context.Context { jsonKey, err := ioutil.ReadFile("/path/to/json/keyfile.json") if err != nil { log.Fatal(err) } conf, err := google.JWTConfigFromJSON( jsonKey, pubsub.ScopeCloudPlatform, pubsub.ScopePubSub, ) if err != nil { log.Fatal(err) } ctx := cloud.NewContext("project-id", conf.Client(oauth2.NoContext)) return ctx } func main() { ctx := Example_auth() msgIDs, err := pubsub.Publish(ctx, "topic1", &pubsub.Message{ Data: []byte("hello world"), }) if err != nil { log.Fatal(err) } log.Printf("Published a message with a message id: %s\n", msgIDs[0]) }
Output:
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a Google Pub/Sub client, which may be used to perform Pub/Sub operations with a project. It must be constructed via NewClient.
func (*Client) NewSubscription ¶
func (c *Client) NewSubscription(ctx context.Context, name string, topic *Topic, ackDeadline time.Duration, pushConfig *PushConfig) (*Subscription, error)
NewSubscription creates a new subscription to a topic.
name is the name of the subscription to create. It must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and must not start with "goog".
topic is the topic from which the subscription should receive messages. It need not belong to the same project as the subscription.
ackDeadline is the maximum time after a subscriber receives a message before the subscriber should acknowledge the message. It must be between 10 and 600 seconds (inclusive), and is rounded down to the nearest second. If the provided ackDeadline is 0, then the default value of 10 seconds is used. Note: messages which are obtained via an Iterator need not be acknowledged within this deadline, as the deadline will be automatically extended.
pushConfig may be set to configure this subscription for push delivery.
If the subscription already exists an error will be returned.
func (*Client) NewTopic ¶
NewTopic creates a new topic. The specified topic name must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and must not start with "goog". If the topic already exists an error will be returned.
func (*Client) Subscription ¶
func (c *Client) Subscription(name string) *Subscription
Subscription creates a reference to a subscription.
func (*Client) Subscriptions ¶
func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator
Subscriptions returns an iterator which returns all of the subscriptions for the client's project.
type Iterator ¶
type Iterator struct {
// contains filtered or unexported fields
}
func (*Iterator) Next ¶
Next returns the next Message to be processed. The caller must call Message.Done when finished with it. Once Stop has been called, calls to Next will return Done.
func (*Iterator) Stop ¶
func (it *Iterator) Stop()
Client code must call Stop on an Iterator when finished with it. Stop will block until Done has been called on all Messages that have been returned by Next, or until the context with which the Iterator was created is cancelled or exceeds its deadline. Stop need only be called once, but may be called multiple times from multiple goroutines.
type Message ¶
type Message struct { // ID identifies this message. // This ID is assigned by the server and is populated for Messages obtained from a subscription. // It is otherwise ignored. ID string // Data is the actual data in the message. Data []byte // Attributes represents the key-value pairs the current message // is labelled with. Attributes map[string]string // AckID is the identifier to acknowledge this message. AckID string // contains filtered or unexported fields }
Message represents a Pub/Sub message.
func Pull
deprecated
Pull pulls up to n messages from the subscription. n must not be larger than 100.
Deprecated: Use Subscription.Pull instead
Example ¶
package main import ( "io/ioutil" "log" "golang.org/x/net/context" "golang.org/x/oauth2" "golang.org/x/oauth2/google" "google.golang.org/cloud" "google.golang.org/cloud/pubsub" ) func Example_auth() context.Context { jsonKey, err := ioutil.ReadFile("/path/to/json/keyfile.json") if err != nil { log.Fatal(err) } conf, err := google.JWTConfigFromJSON( jsonKey, pubsub.ScopeCloudPlatform, pubsub.ScopePubSub, ) if err != nil { log.Fatal(err) } ctx := cloud.NewContext("project-id", conf.Client(oauth2.NoContext)) return ctx } func main() { ctx := Example_auth() // E.g. c.CreateSub("sub1", "topic1", time.Duration(0), "") msgs, err := pubsub.Pull(ctx, "sub1", 1) if err != nil { log.Fatal(err) } log.Printf("New message arrived: %v\n", msgs[0]) if err := pubsub.Ack(ctx, "sub1", msgs[0].AckID); err != nil { log.Fatal(err) } log.Println("Acknowledged message") }
Output:
func PullWait
deprecated
PullWait pulls up to n messages from the subscription. If there are no messages in the queue, it will wait until at least one message is available or a timeout occurs. n must not be larger than 100.
Deprecated: Use Subscription.Pull instead
func (*Message) Done ¶
Done completes the processing of a Message that was returned from an Iterator. ack indicates whether the message should be acknowledged. Client code must call Done when finished for each Message returned by an iterator. Done may only be called on Messages returned by an iterator. If message acknowledgement fails, the Message will be redelivered. Calls to Done have no effect after the first call.
type PullOption ¶
type PullOption interface {
// contains filtered or unexported methods
}
A PullOption is an optional argument to Subscription.Pull.
func MaxExtension ¶
func MaxExtension(duration time.Duration) PullOption
MaxExtension returns a PullOption that limits how long acks deadlines are extended for.
An Iterator will automatically extend the ack deadline of all fetched Messages for the duration specified. Automatic deadline extension may be disabled by specifying a duration of 0.
func MaxPrefetch ¶
func MaxPrefetch(num int) PullOption
MaxPrefetch returns a PullOption that limits Message prefetching.
For performance reasons, the pubsub library may prefetch a pool of Messages to be returned serially from Iterator.Next. MaxPrefetch is used to limit the the size of this pool.
If num is less than 1, it will be treated as if it were 1.
type PushConfig ¶
type PushConfig struct { // A URL locating the endpoint to which messages should be pushed. Endpoint string // Endpoint configuration attributes. See https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions#PushConfig.FIELDS.attributes for more details. Attributes map[string]string }
PushConfig contains configuration for subscriptions that operate in push mode.
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription is a reference to a PubSub subscription.
func (*Subscription) Config ¶
func (s *Subscription) Config(ctx context.Context) (*SubscriptionConfig, error)
Config fetches the current configuration for the subscription.
func (*Subscription) Delete ¶
func (s *Subscription) Delete(ctx context.Context) error
Delete deletes the subscription.
func (*Subscription) Exists ¶
func (s *Subscription) Exists(ctx context.Context) (bool, error)
Exists reports whether the subscription exists on the server.
func (*Subscription) ModifyPushConfig ¶
func (s *Subscription) ModifyPushConfig(ctx context.Context, conf *PushConfig) error
ModifyPushConfig updates the endpoint URL and other attributes of a push subscription.
func (*Subscription) Name ¶
func (s *Subscription) Name() string
Name returns the globally unique name for the subscription.
func (*Subscription) Pull ¶
func (s *Subscription) Pull(ctx context.Context, opts ...PullOption) (*Iterator, error)
Pull returns an Iterator that can be used to fetch Messages. The Iterator will automatically extend the ack deadline of all fetched Messages, for the period specified by DefaultMaxExtension. This may be overridden by supplying a MaxExtension pull option.
If ctx is cancelled or exceeds its deadline, outstanding acks or deadline extensions will fail.
The caller must call Stop on the Iterator once finished with it.
type SubscriptionConfig ¶
type SubscriptionConfig struct { Topic *Topic PushConfig PushConfig // The default maximum time after a subscriber receives a message // before the subscriber should acknowledge the message. Note: // messages which are obtained via an Iterator need not be acknowledged // within this deadline, as the deadline will be automatically // extended. AckDeadline time.Duration }
Subscription config contains the configuration of a subscription.
type SubscriptionIterator ¶
type SubscriptionIterator struct {
// contains filtered or unexported fields
}
SubscriptionIterator is an iterator that returns a series of subscriptions.
func (*SubscriptionIterator) Next ¶
func (subs *SubscriptionIterator) Next() (*Subscription, error)
Next returns the next subscription. If there are no more subscriptions, Done will be returned.
type Topic ¶
type Topic struct {
// contains filtered or unexported fields
}
Topic is a reference to a PubSub topic.
func (*Topic) Publish ¶
Publish publishes the supplied Messages to the topic. If successful, the server-assigned message IDs are returned in the same order as the supplied Messages. At most MaxPublishBatchSize messages may be supplied.
func (*Topic) Subscriptions ¶
func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator
Subscriptions returns an iterator which returns the subscriptions for this topic.
type TopicIterator ¶
type TopicIterator struct {
// contains filtered or unexported fields
}
TopicIterator is an iterator that returns a series of topics.
func (*TopicIterator) Next ¶
func (tps *TopicIterator) Next() (*Topic, error)
Next returns the next topic. If there are no more topics, Done will be returned.