Documentation ¶
Overview ¶
Package pubsublite provides an easy way to publish and receive messages using the Pub/Sub Lite service.
Google Pub/Sub services are designed to provide reliable, many-to-many, asynchronous messaging between applications. Publisher applications can send messages to a topic and other applications can subscribe to that topic to receive the messages. By decoupling senders and receivers, Google Pub/Sub allows developers to communicate between independently written applications.
Compared to Cloud Pub/Sub, Pub/Sub Lite provides partitioned data storage with predefined throughput and storage capacity. Guidance on how to choose between Cloud Pub/Sub and Pub/Sub Lite is available at https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite.
More information about Pub/Sub Lite is available at https://cloud.google.com/pubsub/lite.
See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts, connection pooling and similar aspects of this package.
Introduction ¶
Examples can be found at https://pkg.go.dev/cloud.google.com/go/pubsublite#pkg-examples and https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-examples.
Complete sample programs can be found at https://github.com/GoogleCloudPlatform/golang-samples/tree/master/pubsublite.
The cloud.google.com/go/pubsublite/pscompat subpackage contains clients for publishing and receiving messages, which have similar interfaces to their pubsub.Topic and pubsub.Subscription counterparts in cloud.google.com/go/pubsub. The following examples demonstrate how to declare common interfaces: https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewPublisherClient-Interface and https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewSubscriberClient-Interface.
The following imports are required for code snippets below:
import ( "cloud.google.com/go/pubsub" "cloud.google.com/go/pubsublite" "cloud.google.com/go/pubsublite/pscompat" )
Creating Topics ¶
Messages are published to topics. Pub/Sub Lite topics may be created like so:
ctx := context.Background() const topicPath = "projects/my-project/locations/us-central1-c/topics/my-topic" topicConfig := pubsublite.TopicConfig{ Name: topicPath, PartitionCount: 1, PublishCapacityMiBPerSec: 4, SubscribeCapacityMiBPerSec: 4, PerPartitionBytes: 30 * 1024 * 1024 * 1024, // 30 GiB RetentionDuration: pubsublite.InfiniteRetention, } adminClient, err := pubsublite.NewAdminClient(ctx, "us-central1") if err != nil { // TODO: Handle error. } if _, err = adminClient.CreateTopic(ctx, topicConfig); err != nil { // TODO: Handle error. }
Close must be called to release resources when an AdminClient is no longer required.
See https://cloud.google.com/pubsub/lite/docs/topics for more information about how Pub/Sub Lite topics are configured.
See https://cloud.google.com/pubsub/lite/docs/locations for the list of locations where Pub/Sub Lite is available.
Publishing ¶
Pub/Sub Lite uses gRPC streams extensively for high throughput. For more differences, see https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat.
To publish messages to a topic, first create a PublisherClient:
publisher, err := pscompat.NewPublisherClient(ctx, topicPath) if err != nil { // TODO: Handle error. }
Then call Publish:
result := publisher.Publish(ctx, &pubsub.Message{Data: []byte("payload")})
Publish queues the message for publishing and returns immediately. When enough messages have accumulated, or enough time has elapsed, the batch of messages is sent to the Pub/Sub Lite service. Thresholds for batching can be configured in PublishSettings.
Publish returns a PublishResult, which behaves like a future; its Get method blocks until the message has been sent (or has failed to be sent) to the service:
id, err := result.Get(ctx) if err != nil { // TODO: Handle error. }
Once you've finishing publishing all messages, call Stop to flush all messages to the service and close gRPC streams. The PublisherClient can no longer be used after it has been stopped or has terminated due to a permanent error.
publisher.Stop()
PublisherClients are expected to be long-lived and used for the duration of the application, rather than for publishing small batches of messages. Stop must be called to release resources when a PublisherClient is no longer required.
See https://cloud.google.com/pubsub/lite/docs/publishing for more information about publishing.
Creating Subscriptions ¶
To receive messages published to a topic, create a subscription to the topic. There may be more than one subscription per topic; each message that is published to the topic will be delivered to all of its subscriptions.
Pub/Sub Lite subscriptions may be created like so:
const subscriptionPath = "projects/my-project/locations/us-central1-c/subscriptions/my-subscription" subscriptionConfig := pubsublite.SubscriptionConfig{ Name: subscriptionPath, Topic: topicPath, DeliveryRequirement: pubsublite.DeliverImmediately, } if _, err = adminClient.CreateSubscription(ctx, subscriptionConfig); err != nil { // TODO: Handle error. }
See https://cloud.google.com/pubsub/lite/docs/subscriptions for more information about how subscriptions are configured.
Receiving ¶
To receive messages for a subscription, first create a SubscriberClient:
subscriber, err := pscompat.NewSubscriberClient(ctx, subscriptionPath)
Messages are then consumed from a subscription via callback. The callback may be invoked concurrently by multiple goroutines (one per partition that the subscriber client is connected to).
cctx, cancel := context.WithCancel(ctx) err = subscriber.Receive(cctx, func(ctx context.Context, m *pubsub.Message) { log.Printf("Got message: %s", m.Data) m.Ack() }) if err != nil { // TODO: Handle error. }
Receive blocks until either the context is canceled or a permanent error occurs. To terminate a call to Receive, cancel its context:
cancel()
Clients must call pubsub.Message.Ack() or pubsub.Message.Nack() for every message received. Pub/Sub Lite does not have ACK deadlines. Pub/Sub Lite also does not actually have the concept of NACK. The default behavior terminates the SubscriberClient. In Pub/Sub Lite, only a single subscriber for a given subscription is connected to any partition at a time, and there is no other client that may be able to handle messages.
See https://cloud.google.com/pubsub/lite/docs/subscribing for more information about receiving messages.
gRPC Connection Pools ¶
Pub/Sub Lite utilizes gRPC streams extensively. gRPC allows a maximum of 100 streams per connection. Internally, the library uses a default connection pool size of 8, which supports up to 800 topic partitions. To alter the connection pool size, pass a ClientOption to pscompat.NewPublisherClient and pscompat.NewSubscriberClient:
pub, err := pscompat.NewPublisherClient(ctx, topicPath, option.WithGRPCConnectionPool(10))
Index ¶
- Constants
- type AdminClient
- func (ac *AdminClient) Close() error
- func (ac *AdminClient) CreateReservation(ctx context.Context, config ReservationConfig) (*ReservationConfig, error)
- func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig, ...) (*SubscriptionConfig, error)
- func (ac *AdminClient) CreateTopic(ctx context.Context, config TopicConfig) (*TopicConfig, error)
- func (ac *AdminClient) DeleteReservation(ctx context.Context, reservation string) error
- func (ac *AdminClient) DeleteSubscription(ctx context.Context, subscription string) error
- func (ac *AdminClient) DeleteTopic(ctx context.Context, topic string) error
- func (ac *AdminClient) Reservation(ctx context.Context, reservation string) (*ReservationConfig, error)
- func (ac *AdminClient) ReservationTopics(ctx context.Context, reservation string) *TopicPathIterator
- func (ac *AdminClient) Reservations(ctx context.Context, parent string) *ReservationIterator
- func (ac *AdminClient) SeekSubscription(ctx context.Context, subscription string, target SeekTarget, ...) (*SeekSubscriptionOperation, error)
- func (ac *AdminClient) Subscription(ctx context.Context, subscription string) (*SubscriptionConfig, error)
- func (ac *AdminClient) Subscriptions(ctx context.Context, parent string) *SubscriptionIterator
- func (ac *AdminClient) Topic(ctx context.Context, topic string) (*TopicConfig, error)
- func (ac *AdminClient) TopicPartitionCount(ctx context.Context, topic string) (int, error)
- func (ac *AdminClient) TopicSubscriptions(ctx context.Context, topic string) *SubscriptionPathIterator
- func (ac *AdminClient) Topics(ctx context.Context, parent string) *TopicIterator
- func (ac *AdminClient) UpdateReservation(ctx context.Context, config ReservationConfigToUpdate) (*ReservationConfig, error)
- func (ac *AdminClient) UpdateSubscription(ctx context.Context, config SubscriptionConfigToUpdate) (*SubscriptionConfig, error)
- func (ac *AdminClient) UpdateTopic(ctx context.Context, config TopicConfigToUpdate) (*TopicConfig, error)
- type BacklogLocation
- type CreateSubscriptionOption
- type DeliveryRequirement
- type EventTime
- type ExportConfig
- type ExportConfigToUpdate
- type ExportDestinationConfig
- type ExportState
- type OperationMetadata
- type PubSubDestinationConfig
- type PublishTime
- type ReservationConfig
- type ReservationConfigToUpdate
- type ReservationIterator
- type SeekSubscriptionOperation
- type SeekSubscriptionOption
- type SeekSubscriptionResult
- type SeekTarget
- type SubscriptionConfig
- type SubscriptionConfigToUpdate
- type SubscriptionIterator
- type SubscriptionPathIterator
- type TopicConfig
- type TopicConfigToUpdate
- type TopicIterator
- type TopicPathIterator
Examples ¶
- AdminClient.CreateReservation
- AdminClient.CreateSubscription
- AdminClient.CreateSubscription (AtTargetLocation)
- AdminClient.CreateSubscription (ExportToPubSub)
- AdminClient.CreateTopic
- AdminClient.DeleteReservation
- AdminClient.DeleteSubscription
- AdminClient.DeleteTopic
- AdminClient.ReservationTopics
- AdminClient.Reservations
- AdminClient.SeekSubscription
- AdminClient.Subscriptions
- AdminClient.TopicSubscriptions
- AdminClient.Topics
- AdminClient.UpdateReservation
- AdminClient.UpdateSubscription
- AdminClient.UpdateTopic
Constants ¶
const InfiniteRetention = time.Duration(-1)
InfiniteRetention is a sentinel used in topic configs to denote an infinite retention duration (i.e. retain messages as long as there is available storage).
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AdminClient ¶
type AdminClient struct {
// contains filtered or unexported fields
}
AdminClient provides admin operations for Pub/Sub Lite resources within a Google Cloud region. The location (region or zone) component of resource paths must be within this region. See https://cloud.google.com/pubsub/lite/docs/locations for the list of regions and zones where Pub/Sub Lite is available.
An AdminClient may be shared by multiple goroutines.
Close must be called to release resources when an AdminClient is no longer required. If the client is available for the lifetime of the program, then Close need not be called at exit.
func NewAdminClient ¶
func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*AdminClient, error)
NewAdminClient creates a new Pub/Sub Lite client to perform admin operations for resources within a given region.
func (*AdminClient) Close ¶
func (ac *AdminClient) Close() error
Close releases any resources held by the client when it is no longer required. If the client is available for the lifetime of the program, then Close need not be called at exit.
func (*AdminClient) CreateReservation ¶ added in v1.2.0
func (ac *AdminClient) CreateReservation(ctx context.Context, config ReservationConfig) (*ReservationConfig, error)
CreateReservation creates a new reservation from the given config. If the reservation already exists an error will be returned.
Example ¶
This example demonstrates how to create a new reservation. See https://cloud.google.com/pubsub/lite/docs/locations for the list of regions where Pub/Sub Lite is available.
package main import ( "context" "cloud.google.com/go/pubsublite" ) func main() { ctx := context.Background() admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. } defer admin.Close() reservationConfig := pubsublite.ReservationConfig{ Name: "projects/my-project/locations/region/reservations/my-reservation", ThroughputCapacity: 10, } _, err = admin.CreateReservation(ctx, reservationConfig) if err != nil { // TODO: Handle error. } }
Output:
func (*AdminClient) CreateSubscription ¶
func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig, opts ...CreateSubscriptionOption) (*SubscriptionConfig, error)
CreateSubscription creates a new subscription from the given config. If the subscription already exists an error will be returned.
By default, a new subscription will only receive messages published after the subscription was created. Use AtTargetLocation to initialize the subscription to another location within the message backlog.
Example ¶
This example demonstrates how to create a new subscription for a topic. See https://cloud.google.com/pubsub/lite/docs/subscriptions for more information about how subscriptions are configured.
package main import ( "context" "cloud.google.com/go/pubsublite" ) func main() { ctx := context.Background() // NOTE: resources must be located within this region. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. } defer admin.Close() subscriptionConfig := pubsublite.SubscriptionConfig{ Name: "projects/my-project/locations/region-or-zone/subscriptions/my-subscription", Topic: "projects/my-project/locations/region-or-zone/topics/my-topic", // Do not wait for a published message to be successfully written to storage // before delivering it to subscribers. DeliveryRequirement: pubsublite.DeliverImmediately, } _, err = admin.CreateSubscription(ctx, subscriptionConfig) if err != nil { // TODO: Handle error. } }
Output:
Example (AtTargetLocation) ¶
This example demonstrates how to create a new subscription initialized to a specified target location within the message backlog. The target location can be a BacklogLocation, PublishTime or EventTime.
package main import ( "context" "cloud.google.com/go/pubsublite" ) func main() { ctx := context.Background() // NOTE: resources must be located within this region. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. } defer admin.Close() subscriptionConfig := pubsublite.SubscriptionConfig{ Name: "projects/my-project/locations/region-or-zone/subscriptions/my-subscription", Topic: "projects/my-project/locations/region-or-zone/topics/my-topic", // Do not wait for a published message to be successfully written to storage // before delivering it to subscribers. DeliveryRequirement: pubsublite.DeliverImmediately, } // Initialize the subscription to the oldest retained messages for each // partition. targetLocation := pubsublite.AtTargetLocation(pubsublite.Beginning) _, err = admin.CreateSubscription(ctx, subscriptionConfig, targetLocation) if err != nil { // TODO: Handle error. } }
Output:
Example (ExportToPubSub) ¶
This example demonstrates how to create a new subscription that exports messages to a Pub/Sub topic. See https://cloud.google.com/pubsub/lite/docs/export-pubsub for more information about how export subscriptions are configured.
package main import ( "context" "cloud.google.com/go/pubsublite" ) func main() { ctx := context.Background() // NOTE: resources must be located within this region. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. } defer admin.Close() subscriptionConfig := pubsublite.SubscriptionConfig{ Name: "projects/my-project/locations/region-or-zone/subscriptions/my-subscription", Topic: "projects/my-project/locations/region-or-zone/topics/my-topic", // Deliver a published message to subscribers after it has been successfully // written to storage. DeliveryRequirement: pubsublite.DeliverAfterStored, ExportConfig: &pubsublite.ExportConfig{ DesiredState: pubsublite.ExportActive, // Configure an export subscription to a Pub/Sub topic. Destination: &pubsublite.PubSubDestinationConfig{ Topic: "projects/my-project/topics/destination-pubsub-topic", }, // Optional Lite topic to receive messages that cannot be exported to the // destination. DeadLetterTopic: "projects/my-project/locations/region-or-zone/topics/dead-letter-topic", }, } _, err = admin.CreateSubscription(ctx, subscriptionConfig) if err != nil { // TODO: Handle error. } }
Output:
func (*AdminClient) CreateTopic ¶
func (ac *AdminClient) CreateTopic(ctx context.Context, config TopicConfig) (*TopicConfig, error)
CreateTopic creates a new topic from the given config. If the topic already exists an error will be returned.
Example ¶
This example demonstrates how to create a new topic. Topics may be regional or zonal. See https://cloud.google.com/pubsub/lite/docs/topics for more information about how Pub/Sub Lite topics are configured. See https://cloud.google.com/pubsub/lite/docs/locations for the list of regions and zones where Pub/Sub Lite is available.
package main import ( "context" "cloud.google.com/go/pubsublite" ) func main() { ctx := context.Background() // NOTE: resources must be located within this region. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. } defer admin.Close() const gib = 1 << 30 topicConfig := pubsublite.TopicConfig{ Name: "projects/my-project/locations/region-or-zone/topics/my-topic", PartitionCount: 2, // Must be at least 1. PublishCapacityMiBPerSec: 4, // Must be 4-16 MiB/s. SubscribeCapacityMiBPerSec: 8, // Must be 4-32 MiB/s. PerPartitionBytes: 30 * gib, // Must be 30 GiB-10 TiB. // Retain messages indefinitely as long as there is available storage. RetentionDuration: pubsublite.InfiniteRetention, } _, err = admin.CreateTopic(ctx, topicConfig) if err != nil { // TODO: Handle error. } }
Output:
func (*AdminClient) DeleteReservation ¶ added in v1.2.0
func (ac *AdminClient) DeleteReservation(ctx context.Context, reservation string) error
DeleteReservation deletes a reservation. A valid reservation path has the format: "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID".
Example ¶
package main import ( "context" "cloud.google.com/go/pubsublite" ) func main() { ctx := context.Background() admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. } defer admin.Close() const reservation = "projects/my-project/locations/region/reservations/my-reservation" if err := admin.DeleteReservation(ctx, reservation); err != nil { // TODO: Handle error. } }
Output:
func (*AdminClient) DeleteSubscription ¶
func (ac *AdminClient) DeleteSubscription(ctx context.Context, subscription string) error
DeleteSubscription deletes a subscription. A valid subscription path has the format: "projects/PROJECT_ID/locations/LOCATION/subscriptions/SUBSCRIPTION_ID".
Example ¶
package main import ( "context" "cloud.google.com/go/pubsublite" ) func main() { ctx := context.Background() // NOTE: resources must be located within this region. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. } defer admin.Close() const subscription = "projects/my-project/locations/region-or-zone/subscriptions/my-subscription" if err := admin.DeleteSubscription(ctx, subscription); err != nil { // TODO: Handle error. } }
Output:
func (*AdminClient) DeleteTopic ¶
func (ac *AdminClient) DeleteTopic(ctx context.Context, topic string) error
DeleteTopic deletes a topic. A valid topic path has the format: "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID".
Example ¶
package main import ( "context" "cloud.google.com/go/pubsublite" ) func main() { ctx := context.Background() // NOTE: resources must be located within this region. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. } defer admin.Close() const topic = "projects/my-project/locations/region-or-zone/topics/my-topic" if err := admin.DeleteTopic(ctx, topic); err != nil { // TODO: Handle error. } }
Output:
func (*AdminClient) Reservation ¶ added in v1.2.0
func (ac *AdminClient) Reservation(ctx context.Context, reservation string) (*ReservationConfig, error)
Reservation retrieves the configuration of a reservation. A valid reservation name has the format: "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID".
func (*AdminClient) ReservationTopics ¶ added in v1.2.0
func (ac *AdminClient) ReservationTopics(ctx context.Context, reservation string) *TopicPathIterator
ReservationTopics retrieves the list of topic paths for a reservation. A valid reservation path has the format: "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID".
Example ¶
package main import ( "context" "fmt" "cloud.google.com/go/pubsublite" "google.golang.org/api/iterator" ) func main() { ctx := context.Background() admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. } defer admin.Close() // List the paths of all topics using a reservation. const reservation = "projects/my-project/locations/region/reservations/my-reservation" it := admin.ReservationTopics(ctx, reservation) for { topicPath, err := it.Next() if err == iterator.Done { break } if err != nil { // TODO: Handle error. } fmt.Println(topicPath) } }
Output:
func (*AdminClient) Reservations ¶ added in v1.2.0
func (ac *AdminClient) Reservations(ctx context.Context, parent string) *ReservationIterator
Reservations retrieves the list of reservation configs for a given project and region. A valid parent path has the format: "projects/PROJECT_ID/locations/REGION".
Example ¶
package main import ( "context" "fmt" "cloud.google.com/go/pubsublite" "google.golang.org/api/iterator" ) func main() { ctx := context.Background() admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. } defer admin.Close() // List the configs of all reservations in the given region for the project. it := admin.Reservations(ctx, "projects/my-project/locations/region") for { reservation, err := it.Next() if err == iterator.Done { break } if err != nil { // TODO: Handle error. } fmt.Println(reservation) } }
Output:
func (*AdminClient) SeekSubscription ¶ added in v1.1.0
func (ac *AdminClient) SeekSubscription(ctx context.Context, subscription string, target SeekTarget, opts ...SeekSubscriptionOption) (*SeekSubscriptionOperation, error)
SeekSubscription initiates an out-of-band seek for a subscription to a specified target, which may be timestamps or named positions within the message backlog. A valid subscription path has the format: "projects/PROJECT_ID/locations/LOCATION/subscriptions/SUBSCRIPTION_ID".
See https://cloud.google.com/pubsub/lite/docs/seek for more information.
Example ¶
package main import ( "context" "fmt" "cloud.google.com/go/pubsublite" ) func main() { ctx := context.Background() // NOTE: resources must be located within this region. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. } defer admin.Close() const subscription = "projects/my-project/locations/region-or-zone/subscriptions/my-subscription" seekOp, err := admin.SeekSubscription(ctx, subscription, pubsublite.Beginning) if err != nil { // TODO: Handle error. } // Optional: Wait for the seek operation to complete, which indicates when // subscribers for all partitions are receiving messages from the seek target. _, err = seekOp.Wait(ctx) if err != nil { // TODO: Handle error. } metadata, err := seekOp.Metadata() if err != nil { // TODO: Handle error. } fmt.Println(metadata) }
Output:
func (*AdminClient) Subscription ¶
func (ac *AdminClient) Subscription(ctx context.Context, subscription string) (*SubscriptionConfig, error)
Subscription retrieves the configuration of a subscription. A valid subscription name has the format: "projects/PROJECT_ID/locations/LOCATION/subscriptions/SUBSCRIPTION_ID".
func (*AdminClient) Subscriptions ¶
func (ac *AdminClient) Subscriptions(ctx context.Context, parent string) *SubscriptionIterator
Subscriptions retrieves the list of subscription configs for a given project and location (region or zone). A valid parent path has the format: "projects/PROJECT_ID/locations/LOCATION".
Example ¶
package main import ( "context" "fmt" "cloud.google.com/go/pubsublite" "google.golang.org/api/iterator" ) func main() { ctx := context.Background() // NOTE: resources must be located within this region. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. } defer admin.Close() // List the configs of all subscriptions in the given region or zone for the project. it := admin.Subscriptions(ctx, "projects/my-project/locations/region-or-zone") for { subscriptionConfig, err := it.Next() if err == iterator.Done { break } if err != nil { // TODO: Handle error. } fmt.Println(subscriptionConfig) } }
Output:
func (*AdminClient) Topic ¶
func (ac *AdminClient) Topic(ctx context.Context, topic string) (*TopicConfig, error)
Topic retrieves the configuration of a topic. A valid topic path has the format: "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID".
func (*AdminClient) TopicPartitionCount ¶ added in v0.6.0
TopicPartitionCount returns the number of partitions for a topic. A valid topic path has the format: "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID".
func (*AdminClient) TopicSubscriptions ¶
func (ac *AdminClient) TopicSubscriptions(ctx context.Context, topic string) *SubscriptionPathIterator
TopicSubscriptions retrieves the list of subscription paths for a topic. A valid topic path has the format: "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID".
Example ¶
package main import ( "context" "fmt" "cloud.google.com/go/pubsublite" "google.golang.org/api/iterator" ) func main() { ctx := context.Background() // NOTE: resources must be located within this region. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. } defer admin.Close() // List the paths of all subscriptions of a topic. const topic = "projects/my-project/locations/region-or-zone/topics/my-topic" it := admin.TopicSubscriptions(ctx, topic) for { subscriptionPath, err := it.Next() if err == iterator.Done { break } if err != nil { // TODO: Handle error. } fmt.Println(subscriptionPath) } }
Output:
func (*AdminClient) Topics ¶
func (ac *AdminClient) Topics(ctx context.Context, parent string) *TopicIterator
Topics retrieves the list of topic configs for a given project and location (region or zone). A valid parent path has the format: "projects/PROJECT_ID/locations/LOCATION".
Example ¶
package main import ( "context" "fmt" "cloud.google.com/go/pubsublite" "google.golang.org/api/iterator" ) func main() { ctx := context.Background() // NOTE: resources must be located within this region. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. } defer admin.Close() // List the configs of all topics in the given region or zone for the project. it := admin.Topics(ctx, "projects/my-project/locations/region-or-zone") for { topicConfig, err := it.Next() if err == iterator.Done { break } if err != nil { // TODO: Handle error. } fmt.Println(topicConfig) } }
Output:
func (*AdminClient) UpdateReservation ¶ added in v1.2.0
func (ac *AdminClient) UpdateReservation(ctx context.Context, config ReservationConfigToUpdate) (*ReservationConfig, error)
UpdateReservation updates an existing reservation from the given config and returns the new reservation config. UpdateReservation returns an error if no fields were modified.
Example ¶
package main import ( "context" "cloud.google.com/go/pubsublite" ) func main() { ctx := context.Background() admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. } defer admin.Close() updateConfig := pubsublite.ReservationConfigToUpdate{ Name: "projects/my-project/locations/region/reservations/my-reservation", ThroughputCapacity: 20, } _, err = admin.UpdateReservation(ctx, updateConfig) if err != nil { // TODO: Handle error. } }
Output:
func (*AdminClient) UpdateSubscription ¶
func (ac *AdminClient) UpdateSubscription(ctx context.Context, config SubscriptionConfigToUpdate) (*SubscriptionConfig, error)
UpdateSubscription updates an existing subscription from the given config and returns the new subscription config. UpdateSubscription returns an error if no fields were modified.
Example ¶
package main import ( "context" "cloud.google.com/go/pubsublite" ) func main() { ctx := context.Background() // NOTE: resources must be located within this region. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. } defer admin.Close() updateConfig := pubsublite.SubscriptionConfigToUpdate{ Name: "projects/my-project/locations/region-or-zone/subscriptions/my-subscription", // Deliver a published message to subscribers after it has been successfully // written to storage. DeliveryRequirement: pubsublite.DeliverAfterStored, } _, err = admin.UpdateSubscription(ctx, updateConfig) if err != nil { // TODO: Handle error. } }
Output:
func (*AdminClient) UpdateTopic ¶
func (ac *AdminClient) UpdateTopic(ctx context.Context, config TopicConfigToUpdate) (*TopicConfig, error)
UpdateTopic updates an existing topic from the given config and returns the new topic config. UpdateTopic returns an error if no fields were modified.
Example ¶
package main import ( "context" "time" "cloud.google.com/go/pubsublite" ) func main() { ctx := context.Background() // NOTE: resources must be located within this region. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. } defer admin.Close() updateConfig := pubsublite.TopicConfigToUpdate{ Name: "projects/my-project/locations/region-or-zone/topics/my-topic", PartitionCount: 3, // Only increases currently supported. PublishCapacityMiBPerSec: 8, SubscribeCapacityMiBPerSec: 16, RetentionDuration: 24 * time.Hour, // Garbage collect messages older than 24 hours. } _, err = admin.UpdateTopic(ctx, updateConfig) if err != nil { // TODO: Handle error. } }
Output:
type BacklogLocation ¶ added in v0.8.0
type BacklogLocation int
BacklogLocation refers to a location with respect to the message backlog. It implements the SeekTarget interface.
const ( // End refers to the location past all currently published messages. End // skips the entire message backlog. End BacklogLocation = iota + 1 // Beginning refers to the location of the oldest retained message. Beginning )
type CreateSubscriptionOption ¶ added in v0.8.0
type CreateSubscriptionOption interface {
// contains filtered or unexported methods
}
CreateSubscriptionOption is an option for AdminClient.CreateSubscription.
func AtTargetLocation ¶ added in v1.6.0
func AtTargetLocation(target SeekTarget) CreateSubscriptionOption
AtTargetLocation specifies the target location within the message backlog that a new subscription should be initialized to.
An additional seek request is initiated if the target location is a publish or event time. If the seek fails, the created subscription is not deleted.
func StartingOffset ¶ added in v0.8.0
func StartingOffset(location BacklogLocation) CreateSubscriptionOption
StartingOffset specifies the offset at which a newly created subscription will start receiving messages.
Deprecated. This is equivalent to calling AtTargetLocation with a BacklogLocation and will be removed in the next major version.
type DeliveryRequirement ¶
type DeliveryRequirement int
DeliveryRequirement specifies when a subscription should send messages to subscribers relative to persistence in storage.
const ( // UnspecifiedDeliveryRequirement represents an unset delivery requirement. UnspecifiedDeliveryRequirement DeliveryRequirement = iota // DeliverImmediately means the server will not wait for a published message // to be successfully written to storage before delivering it to subscribers. DeliverImmediately // DeliverAfterStored means the server will not deliver a published message to // subscribers until the message has been successfully written to storage. // This will result in higher end-to-end latency, but consistent delivery. DeliverAfterStored )
type EventTime ¶ added in v1.1.0
EventTime is a message event timestamp. It implements the SeekTarget interface.
type ExportConfig ¶ added in v1.6.0
type ExportConfig struct { // The desired state of this export subscription. This should only be set to // ExportActive or ExportPaused. DesiredState ExportState // This is an output only field that reports the current export state. It is // ignored if set in any requests. CurrentState ExportState // The path of an optional Pub/Sub Lite topic to receive messages that cannot // be exported to the destination, in the format: // "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID". // Must be within the same project and location as the subscription. DeadLetterTopic string // The destination to export messages to. Destination ExportDestinationConfig }
ExportConfig describes the properties of a Pub/Sub Lite export subscription, which configures the service to write messages to a destination.
type ExportConfigToUpdate ¶ added in v1.6.0
type ExportConfigToUpdate struct { // If non-zero, updates the desired state. This should only be set to // ExportActive or ExportPaused. DesiredState ExportState // The path of an optional Pub/Sub Lite topic to receive messages that cannot // be exported to the destination, in the format: // "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID". // Must be within the same project and location as the subscription. DeadLetterTopic optional.String // If non-nil, updates the export destination configuration. Destination ExportDestinationConfig }
ExportConfigToUpdate specifies the properties to update for an export subscription.
type ExportDestinationConfig ¶ added in v1.6.0
type ExportDestinationConfig interface {
// contains filtered or unexported methods
}
ExportDestinationConfig is the configuration for exporting to a destination. Implemented by *PubSubDestinationConfig.
type ExportState ¶ added in v1.6.0
type ExportState int
ExportState specifies the desired state of an export subscription.
const ( // UnspecifiedExportState represents an unset export state. UnspecifiedExportState ExportState = iota // ExportActive specifies that export processing should be enabled. ExportActive // ExportPaused specifies that export processing should be suspended. ExportPaused // ExportPermissionDenied specifies that messages cannot be exported due to // permission denied errors. Output only. ExportPermissionDenied // ExportResourceNotFound specifies that messages cannot be exported due to // missing resources. Output only. ExportResourceNotFound )
type OperationMetadata ¶ added in v1.1.0
type OperationMetadata struct { // The target of the operation. For example, targets of seeks are // subscriptions, structured like: // "projects/PROJECT_ID/locations/LOCATION/subscriptions/SUBSCRIPTION_ID" Target string // The verb describing the kind of operation. Verb string // The time the operation was created. CreateTime time.Time // The time the operation finished running. Is zero if the operation has not // completed. EndTime time.Time }
OperationMetadata stores metadata for long-running operations.
type PubSubDestinationConfig ¶ added in v1.6.0
type PubSubDestinationConfig struct { // The path of a Pub/Sub topic, in the format: // "projects/PROJECT_ID/topics/TOPIC_ID". Topic string }
PubSubDestinationConfig configures messages to be exported to a Pub/Sub topic. Implements the ExportDestinationConfig interface.
See https://cloud.google.com/pubsub/lite/docs/export-pubsub for more information about how export subscriptions to Pub/Sub are configured.
type PublishTime ¶ added in v1.1.0
PublishTime is a message publish timestamp. It implements the SeekTarget interface.
type ReservationConfig ¶ added in v1.2.0
type ReservationConfig struct { // The full path of the reservation, in the format: // "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID". // // - PROJECT_ID: The project ID (e.g. "my-project") or the project number // (e.g. "987654321") can be provided. // - REGION: The Google Cloud region (e.g. "us-central1") for the reservation. // See https://cloud.google.com/pubsub/lite/docs/locations for the list of // regions where Pub/Sub Lite is available. // - RESERVATION_ID: The ID of the reservation (e.g. "my-reservation"). See // https://cloud.google.com/pubsub/docs/admin#resource_names for information // about valid reservation IDs. Name string // The reserved throughput capacity. Every unit of throughput capacity is // equivalent to 1 MiB/s of published messages or 2 MiB/s of subscribed // messages. // // Any topics which are declared as using capacity from a reservation will // consume resources from this reservation instead of being charged // individually. ThroughputCapacity int }
ReservationConfig describes the properties of a Pub/Sub Lite reservation.
type ReservationConfigToUpdate ¶ added in v1.2.0
type ReservationConfigToUpdate struct { // The full path of the reservation to update, in the format: // "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID". // Required. Name string // If non-zero, updates the throughput capacity. ThroughputCapacity int }
ReservationConfigToUpdate specifies the properties to update for a reservation.
type ReservationIterator ¶ added in v1.2.0
type ReservationIterator struct {
// contains filtered or unexported fields
}
ReservationIterator is an iterator that returns a list of reservation configs.
func (*ReservationIterator) Next ¶ added in v1.2.0
func (r *ReservationIterator) Next() (*ReservationConfig, error)
Next returns the next reservation config. The second return value will be iterator.Done if there are no more reservation configs.
type SeekSubscriptionOperation ¶ added in v1.1.0
type SeekSubscriptionOperation struct {
// contains filtered or unexported fields
}
SeekSubscriptionOperation manages a long-running seek operation from AdminClient.SeekSubscription.
func (*SeekSubscriptionOperation) Done ¶ added in v1.1.0
func (s *SeekSubscriptionOperation) Done() bool
Done returns whether the seek operation has completed.
func (*SeekSubscriptionOperation) Metadata ¶ added in v1.1.0
func (s *SeekSubscriptionOperation) Metadata() (*OperationMetadata, error)
Metadata returns metadata associated with the seek operation. To get the latest metadata, call this method after a successful call to Wait.
func (*SeekSubscriptionOperation) Name ¶ added in v1.1.0
func (s *SeekSubscriptionOperation) Name() string
Name returns the path of the seek operation, in the format: "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID".
func (*SeekSubscriptionOperation) Wait ¶ added in v1.1.0
func (s *SeekSubscriptionOperation) Wait(ctx context.Context) (*SeekSubscriptionResult, error)
Wait polls until the seek operation is complete and returns one of the following:
- A SeekSubscriptionResult and nil error if the operation is complete and succeeded.
- Error containing failure reason if the operation is complete and failed.
- Error if polling the operation status failed due to a non-retryable error.
type SeekSubscriptionOption ¶ added in v1.1.0
type SeekSubscriptionOption interface{}
SeekSubscriptionOption is reserved for future options.
type SeekSubscriptionResult ¶ added in v1.1.0
type SeekSubscriptionResult struct{}
SeekSubscriptionResult is the result of a seek subscription operation. Currently empty.
type SeekTarget ¶ added in v1.1.0
type SeekTarget interface {
// contains filtered or unexported methods
}
SeekTarget is the target location to seek a subscription to. Implemented by BacklogLocation, PublishTime, EventTime.
type SubscriptionConfig ¶
type SubscriptionConfig struct { // The full path of the subscription, in the format: // "projects/PROJECT_ID/locations/LOCATION/subscriptions/SUBSCRIPTION_ID". // // - PROJECT_ID: The project ID (e.g. "my-project") or the project number // (e.g. "987654321") can be provided. // - LOCATION: The Google Cloud region (e.g. "us-central1") or zone // (e.g. "us-central1-a") of the corresponding topic. // - SUBSCRIPTION_ID: The ID of the subscription (e.g. "my-subscription"). See // https://cloud.google.com/pubsub/docs/admin#resource_names for information // about valid subscription IDs. Name string // The path of the topic that this subscription is attached to, in the format: // "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID". This cannot be // changed after creation. Topic string // Whether a message should be delivered to subscribers immediately after it // has been published or after it has been successfully written to storage. DeliveryRequirement DeliveryRequirement // If non-nil, configures this subscription to export messages from the // associated topic to a destination. The ExportConfig cannot be removed after // creation of the subscription, however its properties can be changed. ExportConfig *ExportConfig }
SubscriptionConfig describes the properties of a Pub/Sub Lite subscription, which is attached to a Pub/Sub Lite topic. See https://cloud.google.com/pubsub/lite/docs/subscriptions for more information about how subscriptions are configured.
type SubscriptionConfigToUpdate ¶
type SubscriptionConfigToUpdate struct { // The full path of the subscription to update, in the format: // "projects/PROJECT_ID/locations/LOCATION/subscriptions/SUBSCRIPTION_ID". // Required. Name string // If non-zero, updates the message delivery requirement. DeliveryRequirement DeliveryRequirement // If non-nil, updates export config properties. ExportConfig *ExportConfigToUpdate }
SubscriptionConfigToUpdate specifies the properties to update for a subscription.
type SubscriptionIterator ¶
type SubscriptionIterator struct {
// contains filtered or unexported fields
}
SubscriptionIterator is an iterator that returns a list of subscription configs.
func (*SubscriptionIterator) Next ¶
func (s *SubscriptionIterator) Next() (*SubscriptionConfig, error)
Next returns the next subscription config. The second return value will be iterator.Done if there are no more subscription configs.
type SubscriptionPathIterator ¶
type SubscriptionPathIterator struct {
// contains filtered or unexported fields
}
SubscriptionPathIterator is an iterator that returns a list of subscription paths.
func (*SubscriptionPathIterator) Next ¶
func (sp *SubscriptionPathIterator) Next() (string, error)
Next returns the next subscription path, which has format: "projects/PROJECT_ID/locations/LOCATION/subscriptions/SUBSCRIPTION_ID". The second return value will be iterator.Done if there are no more subscription paths.
type TopicConfig ¶
type TopicConfig struct { // The full path of the topic, in the format: // "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID". // // - PROJECT_ID: The project ID (e.g. "my-project") or the project number // (e.g. "987654321") can be provided. // - LOCATION: The Google Cloud region (e.g. "us-central1") or zone // (e.g. "us-central1-a") where the topic is located. // See https://cloud.google.com/pubsub/lite/docs/locations for the list of // regions and zones where Pub/Sub Lite is available. // - TOPIC_ID: The ID of the topic (e.g. "my-topic"). See // https://cloud.google.com/pubsub/docs/admin#resource_names for information // about valid topic IDs. Name string // The number of partitions in the topic. Must be at least 1. Can be increased // after creation, but not decreased. PartitionCount int // Publish throughput capacity per partition in MiB/s. // Must be >= 4 and <= 16. PublishCapacityMiBPerSec int // Subscribe throughput capacity per partition in MiB/s. // Must be >= 4 and <= 32. SubscribeCapacityMiBPerSec int // The provisioned storage, in bytes, per partition. If the number of bytes // stored in any of the topic's partitions grows beyond this value, older // messages will be dropped to make room for newer ones, regardless of the // value of `RetentionDuration`. Must be >= 30 GiB. PerPartitionBytes int64 // How long a published message is retained. If set to `InfiniteRetention`, // messages will be retained as long as the bytes retained for each partition // is below `PerPartitionBytes`. Otherwise, must be > 0. RetentionDuration time.Duration // The path of the reservation to use for this topic's throughput capacity, in // the format: // "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID". ThroughputReservation string }
TopicConfig describes the properties of a Pub/Sub Lite topic. See https://cloud.google.com/pubsub/lite/docs/topics for more information about how topics are configured.
type TopicConfigToUpdate ¶
type TopicConfigToUpdate struct { // The full path of the topic to update, in the format: // "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID". Required. Name string // If non-zero, will update the number of partitions in the topic. // Set value must be >= 1. The number of partitions can only be increased, not // decreased. PartitionCount int // If non-zero, will update the publish throughput capacity per partition. // Set value must be >= 4 and <= 16. PublishCapacityMiBPerSec int // If non-zero, will update the subscribe throughput capacity per partition. // Set value must be >= 4 and <= 32. SubscribeCapacityMiBPerSec int // If non-zero, will update the provisioned storage per partition. // Set value must be >= 30 GiB. PerPartitionBytes int64 // If specified, will update how long a published message is retained. To // clear a retention duration (i.e. retain messages as long as there is // available storage), set this to `InfiniteRetention`. RetentionDuration optional.Duration // The path of the reservation to use for this topic's throughput capacity, in // the format: // "projects/PROJECT_ID/locations/REGION/reservations/RESERVATION_ID". ThroughputReservation optional.String }
TopicConfigToUpdate specifies the properties to update for a topic.
type TopicIterator ¶
type TopicIterator struct {
// contains filtered or unexported fields
}
TopicIterator is an iterator that returns a list of topic configs.
func (*TopicIterator) Next ¶
func (t *TopicIterator) Next() (*TopicConfig, error)
Next returns the next topic config. The second return value will be iterator.Done if there are no more topic configs.
type TopicPathIterator ¶ added in v1.2.0
type TopicPathIterator struct {
// contains filtered or unexported fields
}
TopicPathIterator is an iterator that returns a list of topic paths.
func (*TopicPathIterator) Next ¶ added in v1.2.0
func (sp *TopicPathIterator) Next() (string, error)
Next returns the next topic path, which has format: "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID". The second return value will be iterator.Done if there are no more topic paths.
Directories ¶
Path | Synopsis |
---|---|
Package pubsublite is an auto-generated package for the Pub/Sub Lite API.
|
Package pubsublite is an auto-generated package for the Pub/Sub Lite API. |
Package pscompat contains clients for publishing and subscribing using the Pub/Sub Lite service.
|
Package pscompat contains clients for publishing and subscribing using the Pub/Sub Lite service. |