Documentation ¶
Index ¶
- Constants
- Variables
- func TopicSubscriptionName(topic string) string
- type DefaultMarshalerUnmarshaler
- type ExtractOrderingKey
- type GenerateOrderingKey
- type Marshaler
- type MarshalerUnmarshaler
- type Publisher
- type PublisherConfig
- type Subscriber
- type SubscriberConfig
- type SubscriptionNameFn
- type Unmarshaler
Constants ¶
const (
// ProviderName identifies the provider as Google Cloud Pub/Sub in logs.
ProviderName = "google_cloud_pubsub"
)
const UUIDHeaderKey = "_watermill_message_uuid"
UUIDHeaderKey is the key of the Pub/Sub attribute that carries Waterfall UUID.
Variables ¶
var ( // ErrPublisherClosed happens when trying to publish to a topic while the publisher is closed or closing. ErrPublisherClosed = errors.New("publisher is closed") // ErrTopicDoesNotExist happens when trying to publish or subscribe to a topic that doesn't exist. ErrTopicDoesNotExist = errors.New("topic does not exist") // ErrConnectTimeout happens when the Google Cloud PubSub connection context times out. ErrConnectTimeout = errors.New("connect timeout") )
var ( // ErrSubscriberClosed happens when trying to subscribe to a new topic while the subscriber is closed or closing. ErrSubscriberClosed = errors.New("subscriber is closed") // ErrSubscriptionDoesNotExist happens when trying to use a subscription that does not exist. ErrSubscriptionDoesNotExist = errors.New("subscription does not exist") // ErrUnexpectedTopic happens when the subscription resolved from SubscriptionNameFn is for a different topic than expected. ErrUnexpectedTopic = errors.New("requested subscription already exists, but for other topic than expected") )
Functions ¶
func TopicSubscriptionName ¶
TopicSubscriptionName uses the topic name as the subscription name.
Types ¶
type DefaultMarshalerUnmarshaler ¶
type DefaultMarshalerUnmarshaler struct{}
DefaultMarshalerUnmarshaler implements Marshaler and Unmarshaler in the following way: All Google Cloud Pub/Sub attributes are equivalent to Waterfall Message metadata. Waterfall Message UUID is equivalent to an attribute with `UUIDHeaderKey` as key.
type ExtractOrderingKey ¶
type GenerateOrderingKey ¶
type Marshaler ¶
Marshaler transforms a Waterfall Message into the Google Cloud client library Message.
func NewOrderingMarshaler ¶
func NewOrderingMarshaler(generateOrderingKey GenerateOrderingKey) Marshaler
type MarshalerUnmarshaler ¶
type MarshalerUnmarshaler interface { Marshaler Unmarshaler }
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func NewPublisher ¶
func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)
func (*Publisher) Close ¶
Close notifies the Publisher to stop processing messages, send all the remaining messages and close the connection.
func (*Publisher) Publish ¶
Publish publishes a set of messages on a Google Cloud Pub/Sub topic. It blocks until all the messages are successfully published or an error occurred.
To receive messages published to a topic, you must create a subscription to that topic. Only messages published to the topic after the subscription is created are available to subscriber applications.
See https://cloud.google.com/pubsub/docs/publisher to find out more about how Google Cloud Pub/Sub Publishers work.
type PublisherConfig ¶
type PublisherConfig struct { // ProjectID is the Google Cloud Engine project ID. ProjectID string // If false (default), `Publisher` tries to create a topic if there is none with the requested name. // Otherwise, trying to subscribe to non-existent subscription results in `ErrTopicDoesNotExist`. DoNotCreateTopicIfMissing bool // Enables the topic message ordering EnableMessageOrdering bool // Enables automatic resume publish upon error EnableMessageOrderingAutoResumePublishOnError bool // ConnectTimeout defines the timeout for connecting to Pub/Sub ConnectTimeout time.Duration // PublishTimeout defines the timeout for publishing messages. PublishTimeout time.Duration // Settings for cloud.google.com/go/pubsub client library. PublishSettings *pubsub.PublishSettings ClientOptions []option.ClientOption Marshaler Marshaler }
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber attaches to a Google Cloud Pub/Sub subscription and returns a Go channel with messages from the topic. Be aware that in Google Cloud Pub/Sub, only messages sent after the subscription was created can be consumed.
For more info on how Google Cloud Pub/Sub Subscribers work, check https://cloud.google.com/pubsub/docs/subscriber.
func NewSubscriber ¶
func NewSubscriber( config SubscriberConfig, logger watermill.LoggerAdapter, ) (*Subscriber, error)
func (*Subscriber) Close ¶
func (s *Subscriber) Close() error
Close notifies the Subscriber to stop processing messages on all subscriptions, close all the output channels and terminate the connection.
func (*Subscriber) Subscribe ¶
Subscribe consumes Google Cloud Pub/Sub and outputs them as Waterfall Message objects on the returned channel.
In Google Cloud Pub/Sub, it is impossible to subscribe directly to a topic. Instead, a *subscription* is used. Each subscription has one topic, but there may be multiple subscriptions to one topic (with different names).
The `topic` argument is transformed into subscription name with the configured `GenerateSubscriptionName` function. By default, if the subscription or topic don't exist, the are created. This behavior may be changed in the config.
Be aware that in Google Cloud Pub/Sub, only messages sent after the subscription was created can be consumed.
See https://cloud.google.com/pubsub/docs/subscriber to find out more about how Google Cloud Pub/Sub Subscriptions work.
func (*Subscriber) SubscribeInitialize ¶
func (s *Subscriber) SubscribeInitialize(topic string) (err error)
type SubscriberConfig ¶
type SubscriberConfig struct { // GenerateSubscriptionName generates subscription name for a given topic. // The subscription connects the topic to a subscriber application that receives and processes // messages published to the topic. // // By default, subscriptions expire after 31 days of inactivity. // // A topic can have multiple subscriptions, but a given subscription belongs to a single topic. GenerateSubscriptionName SubscriptionNameFn // ProjectID is the Google Cloud Engine project ID. ProjectID string // TopicProjectID is an optionnal configuration value representing // the underlying topic Google Cloud Engine project ID. // This can be helpful when subscription is linked to a topic for another project. TopicProjectID string // If false (default), `Subscriber` tries to create a subscription if there is none with the requested name. // Otherwise, trying to use non-existent subscription results in `ErrSubscriptionDoesNotExist`. DoNotCreateSubscriptionIfMissing bool // If false (default), `Subscriber` tries to create a topic if there is none with the requested name // and it is trying to create a new subscription with this topic name. // Otherwise, trying to create a subscription on non-existent topic results in `ErrTopicDoesNotExist`. DoNotCreateTopicIfMissing bool // deprecated: ConnectTimeout is no longer used, please use timeout on context in Subscribe() method ConnectTimeout time.Duration // InitializeTimeout defines the timeout for initializing topics. InitializeTimeout time.Duration // Settings for cloud.google.com/go/pubsub client library. ReceiveSettings pubsub.ReceiveSettings SubscriptionConfig pubsub.SubscriptionConfig ClientOptions []option.ClientOption // Unmarshaler transforms the client library format into watermill/message.Message. // Use a custom unmarshaler if needed, otherwise the default Unmarshaler should cover most use cases. Unmarshaler Unmarshaler }
type SubscriptionNameFn ¶
func TopicSubscriptionNameWithSuffix ¶
func TopicSubscriptionNameWithSuffix(suffix string) SubscriptionNameFn
TopicSubscriptionNameWithSuffix uses the topic name with a chosen suffix as the subscription name.
type Unmarshaler ¶
Unmarshaler transforms a Google Cloud client library Message into the Waterfall Message.
func NewOrderingUnmarshaler ¶
func NewOrderingUnmarshaler(extractOrderingKey ExtractOrderingKey) Unmarshaler