pubsub

package module
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 7, 2022 License: LGPL-3.0 Imports: 10 Imported by: 31

README

pubsub

import "github.com/juju/pubsub/v2"

Package pubsub provides publish and subscribe functionality within a single process.

A message as far as a hub is concerned is defined by a topic, and a data blob. All subscribers that match the published topic are notified, and have their callback function called with both the topic and the data blob.

All subscribers get their own goroutine. This way slow consumers do not slow down the act of publishing, and slow consumers do not inferfere with other consumers. Subscribers are guaranteed to get the messages that match their topic matcher in the order that the messages were published to the hub.

This package defines two types of hubs.

  • Simple hubs
  • Structured hubs

Simple hubs just pass the datablob to the subscribers untouched. Structuctured hubs will serialize the datablob into a map[string]interface{} using the marshaller that was defined to create it. The subscription handler functions for structured hubs allow the handlers to define a structure for the datablob to be marshalled into.

Hander functions for a structured hub can get all the published data available by defining a callback with the signature:

func (Topic, map[string]interface{})

Or alternatively, define a struct type, and use that type as the second argument.

func (Topic, SomeStruct, error)

The structured hub will try to serialize the published information into the struct specified. If there is an error marshalling, that error is passed to the callback as the error parameter.

Variables

var JSONMarshaller = &jsonMarshaller{}

JSONMarshaller simply wraps the json.Marshal and json.Unmarshal calls for the Marshaller interface.

type Marshaller

type Marshaller interface {
    // Marshal converts the argument into a byte streem that it can then Unmarshal.
    Marshal(interface{}) ([]byte, error)

    // Unmarshal attempts to convert the byte stream into type passed in as the
    // second arg.
    Unmarshal([]byte, interface{}) error
}

Marshaller defines the Marshal and Unmarshal methods used to serialize and deserialize the structures used in Publish and Subscription handlers of the structured hub.

type Multiplexer

type Multiplexer interface {
    TopicMatcher
    Add(matcher TopicMatcher, handler interface{}) error
}

Multiplexer allows multiple subscriptions to be made sharing a single message queue from the hub. This means that all the messages for the various subscriptions are called back in the order that the messages were published. If more than one handler is added to the Multiplexer that matches any given topic, the handlers are called back one after the other in the order that they were added.

type RegexpMatcher

type RegexpMatcher regexp.Regexp

RegexpMatcher allows standard regular expressions to be used as TopicMatcher values. RegexpMatches can be created using the short-hand function MatchRegexp function that wraps regexp.MustCompile.

func (*RegexpMatcher) Match
func (m *RegexpMatcher) Match(topic Topic) bool

Match implements TopicMatcher.

The topic matches if the regular expression matches the topic.

type SimpleHub

type SimpleHub struct {
    // contains filtered or unexported fields
}

SimpleHub provides the base functionality of dealing with subscribers, and the notification of subscribers of events.

func NewSimpleHub
func NewSimpleHub(config *SimpleHubConfig) *SimpleHub

NewSimpleHub returns a new SimpleHub instance.

A simple hub does not touch the data that is passed through to Publish. This data is passed through to each Subscriber. Note that all subscribers are notified in parallel, and that no modification should be done to the data or data races will occur.

func (*SimpleHub) Publish
func (h *SimpleHub) Publish(topic Topic, data interface{}) <-chan struct{}

Publish will notifiy all the subscribers that are interested by calling their handler function.

The data is passed through to each Subscriber untouched. Note that all subscribers are notified in parallel, and that no modification should be done to the data or data races will occur.

The channel return value is closed when all the subscribers have been notified of the event.

func (*SimpleHub) Subscribe
func (h *SimpleHub) Subscribe(matcher TopicMatcher, handler func(Topic, interface{})) Unsubscriber

Subscribe takes a topic matcher, and a handler function. If the matcher matches the published topic, the handler function is called with the published Topic and the associated data.

The handler function will be called with all maching published events until the Unsubscribe method on the Unsubscriber is called.

type SimpleHubConfig

type SimpleHubConfig struct {
    // LogModule allows for overriding the default logging module.
    // The default value is "pubsub.simple".
    LogModule string
}

SimpleHubConfig is the argument struct for NewSimpleHub.

type StructuredHub

type StructuredHub struct {
    // contains filtered or unexported fields
}

StructuredHub allows the hander functions to accept either structures or map[string]interface{}. The published structure does not need to match the structures of the subscribers. The structures are marshalled using the Marshaller defined in the StructuredHubConfig. If one is not specified, the marshalling is handled by the standard json library.

func NewStructuredHub
func NewStructuredHub(config *StructuredHubConfig) *StructuredHub

NewStructuredHub returns a new StructuredHub instance.

func (*StructuredHub) NewMultiplexer
func (h *StructuredHub) NewMultiplexer() (Unsubscriber, Multiplexer, error)

NewMultiplexer creates a new multiplexer for the hub and subscribes it. Unsubscribing the multiplexer stops calls for all handlers added. Only structured hubs support multiplexer.

func (*StructuredHub) Publish
func (h *StructuredHub) Publish(topic Topic, data interface{}) (<-chan struct{}, error)

Publish will notifiy all the subscribers that are interested by calling their handler function.

The data is serialized out using the marshaller and then back into a map[string]interface{}. If there is an error marshalling the data, Publish fails with an error. The resulting map is then updated with any annotations provided. The annotated values are only set if the specified field is missing or empty. After the annotations are set, the PostProcess function is called if one was specified. The resulting map is then passed to each of the subscribers.

Subscribers are notified in parallel, and that no modification should be done to the data or data races will occur.

The channel return value is closed when all the subscribers have been notified of the event.

func (*StructuredHub) Subscribe
func (h *StructuredHub) Subscribe(matcher TopicMatcher, handler interface{}) (Unsubscriber, error)

Subscribe takes a topic matcher, and a handler function. If the matcher matches the published topic, the handler function is called with the published Topic and the associated data.

The handler function will be called with all maching published events until the Unsubscribe method on the Unsubscriber is called.

The hander function must have the signature:

`func(Topic, map[string]interface{})`

or

`func(Topic, SomeStruct, error)`

where SomeStruct is any structure. The map[string]interface{} from the Publish call is unmarshalled into the SomeStruct structure. If there is an error unmarshalling the handler is called with a zerod structure and an error with the marshalling error.

type StructuredHubConfig

type StructuredHubConfig struct {
    // LogModule allows for overriding the default logging module.
    // The default value is "pubsub.structured".
    LogModule string

    // Marshaller defines how the structured hub will convert from structures to
    // a map[string]interface{} and back. If this is not specified, the
    // `JSONMarshaller` is used.
    Marshaller Marshaller

    // Annotations are added to each message that is published if and only if
    // the values are not already set.
    Annotations map[string]interface{}

    // PostProcess allows the caller to modify the resulting
    // map[string]interface{}. This is useful when a dynamic value, such as a
    // timestamp is added to the map, or when other type conversions are
    // necessary across all the values in the map.
    PostProcess func(map[string]interface{}) (map[string]interface{}, error)
}

StructuredHubConfig is the argument struct for NewStructuredHub.

type Topic

type Topic string

Topic represents a message that can be subscribed to.

func (Topic) Match
func (t Topic) Match(topic Topic) bool

Match implements TopicMatcher. One topic matches another if they are equal.

type TopicMatcher

type TopicMatcher interface {
    Match(Topic) bool
}

TopicMatcher defines the Match method that is used to determine if the subscriber should be notified about a particular message.

var MatchAll TopicMatcher = (*allMatcher)(nil)

MatchAll is a topic matcher that matches all topics.

func MatchRegexp
func MatchRegexp(expression string) TopicMatcher

MatchRegexp expects a valid regular expression. If the expression passed in is not valid, the function panics. The expected use of this is to be able to do something like:

hub.Subscribe(pubsub.MatchRegex("prefix.*suffix"), handler)

type Unsubscriber

type Unsubscriber interface {
    Unsubscribe()
}

Unsubscriber provides a way to stop receiving handler callbacks. Unsubscribing from a hub will also mark any pending notifications as done, and the handler will not be called for them.


Generated by godoc2md

Documentation

Overview

Package pubsub provides publish and subscribe functionality within a single process.

A message as far as a hub is concerned is defined by a topic, and a data blob. All subscribers that match the published topic are notified, and have their callback function called with both the topic and the data blob.

All subscribers get their own goroutine. This way slow consumers do not slow down the act of publishing, and slow consumers do not inferfere with other consumers. Subscribers are guaranteed to get the messages that match their topic matcher in the order that the messages were published to the hub.

This package defines two types of hubs. * Simple hubs * Structured hubs

Simple hubs just pass the datablob to the subscribers untouched. Structuctured hubs will serialize the datablob into a `map[string]interface{}` using the marshaller that was defined to create it. The subscription handler functions for structured hubs allow the handlers to define a structure for the datablob to be marshalled into.

Hander functions for a structured hub can get all the published data available by defining a callback with the signature:

func (Topic, map[string]interface{})

Or alternatively, define a struct type, and use that type as the second argument.

func (Topic, SomeStruct, error)

The structured hub will try to serialize the published information into the struct specified. If there is an error marshalling, that error is passed to the callback as the error parameter.

Index

Constants

This section is empty.

Variables

View Source
var JSONMarshaller = &jsonMarshaller{}

JSONMarshaller simply wraps the json.Marshal and json.Unmarshal calls for the Marshaller interface.

Functions

func MatchAll

func MatchAll(_ string) bool

MatchAll is a topic matcher that matches all topics.

func MatchRegexp

func MatchRegexp(expression string) func(topic string) bool

MatchRegexp expects a valid regular expression. If the expression passed in is not valid, the function panics. The expected use of this is to be able to do something like:

hub.SubscribeMatch(pubsub.MatchRegex("prefix.*suffix"), handler)

func Wait

func Wait(done func()) <-chan struct{}

Wait takes the returning function from Publish and returns a channel. The channel is closed once the returning function is done.

Types

type Logger

type Logger interface {
	Errorf(format string, values ...interface{})
	Debugf(format string, values ...interface{})
	Tracef(format string, values ...interface{})
}

Logger represents methods called by this package to a logging system.

type Marshaller

type Marshaller interface {
	// Marshal converts the argument into a byte streem that it can then Unmarshal.
	Marshal(interface{}) ([]byte, error)

	// Unmarshal attempts to convert the byte stream into type passed in as the
	// second arg.
	Unmarshal([]byte, interface{}) error
}

Marshaller defines the Marshal and Unmarshal methods used to serialize and deserialize the structures used in Publish and Subscription handlers of the structured hub.

type Metrics

type Metrics interface {
	// Subscribed metric increments to show the number of subscriptions per
	// hub.
	Subscribed()

	// Unsubscribed metric decrements the number of subscriptions the hub has.
	Unsubscribed()

	// Published metric is used to increment how many published messages are
	// sent per topic.
	Published(topic string)

	// Enqueued metric increments the number of messages a subscriber has
	// currently. This can be used to see if a subscriber has a backlog of
	// messages pilling up.
	Enqueued(ident string)

	// Dequeued metric decrements the message count once the subscriber has
	// in the pending queue. This doesn't tell you if the message was consumed
	// via the callback, or how long it took.
	Dequeued(ident string)

	// Consumed metric increments the number of consumed messages the subscriber
	// has consumed since a message was enqueued.
	Consumed(ident string, duration time.Duration)
}

Metrics represents methods for collecting information about the internal state of the pubsub.

type Multiplexer

type Multiplexer interface {
	Add(topic string, handler interface{}) error
	AddMatch(matcher func(string) bool, handler interface{}) error
	Unsubscribe()
}

Multiplexer allows multiple subscriptions to be made sharing a single message queue from the hub. This means that all the messages for the various subscriptions are called back in the order that the messages were published. If more than one handler is added to the Multiplexer that matches any given topic, the handlers are called back one after the other in the order that they were added.

type SimpleHub

type SimpleHub struct {
	// contains filtered or unexported fields
}

SimpleHub provides the base functionality of dealing with subscribers, and the notification of subscribers of events.

func NewSimpleHub

func NewSimpleHub(config *SimpleHubConfig) *SimpleHub

NewSimpleHub returns a new SimpleHub instance.

A simple hub does not touch the data that is passed through to Publish. This data is passed through to each Subscriber. Note that all subscribers are notified in parallel, and that no modification should be done to the data or data races will occur.

func (*SimpleHub) NewMultiplexer

func (h *SimpleHub) NewMultiplexer() SimpleMultiplexer

NewMultiplexer creates a new multiplexer for the hub and subscribes it. Unsubscribing the multiplexer stops calls for all handlers added.

func (*SimpleHub) Publish

func (h *SimpleHub) Publish(topic string, data interface{}) func()

Publish will notify all the subscribers that are interested by calling their handler function.

The data is passed through to each Subscriber untouched. Note that all subscribers are notified in parallel, and that no modification should be done to the data or data races will occur.

The return function when called blocks and waits for all callbacks to be completed.

func (*SimpleHub) Subscribe

func (h *SimpleHub) Subscribe(topic string, handler func(string, interface{})) func()

Subscribe to a topic with a handler function. If the topic is the same as the published topic, the handler function is called with the published topic and the associated data.

The return value is a function that will unsubscribe the caller from the hub, for this subscription.

func (*SimpleHub) SubscribeMatch

func (h *SimpleHub) SubscribeMatch(matcher func(string) bool, handler func(string, interface{})) func()

SubscribeMatch takes a function that determins whether the topic matches, and a handler function. If the matcher matches the published topic, the handler function is called with the published topic and the associated data.

The return value is a function that will unsubscribe the caller from the hub, for this subscription.

type SimpleHubConfig

type SimpleHubConfig struct {
	// Logger allows specifying a logging implementation for debug
	// and trace level messages emitted from the hub.
	Logger Logger
	// Metrics allows the passing in of a metrics collector.
	Metrics Metrics
	// Clock defines a clock to help improve test coverage.
	Clock clock.Clock
}

SimpleHubConfig is the argument struct for NewSimpleHub.

type SimpleMultiplexer

type SimpleMultiplexer interface {
	Add(topic string, handler func(string, interface{}))
	AddMatch(matcher func(string) bool, handler func(string, interface{}))
	Unsubscribe()
}

SimpleMultiplexer allows multiple subscriptions to be made sharing a single message queue from the hub. This means that all the messages for the various subscriptions are called back in the order that the messages were published. If more than one handler is added to the SimpleMultiplexer that matches any given topic, the handlers are called back one after the other in the order that they were added.

type StructuredHub

type StructuredHub struct {
	// contains filtered or unexported fields
}

StructuredHub allows the hander functions to accept either structures or map[string]interface{}. The published structure does not need to match the structures of the subscribers. The structures are marshalled using the Marshaller defined in the StructuredHubConfig. If one is not specified, the marshalling is handled by the standard json library.

func NewStructuredHub

func NewStructuredHub(config *StructuredHubConfig) *StructuredHub

NewStructuredHub returns a new StructuredHub instance.

func (*StructuredHub) NewMultiplexer

func (h *StructuredHub) NewMultiplexer() (Multiplexer, error)

NewMultiplexer creates a new multiplexer for the hub and subscribes it. Unsubscribing the multiplexer stops calls for all handlers added. Only structured hubs support multiplexer.

func (*StructuredHub) Publish

func (h *StructuredHub) Publish(topic string, data interface{}) (func(), error)

Publish will notifiy all the subscribers that are interested by calling their handler function.

The data is serialized out using the marshaller and then back into a map[string]interface{}. If there is an error marshalling the data, Publish fails with an error. The resulting map is then updated with any annotations provided. The annotated values are only set if the specified field is missing or empty. After the annotations are set, the PostProcess function is called if one was specified. The resulting map is then passed to each of the subscribers.

Subscribers are notified in parallel, and that no modification should be done to the data or data races will occur.

The return function when called blocks and waits for all callbacks to be completed.

func (*StructuredHub) Subscribe

func (h *StructuredHub) Subscribe(topic string, handler interface{}) (func(), error)

Subscribe takes a topic with a handler function. If the topic is the same as the published topic, the handler function is called with the published topic and the associated data.

The function return value is a function that will unsubscribe the caller from the hub, for this subscription.

The hander function must have the signature:

`func(string, map[string]interface{})`

or

`func(string, SomeStruct, error)`

where `SomeStruct` is any structure.

If the hander function does not match one of these signatures, the Subscribe function returns an error.

The map[string]interface{} from the Publish call is unmarshalled into the `SomeStruct` structure. If there is an error unmarshalling the handler is called with a zerod structure and an error with the marshalling error.

func (*StructuredHub) SubscribeMatch

func (h *StructuredHub) SubscribeMatch(matcher func(string) bool, handler interface{}) (func(), error)

SubscribeMatch takes a function that determins whether the topic matches, and a handler function. If the matcher matches the published topic, the handler function is called with the published topic and the associated data.

All other aspects of the function are the same as the `Subscribe` method.

type StructuredHubConfig

type StructuredHubConfig struct {
	// Logger allows specifying a logging implementation for debug
	// and trace level messages emitted from the hub.
	Logger Logger

	// Metrics allows the passing in of a metrics collector.
	Metrics Metrics

	// Clock defines a clock to help improve test coverage.
	Clock clock.Clock

	// Marshaller defines how the structured hub will convert from structures to
	// a map[string]interface{} and back. If this is not specified, the
	// `JSONMarshaller` is used.
	Marshaller Marshaller

	// Annotations are added to each message that is published if and only if
	// the values are not already set.
	Annotations map[string]interface{}

	// PostProcess allows the caller to modify the resulting
	// map[string]interface{}. This is useful when a dynamic value, such as a
	// timestamp is added to the map, or when other type conversions are
	// necessary across all the values in the map.
	PostProcess func(map[string]interface{}) (map[string]interface{}, error)
}

StructuredHubConfig is the argument struct for NewStructuredHub.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL