Documentation ¶
Overview ¶
Package pubsub provides default implementations for using Google Cloud PubSub with the runtimes in the queue package.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BatchAcknowledgeProcessor ¶
type BatchAcknowledgeProcessor struct {
// contains filtered or unexported fields
}
BatchAcknowledgeProcessor concurrently processes and acknowledges PubSub messages.
func NewBatchAcknowledgeProcessor ¶
func NewBatchAcknowledgeProcessor(opts ...BatchAcknowledgeProcessorOption) *BatchAcknowledgeProcessor
NewBatchAcknowledgeProcessor returns a fully initialized BatchAcknowledgeProcessor.
func (*BatchAcknowledgeProcessor) Process ¶
func (p *BatchAcknowledgeProcessor) Process(ctx context.Context, msgs []*pubsubpb.ReceivedMessage) error
Process implements the queue.Processor interface.
Each message is processed concurrently using the processor that was provided to the BatchAcknowledgeProcessor when it was created. If the inner processor returns an error for a message, it will not be acknowledged in PubSub and will be reprocessed after the VisibilityTimeout expires. If no error is returned, the message will be collected with the other messages from the slice, msgs, to be acknowledged together in a single Acknowledge request to PubSub.
type BatchAcknowledgeProcessorOption ¶
type BatchAcknowledgeProcessorOption interface {
// contains filtered or unexported methods
}
BatchAcknowledgeProcessorOption are options for configuring the BatchAknowledgeProcessor.
func Processor ¶
func Processor(p queue.Processor[*pubsubpb.ReceivedMessage]) BatchAcknowledgeProcessorOption
Processor configures the underlying single message queue.Processor which the BatchAcknowledgeProcessor calls when concurrently processing a batch of messages.
type CommonOption ¶
type CommonOption interface { ConsumerOption BatchAcknowledgeProcessorOption }
CommonOption are options common to all Google Cloud PubSub related consumers and processors.
func Client ¶
func Client(c *pubsub.SubscriberClient) CommonOption
Client configures the underlying PubSub client.
func LogHandler ¶
func LogHandler(h slog.Handler) CommonOption
LogHandler configures the underlying slog.Handler.
func Subscription ¶
func Subscription(s string) CommonOption
Subscription configures the PubSub subscription id.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer consumes messages from Google Cloud PubSub.
func NewConsumer ¶
func NewConsumer(opts ...ConsumerOption) *Consumer
NewConsumer returns a fully initialized Consumer.
func (*Consumer) Consume ¶
Consume implements the queue.Consumer interface.
A PullRequest is sent to Google Cloud PubSub with the configured options (e.g. max number of messages, etc.). An error is only returned in the case where the PubSub request fails or PubSub returns zero messages. In the case of zero messages, the error, queue.ErrNoItem, is returned which allows the queue based runtimes to disregard this as a failure and retry consuming messages.
type ConsumerOption ¶
type ConsumerOption interface {
// contains filtered or unexported methods
}
ConsumerOption are options for configuring the Consumer.
func MaxNumOfMessages ¶
func MaxNumOfMessages(n int32) ConsumerOption
MaxNumOfMessages defines the maximum number of messages which Google Cloud PubSub will return in a single response.
PubSub never returns more messages than this value (however, fewer messages might be returned). Must be a positive integer.