pubsub

package
v0.0.0-...-6e5c0fe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2025 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package pubsub provides a generic way to batch pubsub pull notifications.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Options

type Options struct {
	// ReceiveDuration limits the duration of the Process() execution.
	//
	// It actually determines how long to receive pubsub messages for.
	ReceiveDuration time.Duration

	// MaxBatchSize limits how many messages to process in a single batch.
	MaxBatchSize int

	// ConcurrentBatches controls the number of batches being processed
	// concurrently.
	ConcurrentBatches int
}

Options control the operation of a PullingBatchProcessor.

type ProcessBatchFunc

type ProcessBatchFunc func(context.Context, []*pubsub.Message) error

ProcessBatchFunc is the signature that the batch processing function needs to have.

type PullingBatchProcessor

type PullingBatchProcessor struct {
	// ProcessBatch is a function to handle one batch of messages.
	//
	// The messages aren't yet ack-ed when they are passed to this func.
	// The func is allowed to Ack or Nack them as it sees fit.
	// This can be useful, for example, if processing some messages in a batch
	// succeeds and fails on others.
	//
	// As a fail-safe, PullingBatchProcessor will **always** call Nack() or
	// Ack() on all the messages after ProcessBatch completes.
	// This is fine because PubSub client ignores any subsequent calls to Nack
	// or Ack, thus the fail-safe won't override any Nack/Ack previously issued
	// by the ProcessBatch.
	//
	// The fail-safe uses Nack() if the error returned by the ProcessBatch is
	// transient, thus asking for re-delivery and an eventual retry.
	// Ack() is used otherwise, which prevents retries on permanent errors.
	ProcessBatch ProcessBatchFunc

	// ProjectID is the project id for the subscription below.
	ProjectID string

	// SubID is the id of the pubsub subscription to pull messages from.
	// It must exist.
	SubID string

	// Options are optional. Only nonzero fields will be applied.
	Options Options
}

PullingBatchProcessor batches notifications pulled from a pubsub subscription, and calls a custom process function on each batch.

Provides an endpoint to be called by e.g. a cron job that starts the message pulling and processing cycle for the specified time. (See .Process())

func (*PullingBatchProcessor) Process

func (pbp *PullingBatchProcessor) Process(ctx context.Context) error

Process is the endpoint that (e.g. by cron job) should be periodically hit to operate the PullingBatchProcessor.

It creates the pubsub client and processes notifications for up to Options.CronRunTime.

func (*PullingBatchProcessor) Validate

func (pbp *PullingBatchProcessor) Validate() error

Validate checks missing required fields and normalizes options.

type TestPSServer

type TestPSServer struct {
	*pstest.Server

	Client *pubsub.Client
	ProjID string
	SubID  string
	// contains filtered or unexported fields
}

TestPSServer embeds a pubsub test server for testing PullingBatchProcessor.

It also exposes a client that can be used to talk to the test server and the fields needed to retrieve messages from a subscription.

func NewTestPSServer

func NewTestPSServer(ctx context.Context) (*TestPSServer, error)

NewTestPSServer starts a test pubsub server, binds a client to it, creates a default topic and subscription.

func (*TestPSServer) Close

func (mps *TestPSServer) Close() error

Close closes the embedded server and also the pubsub client and underlying grpc connection.

func (*TestPSServer) PublishTestMessages

func (mps *TestPSServer) PublishTestMessages(n int) stringset.Set

PublishTestMessages puts `n` distinct messages into the test servers default topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL