pubsub

package
v0.0.0-...-d3da680 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2024 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Overview

Package pubsub allows to register handlers called by Cloud Pub/Sub.

The HTTP endpoints exposed by this module perform necessary authorization checks and route requests to registered handlers, collecting monitoring metrics from them.

Note that you still need to configure Cloud Pub/Sub Push subscriptions. By default registered handlers are exposed as "/internal/pubsub/<handler-id>" POST endpoints. This URL path should be used when configuring Cloud Pub/Sub subscriptions.

Cloud Pub/Sub push subscriptions used with this module must be configured to send Wrapped messages. See https://cloud.google.com/pubsub/docs/push for more.

Index

Constants

This section is empty.

Variables

View Source
var (
	// Ignore is an error tag used to indicate that the handler wants the
	// pub/sub message to be dropped as it is not useful (e.g. represents
	// an event that the service does not ingest, such a build state changing
	// to running when the service is only interested in build completions,
	// or a replay of a previous accepted message).
	//
	// This results in the pub/sub push handler returning status 204
	// (No Content) as opposed to status 200. This is particularly
	// useful for allowing SLOs to be defined over useful messages only.
	Ignore = errors.BoolTag{Key: errors.NewTagKey("the message should be dropped as not useful")}
)
View Source
var ModuleName = module.RegisterName("go.chromium.org/luci/server/pubsub")

ModuleName can be used to refer to this module when declaring dependencies.

Functions

func NewModule

func NewModule(opts *ModuleOptions) module.Module

NewModule returns a server module that sets up a pubsub dispatcher.

func NewModuleFromFlags

func NewModuleFromFlags() module.Module

NewModuleFromFlags is a variant of NewModule that initializes options through command line flags.

Calling this function registers flags in flag.CommandLine. They are usually parsed in server.Main(...).

func RegisterHandler

func RegisterHandler(id string, h Handler)

RegisterHandler is a shortcut for Default.RegisterHandler.

func RegisterJSONPBHandler

func RegisterJSONPBHandler[T any, TP interface {
	*T
	proto.Message
}](id string, h func(context.Context, Message, TP) error)

RegisterJSONPBHandler is a shortcut for Default.RegisterHandler(JSONPB(...)).

It can be used to register handlers that expect JSONPB-serialized protos of a concrete type.

func RegisterWirePBHandler

func RegisterWirePBHandler[T any, TP interface {
	*T
	proto.Message
}](id string, h func(context.Context, Message, TP) error)

RegisterWirePBHandler is a shortcut for Default.RegisterHandler(WirePB(...)).

It can be used to register handlers that expect wirepb-serialized protos of a concrete type.

Types

type Dispatcher

type Dispatcher struct {
	// AuthorizedCallers is a list of service accounts Cloud Pub/Sub may use to
	// call pub/sub HTTP endpoints.
	//
	// See https://cloud.google.com/pubsub/docs/authenticate-push-subscriptions for details.
	AuthorizedCallers []string

	// DisableAuth can be used to disable authentication on HTTP endpoints.
	//
	// This is useful when running in development mode on localhost or in tests.
	DisableAuth bool
	// contains filtered or unexported fields
}

Dispatcher routes requests from Cloud Pub/Sub to registered handlers.

var Default Dispatcher

Default is a dispatcher installed into the server when using NewModule or NewModuleFromFlags.

The module takes care of configuring this dispatcher based on the server environment and module's options.

You still need to register your handlers in it using RegisterHandler and configure Cloud Pub/Sub subscriptions that push to them.

func (*Dispatcher) InstallPubSubRoutes

func (d *Dispatcher) InstallPubSubRoutes(r *router.Router, prefix string)

InstallPubSubRoutes installs routes that handle requests from Cloud Pub/Sub.

func (*Dispatcher) RegisterHandler

func (d *Dispatcher) RegisterHandler(id string, h Handler)

RegisterHandler registers a callback called to handle a pubsub message.

The handler can be invoked via POST requests to "<serving-prefix>/<id>", (usually "/internal/pubsub/<id>"). This is the push endpoint that should be used when configuring Cloud Pub/Sub subscriptions. The Pub/Sub push subscription must be configured to send wrapped messages.

The ID must match `[a-zA-Z0-9_\-.]{1,100}`. Panics otherwise. Panics if a handler with such ID is already registered.

type Handler

type Handler func(ctx context.Context, message Message) error

Handler is called to handle a pub/sub message.

Transient errors are transformed into HTTP 500 replies to Cloud Pub/Sub, which may trigger a retry based on the pub/sub subscription retry policy. Returning a non-transient error results in a error-level logging message and HTTP 202 reply, which does not trigger a retry.

func JSONPB

func JSONPB[T any, TP interface {
	*T
	proto.Message
}](handler func(context.Context, Message, TP) error) Handler

JSONPB wraps a handler by deserializing messages as JSONPB protobufs before passing them to the handler.

func WirePB

func WirePB[T any, TP interface {
	*T
	proto.Message
}](handler func(context.Context, Message, TP) error) Handler

WirePB wraps a handler by deserializing messages as protobufs in wire encoding before passing them to the handler.

type Message

type Message struct {
	// Data is the PubSub message payload.
	Data []byte
	// Subscription is the full subscription name that pushed the message.
	// Format: projects/myproject/subscriptions/mysubscription.
	Subscription string
	// MessageID is the PubSub message ID.
	MessageID string
	// PublishTime is when the message was published.
	PublishTime time.Time
	// Attributes is PubSub message attributes of the published message.
	// Guaranteed to be non-nil.
	Attributes map[string]string
	// Query is the query part of the HTTP request string.
	// Guaranteed to be non-nil.
	Query url.Values
}

type ModuleOptions

type ModuleOptions struct {
	// Dispatcher is a dispatcher to use.
	//
	// Default is the global Default instance.
	Dispatcher *Dispatcher

	// ServingPrefix is a URL path prefix to serve registered pubsub handlers from.
	//
	// POSTs to a URL under this prefix (regardless which one) will be treated
	// as Cloud Pub/Sub message pushes.
	//
	// Must start with "/internal/". Default is "/internal/pubsub".
	ServingPrefix string

	// AuthorizedCallers is a list of service accounts Cloud Pub/Sub may use to
	// call pub/sub HTTP endpoints.
	//
	// See https://cloud.google.com/pubsub/docs/authenticate-push-subscriptions for details.
	//
	// This must be specified for Cloud Pub/Sub messages to be accepted.
	//
	// Default is an empty list.
	AuthorizedCallers []string
}

ModuleOptions contain configuration of the pubsub server module.

It will be used to initialize Default dispatcher.

func (*ModuleOptions) Register

func (o *ModuleOptions) Register(f *flag.FlagSet)

Register registers the command line flags.

Mutates `o` by populating defaults.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL