Documentation ¶
Overview ¶
Package pubsub provides an easy way to publish and receive Google Cloud Pub/Sub messages, hiding the the details of the underlying server RPCs. Google Cloud Pub/Sub is a many-to-many, asynchronous messaging system that decouples senders and receivers.
Note: This package is experimental and may make backwards-incompatible changes.
More information about Google Cloud Pub/Sub is available at https://cloud.google.com/pubsub/docs
Publishing ¶
Google Cloud Pub/Sub messages are published to topics. Topics may be created using the pubsub package like so:
topic, err := pubsubClient.CreateTopic(context.Background(), "topic-name")
Messages may then be published to a topic:
res := topic.Publish(ctx, &pubsub.Message{Data: []byte("payload")})
Publish queues the message for publishing and returns immediately. When enough messages have accumulated, or enough time has elapsed, the batch of messages is sent to the Pub/Sub service.
Publish returns a PublishResult, which behaves like a future: its Get method blocks until the message has been sent to the service.
The first time you call Publish on a topic, goroutines are started in the background. To clean up these goroutines, call Stop:
topic.Stop()
Receiving ¶
To receive messages published to a topic, clients create subscriptions to the topic. There may be more than one subscription per topic; each message that is published to the topic will be delivered to all of its subscriptions.
Subsciptions may be created like so:
sub, err := pubsubClient.CreateSubscription(context.Background(), "sub-name", topic, 0, nil)
Messages are then consumed from a subscription via callback.
err := sub.Receive(context.Background(), func(ctx context.Context, m *Message) { log.Printf("Got message: %s", m.Data) m.Ack() }) if err != nil { // Handle error. }
The callback is invoked concurrently by multiple goroutines, maximizing throughput. To terminate a call to Receive, cancel its context.
Once client code has processed the message, it must call Message.Ack, otherwise the message will eventually be redelivered. As an optimization, if the client cannot or doesn't want to process the message, it can call Message.Nack to speed redelivery. For more information and configuration options, see "Deadlines" below.
Note: It is possible for Messages to be redelivered, even if Message.Ack has been called. Client code must be robust to multiple deliveries of messages.
Deadlines ¶
The default pubsub deadlines are suitable for most use cases, but may be overridden. This section describes the tradeoffs that should be considered when overriding the defaults.
Behind the scenes, each message returned by the Pub/Sub server has an associated lease, known as an "ACK deadline". Unless a message is acknowledged within the ACK deadline, or the client requests that the ACK deadline be extended, the message will become elegible for redelivery. As a convenience, the pubsub package will automatically extend deadlines until either:
- Message.Ack or Message.Nack is called, or
- the "MaxExtension" period elapses from the time the message is fetched from the server.
The initial ACK deadline given to each messages defaults to 10 seconds, but may be overridden during subscription creation. Selecting an ACK deadline is a tradeoff between message redelivery latency and RPC volume. If the pubsub package fails to acknowledge or extend a message (e.g. due to unexpected termination of the process), a shorter ACK deadline will generally result in faster message redelivery by the Pub/Sub system. However, a short ACK deadline may also increase the number of deadline extension RPCs that the pubsub package sends to the server.
The default max extension period is DefaultReceiveSettings.MaxExtension, and can be overridden by setting Subscription.ReceiveSettings.MaxExtension. Selecting a max extension period is a tradeoff between the speed at which client code must process messages, and the redelivery delay if messages fail to be acknowledged (e.g. because client code neglects to do so). Using a large MaxExtension increases the available time for client code to process messages. However, if the client code neglects to call Message.Ack/Nack, a large MaxExtension will increase the delay before the message is redelivered.
Authentication ¶
See examples of authorization and authentication at https://godoc.org/cloud.google.com/go#pkg-examples.
Index ¶
- Constants
- Variables
- type Client
- func (c *Client) Close() error
- func (c *Client) CreateSubscription(ctx context.Context, id string, topic *Topic, ackDeadline time.Duration, ...) (*Subscription, error)
- func (c *Client) CreateTopic(ctx context.Context, id string) (*Topic, error)
- func (c *Client) Subscription(id string) *Subscription
- func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator
- func (c *Client) Topic(id string) *Topic
- func (c *Client) Topics(ctx context.Context) *TopicIterator
- type Message
- type PublishResult
- type PublishSettings
- type PushConfig
- type ReceiveSettings
- type Subscription
- func (s *Subscription) Config(ctx context.Context) (*SubscriptionConfig, error)
- func (s *Subscription) Delete(ctx context.Context) error
- func (s *Subscription) Exists(ctx context.Context) (bool, error)
- func (s *Subscription) IAM() *iam.Handle
- func (s *Subscription) ID() string
- func (s *Subscription) ModifyPushConfig(ctx context.Context, conf *PushConfig) error
- func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error
- func (s *Subscription) String() string
- type SubscriptionConfig
- type SubscriptionIterator
- type Topic
- func (t *Topic) Delete(ctx context.Context) error
- func (t *Topic) Exists(ctx context.Context) (bool, error)
- func (t *Topic) IAM() *iam.Handle
- func (t *Topic) ID() string
- func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult
- func (t *Topic) Stop()
- func (t *Topic) String() string
- func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator
- func (t *Topic) TryPublish(ctx context.Context, msg *Message) *PublishResult
- type TopicIterator
Examples ¶
- Client.CreateSubscription
- Client.CreateTopic
- Client.Subscriptions
- Client.Topics
- NewClient
- Subscription.Config
- Subscription.Delete
- Subscription.Exists
- Subscription.ModifyPushConfig
- Subscription.Receive
- Subscription.Receive (MaxExtension)
- Subscription.Receive (MaxOutstanding)
- SubscriptionIterator.Next
- Topic.Delete
- Topic.Exists
- Topic.Publish
- Topic.Subscriptions
- Topic.TryPublish
- TopicIterator.Next
Constants ¶
const ( // ScopePubSub grants permissions to view and manage Pub/Sub // topics and subscriptions. ScopePubSub = "https://www.googleapis.com/auth/pubsub" // ScopeCloudPlatform grants permissions to view and manage your data // across Google Cloud Platform services. ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform" )
const ( // The maximum number of messages that can be in a single publish request, as // determined by the PubSub service. MaxPublishRequestCount = 1000 // The maximum size of a single publish request in bytes, as determined by the PubSub service. MaxPublishRequestBytes = 1e7 )
Variables ¶
var DefaultPublishSettings = PublishSettings{ DelayThreshold: 1 * time.Millisecond, CountThreshold: 100, ByteThreshold: 1e6, MaxOutstandingMessages: -1, MaxOutstandingBytes: 1e9, }
DefaultPublishSettings holds the default values for topics' BatchSettings.
var DefaultReceiveSettings = ReceiveSettings{ MaxExtension: 10 * time.Minute, MaxOutstandingMessages: 1000, MaxOutstandingBytes: 1e9, }
DefaultReceiveSettings holds the default values for ReceiveSettings.
var ErrOversizedMessage = bundler.ErrOversizedItem
ErrOversizedMessage indicates that a message's size exceeds MaxPublishRequestBytes.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a Google Pub/Sub client scoped to a single project.
Clients should be reused rather than being created as needed. A Client may be shared by multiple goroutines.
func NewClient ¶
NewClient creates a new PubSub client.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() _, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // See the other examples to learn how to use the Client. }
Output:
func (*Client) Close ¶
Close closes any resources held by the client.
Close need not be called at program exit.
func (*Client) CreateSubscription ¶
func (c *Client) CreateSubscription(ctx context.Context, id string, topic *Topic, ackDeadline time.Duration, pushConfig *PushConfig) (*Subscription, error)
CreateSubscription creates a new subscription on a topic.
name is the name of the subscription to create. It must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and must not start with "goog".
topic is the topic from which the subscription should receive messages. It need not belong to the same project as the subscription.
ackDeadline is the maximum time after a subscriber receives a message before the subscriber should acknowledge the message. It must be between 10 and 600 seconds (inclusive), and is rounded down to the nearest second. If the provided ackDeadline is 0, then the default value of 10 seconds is used. Note: messages which are obtained via Subscription.Receive need not be acknowledged within this deadline, as the deadline will be automatically extended.
pushConfig may be set to configure this subscription for push delivery.
If the subscription already exists an error will be returned.
Example ¶
package main import ( "time" "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // Create a new topic with the given name. topic, err := client.CreateTopic(ctx, "topicName") if err != nil { // TODO: Handle error. } // Create a new subscription to the previously created topic // with the given name. sub, err := client.CreateSubscription(ctx, "subName", topic, 10*time.Second, nil) if err != nil { // TODO: Handle error. } _ = sub // TODO: use the subscription. }
Output:
func (*Client) CreateTopic ¶
CreateTopic creates a new topic. The specified topic ID must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and must not start with "goog". If the topic already exists an error will be returned.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // Create a new topic with the given name. topic, err := client.CreateTopic(ctx, "topicName") if err != nil { // TODO: Handle error. } _ = topic // TODO: use the topic. }
Output:
func (*Client) Subscription ¶
func (c *Client) Subscription(id string) *Subscription
Subscription creates a reference to a subscription.
func (*Client) Subscriptions ¶
func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator
Subscriptions returns an iterator which returns all of the subscriptions for the client's project.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // List all subscriptions of the project. it := client.Subscriptions(ctx) _ = it // TODO: iterate using Next. }
Output:
func (*Client) Topic ¶
Topic creates a reference to a topic.
If a Topic's Publish method is called, it has background goroutines associated with it. Clean them up by calling Topic.Stop.
Avoid creating many Topic instances if you use them to publish.
func (*Client) Topics ¶
func (c *Client) Topics(ctx context.Context) *TopicIterator
Topics returns an iterator which returns all of the topics for the client's project.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } it := client.Topics(ctx) _ = it // TODO: iterate using Next. }
Output:
type Message ¶
type Message struct { // ID identifies this message. // This ID is assigned by the server and is populated for Messages obtained from a subscription. // This field is read-only. ID string // Data is the actual data in the message. Data []byte // Attributes represents the key-value pairs the current message // is labelled with. Attributes map[string]string // The time at which the message was published. // This is populated by the server for Messages obtained from a subscription. // This field is read-only. PublishTime time.Time // contains filtered or unexported fields }
Message represents a Pub/Sub message.
func (*Message) Ack ¶
func (m *Message) Ack()
Ack indicates successful processing of a Message passed to the Subscriber.Receive callback. It should not be called on any other Message value. If message acknowledgement fails, the Message will be redelivered. Client code must call Ack or Nack when finished for each received Message. Calls to Ack or Nack have no effect after the first call.
func (*Message) Nack ¶
func (m *Message) Nack()
Nack indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback. It should not be called on any other Message value. Nack will result in the Message being redelivered more quickly than if it were allowed to expire. Client code must call Ack or Nack when finished for each received Message. Calls to Ack or Nack have no effect after the first call.
type PublishResult ¶
type PublishResult struct {
// contains filtered or unexported fields
}
A PublishResult holds the result from a call to Publish.
func (*PublishResult) Get ¶
func (r *PublishResult) Get(ctx context.Context) (serverID string, err error)
Get returns the server-generated message ID and/or error result of a Publish call. Get blocks until the Publish call completes or the context is done.
func (*PublishResult) Ready ¶
func (r *PublishResult) Ready() <-chan struct{}
Ready returns a channel that is closed when the result is ready. When the Ready channel is closed, Get is guaranteed not to block.
type PublishSettings ¶
type PublishSettings struct { // Publish a non-empty batch after this delay has passed. DelayThreshold time.Duration // Publish a batch when it has this many messages. The maximum is // MaxPublishRequestCount. CountThreshold int // Publish a batch when its size in bytes reaches this value. ByteThreshold int // MaxOutstandingMessages is the maximum number of messages for which the // publish can be outstanding. If MaxOutstandingMessages is 0, it will be // treated as if it were DefaultPublishSettings.MaxOutstandingMessages. If the // value is negative, then there will be no limit on the number of messages // that can be outstanding. MaxOutstandingMessages int // MaxOutstandingBytes is the maximum size of messages for which the publish // can be outstanding. If MaxOutstandingBytes is 0, it will be treated as if it // werer DefaultPublishSettings.MaxOutstandingBytes. If the value is negative, // then there will be no limit on the number of bytes that can be outstanding. MaxOutstandingBytes int // The number of goroutines that invoke the Publish RPC concurrently. // Defaults to a multiple of GOMAXPROCS. NumGoroutines int }
PublishSettings control the bundling of published messages.
type PushConfig ¶
type PushConfig struct { // A URL locating the endpoint to which messages should be pushed. Endpoint string // Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details. Attributes map[string]string }
PushConfig contains configuration for subscriptions that operate in push mode.
type ReceiveSettings ¶
type ReceiveSettings struct { // MaxExtension is the maximum period for which the Subscription should // automatically extend the ack deadline for each message. // // The Subscription will automatically extend the ack deadline of all // fetched Messages for the duration specified. Automatic deadline // extension may be disabled by specifying a duration less than 1. MaxExtension time.Duration // MaxOutstandingMessages is the maximum number of unprocessed messages // (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it // will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages. // If the value is negative, then there will be no limit on the number of // unprocessed messages. MaxOutstandingMessages int // MaxOutstandingBytes is the maximum size of unprocessed messages // (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will // be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If // the value is negative, then there will be no limit on the number of bytes // for unprocessed messages. MaxOutstandingBytes int }
ReceiveSettings configure the Receive method. A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
type Subscription ¶
type Subscription struct { // Settings for pulling messages. Configure these before calling Receive. ReceiveSettings ReceiveSettings // contains filtered or unexported fields }
Subscription is a reference to a PubSub subscription.
func (*Subscription) Config ¶
func (s *Subscription) Config(ctx context.Context) (*SubscriptionConfig, error)
Config fetches the current configuration for the subscription.
Example ¶
package main import ( "fmt" "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") config, err := sub.Config(ctx) if err != nil { // TODO: Handle error. } fmt.Println(config) }
Output:
func (*Subscription) Delete ¶
func (s *Subscription) Delete(ctx context.Context) error
Delete deletes the subscription.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") if err := sub.Delete(ctx); err != nil { // TODO: Handle error. } }
Output:
func (*Subscription) Exists ¶
func (s *Subscription) Exists(ctx context.Context) (bool, error)
Exists reports whether the subscription exists on the server.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") ok, err := sub.Exists(ctx) if err != nil { // TODO: Handle error. } if !ok { // Subscription doesn't exist. } }
Output:
func (*Subscription) IAM ¶
func (s *Subscription) IAM() *iam.Handle
func (*Subscription) ID ¶
func (s *Subscription) ID() string
ID returns the unique identifier of the subscription within its project.
func (*Subscription) ModifyPushConfig ¶
func (s *Subscription) ModifyPushConfig(ctx context.Context, conf *PushConfig) error
ModifyPushConfig updates the endpoint URL and other attributes of a push subscription.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") if err := sub.ModifyPushConfig(ctx, &pubsub.PushConfig{Endpoint: "https://example.com/push"}); err != nil { // TODO: Handle error. } }
Output:
func (*Subscription) Receive ¶
Receive calls f with the outstanding messages from the subscription. It blocks until ctx is done, or the service returns a non-retryable error.
The standard way to terminate a Receive is to cancel its context:
cctx, cancel := context.WithCancel(ctx) err := sub.Receive(cctx, callback) // Call cancel from callback, or another goroutine.
If the service returns a non-retryable error, Receive returns that error after all of the outstanding calls to f have returned. If ctx is done, Receive returns either nil after all of the outstanding calls to f have returned and all messages have been acknowledged or have expired.
Receive calls f concurrently from multiple goroutines. It is encouraged to process messages synchronously in f, even if that processing is relatively time-consuming; Receive will spawn new goroutines for incoming messages, limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings.
The context passed to f will be canceled when ctx is Done or there is a fatal service error.
Receive will automatically extend the ack deadline of all fetched Messages for the period specified by s.ReceiveSettings.MaxExtension.
Each Subscription may have only one invocation of Receive active at a time.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { // TODO: Handle message. // NOTE: May be called concurrently; synchronize access to shared memory. m.Ack() }) if err != context.Canceled { // TODO: Handle error. } }
Output:
Example (MaxExtension) ¶
This example shows how to configure keepalive so that unacknoweldged messages expire quickly, allowing other subscribers to take them.
package main import ( "time" "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") // This program is expected to process and acknowledge messages in 30 seconds. If // not, the Pub/Sub API will assume the message is not acknowledged. sub.ReceiveSettings.MaxExtension = 30 * time.Second err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { // TODO: Handle message. m.Ack() }) if err != context.Canceled { // TODO: Handle error. } }
Output:
Example (MaxOutstanding) ¶
This example shows how to throttle Subscription.Receive, which aims for high throughput by default. By limiting the number of messages and/or bytes being processed at once, you can bound your program's resource consumption.
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") sub.ReceiveSettings.MaxOutstandingMessages = 5 sub.ReceiveSettings.MaxOutstandingBytes = 10e6 err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { // TODO: Handle message. m.Ack() }) if err != context.Canceled { // TODO: Handle error. } }
Output:
func (*Subscription) String ¶
func (s *Subscription) String() string
String returns the globally unique printable name of the subscription.
type SubscriptionConfig ¶
type SubscriptionConfig struct { Topic *Topic PushConfig PushConfig // The default maximum time after a subscriber receives a message before // the subscriber should acknowledge the message. Note: messages which are // obtained via Subscription.Receive need not be acknowledged within this // deadline, as the deadline will be automatically extended. AckDeadline time.Duration }
Subscription config contains the configuration of a subscription.
type SubscriptionIterator ¶
type SubscriptionIterator struct {
// contains filtered or unexported fields
}
SubscriptionIterator is an iterator that returns a series of subscriptions.
func (*SubscriptionIterator) Next ¶
func (subs *SubscriptionIterator) Next() (*Subscription, error)
Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned.
Example ¶
package main import ( "fmt" "cloud.google.com/go/pubsub" "golang.org/x/net/context" "google.golang.org/api/iterator" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // List all subscriptions of the project. it := client.Subscriptions(ctx) for { sub, err := it.Next() if err == iterator.Done { break } if err != nil { // TODO: Handle error. } fmt.Println(sub) } }
Output:
type Topic ¶
type Topic struct { // Settings for publishing messages. All changes must be made before the // first call to Publish. The default is DefaultPublishSettings. PublishSettings PublishSettings // contains filtered or unexported fields }
Topic is a reference to a PubSub topic.
func (*Topic) Delete ¶
Delete deletes the topic.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } topic := client.Topic("topicName") if err := topic.Delete(ctx); err != nil { // TODO: Handle error. } }
Output:
func (*Topic) Exists ¶
Exists reports whether the topic exists on the server.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } topic := client.Topic("topicName") ok, err := topic.Exists(ctx) if err != nil { // TODO: Handle error. } if !ok { // Topic doesn't exist. } }
Output:
func (*Topic) Publish ¶
func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult
Publish publishes msg to the topic asynchronously. Messages are batched and sent according to the topic's BatchSettings.
Publish returns a non-nil PublishResult which will be ready when the message has been sent (or has failed to be sent) to the server.
Publish creates goroutines for batching and sending messages. These goroutines need to be stopped by calling t.Stop(). Once stopped, future calls to Publish or TryPublish will immediately return a PublishResult with an error.
Publish blocks until the number of messages and memory consumed by batching fall below MaxOutstandingMessages and MaxOutstandingBytes, respectively, or until ctx is Done. The ctx argument is used only for this purpose; it is unrelated to the context used by the background goroutines which call the Publish RPC.
Example ¶
package main import ( "fmt" "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } topic := client.Topic("topicName") defer topic.Stop() var results []*pubsub.PublishResult r := topic.Publish(ctx, &pubsub.Message{ Data: []byte("hello world"), }) results = append(results, r) // Do other work ... for _, r := range results { id, err := r.Get(ctx) if err != nil { // TODO: Handle error. } fmt.Printf("Published a message with a message ID: %s\n", id) } }
Output:
func (*Topic) Stop ¶
func (t *Topic) Stop()
Send all remaining published messages and stop goroutines created for handling publishing. Returns once all outstanding messages have been sent or have failed to be sent.
func (*Topic) Subscriptions ¶
func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator
Subscriptions returns an iterator which returns the subscriptions for this topic.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" "google.golang.org/api/iterator" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } topic := client.Topic("topic-name") // List all subscriptions of the topic (maybe of multiple projects). for subs := topic.Subscriptions(ctx); ; { sub, err := subs.Next() if err == iterator.Done { break } if err != nil { // TODO: Handle error. } _ = sub // TODO: use the subscription. } }
Output:
func (*Topic) TryPublish ¶
func (t *Topic) TryPublish(ctx context.Context, msg *Message) *PublishResult
TryPublish publishes msg to the topic asynchronously. Messages are batched and sent according to the topic's BatchSettings.
If the number of messages or memory consumed by batching are above MaxOutstandingMessages or MaxOutstandingBytes, respectively, then TryPublish immediately returns nil. Otherwise, TryPublish returns a non-nil PublishResult which will be ready when the message has been sent (or has failed to be sent) to the server.
TryPublish creates goroutines for batching and sending messages. These goroutines need to be stopped by calling t.Stop(). Once stopped, future calls to Publish or TryPublish will immediately return a PublishResult with an error.
Example ¶
package main import ( "fmt" "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } topic := client.Topic("topicName") defer topic.Stop() var results []*pubsub.PublishResult r := topic.TryPublish(ctx, &pubsub.Message{ Data: []byte("hello world"), }) results = append(results, r) // Do other work ... for _, r := range results { id, err := r.Get(ctx) if err != nil { // TODO: Handle error. } fmt.Printf("Published a message with a message ID: %s\n", id) } }
Output:
type TopicIterator ¶
type TopicIterator struct {
// contains filtered or unexported fields
}
TopicIterator is an iterator that returns a series of topics.
func (*TopicIterator) Next ¶
func (tps *TopicIterator) Next() (*Topic, error)
Next returns the next topic. If there are no more topics, iterator.Done will be returned.
Example ¶
package main import ( "fmt" "cloud.google.com/go/pubsub" "golang.org/x/net/context" "google.golang.org/api/iterator" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // List all topics. it := client.Topics(ctx) for { t, err := it.Next() if err == iterator.Done { break } if err != nil { // TODO: Handle error. } fmt.Println(t) } }
Output:
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package pubsub is an experimental, auto-generated package for the pubsub API.
|
Package pubsub is an experimental, auto-generated package for the pubsub API. |
Package loadtest implements load testing for pubsub, following the interface defined in https://github.com/GoogleCloudPlatform/pubsub/tree/master/load-test-framework/ .
|
Package loadtest implements load testing for pubsub, following the interface defined in https://github.com/GoogleCloudPlatform/pubsub/tree/master/load-test-framework/ . |
pb
Package google_pubsub_loadtest is a generated protocol buffer package.
|
Package google_pubsub_loadtest is a generated protocol buffer package. |