Documentation ¶
Overview ¶
Package pubsub provides a generic way to batch pubsub pull notifications.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Options ¶
type Options struct { // ReceiveDuration limits the duration of the Process() execution. // // It actually determines how long to receive pubsub messages for. ReceiveDuration time.Duration // MaxBatchSize limits how many messages to process in a single batch. MaxBatchSize int // ConcurrentBatches controls the number of batches being processed // concurrently. ConcurrentBatches int }
Options control the operation of a PullingBatchProcessor.
type ProcessBatchFunc ¶
ProcessBatchFunc is the signature that the batch processing function needs to have.
type PullingBatchProcessor ¶
type PullingBatchProcessor struct { // ProcessBatch is a function to handle one batch of messages. // // The messages aren't yet ack-ed when they are passed to this func. // The func is allowed to Ack or Nack them as it sees fit. // This can be useful, for example, if processing some messages in a batch // succeeds and fails on others. // // As a fail-safe, PullingBatchProcessor will **always** call Nack() or // Ack() on all the messages after ProcessBatch completes. // This is fine because PubSub client ignores any subsequent calls to Nack // or Ack, thus the fail-safe won't override any Nack/Ack previously issued // by the ProcessBatch. // // The fail-safe uses Nack() if the error returned by the ProcessBatch is // transient, thus asking for re-delivery and an eventual retry. // Ack() is used otherwise, which prevents retries on permanent errors. ProcessBatch ProcessBatchFunc // ProjectID is the project id for the subscription below. ProjectID string // SubID is the id of the pubsub subscription to pull messages from. // It must exist. SubID string // Options are optional. Only nonzero fields will be applied. Options Options }
PullingBatchProcessor batches notifications pulled from a pubsub subscription, and calls a custom process function on each batch.
Provides an endpoint to be called by e.g. a cron job that starts the message pulling and processing cycle for the specified time. (See .Process())
func (*PullingBatchProcessor) Process ¶
func (pbp *PullingBatchProcessor) Process(ctx context.Context) error
Process is the endpoint that (e.g. by cron job) should be periodically hit to operate the PullingBatchProcessor.
It creates the pubsub client and processes notifications for up to Options.CronRunTime.
func (*PullingBatchProcessor) Validate ¶
func (pbp *PullingBatchProcessor) Validate() error
Validate checks missing required fields and normalizes options.
type TestPSServer ¶
type TestPSServer struct { *pstest.Server Client *pubsub.Client ProjID string SubID string // contains filtered or unexported fields }
TestPSServer embeds a pubsub test server for testing PullingBatchProcessor.
It also exposes a client that can be used to talk to the test server and the fields needed to retrieve messages from a subscription.
func NewTestPSServer ¶
func NewTestPSServer(ctx context.Context) (*TestPSServer, error)
NewTestPSServer starts a test pubsub server, binds a client to it, creates a default topic and subscription.
func (*TestPSServer) Close ¶
func (mps *TestPSServer) Close() error
Close closes the embedded server and also the pubsub client and underlying grpc connection.
func (*TestPSServer) PublishTestMessages ¶
func (mps *TestPSServer) PublishTestMessages(n int) stringset.Set
PublishTestMessages puts `n` distinct messages into the test servers default topic.