Documentation ¶
Overview ¶
Package pubsub allows to register handlers called by Cloud Pub/Sub.
The HTTP endpoints exposed by this module perform necessary authorization checks and route requests to registered handlers, collecting monitoring metrics from them.
Note that you still need to configure Cloud Pub/Sub Push subscriptions. By default registered handlers are exposed as "/internal/pubsub/<handler-id>" POST endpoints. This URL path should be used when configuring Cloud Pub/Sub subscriptions.
Cloud Pub/Sub push subscriptions used with this module must be configured to send Wrapped messages. See https://cloud.google.com/pubsub/docs/push for more.
Index ¶
- Variables
- func NewModule(opts *ModuleOptions) module.Module
- func NewModuleFromFlags() module.Module
- func RegisterHandler(id string, h Handler)
- func RegisterJSONPBHandler[T any, TP interface{ ... }](id string, h func(context.Context, Message, TP) error)
- func RegisterWirePBHandler[T any, TP interface{ ... }](id string, h func(context.Context, Message, TP) error)
- type Dispatcher
- type Handler
- type Message
- type ModuleOptions
Constants ¶
This section is empty.
Variables ¶
var ( // Ignore is an error tag used to indicate that the handler wants the // pub/sub message to be dropped as it is not useful (e.g. represents // an event that the service does not ingest, such a build state changing // to running when the service is only interested in build completions, // or a replay of a previous accepted message). // // This results in the pub/sub push handler returning status 204 // (No Content) as opposed to status 200. This is particularly // useful for allowing SLOs to be defined over useful messages only. Ignore = errors.BoolTag{Key: errors.NewTagKey("the message should be dropped as not useful")} )
var ModuleName = module.RegisterName("go.chromium.org/luci/server/pubsub")
ModuleName can be used to refer to this module when declaring dependencies.
Functions ¶
func NewModule ¶
func NewModule(opts *ModuleOptions) module.Module
NewModule returns a server module that sets up a pubsub dispatcher.
func NewModuleFromFlags ¶
NewModuleFromFlags is a variant of NewModule that initializes options through command line flags.
Calling this function registers flags in flag.CommandLine. They are usually parsed in server.Main(...).
func RegisterHandler ¶
RegisterHandler is a shortcut for Default.RegisterHandler.
func RegisterJSONPBHandler ¶
func RegisterJSONPBHandler[T any, TP interface { *T proto.Message }](id string, h func(context.Context, Message, TP) error)
RegisterJSONPBHandler is a shortcut for Default.RegisterHandler(JSONPB(...)).
It can be used to register handlers that expect JSONPB-serialized protos of a concrete type.
func RegisterWirePBHandler ¶
func RegisterWirePBHandler[T any, TP interface { *T proto.Message }](id string, h func(context.Context, Message, TP) error)
RegisterWirePBHandler is a shortcut for Default.RegisterHandler(WirePB(...)).
It can be used to register handlers that expect wirepb-serialized protos of a concrete type.
Types ¶
type Dispatcher ¶
type Dispatcher struct { // AuthorizedCallers is a list of service accounts Cloud Pub/Sub may use to // call pub/sub HTTP endpoints. // // See https://cloud.google.com/pubsub/docs/authenticate-push-subscriptions for details. AuthorizedCallers []string // DisableAuth can be used to disable authentication on HTTP endpoints. // // This is useful when running in development mode on localhost or in tests. DisableAuth bool // contains filtered or unexported fields }
Dispatcher routes requests from Cloud Pub/Sub to registered handlers.
var Default Dispatcher
Default is a dispatcher installed into the server when using NewModule or NewModuleFromFlags.
The module takes care of configuring this dispatcher based on the server environment and module's options.
You still need to register your handlers in it using RegisterHandler and configure Cloud Pub/Sub subscriptions that push to them.
func (*Dispatcher) InstallPubSubRoutes ¶
func (d *Dispatcher) InstallPubSubRoutes(r *router.Router, prefix string)
InstallPubSubRoutes installs routes that handle requests from Cloud Pub/Sub.
func (*Dispatcher) RegisterHandler ¶
func (d *Dispatcher) RegisterHandler(id string, h Handler)
RegisterHandler registers a callback called to handle a pubsub message.
The handler can be invoked via POST requests to "<serving-prefix>/<id>", (usually "/internal/pubsub/<id>"). This is the push endpoint that should be used when configuring Cloud Pub/Sub subscriptions. The Pub/Sub push subscription must be configured to send wrapped messages.
The ID must match `[a-zA-Z0-9_\-.]{1,100}`. Panics otherwise. Panics if a handler with such ID is already registered.
type Handler ¶
Handler is called to handle a pub/sub message.
Transient errors are transformed into HTTP 500 replies to Cloud Pub/Sub, which may trigger a retry based on the pub/sub subscription retry policy. Returning a non-transient error results in a error-level logging message and HTTP 202 reply, which does not trigger a retry.
type Message ¶
type Message struct { // Data is the PubSub message payload. Data []byte // Subscription is the full subscription name that pushed the message. // Format: projects/myproject/subscriptions/mysubscription. Subscription string // MessageID is the PubSub message ID. MessageID string // PublishTime is when the message was published. PublishTime time.Time // Attributes is PubSub message attributes of the published message. // Guaranteed to be non-nil. Attributes map[string]string // Query is the query part of the HTTP request string. // Guaranteed to be non-nil. Query url.Values }
type ModuleOptions ¶
type ModuleOptions struct { // Dispatcher is a dispatcher to use. // // Default is the global Default instance. Dispatcher *Dispatcher // ServingPrefix is a URL path prefix to serve registered pubsub handlers from. // // POSTs to a URL under this prefix (regardless which one) will be treated // as Cloud Pub/Sub message pushes. // // Must start with "/internal/". Default is "/internal/pubsub". ServingPrefix string // AuthorizedCallers is a list of service accounts Cloud Pub/Sub may use to // call pub/sub HTTP endpoints. // // See https://cloud.google.com/pubsub/docs/authenticate-push-subscriptions for details. // // This must be specified for Cloud Pub/Sub messages to be accepted. // // Default is an empty list. AuthorizedCallers []string }
ModuleOptions contain configuration of the pubsub server module.
It will be used to initialize Default dispatcher.
func (*ModuleOptions) Register ¶
func (o *ModuleOptions) Register(f *flag.FlagSet)
Register registers the command line flags.
Mutates `o` by populating defaults.