pubsub

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2023 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package pubsub provides pub-sub functionality. It provides a Hub for simple subscription & broadcasting management. It also provides a Broker for path-based topic routing to pub-sub event handlers based on the github.com/labstack/echo/v4 framework's routing system, as well as orchestration of on-demand publishers.

Index

Constants

View Source
const (
	MethodPub     = "PUB"
	MethodSub     = "SUB"
	MethodUnsub   = "UNSUB"
	RouteNotFound = "pubsub_route_not_found"
)

Pub-sub broker event methods.

Variables

This section is empty.

Functions

func MethodNotAllowedHandler added in v0.4.0

func MethodNotAllowedHandler[HandlerContext Context](c HandlerContext) error

MethodNotAllowedHandler is the fallback handler used for handling pub-sub broker events with methods lacking a matching broker handler. Analogous to Echo's MethodNotAllowedHandler.

func NotFoundHandler added in v0.4.0

func NotFoundHandler[HandlerContext Context](c HandlerContext) error

NotFoundHandler is the fallback handler used for handling pub-sub broker events on topics lacking a matching broker handler. Analogous to Echo's NotFoundHandler.

Types

type BroadcastingChange

type BroadcastingChange struct {
	Added   []string
	Removed []string
}

BroadcastingChange is an event record listing all topics which were added to the Hub and all topics which were removed from the Hub since the last emitted BroadcastingChange.

type Broker added in v0.4.0

type Broker[HandlerContext Context, Message any] struct {
	// contains filtered or unexported fields
}

Broker is the top-level pub-sub framework for routing pub-sub events to handlers. Analogous to Echo's Echo.

func NewBroker added in v0.4.0

func NewBroker[HandlerContext Context, Message any](logger Logger) *Broker[HandlerContext, Message]

NewBroker creates an instance of Broker.

func (*Broker[HandlerContext, Message]) Add added in v0.4.0

func (b *Broker[HandlerContext, Message]) Add(
	method, topic string, handler HandlerFunc[HandlerContext],
	middleware ...MiddlewareFunc[HandlerContext],
) *Route

Add registers a new route for a pub-sub broker event method and topic with matching handler in the router, with optional route-level middleware. Analogous to Echo's Echo.Add.

func (*Broker[HandlerContext, Message]) CancelPub added in v0.4.0

func (b *Broker[HandlerContext, Message]) CancelPub(topic string)

CancelPub cancels the PUB handler goroutine for the topic started by Broker.TriggerPub. Useful for writing alternatives to Broker.Serve.

func (*Broker[HandlerContext, Message]) GetHandler added in v0.4.0

func (b *Broker[HandlerContext, Message]) GetHandler(
	method string, topic string, c *RouterContext[HandlerContext],
) HandlerFunc[HandlerContext]

GetHandler returns the handler associated in the router with the method and topic, with middleware (specified by Broker.Use) applied afterwards, and with the provided router context modified based on routing results. This is useful in combination with Broker.NewBrokerContext for triggering handlers for custom pub-sub broker event methods.

func (*Broker[HandlerContext, Message]) Hub added in v0.4.0

func (b *Broker[HandlerContext, Message]) Hub() *Hub[[]Message]

Hub returns the associated pub-sub Hub.

func (*Broker[HandlerContext, Message]) NewBrokerContext added in v0.4.0

func (b *Broker[HandlerContext, Message]) NewBrokerContext(
	ctx context.Context, method, topic string,
) *BrokerContext[HandlerContext, Message]

NewBrokerContext creates a new pub-sub broker event context. This is useful in combination with Broker.GetHandler for triggering handlers for custom pub-sub broker event methods.

func (*Broker[HandlerContext, Message]) PUB added in v0.4.0

func (b *Broker[HandlerContext, Message]) PUB(
	topic string, h HandlerFunc[HandlerContext], m ...MiddlewareFunc[HandlerContext],
) *Route

PUB registers a new PUB route for a topic with matching handler in the router, with optional route-level middleware. Refer to Broker.Serve for details on how PUB handlers are used.

func (*Broker[HandlerContext, Message]) SUB added in v0.4.0

func (b *Broker[HandlerContext, Message]) SUB(
	topic string, h HandlerFunc[HandlerContext], m ...MiddlewareFunc[HandlerContext],
) *Route

SUB registers a new SUB route for a topic with matching handler in the router, with optional route-level middleware. Refer to Broker.Subscribe for details on how SUB handlers are used.

func (*Broker[HandlerContext, Message]) Serve added in v0.4.0

func (b *Broker[HandlerContext, Message]) Serve(
	ctx context.Context, hc HandlerContextMaker[HandlerContext, Message],
) error

Serve launches and cancels PUB handlers based on the appearance and disappearance of subscriptions for the PUB handlers' corresponding topics. The PUB handler for a topic is started in a goroutine when a new subscription is added to the broker (or to the broker's Hub) on a topic which previously did not have associated subscriptions; and its context is canceled upon removal of the only remaining subscription for the topic. This way, exactly one instance of the PUB handler for a topic is run exactly when there is at least one subscriber on that topic. The Broker should not be used after the Serve method finishes running.

func (*Broker[HandlerContext, Message]) Subscribe added in v0.4.0

func (b *Broker[HandlerContext, Message]) Subscribe(
	ctx context.Context, topic string, hc HandlerContextMaker[HandlerContext, Message],
	broadcastHandler func(ctx context.Context, messages []Message) error,
) (finished <-chan struct{})

Subscribe runs the SUB handler for the topic and, if it does not produce an error, adds a subscription to the broker's Hub with a callback function to handle messages broadcast over the broker's Hub. When the context is canceled, the UNSUB handler is run. Any messages published on the broker's Hub (e.g. messages broadcast from [Context.Publish], [Context.Broadcast], or Hub.Broadcast) will be passed to the broadcast handler callback function.

func (*Broker[HandlerContext, Message]) TriggerPub added in v0.4.0

func (b *Broker[HandlerContext, Message]) TriggerPub(
	ctx context.Context, topic string, hc HandlerContextMaker[HandlerContext, Message],
) error

TriggerPub looks up and launches a goroutine running the PUB handler associated with the topic. It returns an error if the handler for that topic already started but has not yet been canceled by a call to Broker.CancelPub. Useful for writing alternatives to Broker.Serve.

func (*Broker[HandlerContext, Message]) TriggerSub added in v0.4.0

func (b *Broker[HandlerContext, Message]) TriggerSub(
	ctx context.Context, topic string, hc HandlerContextMaker[HandlerContext, Message],
) error

TriggerSub looks up and runs the SUB handler associated with the topic.

func (*Broker[HandlerContext, Message]) TriggerUnsub added in v0.4.0

func (b *Broker[HandlerContext, Message]) TriggerUnsub(
	ctx context.Context, topic string, hc HandlerContextMaker[HandlerContext, Message],
)

TriggerUnsub looks up and runs the UNSUB handler associated with the topic.

func (*Broker[HandlerContext, Message]) UNSUB added in v0.4.0

func (b *Broker[HandlerContext, Message]) UNSUB(
	topic string, h HandlerFunc[HandlerContext], m ...MiddlewareFunc[HandlerContext],
) *Route

UNSUB registers a new UNSUB route for a topic with matching handler in the router, with optional route-level middleware. Refer to Broker.Subscribe for details on how UNSUB handlers are used.

func (*Broker[HandlerContext, Message]) Use added in v0.4.0

func (b *Broker[HandlerContext, Message]) Use(middleware ...MiddlewareFunc[HandlerContext])

Use adds middleware to the chain which is run after the router. Analogous to Echo's Echo.Use.

type BrokerContext added in v0.4.0

type BrokerContext[HandlerContext Context, Message any] struct {
	// contains filtered or unexported fields
}

BrokerContext represents the broker's portion of the context of the current pub-sub broker event. Usually should be an embedded type in HandlerContext, which is analogous to Echo's Context.

func (*BrokerContext[HandlerContext, Message]) Broadcast added in v0.4.2

func (c *BrokerContext[HandlerContext, Message]) Broadcast(topic string, messages ...Message)

Broadcast broadcasts the messages over the associated broker's pub-sub Hub, on the specified topic.

func (*BrokerContext[HandlerContext, Message]) Context added in v0.4.0

func (c *BrokerContext[HandlerContext, Message]) Context() context.Context

Context returns the associated cancellation context.Context. Analogous to Echo's Context.Request().Context.

func (*BrokerContext[HandlerContext, Message]) Hub added in v0.4.0

func (c *BrokerContext[HandlerContext, Message]) Hub() *Hub[[]Message]

Hub returns the associated broker's pub-sub Hub.

func (*BrokerContext[HandlerContext, Message]) Logger added in v0.4.0

func (c *BrokerContext[HandlerContext, Message]) Logger() Logger

Logger returns the associated Logger instance. Analogous to Echo's Context.Logger.

func (*BrokerContext[HandlerContext, Message]) Method added in v0.4.0

func (c *BrokerContext[HandlerContext, Message]) Method() string

Method returns the associated pub-sub broker event method. Analogous to Echo's Context.Request().Method.

func (*BrokerContext[HandlerContext, Message]) Param added in v0.4.0

func (c *BrokerContext[HandlerContext, Message]) Param(name string) string

Param returns the topic's path parameter value by name. Analogous to Echo's Context.Param.

func (*BrokerContext[HandlerContext, Message]) ParamNames added in v0.4.0

func (c *BrokerContext[HandlerContext, Message]) ParamNames() []string

ParamNames returns the path parameter names. Analogous to Echo's Context.ParamNames.

func (*BrokerContext[HandlerContext, Message]) ParamValues added in v0.4.0

func (c *BrokerContext[HandlerContext, Message]) ParamValues() []string

ParamValues returns the path parameter values. Analogous to Echo's Context.ParamValues.

func (*BrokerContext[HandlerContext, Message]) Path added in v0.4.0

func (c *BrokerContext[HandlerContext, Message]) Path() string

Path returns the registered path for the handler. Analogous to Echo's Context.Path.

func (*BrokerContext[HandlerContext, Message]) Publish added in v0.4.0

func (c *BrokerContext[HandlerContext, Message]) Publish(messages ...Message)

Publish broadcasts the messages over the associated broker's pub-sub Hub, on the same topic as the BrokerContext itself.

func (*BrokerContext[HandlerContext, Message]) QueryParams added in v0.5.0

func (c *BrokerContext[HandlerContext, Message]) QueryParams() (url.Values, error)

QueryParams returns the query parameters as url.Values, if the topic can be parsed as a request URI. Analogous to Echo's Context.QueryParams.

func (*BrokerContext[HandlerContext, Message]) RouterContext added in v0.4.0

func (c *BrokerContext[HandlerContext, Message]) RouterContext() *RouterContext[HandlerContext]

RouterContext returns the associated pub-sub broker routing context.

func (*BrokerContext[HandlerContext, Message]) Topic added in v0.4.0

func (c *BrokerContext[HandlerContext, Message]) Topic() string

Topic returns the associated pub-sub broker event topic. Analogous to Echo's Context.Request().URL.RequestURI.

type Context added in v0.4.0

type Context interface {
	// Method returns the handler method for the broker event, such as subscription. Analogous to
	// Echo's Context.Request().Method.
	Method() string
	// Method returns the handler topic for the broker event. Analogous to Echo's
	// Context.Request().URL.RequestURI method.
	Topic() string
}

Context is the type constraint for all broker handler contexts, representing the context of the current pub-sub broker event.

type HandlerContextMaker added in v0.4.0

type HandlerContextMaker[HandlerContext Context, Message any] func(
	c *BrokerContext[HandlerContext, Message],
) HandlerContext

HandlerContextMaker is a function for creating a HandlerContext instance from a BrokerContext instance created by Broker.NewBrokerContext.

type HandlerFunc added in v0.4.0

type HandlerFunc[HandlerContext Context] func(c HandlerContext) error

HandlerFunc defines a function to handle pub-sub events. Analogous to Echo's HandlerFunc.

type HandlerRouter added in v0.4.0

type HandlerRouter[HandlerContext Context] struct {
	// contains filtered or unexported fields
}

HandlerRouter is the registry of routes for event handler routing and topic path parameter parsing.

func NewHandlerRouter added in v0.4.0

func NewHandlerRouter[HandlerContext Context](maxParam *int) *HandlerRouter[HandlerContext]

NewHandlerRouter creates a new instance of HandlerRouter.

func (*HandlerRouter[HandlerContext]) Add added in v0.4.0

func (r *HandlerRouter[HandlerContext]) Add(method, path string, h HandlerFunc[HandlerContext])

Add registers a new route for a pub-sub broker event method and topic with matching handler.

func (*HandlerRouter[HandlerContext]) Find added in v0.4.0

func (r *HandlerRouter[HandlerContext]) Find(
	method, path string, ctx *RouterContext[HandlerContext],
)

Find modifies the router context with the results of attempting to match the method and path to a handler.

type Hub added in v0.4.0

type Hub[Message any] struct {
	// contains filtered or unexported fields
}

Hub coordinates broadcasting of messages between publishers and subscribers.

func NewHub added in v0.4.0

func NewHub[Message any](brChanges chan<- BroadcastingChange, logger Logger) *Hub[Message]

NewHub creates a Hub. If a send channel of BroadcastingChange events is provided, the Hub will emit events indicating topics added to the Hub (i.e. a subscription is added on a new topic) or removed from the Hub (i.e. all subscriptions on a topic were canceled) as those changes occur.

func (*Hub[Message]) Broadcast added in v0.4.0

func (h *Hub[Message]) Broadcast(topic string, message Message)

Broadcast emits a message to all subscriptions on the topic, blocking until completion of the receiver callback functions of all subscriptions; those callbacks are run in parallel. Subscriptions whose receiver callbacks returned errors are deactivated.

func (*Hub[Message]) Cancel added in v0.4.0

func (h *Hub[Message]) Cancel(topics ...string)

Cancel cancels all subscriptions on the topics.

func (*Hub[Message]) Close added in v0.4.0

func (h *Hub[Message]) Close()

Close cancels all subscriptions and destroys internal resources. If the Hub has an associated send channel of BroadcastingChange events, Close closes and forgets that channel without emitting an event listing all removed topics. The Hub should not be used after it's closed.

func (*Hub[Message]) Subscribe added in v0.4.0

func (h *Hub[Message]) Subscribe(
	ctx context.Context, topic string, receive ReceiveFunc[Message],
) (removed <-chan struct{})

Subscribe creates an active subscription on the topic. While the subscription is active, the receive callback function will be called on every message published for that topic. The subscription is active until the callback function returns an error on a message, the context is done, the topic is canceled on the hub, or the hub is closed. The returned channel is closed when the subscription becomes inactive.

type Logger added in v0.4.0

type Logger interface {
	Print(i ...interface{})
	Printf(format string, args ...interface{})
	Debug(i ...interface{})
	Debugf(format string, args ...interface{})
	Info(i ...interface{})
	Infof(format string, args ...interface{})
	Warn(i ...interface{})
	Warnf(format string, args ...interface{})
	Error(i ...interface{})
	Errorf(format string, args ...interface{})
	Fatal(i ...interface{})
	Fatalf(format string, args ...interface{})
	Panic(i ...interface{})
	Panicf(format string, args ...interface{})
}

Logger is a reduced interface for loggers.

type MiddlewareFunc added in v0.4.0

type MiddlewareFunc[HandlerContext Context] func(
	next HandlerFunc[HandlerContext],
) HandlerFunc[HandlerContext]

MiddlewareFunc defines a function to process middleware. Analogous to Echo's MiddlewareFunc.

type ReceiveFunc added in v0.4.0

type ReceiveFunc[Message any] func(message Message) error

ReceiveFunc is the callback function used to handle each message emitted over a subscription.

type Route added in v0.4.0

type Route struct {
	Method string `json:"method"`
	Path   string `json:"path"`
	Name   string `json:"name"`
}

Route contains a handler and information for matching against requests. Analogous to Echo's Route.

type Router added in v0.4.0

type Router[HandlerContext Context] interface {
	PUB(topic string, h HandlerFunc[HandlerContext], m ...MiddlewareFunc[HandlerContext]) *Route
	SUB(topic string, h HandlerFunc[HandlerContext], m ...MiddlewareFunc[HandlerContext]) *Route
	UNSUB(topic string, h HandlerFunc[HandlerContext], m ...MiddlewareFunc[HandlerContext]) *Route
}

Router is the subset of Broker methods for adding handlers to routes.

type RouterContext added in v0.4.0

type RouterContext[HandlerContext Context] struct {
	// contains filtered or unexported fields
}

RouterContext represents the subset of the context of the current pub-sub broker event related to routing events to handlers.

func (*RouterContext[HandlerContext]) Param added in v0.4.0

func (c *RouterContext[HandlerContext]) Param(name string) string

Param returns the path parameter value by name. Analogous to Echo's Context.Param.

func (*RouterContext[HandlerContext]) ParamNames added in v0.4.0

func (c *RouterContext[HandlerContext]) ParamNames() []string

ParamNames returns the path parameter names. Analogous to Echo's Context.ParamNames.

func (*RouterContext[HandlerContext]) ParamValues added in v0.4.0

func (c *RouterContext[HandlerContext]) ParamValues() []string

ParamValues returns the path parameter values. Analogous to Echo's Context.ParamValues.

func (*RouterContext[HandlerContext]) Path added in v0.4.0

func (c *RouterContext[HandlerContext]) Path() string

Path returns the registered path for the handler. Analogous to Echo's Context.Path.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL