googlecloud

package
v1.0.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2023 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ProviderName identifies the provider as Google Cloud Pub/Sub in logs.
	ProviderName = "google_cloud_pubsub"
)
View Source
const UUIDHeaderKey = "_watermill_message_uuid"

UUIDHeaderKey is the key of the Pub/Sub attribute that carries Waterfall UUID.

Variables

View Source
var (
	// ErrPublisherClosed happens when trying to publish to a topic while the publisher is closed or closing.
	ErrPublisherClosed = errors.New("publisher is closed")
	// ErrTopicDoesNotExist happens when trying to publish or subscribe to a topic that doesn't exist.
	ErrTopicDoesNotExist = errors.New("topic does not exist")
	// ErrConnectTimeout happens when the Google Cloud PubSub connection context times out.
	ErrConnectTimeout = errors.New("connect timeout")
)
View Source
var (
	// ErrSubscriberClosed happens when trying to subscribe to a new topic while the subscriber is closed or closing.
	ErrSubscriberClosed = errors.New("subscriber is closed")
	// ErrSubscriptionDoesNotExist happens when trying to use a subscription that does not exist.
	ErrSubscriptionDoesNotExist = errors.New("subscription does not exist")
	// ErrUnexpectedTopic happens when the subscription resolved from SubscriptionNameFn is for a different topic than expected.
	ErrUnexpectedTopic = errors.New("requested subscription already exists, but for other topic than expected")
)

Functions

func TopicSubscriptionName

func TopicSubscriptionName(topic string) string

TopicSubscriptionName uses the topic name as the subscription name.

Types

type DefaultMarshalerUnmarshaler

type DefaultMarshalerUnmarshaler struct{}

DefaultMarshalerUnmarshaler implements Marshaler and Unmarshaler in the following way: All Google Cloud Pub/Sub attributes are equivalent to Waterfall Message metadata. Waterfall Message UUID is equivalent to an attribute with `UUIDHeaderKey` as key.

func (DefaultMarshalerUnmarshaler) Marshal

func (DefaultMarshalerUnmarshaler) Unmarshal

func (DefaultMarshalerUnmarshaler) Unmarshal(pubsubMsg *pubsub.Message) (*message.Message, error)

type ExtractOrderingKey

type ExtractOrderingKey func(orderingKey string, msg *message.Message) error

type GenerateOrderingKey

type GenerateOrderingKey func(topic string, msg *message.Message) (string, error)

type Marshaler

type Marshaler interface {
	Marshal(topic string, msg *message.Message) (*pubsub.Message, error)
}

Marshaler transforms a Waterfall Message into the Google Cloud client library Message.

func NewOrderingMarshaler

func NewOrderingMarshaler(generateOrderingKey GenerateOrderingKey) Marshaler

type MarshalerUnmarshaler

type MarshalerUnmarshaler interface {
	Marshaler
	Unmarshaler
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)

func (*Publisher) Close

func (p *Publisher) Close() error

Close notifies the Publisher to stop processing messages, send all the remaining messages and close the connection.

func (*Publisher) Publish

func (p *Publisher) Publish(topic string, messages ...*message.Message) error

Publish publishes a set of messages on a Google Cloud Pub/Sub topic. It blocks until all the messages are successfully published or an error occurred.

To receive messages published to a topic, you must create a subscription to that topic. Only messages published to the topic after the subscription is created are available to subscriber applications.

See https://cloud.google.com/pubsub/docs/publisher to find out more about how Google Cloud Pub/Sub Publishers work.

type PublisherConfig

type PublisherConfig struct {
	// ProjectID is the Google Cloud Engine project ID.
	ProjectID string

	// If false (default), `Publisher` tries to create a topic if there is none with the requested name.
	// Otherwise, trying to subscribe to non-existent subscription results in `ErrTopicDoesNotExist`.
	DoNotCreateTopicIfMissing bool
	// Enables the topic message ordering
	EnableMessageOrdering bool
	// Enables automatic resume publish upon error
	EnableMessageOrderingAutoResumePublishOnError bool

	// ConnectTimeout defines the timeout for connecting to Pub/Sub
	ConnectTimeout time.Duration
	// PublishTimeout defines the timeout for publishing messages.
	PublishTimeout time.Duration

	// Settings for cloud.google.com/go/pubsub client library.
	PublishSettings *pubsub.PublishSettings
	ClientOptions   []option.ClientOption

	Marshaler Marshaler
}

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber attaches to a Google Cloud Pub/Sub subscription and returns a Go channel with messages from the topic. Be aware that in Google Cloud Pub/Sub, only messages sent after the subscription was created can be consumed.

For more info on how Google Cloud Pub/Sub Subscribers work, check https://cloud.google.com/pubsub/docs/subscriber.

func NewSubscriber

func NewSubscriber(
	config SubscriberConfig,
	logger watermill.LoggerAdapter,
) (*Subscriber, error)

func (*Subscriber) Close

func (s *Subscriber) Close() error

Close notifies the Subscriber to stop processing messages on all subscriptions, close all the output channels and terminate the connection.

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

Subscribe consumes Google Cloud Pub/Sub and outputs them as Waterfall Message objects on the returned channel.

In Google Cloud Pub/Sub, it is impossible to subscribe directly to a topic. Instead, a *subscription* is used. Each subscription has one topic, but there may be multiple subscriptions to one topic (with different names).

The `topic` argument is transformed into subscription name with the configured `GenerateSubscriptionName` function. By default, if the subscription or topic don't exist, the are created. This behavior may be changed in the config.

Be aware that in Google Cloud Pub/Sub, only messages sent after the subscription was created can be consumed.

See https://cloud.google.com/pubsub/docs/subscriber to find out more about how Google Cloud Pub/Sub Subscriptions work.

func (*Subscriber) SubscribeInitialize

func (s *Subscriber) SubscribeInitialize(topic string) (err error)

type SubscriberConfig

type SubscriberConfig struct {
	// GenerateSubscriptionName generates subscription name for a given topic.
	// The subscription connects the topic to a subscriber application that receives and processes
	// messages published to the topic.
	//
	// By default, subscriptions expire after 31 days of inactivity.
	//
	// A topic can have multiple subscriptions, but a given subscription belongs to a single topic.
	GenerateSubscriptionName SubscriptionNameFn

	// ProjectID is the Google Cloud Engine project ID.
	ProjectID string

	// TopicProjectID is an optionnal configuration value representing
	// the underlying topic Google Cloud Engine project ID.
	// This can be helpful when subscription is linked to a topic for another project.
	TopicProjectID string

	// If false (default), `Subscriber` tries to create a subscription if there is none with the requested name.
	// Otherwise, trying to use non-existent subscription results in `ErrSubscriptionDoesNotExist`.
	DoNotCreateSubscriptionIfMissing bool

	// If false (default), `Subscriber` tries to create a topic if there is none with the requested name
	// and it is trying to create a new subscription with this topic name.
	// Otherwise, trying to create a subscription on non-existent topic results in `ErrTopicDoesNotExist`.
	DoNotCreateTopicIfMissing bool

	// deprecated: ConnectTimeout is no longer used, please use timeout on context in Subscribe() method
	ConnectTimeout time.Duration

	// InitializeTimeout defines the timeout for initializing topics.
	InitializeTimeout time.Duration

	// Settings for cloud.google.com/go/pubsub client library.
	ReceiveSettings    pubsub.ReceiveSettings
	SubscriptionConfig pubsub.SubscriptionConfig
	ClientOptions      []option.ClientOption

	// Unmarshaler transforms the client library format into watermill/message.Message.
	// Use a custom unmarshaler if needed, otherwise the default Unmarshaler should cover most use cases.
	Unmarshaler Unmarshaler
}

type SubscriptionNameFn

type SubscriptionNameFn func(topic string) string

func TopicSubscriptionNameWithSuffix

func TopicSubscriptionNameWithSuffix(suffix string) SubscriptionNameFn

TopicSubscriptionNameWithSuffix uses the topic name with a chosen suffix as the subscription name.

type Unmarshaler

type Unmarshaler interface {
	Unmarshal(*pubsub.Message) (*message.Message, error)
}

Unmarshaler transforms a Google Cloud client library Message into the Waterfall Message.

func NewOrderingUnmarshaler

func NewOrderingUnmarshaler(extractOrderingKey ExtractOrderingKey) Unmarshaler

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL