README ¶
msgbus
A real-time message bus server and library written in Go.
Features
- Simple HTTP API
- Simple command-line client
- In memory queues
- WebSockets for real-time messages
- Pull and Push model
Install
$ go install git.mills.io/prologic/msgbus/cmd/...
Use Cases
- As a simple generic webhook
You can use msgbus as a simple generic webhook. For example in my dockerfiles repo I have hooked up Prometheus's AlertManager to send alert notifications to an IRC channel using some simple shell scripts.
See: alert
- As a general-purpose message / event bus that supports pub/sub as well as pulling messages synchronously.
Usage (library)
Install the package into your project:
$ go get git.mills.io/prologic/msgbus
Use the MessageBus
type either directly:
package main
import (
"log"
"git.mills.io/prologic/msgbus"
)
func main() {
m := msgbus.New()
m.Put("foo", m.NewMessage([]byte("Hello World!")))
msg, ok := m.Get("foo")
if !ok {
log.Printf("No more messages in queue: foo")
} else {
log.Printf
"Received message: id=%s topic=%s payload=%s",
msg.ID, msg.Topic, msg.Payload,
)
}
}
Running this example should yield something like this:
$ go run examples/hello.go
2017/08/09 03:01:54 [msgbus] PUT id=0 topic=foo payload=Hello World!
2017/08/09 03:01:54 [msgbus] NotifyAll id=0 topic=foo payload=Hello World!
2017/08/09 03:01:54 [msgbus] GET topic=foo
2017/08/09 03:01:54 Received message: id=%!s(uint64=0) topic=foo payload=Hello World!
See the godoc for further documentation and other examples.
Usage (tool)
Run the message bus daemon/server:
$ msgbusd
2017/08/07 01:11:16 [msgbus] Subscribe id=[::1]:55341 topic=foo
2017/08/07 01:11:22 [msgbus] PUT id=0 topic=foo payload=hi
2017/08/07 01:11:22 [msgbus] NotifyAll id=0 topic=foo payload=hi
2017/08/07 01:11:26 [msgbus] PUT id=1 topic=foo payload=bye
2017/08/07 01:11:26 [msgbus] NotifyAll id=1 topic=foo payload=bye
2017/08/07 01:11:33 [msgbus] GET topic=foo
2017/08/07 01:11:33 [msgbus] GET topic=foo
2017/08/07 01:11:33 [msgbus] GET topic=foo
Subscribe to a topic using the message bus client:
$ msgbus sub foo
2017/08/07 01:11:22 [msgbus] received message: id=0 topic=foo payload=hi
2017/08/07 01:11:26 [msgbus] received message: id=1 topic=foo payload=bye
Send a few messages with the message bus client:
$ msgbus pub foo hi
$ msgbus pub foo bye
You can also manually pull messages using the client:
$ msgbus pull foo
2017/08/07 01:11:33 [msgbus] received message: id=0 topic=foo payload=hi
2017/08/07 01:11:33 [msgbus] received message: id=1 topic=foo payload=bye
This is slightly different from a listening subscriber (using websockets) where messages are pulled directly.
Usage (HTTP)
Run the message bus daemon/server:
$ msgbusd
2018/03/25 13:21:18 msgbusd listening on :8000
Send a message with using curl
:
$ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello
Pull the messages off the "hello" queue using curl
:
$ curl -q -o - http://localhost:8000/hello
{"id":0,"topic":{"name":"hello","ttl":60000000000,"seq":1,"created":"2018-03-25T13:18:38.732437-07:00"},"payload":"eyJtZXNzYWdlIjogImhlbGxvIn0=","created":"2018-03-25T13:18:38.732465-07:00"}
Decode the payload:
$ echo 'eyJtZXNzYWdlIjogImhlbGxvIn0=' | base64 -d
{"message": "hello"}
API
GET /
List all known topics/queues.
Example:
$ curl -q -o - http://localhost:8000/ | jq '.'
{
"hello": {
"name": "hello",
"ttl": 60000000000,
"seq": 1,
"created": "2018-05-07T23:44:25.681392205-07:00"
}
}
POST|PUT /topic
Post a new message to the queue named by <topic>
.
NB: Either POST
or PUT
methods can be used here.
Example:
$ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello
message successfully published to hello with sequence 1
GET /topic
Get the next message of the queue named by <topic>
.
- If the topic is not found. Returns:
404 Not Found
- If the Websockets
Upgrade
header is found, upgrades to a websocket channel and subscribes to the topic<topic>
. Each new message published to the topic<topic>
are instantly published to all subscribers.
Example:
$ curl -q -o - http://localhost:8000/hello
{"id":0,"topic":{"name":"hello","ttl":60000000000,"seq":1,"created":"2018-03-25T13:18:38.732437-07:00"},"payload":"eyJtZXNzYWdlIjogImhlbGxvIn0=","created":"2018-03-25T13:18:38.732465-07:00"}
DELETE /topic
Deletes a queue named by <topic>
.
Not implemented.
Related Projects
- je -- A distributed job execution engine for the execution of batch jobs, workflows, remediations and more.
License
msgbus is licensed under the MIT License
Documentation ¶
Index ¶
- Constants
- Variables
- func FullVersion() string
- func GenerateULID() (string, error)
- func MustGenerateULID() string
- func SafeParseInt64(s string, d int64) int64
- type Client
- type HandlerFunc
- type Message
- type MessageBus
- func (mb *MessageBus) Get(t *Topic) (Message, bool)
- func (mb *MessageBus) Len() int
- func (mb *MessageBus) Metrics() *Metrics
- func (mb *MessageBus) NewMessage(topic *Topic, payload []byte) Message
- func (mb *MessageBus) NewTopic(topic string) *Topic
- func (mb *MessageBus) Put(message Message) error
- func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan Message
- func (mb *MessageBus) Unsubscribe(id, topic string)
- type Metrics
- func (m *Metrics) Counter(subsystem, name string) prometheus.Counter
- func (m *Metrics) CounterVec(subsystem, name string) *prometheus.CounterVec
- func (m *Metrics) Gauge(subsystem, name string) prometheus.Gauge
- func (m *Metrics) GaugeVec(subsystem, name string) *prometheus.GaugeVec
- func (m *Metrics) Handler() http.Handler
- func (m *Metrics) NewCounter(subsystem, name, help string) prometheus.Counter
- func (m *Metrics) NewCounterFunc(subsystem, name, help string, f func() float64) prometheus.CounterFunc
- func (m *Metrics) NewCounterVec(subsystem, name, help string, labels []string) *prometheus.CounterVec
- func (m *Metrics) NewGauge(subsystem, name, help string) prometheus.Gauge
- func (m *Metrics) NewGaugeFunc(subsystem, name, help string, f func() float64) prometheus.GaugeFunc
- func (m *Metrics) NewGaugeVec(subsystem, name, help string, labels []string) *prometheus.GaugeVec
- func (m *Metrics) NewSummary(subsystem, name, help string) prometheus.Summary
- func (m *Metrics) NewSummaryVec(subsystem, name, help string, labels []string) *prometheus.SummaryVec
- func (m *Metrics) Run(addr string)
- func (m *Metrics) Summary(subsystem, name string) prometheus.Summary
- func (m *Metrics) SummaryVec(subsystem, name string) *prometheus.SummaryVec
- type Option
- type Options
- type Queue
- func (q *Queue) Empty() bool
- func (q *Queue) ForEach(f func(elem interface{}) error) error
- func (q *Queue) Full() bool
- func (q *Queue) Len() int
- func (q *Queue) MaxLen() int
- func (q *Queue) Peek() interface{}
- func (q *Queue) Pop() interface{}
- func (q *Queue) Push(elem interface{})
- func (q *Queue) Size() int
- type SubscribeOption
- type SubscriberConfig
- type SubscriberOptions
- type Subscribers
- func (subs *Subscribers) AddSubscriber(id string) chan Message
- func (subs *Subscribers) GetSubscriber(id string) (chan Message, bool)
- func (subs *Subscribers) HasSubscriber(id string) bool
- func (subs *Subscribers) Len() int
- func (subs *Subscribers) NotifyAll(message Message) int
- func (subs *Subscribers) RemoveSubscriber(id string)
- type Topic
Constants ¶
const ( // DefaultBind is the default bind address DefaultBind = ":8000" // DefaultLogPath is the default path to write logs to (wal) DefaultLogPath = "./logs" // DefaultMaxQueueSize is the default maximum size of queues DefaultMaxQueueSize = 1024 // ~8MB per queue (1000 * 4KB) // DefaultMaxPayloadSize is the default maximum payload size DefaultMaxPayloadSize = 8192 // 8KB // DefaultBufferLength is the default buffer length for subscriber chans DefaultBufferLength = 256 // DefaultMetrics is the default for whether to enable metrics DefaultMetrics = false // DefaultNoSync is the default for whether to disable faync after writing // messages to the write-ahead-log (wal) files. The default is `false` which // is safer and will prevent corruption in event of crahses or power failure, // but is slower. DefaultNoSync = false )
Variables ¶
var ( // Version release version Version = "0.1.1" // Commit will be overwritten automatically by the build system Commit = "HEAD" )
var DefObjectives = map[float64]float64{
0.50: 0.05,
0.90: 0.01,
0.95: 0.005,
0.99: 0.001,
}
DefObjectives ...
var ( // BufferFull is logged in Subscribe() when a subscriber's // buffer is full and messages can no longer be enqueued for delivery ErrBufferFull = errors.New("error: subscriber buffer full") )
Functions ¶
func FullVersion ¶
func FullVersion() string
FullVersion returns the full version, build and commit hash
func GenerateULID ¶ added in v0.1.16
GenerateULID generates a new unique identifer
func MustGenerateULID ¶ added in v0.1.16
func MustGenerateULID() string
MustGenerateULID generates a new unique identifer or fails
func SafeParseInt64 ¶ added in v0.1.15
SafeParseInt64 ...
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client ...
type Message ¶
type Message struct { ID int64 `json:"id"` Topic *Topic `json:"topic"` Payload []byte `json:"payload"` Created time.Time `json:"created"` }
Message ...
func LoadMessage ¶ added in v0.1.15
type MessageBus ¶
MessageBus ...
func NewMessageBus ¶
func NewMessageBus(opts ...Option) (*MessageBus, error)
NewMessageBus creates a new message bus with the provided options
func (*MessageBus) NewMessage ¶
func (mb *MessageBus) NewMessage(topic *Topic, payload []byte) Message
NewMessage ...
func (*MessageBus) ServeHTTP ¶
func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request)
func (*MessageBus) Subscribe ¶
func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan Message
Subscribe ...
func (*MessageBus) Unsubscribe ¶
func (mb *MessageBus) Unsubscribe(id, topic string)
Unsubscribe ...
type Metrics ¶
Metrics ...
func (*Metrics) Counter ¶
func (m *Metrics) Counter(subsystem, name string) prometheus.Counter
Counter ...
func (*Metrics) CounterVec ¶ added in v0.1.2
func (m *Metrics) CounterVec(subsystem, name string) *prometheus.CounterVec
CounterVec ...
func (*Metrics) GaugeVec ¶
func (m *Metrics) GaugeVec(subsystem, name string) *prometheus.GaugeVec
GaugeVec ...
func (*Metrics) NewCounter ¶
func (m *Metrics) NewCounter(subsystem, name, help string) prometheus.Counter
NewCounter ...
func (*Metrics) NewCounterFunc ¶
func (m *Metrics) NewCounterFunc(subsystem, name, help string, f func() float64) prometheus.CounterFunc
NewCounterFunc ...
func (*Metrics) NewCounterVec ¶ added in v0.1.2
func (m *Metrics) NewCounterVec(subsystem, name, help string, labels []string) *prometheus.CounterVec
NewCounterVec ...
func (*Metrics) NewGauge ¶
func (m *Metrics) NewGauge(subsystem, name, help string) prometheus.Gauge
NewGauge ...
func (*Metrics) NewGaugeFunc ¶
func (m *Metrics) NewGaugeFunc(subsystem, name, help string, f func() float64) prometheus.GaugeFunc
NewGaugeFunc ...
func (*Metrics) NewGaugeVec ¶
func (m *Metrics) NewGaugeVec(subsystem, name, help string, labels []string) *prometheus.GaugeVec
NewGaugeVec ...
func (*Metrics) NewSummary ¶
func (m *Metrics) NewSummary(subsystem, name, help string) prometheus.Summary
NewSummary ...
func (*Metrics) NewSummaryVec ¶
func (m *Metrics) NewSummaryVec(subsystem, name, help string, labels []string) *prometheus.SummaryVec
NewSummaryVec ...
func (*Metrics) Summary ¶
func (m *Metrics) Summary(subsystem, name string) prometheus.Summary
Summary ...
func (*Metrics) SummaryVec ¶
func (m *Metrics) SummaryVec(subsystem, name string) *prometheus.SummaryVec
SummaryVec ...
type Option ¶ added in v0.1.15
func WithBufferLength ¶ added in v0.1.15
func WithLogPath ¶ added in v0.1.15
func WithMaxPayloadSize ¶ added in v0.1.15
func WithMaxQueueSize ¶ added in v0.1.15
func WithMetrics ¶ added in v0.1.15
func WithNoSync ¶ added in v0.1.15
type Options ¶
type Options struct { LogPath string BufferLength int MaxQueueSize int MaxPayloadSize int Metrics bool NoSync bool }
Options ...
func NewDefaultOptions ¶ added in v0.1.15
func NewDefaultOptions() *Options
type Queue ¶
Queue represents a single instance of a bounded queue data structure with access to both side. If maxlen is non-zero the queue is bounded otherwise unbounded.
func (*Queue) ForEach ¶ added in v0.1.13
ForEach applys the function `f` over each item in the queue for read-only access into the queue in O(n) time for indexining into the queue.
func (*Queue) Peek ¶
func (q *Queue) Peek() interface{}
Peek returns the element at the front of the queue.
func (*Queue) Pop ¶
func (q *Queue) Pop() interface{}
Pop removes and returns the element from the front of the queue.
type SubscribeOption ¶ added in v0.1.13
type SubscribeOption func(*SubscriberOptions)
SubscribeOption ...
func WithIndex ¶ added in v0.1.13
func WithIndex(index int64) SubscribeOption
WithIndex sets the index to start subscribing from
type SubscriberConfig ¶ added in v0.1.13
type SubscriberConfig struct {
BufferLength int
}
SubscribersConfig ...
type SubscriberOptions ¶ added in v0.1.13
type SubscriberOptions struct {
Index int64
}
SubscriberOptions ...
type Subscribers ¶ added in v0.1.13
Subscribers ...
func NewSubscribers ¶ added in v0.1.13
func NewSubscribers(config *SubscriberConfig) *Subscribers
NewSubscribers ...
func (*Subscribers) AddSubscriber ¶ added in v0.1.13
func (subs *Subscribers) AddSubscriber(id string) chan Message
AddSubscriber ...
func (*Subscribers) GetSubscriber ¶ added in v0.1.13
func (subs *Subscribers) GetSubscriber(id string) (chan Message, bool)
GetSubscriber ...
func (*Subscribers) HasSubscriber ¶ added in v0.1.13
func (subs *Subscribers) HasSubscriber(id string) bool
HasSubscriber ...
func (*Subscribers) NotifyAll ¶ added in v0.1.13
func (subs *Subscribers) NotifyAll(message Message) int
NotifyAll ...
func (*Subscribers) RemoveSubscriber ¶ added in v0.1.13
func (subs *Subscribers) RemoveSubscriber(id string)
RemoveSubscriber ...