README
¶
PubSub provides a simple helper library for doing publish and subscribe style asynchronous tasks in Go, usually in a web or micro service. PubSub allows you to write publishers and subscribers, fully typed, and swap out providers (Google Cloud PubSub, AWS SQS etc) as required.
PubSub also abstracts away the creation of the queues and their subscribers, so you shouldn't have to write any cloud specific code, but still gives you options to set concurrency, deadlines, error handling etc.
Middleware is also included, including logging, tracing and error handling!
Table of Contents
Example
Here's a basic example using Nats streaming server and a basic subscriber function that prints hello.
To publish messages, you can call Publish
, you can publish a Protobuf or JSON serializable object (i.e, most Go objects).
Publish is Protobuf by default..
Publisher
pubsub.Publish(ctx, "topic-name", &User{Id: "usr_0001"})
to publish a JSON object
pubsub.PublishJSON(ctx, "topic-name", &User{Id: "usr_0001"})
This can be useful if the application subscribing isn't good with Protobuf or is external to your company for example. However Protobuf is recommended for speed, type safety and forwards compatability.
Subscriber
Subscribing to a topic is done with a single function, you'll receive a context, the object that was in the queue and the pubsub message, which includes some metadata and timing information, should you need it.
func PrintHello(ctx context.Context, msg *HelloMsg, m *pubsub.Msg) error {
fmt.Printf("Message received %+v\n\n", m)
fmt.Printf(msg.Greeting + " " + msg.Name + "\n")
return nil
}
First though, you need to "Setup" your subscribers
type Subscriber struct{}
func (s *Subscriber) Setup(c *pubsub.Client) {
c.On(pubsub.HandlerOptions{
Topic: HelloTopic,
Name: "print-hello",
Handler: PrintHello,
AutoAck: true,
JSON: true,
})
}
pubsub.Subscribe(&Subscriber{})
Full Example
You can see a full example in the example folder.
Middleware
Default
PubSub provides a helper to setup the default middleware.
At the time of writing this includes, Logrus, Opentracing, Prometheus, Recovery (Handles panics) and Audit Logging
To use this, simple include it when initialising PubSub
pubsub.SetClient(&pubsub.Client{
ServiceName: "my-service-name",
Provider: provider,
Middleware: defaults.Middleware,
})
You can optionaly provide a recovery handler too.
pubsub.SetClient(&pubsub.Client{
ServiceName: "my-service-name",
Provider: provider,
Middleware: defaults.MiddlewareWithRecovery(func(p interface{}) (err error){
// log p or report to an error reporter
}),
})
Logrus
When enabled, the Logrus middleware will output something similar to below. Note that the level is DEBUG
by default. To see the logs, you'll need to set logrus.SetLevel(logrus.DebugLevel)
or use something like github.com/lileio/Logr which can set it from ENV variables.
time="2019-09-23T12:46:13Z" level=debug msg="Google Pubsub: Publishing"
time="2019-09-23T12:46:13Z" level=debug msg="Google Pubsub: Publish confirmed"
time="2019-09-23T12:46:13Z" level=debug msg="Published PubSub Msg" component=pubsub duration=143.545203ms metadata="map[x-b3-parentspanid:622cff2be9102141 x-b3-sampled:1 x-b3-flags:0 x-audit-user:xxxx@example.co.uk x-b3-traceid:4275176f2f7f729257887d1e4853498d x-b3-spanid:017e28147c6c3704]" topic=hello.world
time="2019-09-23T12:46:16Z" level=debug msg="Processed PubSub Msg" component=pubsub duration=1.702259988s handler=function_name id=734207593944188 metadata="map[x-b3-traceid:4275176f2f7f729257887d1e4853498d x-b3-spanid:017e28147c6c3704 x-audit-user:xxxx@example.co.uk x-b3-parentspanid:622cff2be9102141 x-b3-flags:0 x-b3-sampled:1]" topic=hello.work
Opentracing
The Opentracing middle adds tags ands logs to spans which will later be sent to something like Zipkin or Jaeger when setup in the application. Note that the Opentracing middleware only adds things to the context but isn't responsible for setting up Opentracing and it's reporting, for that, see here.
Prometheus
The Prometheus middleware includes some counters and histograms to help with monitoring, you can see there here but these include.
pubsub_message_published_total{topic,service}
pubsub_outgoing_bytes{topic,service}
pubsub_publish_durations_histogram_seconds
pubsub_server_handled_total{"topic", "service", "success"}
pubsub_incoming_bytes{"topic", "service"}
pubsub_subscribe_durations_histogram_seconds{"topic", "service"}
Here's an example query to get messages handled (by a subscriber) every minute, make sure your prometheus step is also 1m
sum(increase(pubsub_server_handled_total[1m])) by (topic, success)
Providers
Google Cloud PubSub
To setup a Google Cloud client, you can do the following..
pubsub.SetClient(&pubsub.Client{
ServiceName: "my-service-name",
Provider: google.NewGoogleCloud('projectid'),
Middleware: defaults.Middleware,
})
If you're on Google Cloud vms you're environment likely already has credentials setup, but locally you can set them up with default credentials, if you're on Kubernetes, I reccomend setting up service account and then making a secret file and setting the GOOGLE_APPLICATION_CREDENTIALS
to the filepath of that JSON secret key.
The Google PubSub provider is tested heavily in production by Echo and works well, we have however noticed some strange behaviour from Google subscribers, as they try to be clever and balance traffic and other strange things. For example, if you want to only process 2 messages at a time, and don't process the two you're given, then can often result in a pause before more messages are sent to you, this can be hard to debug as a queue builds up, but often fixes itself.
Nats Streaming Server
To setup a Nats Streaming client, you can do the following. Optionally passing options for the original client
pubsub.SetClient(&pubsub.Client{
ServiceName: "my-service-name",
Provider: nats.NewNats('clustername', opts),
Middleware: defaults.Middleware,
})
Note this driver is for Nats Streaming, and not for plain Nats.
AWS SQS/SNS
Currently there is no provider for AWS SNS and SQS. Please feel free to make a pull request!
Kafka
There's an experimental provider for Kafka available here, but it's limiting in options you can override. I'd love to see someone take this on and help it become more bullet proof. But things like retries are hard.
Documentation
¶
Overview ¶
Package pubsub implements publish subscriber patterns for usage in Golang
Index ¶
- func AddPublisherClient(cli *Client)
- func SetClient(cli *Client)
- func Shutdown()
- func Subscribe(s Subscriber)
- func WaitForAllPublishing()
- type Client
- type Handler
- type HandlerOptions
- type MessageWrapper
- func (*MessageWrapper) Descriptor() ([]byte, []int)
- func (m *MessageWrapper) GetData() []byte
- func (m *MessageWrapper) GetMetadata() map[string]string
- func (m *MessageWrapper) GetPublishTime() *timestamp.Timestamp
- func (*MessageWrapper) ProtoMessage()
- func (m *MessageWrapper) Reset()
- func (m *MessageWrapper) String() string
- func (m *MessageWrapper) XXX_DiscardUnknown()
- func (m *MessageWrapper) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *MessageWrapper) XXX_Merge(src proto.Message)
- func (m *MessageWrapper) XXX_Size() int
- func (m *MessageWrapper) XXX_Unmarshal(b []byte) error
- type Middleware
- type Msg
- type MsgHandler
- type NoopProvider
- type Provider
- type PublishHandler
- type PublishResult
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AddPublisherClient ¶ added in v2.3.1
func AddPublisherClient(cli *Client)
AddPublisherClient allows another client to bet set, for publishing only
func SetClient ¶
func SetClient(cli *Client)
SetClient sets the global pubsub client, useful in tests
func Subscribe ¶
func Subscribe(s Subscriber)
Subscribe starts a run loop with a Subscriber that listens to topics and waits for a syscall.SIGINT or syscall.SIGTERM
func WaitForAllPublishing ¶ added in v2.3.1
func WaitForAllPublishing()
WaitForAllPublishing waits for all in flight publisher messages to go, before returning
Types ¶
type Client ¶
type Client struct { ServiceName string Provider Provider Middleware []Middleware }
Client holds a reference to a Provider
func GetClient ¶ added in v2.6.0
func GetClient() *Client
GetClient get the global pubsub client, useful in tests
func (Client) On ¶
func (c Client) On(opts HandlerOptions)
On takes HandlerOptions and subscribes to a topic, waiting for a protobuf message calling the function when a message is received
type Handler ¶
type Handler interface{}
Handler is a specific callback used for Subscribe in the format of.. func(ctx context.Context, obj proto.Message, msg *Msg) error for example, you can unmarshal a custom type.. func(ctx context.Context, accounts accounts.Account, msg *Msg) error you can also unmarshal a JSON object by supplying any type of interface{} func(ctx context.Context, accounts models.SomeJSONAccount, msg *Msg) error
type HandlerOptions ¶
type HandlerOptions struct { // The topic to subscribe to Topic string // The name of this subscriber/function Name string // The name of this subscriber/function's service ServiceName string // The function to invoke Handler Handler // A message deadline/timeout Deadline time.Duration // Concurrency sets the maximum number of msgs to be run concurrently // default: 20 Concurrency int // Auto Ack the message automatically if return err == nil AutoAck bool // Decode JSON objects from pubsub instead of protobuf JSON bool // StartFromBeginning starts a new subscriber from // the beginning of messages available, if supported StartFromBeginning bool // Unique subscriber means that all subscribers will receive all messages Unique bool }
HandlerOptions defines the options for a subscriber handler
type MessageWrapper ¶
type MessageWrapper struct { Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` Metadata map[string]string `` /* 157-byte string literal not displayed */ PublishTime *timestamp.Timestamp `protobuf:"bytes,4,opt,name=publish_time,json=publishTime,proto3" json:"publish_time,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
Msg is a wrapper message that alllows us to keep metadata and other different and useful information across all providers
func (*MessageWrapper) Descriptor ¶
func (*MessageWrapper) Descriptor() ([]byte, []int)
func (*MessageWrapper) GetData ¶
func (m *MessageWrapper) GetData() []byte
func (*MessageWrapper) GetMetadata ¶
func (m *MessageWrapper) GetMetadata() map[string]string
func (*MessageWrapper) GetPublishTime ¶
func (m *MessageWrapper) GetPublishTime() *timestamp.Timestamp
func (*MessageWrapper) ProtoMessage ¶
func (*MessageWrapper) ProtoMessage()
func (*MessageWrapper) Reset ¶
func (m *MessageWrapper) Reset()
func (*MessageWrapper) String ¶
func (m *MessageWrapper) String() string
func (*MessageWrapper) XXX_DiscardUnknown ¶
func (m *MessageWrapper) XXX_DiscardUnknown()
func (*MessageWrapper) XXX_Marshal ¶
func (m *MessageWrapper) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*MessageWrapper) XXX_Merge ¶
func (m *MessageWrapper) XXX_Merge(src proto.Message)
func (*MessageWrapper) XXX_Size ¶
func (m *MessageWrapper) XXX_Size() int
func (*MessageWrapper) XXX_Unmarshal ¶
func (m *MessageWrapper) XXX_Unmarshal(b []byte) error
type Middleware ¶
type Middleware interface { SubscribeInterceptor(opts HandlerOptions, next MsgHandler) MsgHandler PublisherMsgInterceptor(serviceName string, next PublishHandler) PublishHandler }
Middleware is an interface to provide subscriber and publisher interceptors
type Msg ¶
type Msg struct { ID string Metadata map[string]string Data []byte PublishTime *time.Time Ack func() Nack func() }
Msg is a lile representation of a pub sub message
type MsgHandler ¶
MsgHandler is the internal or raw message handler
type NoopProvider ¶
type NoopProvider struct{}
NoopProvider is a simple provider that does nothing, for testing, defaults
func (NoopProvider) Subscribe ¶
func (np NoopProvider) Subscribe(opts HandlerOptions, h MsgHandler)
Subscribe does nothing
type Provider ¶
type Provider interface { Publish(ctx context.Context, topic string, m *Msg) error Subscribe(opts HandlerOptions, handler MsgHandler) Shutdown() }
Provider is generic interface for a pub sub provider
type PublishHandler ¶
PublishHandler wraps a call to publish, for interception
type PublishResult ¶
type PublishResult struct { Ready chan struct{} Err error }
A PublishResult holds the result from a call to Publish.
func Publish ¶
Publish is a convenience message which publishes to the current (global) publisher as protobuf
func PublishJSON ¶
func PublishJSON(ctx context.Context, topic string, obj interface{}) *PublishResult
PublishJSON is a convenience message which publishes to the current (global) publisher as JSON
type Subscriber ¶
type Subscriber interface { // Setup is a required method that allows the subscriber service to add handlers // and perform any setup if required, this is usually called by pubsub upon start Setup(*Client) }
Subscriber is a service that listens to events and registers handlers for those events