Documentation
¶
Index ¶
- Constants
- func TrimLeftBytes(str string, maxBytes int) string
- func WithChannel(ch *Channel) func(*PubSub)
- func WithDeadLetter(ch *Channel) func(*PubSub)
- type Channel
- type Option
- type PubSub
- func (s *PubSub) Channel(channelID string) *Channel
- func (s *PubSub) Close() error
- func (s *PubSub) Configure(env *goboot.AppEnv) error
- func (s *PubSub) CreateAll() error
- func (s *PubSub) DeleteAll() error
- func (s *PubSub) DeleteChannel(channel string) error
- func (s *PubSub) EnsureSubscription(topicID string, subID string) error
- func (s *PubSub) EnsureTopic(topicID string) error
- func (s *PubSub) Init() error
- func (s *PubSub) Name() string
- func (s *PubSub) PublishEvent(ctx context.Context, channel string, eventName string, payload any) error
- func (s *PubSub) Receive(ctx context.Context, channel string, f func(context.Context, *RichMessage)) error
- func (s *PubSub) ReceiveNr(ctx context.Context, channel string, nrOfMessages int) ([]*RichMessage, error)
- func (s *PubSub) TryPublishEvent(ctx context.Context, channel string, eventName string, payload any)
- type RichMessage
- func (msg *RichMessage) DeadLetter(ctx context.Context, cause error) error
- func (msg *RichMessage) RetryableError(ctx context.Context, cause error) error
- func (msg *RichMessage) TryDeadLetter(ctx context.Context, cause error)
- func (msg *RichMessage) TryRetryableError(ctx context.Context, cause error)
Constants ¶
const ( DefaultDeadLetterName = "dead-letter" RetryDelay = time.Minute * 2 AckDeadline = 10 * time.Second MaxAttributeLength = 1024 )
defaultDeadLetterName is the name used to identity the dead letter channel if no other name was defined.
Variables ¶
This section is empty.
Functions ¶
func TrimLeftBytes ¶
TrimLeftBytes trims a string from the left until the string has max X bytes. Removes any invalid runes at the end.
func WithChannel ¶
WithChannel option adds a channel with a topic and a subscription.
The channel name is a self-chosen name separate from the topicID and subscriptionID to more easily reference the subscription in the rest of your codebase.
If you're not intending to receive any messages you can leave the subscriptionID empty. Be aware any messages sent to a topic without any subscription are essentially lost.
func WithDeadLetter ¶
WithDeadLetter option adds a deadletter channel to the Pub/Sub service.
The topic and optional subscription are automatically created if they don't exist already just like a regular channel.
Without a dead letter channel messages will get NACKed on error and retried until Google pubsub automatically removes them after 7 days. This can quickly fill up your queues so you're highly advised to always add a dead letter channel.
A RichMessage will get sent to the dead-letter channel if an unrecoverable error occurred or if the max message age has expired.
Like a normal channel the subscriptionID is optional but be aware messages sent to a topic without any subscriptions are dropped immediately. When the channel name is left empty the default name "dead-letter" is used instead.
Types ¶
type Channel ¶
type Channel struct { ID string TopicID string SubscriptionID string // MaxRetryAge is the time since publishing the message within a recoverable error // is still NACK'ed rather than dead-lettered. // // The default MaxRetryAge is 2 minutes. // // The max age prevents messages from being requeued and retried thousands of times // until Google pubsub deletes them automatically after 7 days. // // When no dead letter channel is configured a message will always be NACK'ed upon a // recoverable error. MaxRetryAge time.Duration }
Channel is a message channel containing a topic ID and optionally a subscription.
type PubSub ¶
type PubSub struct { *pubsub.Client Channels map[string]*Channel // DeadLetter is the channel used for dead letter messages. DeadLetterChannel *Channel // contains filtered or unexported fields }
PubSub adds some utility methods to the Google cloud PubSub such ensuring a topic and subscription exists and deadlettering.
It represents subscriptions and topics as a single message Channel as from an application perspective.
func NewPubSubService ¶
NewPubSubService configures a new Service and connects to the pubsub server.
func (*PubSub) Close ¶
Close releases any resources held by the pubsub Service such as memory and goroutines.
func (*PubSub) Configure ¶
Configure implements the AppService interface and instantiates the client connection to gcloud pubsub.
func (*PubSub) DeleteAll ¶
DeleteAll deletes all topics and subscriptions of all configured channels, including the dead-letter channel.
func (*PubSub) DeleteChannel ¶
DeleteChannel deletes the pubsub topic and subscription if they exist. If they don't exist nothing happens.
func (*PubSub) EnsureSubscription ¶
EnsureSubscription creates a subscription for specified topic. The topic must already exist.
In most cases you should use CreateAll instead.
The subscription is created with an ACK deadline of 10 seconds, meaning the message must be ACK'ed or NACK'ed within 10 seconds or else it will be re-delivered.
func (*PubSub) EnsureTopic ¶
EnsureTopic creates a topic with specified ID if it doesn't exist already. In most cases you should use CreateAll instead.
func (*PubSub) PublishEvent ¶
func (s *PubSub) PublishEvent(ctx context.Context, channel string, eventName string, payload any) error
PublishEvent publishes a message to the channel's topic and waits for it to be published on the server.
Google's pubsub batching is disabled by default which is only useful in very high-throughput use cases.
func (*PubSub) Receive ¶
func (s *PubSub) Receive(ctx context.Context, channel string, f func(context.Context, *RichMessage)) error
Receive starts receiving messages on specified channel.
It is similar to a normal google pubsub subscription receiver but returns RichMessages in specified callback.
type RichMessage ¶
RichMessage embeds the raw gcloud pubsub message with additional details and functions.
The PubSubRichMessage primarily helps handling retryable and unrecoverable errors.
func (*RichMessage) DeadLetter ¶
func (msg *RichMessage) DeadLetter(ctx context.Context, cause error) error
DeadLetter publishes a copy of a message to the deadletter channel and ACK's the original message.
If for some reason deadlettering the message failed an error is logged and the original message is NACK'ed.
The dead letter message adds extra attributes to the original message.
The method returns an error if neither neither ACKing or NACKing is possible.
func (*RichMessage) RetryableError ¶
func (msg *RichMessage) RetryableError(ctx context.Context, cause error) error
RetryableError will NACK a message if it is within the max retry timespan, otherwise it will sent the message to a deadletter channel.
Returns an error if no deadlettering the message failed.
func (*RichMessage) TryDeadLetter ¶
func (msg *RichMessage) TryDeadLetter(ctx context.Context, cause error)
TryDeadLetter is the same as DeadLetter but logs any error rather than returning it.
Messages will be redelivered automatically if not ACKed or NACKed in time.
func (*RichMessage) TryRetryableError ¶
func (msg *RichMessage) TryRetryableError(ctx context.Context, cause error)
TryRetryableError is the same as RetryableError but logs any error rather than returning it.
Messages will be redelivered automatically if not ACKed or NACKed in time.