Documentation ¶
Overview ¶
Package pubsub provides pub-sub functionality. It provides a Hub for simple subscription & broadcasting management. It also provides a Broker for path-based topic routing to pub-sub event handlers based on the github.com/labstack/echo/v4 framework's routing system, as well as orchestration of on-demand publishers.
Index ¶
- Constants
- func MethodNotAllowedHandler[HandlerContext Context](c HandlerContext) error
- func NotFoundHandler[HandlerContext Context](c HandlerContext) error
- type BroadcastingChange
- type Broker
- func (b *Broker[HandlerContext, Message]) Add(method, topic string, handler HandlerFunc[HandlerContext], ...) *Route
- func (b *Broker[HandlerContext, Message]) CancelPub(topic string)
- func (b *Broker[HandlerContext, Message]) GetHandler(method string, topic string, c *RouterContext[HandlerContext]) HandlerFunc[HandlerContext]
- func (b *Broker[HandlerContext, Message]) Hub() *Hub[[]Message]
- func (b *Broker[HandlerContext, Message]) NewBrokerContext(ctx context.Context, method, topic string) *BrokerContext[HandlerContext, Message]
- func (b *Broker[HandlerContext, Message]) PUB(topic string, h HandlerFunc[HandlerContext], ...) *Route
- func (b *Broker[HandlerContext, Message]) SUB(topic string, h HandlerFunc[HandlerContext], ...) *Route
- func (b *Broker[HandlerContext, Message]) Serve(ctx context.Context, hc HandlerContextMaker[HandlerContext, Message]) error
- func (b *Broker[HandlerContext, Message]) Subscribe(ctx context.Context, topic string, ...) (finished <-chan struct{})
- func (b *Broker[HandlerContext, Message]) TriggerPub(ctx context.Context, topic string, ...) error
- func (b *Broker[HandlerContext, Message]) TriggerSub(ctx context.Context, topic string, ...) error
- func (b *Broker[HandlerContext, Message]) TriggerUnsub(ctx context.Context, topic string, ...)
- func (b *Broker[HandlerContext, Message]) UNSUB(topic string, h HandlerFunc[HandlerContext], ...) *Route
- func (b *Broker[HandlerContext, Message]) Use(middleware ...MiddlewareFunc[HandlerContext])
- type BrokerContext
- func (c *BrokerContext[HandlerContext, Message]) Broadcast(topic string, messages ...Message)
- func (c *BrokerContext[HandlerContext, Message]) Context() context.Context
- func (c *BrokerContext[HandlerContext, Message]) Hub() *Hub[[]Message]
- func (c *BrokerContext[HandlerContext, Message]) Logger() Logger
- func (c *BrokerContext[HandlerContext, Message]) Method() string
- func (c *BrokerContext[HandlerContext, Message]) Param(name string) string
- func (c *BrokerContext[HandlerContext, Message]) ParamNames() []string
- func (c *BrokerContext[HandlerContext, Message]) ParamValues() []string
- func (c *BrokerContext[HandlerContext, Message]) Path() string
- func (c *BrokerContext[HandlerContext, Message]) Publish(messages ...Message)
- func (c *BrokerContext[HandlerContext, Message]) QueryParams() (url.Values, error)
- func (c *BrokerContext[HandlerContext, Message]) RouterContext() *RouterContext[HandlerContext]
- func (c *BrokerContext[HandlerContext, Message]) Topic() string
- type Context
- type HandlerContextMaker
- type HandlerFunc
- type HandlerRouter
- type Hub
- type Logger
- type MiddlewareFunc
- type ReceiveFunc
- type Route
- type Router
- type RouterContext
Constants ¶
const ( MethodPub = "PUB" MethodSub = "SUB" MethodUnsub = "UNSUB" RouteNotFound = "pubsub_route_not_found" )
Pub-sub broker event methods.
Variables ¶
This section is empty.
Functions ¶
func MethodNotAllowedHandler ¶ added in v0.4.0
MethodNotAllowedHandler is the fallback handler used for handling pub-sub broker events with methods lacking a matching broker handler. Analogous to Echo's MethodNotAllowedHandler.
func NotFoundHandler ¶ added in v0.4.0
NotFoundHandler is the fallback handler used for handling pub-sub broker events on topics lacking a matching broker handler. Analogous to Echo's NotFoundHandler.
Types ¶
type BroadcastingChange ¶
BroadcastingChange is an event record listing all topics which were added to the Hub and all topics which were removed from the Hub since the last emitted BroadcastingChange.
type Broker ¶ added in v0.4.0
type Broker[HandlerContext Context, Message any] struct { // contains filtered or unexported fields }
Broker is the top-level pub-sub framework for routing pub-sub events to handlers. Analogous to Echo's Echo.
func (*Broker[HandlerContext, Message]) Add ¶ added in v0.4.0
func (b *Broker[HandlerContext, Message]) Add( method, topic string, handler HandlerFunc[HandlerContext], middleware ...MiddlewareFunc[HandlerContext], ) *Route
Add registers a new route for a pub-sub broker event method and topic with matching handler in the router, with optional route-level middleware. Analogous to Echo's Echo.Add.
func (*Broker[HandlerContext, Message]) CancelPub ¶ added in v0.4.0
CancelPub cancels the PUB handler goroutine for the topic started by Broker.TriggerPub. Useful for writing alternatives to Broker.Serve.
func (*Broker[HandlerContext, Message]) GetHandler ¶ added in v0.4.0
func (b *Broker[HandlerContext, Message]) GetHandler( method string, topic string, c *RouterContext[HandlerContext], ) HandlerFunc[HandlerContext]
GetHandler returns the handler associated in the router with the method and topic, with middleware (specified by Broker.Use) applied afterwards, and with the provided router context modified based on routing results. This is useful in combination with Broker.NewBrokerContext for triggering handlers for custom pub-sub broker event methods.
func (*Broker[HandlerContext, Message]) Hub ¶ added in v0.4.0
Hub returns the associated pub-sub Hub.
func (*Broker[HandlerContext, Message]) NewBrokerContext ¶ added in v0.4.0
func (b *Broker[HandlerContext, Message]) NewBrokerContext( ctx context.Context, method, topic string, ) *BrokerContext[HandlerContext, Message]
NewBrokerContext creates a new pub-sub broker event context. This is useful in combination with Broker.GetHandler for triggering handlers for custom pub-sub broker event methods.
func (*Broker[HandlerContext, Message]) PUB ¶ added in v0.4.0
func (b *Broker[HandlerContext, Message]) PUB( topic string, h HandlerFunc[HandlerContext], m ...MiddlewareFunc[HandlerContext], ) *Route
PUB registers a new PUB route for a topic with matching handler in the router, with optional route-level middleware. Refer to Broker.Serve for details on how PUB handlers are used.
func (*Broker[HandlerContext, Message]) SUB ¶ added in v0.4.0
func (b *Broker[HandlerContext, Message]) SUB( topic string, h HandlerFunc[HandlerContext], m ...MiddlewareFunc[HandlerContext], ) *Route
SUB registers a new SUB route for a topic with matching handler in the router, with optional route-level middleware. Refer to Broker.Subscribe for details on how SUB handlers are used.
func (*Broker[HandlerContext, Message]) Serve ¶ added in v0.4.0
func (b *Broker[HandlerContext, Message]) Serve( ctx context.Context, hc HandlerContextMaker[HandlerContext, Message], ) error
Serve launches and cancels PUB handlers based on the appearance and disappearance of subscriptions for the PUB handlers' corresponding topics. The PUB handler for a topic is started in a goroutine when a new subscription is added to the broker (or to the broker's Hub) on a topic which previously did not have associated subscriptions; and its context is canceled upon removal of the only remaining subscription for the topic. This way, exactly one instance of the PUB handler for a topic is run exactly when there is at least one subscriber on that topic. The Broker should not be used after the Serve method finishes running.
func (*Broker[HandlerContext, Message]) Subscribe ¶ added in v0.4.0
func (b *Broker[HandlerContext, Message]) Subscribe( ctx context.Context, topic string, hc HandlerContextMaker[HandlerContext, Message], broadcastHandler func(ctx context.Context, messages []Message) error, ) (finished <-chan struct{})
Subscribe runs the SUB handler for the topic and, if it does not produce an error, adds a subscription to the broker's Hub with a callback function to handle messages broadcast over the broker's Hub. When the context is canceled, the UNSUB handler is run. Any messages published on the broker's Hub (e.g. messages broadcast from [Context.Publish], [Context.Broadcast], or Hub.Broadcast) will be passed to the broadcast handler callback function.
func (*Broker[HandlerContext, Message]) TriggerPub ¶ added in v0.4.0
func (b *Broker[HandlerContext, Message]) TriggerPub( ctx context.Context, topic string, hc HandlerContextMaker[HandlerContext, Message], ) error
TriggerPub looks up and launches a goroutine running the PUB handler associated with the topic. It returns an error if the handler for that topic already started but has not yet been canceled by a call to Broker.CancelPub. Useful for writing alternatives to Broker.Serve.
func (*Broker[HandlerContext, Message]) TriggerSub ¶ added in v0.4.0
func (b *Broker[HandlerContext, Message]) TriggerSub( ctx context.Context, topic string, hc HandlerContextMaker[HandlerContext, Message], ) error
TriggerSub looks up and runs the SUB handler associated with the topic.
func (*Broker[HandlerContext, Message]) TriggerUnsub ¶ added in v0.4.0
func (b *Broker[HandlerContext, Message]) TriggerUnsub( ctx context.Context, topic string, hc HandlerContextMaker[HandlerContext, Message], )
TriggerUnsub looks up and runs the UNSUB handler associated with the topic.
func (*Broker[HandlerContext, Message]) UNSUB ¶ added in v0.4.0
func (b *Broker[HandlerContext, Message]) UNSUB( topic string, h HandlerFunc[HandlerContext], m ...MiddlewareFunc[HandlerContext], ) *Route
UNSUB registers a new UNSUB route for a topic with matching handler in the router, with optional route-level middleware. Refer to Broker.Subscribe for details on how UNSUB handlers are used.
func (*Broker[HandlerContext, Message]) Use ¶ added in v0.4.0
func (b *Broker[HandlerContext, Message]) Use(middleware ...MiddlewareFunc[HandlerContext])
Use adds middleware to the chain which is run after the router. Analogous to Echo's Echo.Use.
type BrokerContext ¶ added in v0.4.0
type BrokerContext[HandlerContext Context, Message any] struct { // contains filtered or unexported fields }
BrokerContext represents the broker's portion of the context of the current pub-sub broker event. Usually should be an embedded type in HandlerContext, which is analogous to Echo's Context.
func (*BrokerContext[HandlerContext, Message]) Broadcast ¶ added in v0.4.2
func (c *BrokerContext[HandlerContext, Message]) Broadcast(topic string, messages ...Message)
Broadcast broadcasts the messages over the associated broker's pub-sub Hub, on the specified topic.
func (*BrokerContext[HandlerContext, Message]) Context ¶ added in v0.4.0
func (c *BrokerContext[HandlerContext, Message]) Context() context.Context
Context returns the associated cancellation context.Context. Analogous to Echo's Context.Request().Context.
func (*BrokerContext[HandlerContext, Message]) Hub ¶ added in v0.4.0
func (c *BrokerContext[HandlerContext, Message]) Hub() *Hub[[]Message]
Hub returns the associated broker's pub-sub Hub.
func (*BrokerContext[HandlerContext, Message]) Logger ¶ added in v0.4.0
func (c *BrokerContext[HandlerContext, Message]) Logger() Logger
Logger returns the associated Logger instance. Analogous to Echo's Context.Logger.
func (*BrokerContext[HandlerContext, Message]) Method ¶ added in v0.4.0
func (c *BrokerContext[HandlerContext, Message]) Method() string
Method returns the associated pub-sub broker event method. Analogous to Echo's Context.Request().Method.
func (*BrokerContext[HandlerContext, Message]) Param ¶ added in v0.4.0
func (c *BrokerContext[HandlerContext, Message]) Param(name string) string
Param returns the topic's path parameter value by name. Analogous to Echo's Context.Param.
func (*BrokerContext[HandlerContext, Message]) ParamNames ¶ added in v0.4.0
func (c *BrokerContext[HandlerContext, Message]) ParamNames() []string
ParamNames returns the path parameter names. Analogous to Echo's Context.ParamNames.
func (*BrokerContext[HandlerContext, Message]) ParamValues ¶ added in v0.4.0
func (c *BrokerContext[HandlerContext, Message]) ParamValues() []string
ParamValues returns the path parameter values. Analogous to Echo's Context.ParamValues.
func (*BrokerContext[HandlerContext, Message]) Path ¶ added in v0.4.0
func (c *BrokerContext[HandlerContext, Message]) Path() string
Path returns the registered path for the handler. Analogous to Echo's Context.Path.
func (*BrokerContext[HandlerContext, Message]) Publish ¶ added in v0.4.0
func (c *BrokerContext[HandlerContext, Message]) Publish(messages ...Message)
Publish broadcasts the messages over the associated broker's pub-sub Hub, on the same topic as the BrokerContext itself.
func (*BrokerContext[HandlerContext, Message]) QueryParams ¶ added in v0.5.0
func (c *BrokerContext[HandlerContext, Message]) QueryParams() (url.Values, error)
QueryParams returns the query parameters as url.Values, if the topic can be parsed as a request URI. Analogous to Echo's Context.QueryParams.
func (*BrokerContext[HandlerContext, Message]) RouterContext ¶ added in v0.4.0
func (c *BrokerContext[HandlerContext, Message]) RouterContext() *RouterContext[HandlerContext]
RouterContext returns the associated pub-sub broker routing context.
func (*BrokerContext[HandlerContext, Message]) Topic ¶ added in v0.4.0
func (c *BrokerContext[HandlerContext, Message]) Topic() string
Topic returns the associated pub-sub broker event topic. Analogous to Echo's Context.Request().URL.RequestURI.
type Context ¶ added in v0.4.0
type Context interface { // Method returns the handler method for the broker event, such as subscription. Analogous to // Echo's Context.Request().Method. Method() string // Method returns the handler topic for the broker event. Analogous to Echo's // Context.Request().URL.RequestURI method. Topic() string }
Context is the type constraint for all broker handler contexts, representing the context of the current pub-sub broker event.
type HandlerContextMaker ¶ added in v0.4.0
type HandlerContextMaker[HandlerContext Context, Message any] func( c *BrokerContext[HandlerContext, Message], ) HandlerContext
HandlerContextMaker is a function for creating a HandlerContext instance from a BrokerContext instance created by Broker.NewBrokerContext.
type HandlerFunc ¶ added in v0.4.0
HandlerFunc defines a function to handle pub-sub events. Analogous to Echo's HandlerFunc.
type HandlerRouter ¶ added in v0.4.0
type HandlerRouter[HandlerContext Context] struct { // contains filtered or unexported fields }
HandlerRouter is the registry of routes for event handler routing and topic path parameter parsing.
func NewHandlerRouter ¶ added in v0.4.0
func NewHandlerRouter[HandlerContext Context](maxParam *int) *HandlerRouter[HandlerContext]
NewHandlerRouter creates a new instance of HandlerRouter.
func (*HandlerRouter[HandlerContext]) Add ¶ added in v0.4.0
func (r *HandlerRouter[HandlerContext]) Add(method, path string, h HandlerFunc[HandlerContext])
Add registers a new route for a pub-sub broker event method and topic with matching handler.
func (*HandlerRouter[HandlerContext]) Find ¶ added in v0.4.0
func (r *HandlerRouter[HandlerContext]) Find( method, path string, ctx *RouterContext[HandlerContext], )
Find modifies the router context with the results of attempting to match the method and path to a handler.
type Hub ¶ added in v0.4.0
type Hub[Message any] struct { // contains filtered or unexported fields }
Hub coordinates broadcasting of messages between publishers and subscribers.
func NewHub ¶ added in v0.4.0
func NewHub[Message any](brChanges chan<- BroadcastingChange, logger Logger) *Hub[Message]
NewHub creates a Hub. If a send channel of BroadcastingChange events is provided, the Hub will emit events indicating topics added to the Hub (i.e. a subscription is added on a new topic) or removed from the Hub (i.e. all subscriptions on a topic were canceled) as those changes occur.
func (*Hub[Message]) Broadcast ¶ added in v0.4.0
Broadcast emits a message to all subscriptions on the topic, blocking until completion of the receiver callback functions of all subscriptions; those callbacks are run in parallel. Subscriptions whose receiver callbacks returned errors are deactivated.
func (*Hub[Message]) Close ¶ added in v0.4.0
func (h *Hub[Message]) Close()
Close cancels all subscriptions and destroys internal resources. If the Hub has an associated send channel of BroadcastingChange events, Close closes and forgets that channel without emitting an event listing all removed topics. The Hub should not be used after it's closed.
func (*Hub[Message]) Subscribe ¶ added in v0.4.0
func (h *Hub[Message]) Subscribe( ctx context.Context, topic string, receive ReceiveFunc[Message], ) (removed <-chan struct{})
Subscribe creates an active subscription on the topic. While the subscription is active, the receive callback function will be called on every message published for that topic. The subscription is active until the callback function returns an error on a message, the context is done, the topic is canceled on the hub, or the hub is closed. The returned channel is closed when the subscription becomes inactive.
type Logger ¶ added in v0.4.0
type Logger interface { Print(i ...interface{}) Printf(format string, args ...interface{}) Debug(i ...interface{}) Debugf(format string, args ...interface{}) Info(i ...interface{}) Infof(format string, args ...interface{}) Warn(i ...interface{}) Warnf(format string, args ...interface{}) Error(i ...interface{}) Errorf(format string, args ...interface{}) Fatal(i ...interface{}) Fatalf(format string, args ...interface{}) Panic(i ...interface{}) Panicf(format string, args ...interface{}) }
Logger is a reduced interface for loggers.
type MiddlewareFunc ¶ added in v0.4.0
type MiddlewareFunc[HandlerContext Context] func( next HandlerFunc[HandlerContext], ) HandlerFunc[HandlerContext]
MiddlewareFunc defines a function to process middleware. Analogous to Echo's MiddlewareFunc.
type ReceiveFunc ¶ added in v0.4.0
ReceiveFunc is the callback function used to handle each message emitted over a subscription.
type Route ¶ added in v0.4.0
type Route struct { Method string `json:"method"` Path string `json:"path"` Name string `json:"name"` }
Route contains a handler and information for matching against requests. Analogous to Echo's Route.
type Router ¶ added in v0.4.0
type Router[HandlerContext Context] interface { PUB(topic string, h HandlerFunc[HandlerContext], m ...MiddlewareFunc[HandlerContext]) *Route SUB(topic string, h HandlerFunc[HandlerContext], m ...MiddlewareFunc[HandlerContext]) *Route UNSUB(topic string, h HandlerFunc[HandlerContext], m ...MiddlewareFunc[HandlerContext]) *Route }
Router is the subset of Broker methods for adding handlers to routes.
type RouterContext ¶ added in v0.4.0
type RouterContext[HandlerContext Context] struct { // contains filtered or unexported fields }
RouterContext represents the subset of the context of the current pub-sub broker event related to routing events to handlers.
func (*RouterContext[HandlerContext]) Param ¶ added in v0.4.0
func (c *RouterContext[HandlerContext]) Param(name string) string
Param returns the path parameter value by name. Analogous to Echo's Context.Param.
func (*RouterContext[HandlerContext]) ParamNames ¶ added in v0.4.0
func (c *RouterContext[HandlerContext]) ParamNames() []string
ParamNames returns the path parameter names. Analogous to Echo's Context.ParamNames.
func (*RouterContext[HandlerContext]) ParamValues ¶ added in v0.4.0
func (c *RouterContext[HandlerContext]) ParamValues() []string
ParamValues returns the path parameter values. Analogous to Echo's Context.ParamValues.
func (*RouterContext[HandlerContext]) Path ¶ added in v0.4.0
func (c *RouterContext[HandlerContext]) Path() string
Path returns the registered path for the handler. Analogous to Echo's Context.Path.