gcppubsub

package
v0.35.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 8, 2023 License: Apache-2.0 Imports: 25 Imported by: 88

Documentation

Overview

Package gcppubsub provides a pubsub implementation that uses GCP PubSub. Use OpenTopic to construct a *pubsub.Topic, and/or OpenSubscription to construct a *pubsub.Subscription.

URLs

For pubsub.OpenTopic and pubsub.OpenSubscription, gcppubsub registers for the scheme "gcppubsub". The default URL opener will creating a connection using use default credentials from the environment, as described in https://cloud.google.com/docs/authentication/production. To customize the URL opener, or for more details on the URL format, see URLOpener. See https://gocloud.dev/concepts/urls/ for background information.

GCP Pub/Sub emulator is supported as per https://cloud.google.com/pubsub/docs/emulator So, when environment variable 'PUBSUB_EMULATOR_HOST' is set driver connects to the specified emulator host by default.

Message Delivery Semantics

GCP Pub/Sub supports at-least-once semantics; applications must call Message.Ack after processing a message, or it will be redelivered. See https://godoc.org/gocloud.dev/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more background.

As

gcppubsub exposes the following types for As:

  • Topic: *raw.PublisherClient
  • Subscription: *raw.SubscriberClient
  • Message.BeforeSend: *pb.PubsubMessage
  • Message.AfterSend: *string for the pb.PublishResponse.MessageIds entry corresponding to the message.
  • Message: *pb.PubsubMessage, *pb.ReceivedMessage
  • Error: *google.golang.org/grpc/status.Status
Example (OpenSubscriptionFromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/gcppubsub"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	subscription, err := pubsub.OpenSubscription(ctx,
		"gcppubsub://projects/my-project/subscriptions/my-subscription")
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}
Output:

Example (OpenTopicFromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/gcppubsub"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	topic, err := pubsub.OpenTopic(ctx, "gcppubsub://projects/myproject/topics/mytopic")
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Output:

Index

Examples

Constants

View Source
const Scheme = "gcppubsub"

Scheme is the URL scheme gcppubsub registers its URLOpeners under on pubsub.DefaultMux.

Variables

View Source
var Set = wire.NewSet(
	Dial,
	PublisherClient,
	SubscriberClient,
	wire.Struct(new(SubscriptionOptions)),
	wire.Struct(new(TopicOptions)),
	wire.Struct(new(URLOpener), "Conn", "TopicOptions", "SubscriptionOptions"),
)

Set holds Wire providers for this package.

Functions

func Dial

func Dial(ctx context.Context, ts gcp.TokenSource) (*grpc.ClientConn, func(), error)

Dial opens a gRPC connection to the GCP Pub Sub API.

The second return value is a function that can be called to clean up the connection opened by Dial.

func OpenSubscription

func OpenSubscription(client *raw.SubscriberClient, projectID gcp.ProjectID, subscriptionName string, opts *SubscriptionOptions) *pubsub.Subscription

OpenSubscription returns a *pubsub.Subscription backed by an existing GCP PubSub subscription subscriptionName in the given projectID. See the package documentation for an example.

Example
package main

import (
	"context"
	"log"

	"gocloud.dev/gcp"
	"gocloud.dev/pubsub/gcppubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// Your GCP credentials.
	// See https://cloud.google.com/docs/authentication/production
	// for more info on alternatives.
	creds, err := gcp.DefaultCredentials(ctx)
	if err != nil {
		log.Fatal(err)
	}

	// Open a gRPC connection to the GCP Pub/Sub API.
	conn, cleanup, err := gcppubsub.Dial(ctx, creds.TokenSource)
	if err != nil {
		log.Fatal(err)
	}
	defer cleanup()

	// Construct a SubscriberClient using the connection.
	subClient, err := gcppubsub.SubscriberClient(ctx, conn)
	if err != nil {
		log.Fatal(err)
	}
	defer subClient.Close()

	// Construct a *pubsub.Subscription.
	subscription, err := gcppubsub.OpenSubscriptionByPath(
		subClient, "projects/myprojectID/subscriptions/example-subscription", nil)
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}
Output:

func OpenSubscriptionByPath added in v0.16.0

func OpenSubscriptionByPath(client *raw.SubscriberClient, subscriptionPath string, opts *SubscriptionOptions) (*pubsub.Subscription, error)

OpenSubscriptionByPath returns a *pubsub.Subscription backed by an existing GCP PubSub subscription. subscriptionPath must be of the form "projects/<projectID>/subscriptions/<subscription>". See the package documentation for an example.

func OpenTopic

func OpenTopic(client *raw.PublisherClient, projectID gcp.ProjectID, topicName string, opts *TopicOptions) *pubsub.Topic

OpenTopic returns a *pubsub.Topic backed by an existing GCP PubSub topic in the given projectID. topicName is the last part of the full topic path, e.g., "foo" from "projects/<projectID>/topic/foo". See the package documentation for an example.

Example
package main

import (
	"context"
	"log"

	"gocloud.dev/gcp"
	"gocloud.dev/pubsub/gcppubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// Your GCP credentials.
	// See https://cloud.google.com/docs/authentication/production
	// for more info on alternatives.
	creds, err := gcp.DefaultCredentials(ctx)
	if err != nil {
		log.Fatal(err)
	}
	// Open a gRPC connection to the GCP Pub/Sub API.
	conn, cleanup, err := gcppubsub.Dial(ctx, creds.TokenSource)
	if err != nil {
		log.Fatal(err)
	}
	defer cleanup()

	// Construct a PublisherClient using the connection.
	pubClient, err := gcppubsub.PublisherClient(ctx, conn)
	if err != nil {
		log.Fatal(err)
	}
	defer pubClient.Close()

	// Construct a *pubsub.Topic.
	topic, err := gcppubsub.OpenTopicByPath(pubClient, "projects/myprojectID/topics/example-topic", nil)
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Output:

func OpenTopicByPath added in v0.16.0

func OpenTopicByPath(client *raw.PublisherClient, topicPath string, opts *TopicOptions) (*pubsub.Topic, error)

OpenTopicByPath returns a *pubsub.Topic backed by an existing GCP PubSub topic. topicPath must be of the form "projects/<projectID>/topic/<topic>". See the package documentation for an example.

func PublisherClient

func PublisherClient(ctx context.Context, conn *grpc.ClientConn) (*raw.PublisherClient, error)

PublisherClient returns a *raw.PublisherClient that can be used in OpenTopic.

func SubscriberClient

func SubscriberClient(ctx context.Context, conn *grpc.ClientConn) (*raw.SubscriberClient, error)

SubscriberClient returns a *raw.SubscriberClient that can be used in OpenSubscription.

Types

type SubscriptionOptions

type SubscriptionOptions struct {
	// MaxBatchSize caps the maximum batch size used when retrieving messages. It defaults to 1000.
	MaxBatchSize int

	// NackLazy determines what Nack does.
	//
	// By default, Nack uses ModifyAckDeadline to set the ack deadline
	// for the nacked message to 0, so that it will be redelivered immediately.
	// Set NackLazy to true to bypass this behavior; Nack will do nothing,
	// and the message will be redelivered after the existing ack deadline
	// expires.
	NackLazy bool

	// ReceiveBatcherOptions adds constraints to the default batching done for receives.
	ReceiveBatcherOptions batcher.Options

	// AckBatcherOptions adds constraints to the default batching done for acks.
	AckBatcherOptions batcher.Options
}

SubscriptionOptions will contain configuration for subscriptions.

type TopicOptions

type TopicOptions struct {
	// BatcherOptions adds constraints to the default batching done for sends.
	BatcherOptions batcher.Options
}

TopicOptions will contain configuration for topics.

type URLOpener added in v0.12.0

type URLOpener struct {
	// Conn must be set to a non-nil ClientConn authenticated with
	// Cloud Pub/Sub scope or equivalent.
	Conn *grpc.ClientConn

	// TopicOptions specifies the options to pass to OpenTopic.
	TopicOptions TopicOptions
	// SubscriptionOptions specifies the options to pass to OpenSubscription.
	SubscriptionOptions SubscriptionOptions
}

URLOpener opens GCP Pub/Sub URLs like "gcppubsub://projects/myproject/topics/mytopic" for topics or "gcppubsub://projects/myproject/subscriptions/mysub" for subscriptions.

The shortened forms "gcppubsub://myproject/mytopic" for topics or "gcppubsub://myproject/mysub" for subscriptions are also supported.

The following query parameters are supported:

  • max_recv_batch_size: sets SubscriptionOptions.MaxBatchSize.
  • max_send_batch_size: sets TopicOptions.BatcherOptions.MaxBatchSize.
  • nacklazy: sets SubscriberOptions.NackLazy. The value must be parseable by `strconv.ParseBool`.

Currently their use is limited to subscribers.

func (*URLOpener) OpenSubscriptionURL added in v0.12.0

func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)

OpenSubscriptionURL opens a pubsub.Subscription based on u.

func (*URLOpener) OpenTopicURL added in v0.12.0

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on u.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL