msgbus

package module
v0.1.22 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2024 License: MIT Imports: 24 Imported by: 3

README

msgbus

Build Status Go Reference

A real-time message bus server and library written in Go.

Features

  • Simple HTTP API
  • Simple command-line client
  • In memory queues
  • WebSockets for real-time messages
  • Pull and Push model

Install

$ go install git.mills.io/prologic/msgbus/cmd/...

Use Cases

  • As a simple generic webhook

You can use msgbus as a simple generic webhook. For example in my dockerfiles repo I have hooked up Prometheus's AlertManager to send alert notifications to an IRC channel using some simple shell scripts.

See: alert

  • As a general-purpose message / event bus that supports pub/sub as well as pulling messages synchronously.

Usage (library)

Install the package into your project:

$ go get git.mills.io/prologic/msgbus

Use the MessageBus type either directly:

package main

import (
    "log"

    "git.mills.io/prologic/msgbus"
)

func main() {
    m := msgbus.New()
    m.Put("foo", m.NewMessage([]byte("Hello World!")))

    msg, ok := m.Get("foo")
    if !ok {
        log.Printf("No more messages in queue: foo")
    } else {
        log.Printf
	    "Received message: id=%s topic=%s payload=%s",
	    msg.ID, msg.Topic, msg.Payload,
	)
    }
}

Running this example should yield something like this:

$ go run examples/hello.go
2017/08/09 03:01:54 [msgbus] PUT id=0 topic=foo payload=Hello World!
2017/08/09 03:01:54 [msgbus] NotifyAll id=0 topic=foo payload=Hello World!
2017/08/09 03:01:54 [msgbus] GET topic=foo
2017/08/09 03:01:54 Received message: id=%!s(uint64=0) topic=foo payload=Hello World!

See the godoc for further documentation and other examples.

Usage (tool)

Run the message bus daemon/server:

$ msgbusd
2017/08/07 01:11:16 [msgbus] Subscribe id=[::1]:55341 topic=foo
2017/08/07 01:11:22 [msgbus] PUT id=0 topic=foo payload=hi
2017/08/07 01:11:22 [msgbus] NotifyAll id=0 topic=foo payload=hi
2017/08/07 01:11:26 [msgbus] PUT id=1 topic=foo payload=bye
2017/08/07 01:11:26 [msgbus] NotifyAll id=1 topic=foo payload=bye
2017/08/07 01:11:33 [msgbus] GET topic=foo
2017/08/07 01:11:33 [msgbus] GET topic=foo
2017/08/07 01:11:33 [msgbus] GET topic=foo

Subscribe to a topic using the message bus client:

$ msgbus sub foo
2017/08/07 01:11:22 [msgbus] received message: id=0 topic=foo payload=hi
2017/08/07 01:11:26 [msgbus] received message: id=1 topic=foo payload=bye

Send a few messages with the message bus client:

$ msgbus pub foo hi
$ msgbus pub foo bye

You can also manually pull messages using the client:

$ msgbus pull foo
2017/08/07 01:11:33 [msgbus] received message: id=0 topic=foo payload=hi
2017/08/07 01:11:33 [msgbus] received message: id=1 topic=foo payload=bye

This is slightly different from a listening subscriber (using websockets) where messages are pulled directly.

Usage (HTTP)

Run the message bus daemon/server:

$ msgbusd
2018/03/25 13:21:18 msgbusd listening on :8000

Send a message with using curl:

$ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello

Pull the messages off the "hello" queue using curl:

$ curl -q -o - http://localhost:8000/hello
{"id":0,"topic":{"name":"hello","ttl":60000000000,"seq":1,"created":"2018-03-25T13:18:38.732437-07:00"},"payload":"eyJtZXNzYWdlIjogImhlbGxvIn0=","created":"2018-03-25T13:18:38.732465-07:00"}

Decode the payload:

$ echo 'eyJtZXNzYWdlIjogImhlbGxvIn0=' | base64 -d
{"message": "hello"}

API

GET /

List all known topics/queues.

Example:

$ curl -q -o - http://localhost:8000/ | jq '.'
{
  "hello": {
    "name": "hello",
    "ttl": 60000000000,
    "seq": 1,
    "created": "2018-05-07T23:44:25.681392205-07:00"
  }
}

POST|PUT /topic

Post a new message to the queue named by <topic>.

NB: Either POST or PUT methods can be used here.

Example:

$ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello
message successfully published to hello with sequence 1

GET /topic

Get the next message of the queue named by <topic>.

  • If the topic is not found. Returns: 404 Not Found
  • If the Websockets Upgrade header is found, upgrades to a websocket channel and subscribes to the topic <topic>. Each new message published to the topic <topic> are instantly published to all subscribers.

Example:

$ curl -q -o - http://localhost:8000/hello
{"id":0,"topic":{"name":"hello","ttl":60000000000,"seq":1,"created":"2018-03-25T13:18:38.732437-07:00"},"payload":"eyJtZXNzYWdlIjogImhlbGxvIn0=","created":"2018-03-25T13:18:38.732465-07:00"}

DELETE /topic

Deletes a queue named by <topic>.

Not implemented.

  • je -- A distributed job execution engine for the execution of batch jobs, workflows, remediations and more.

License

msgbus is licensed under the MIT License

Documentation

Overview

Package msgbus is a real-time message bus server and library written in Go

Index

Constants

View Source
const (
	// DefaultBind is the default bind address
	DefaultBind = ":8000"

	// DefaultLogPath is the default path to write logs to (wal)
	DefaultLogPath = "./logs"

	// DefaultMaxQueueSize is the default maximum size of queues
	DefaultMaxQueueSize = 1024 // ~8MB per queue (1000 * 4KB)

	// DefaultMaxPayloadSize is the default maximum payload size
	DefaultMaxPayloadSize = 8192 // 8KB

	// DefaultBufferLength is the default buffer length for subscriber chans
	DefaultBufferLength = 256

	// DefaultMetrics is the default for whether to enable metrics
	DefaultMetrics = false

	// DefaultNoSync is the default for whether to disable faync after writing
	// messages to the write-ahead-log (wal) files. The default is `false` which
	// is safer and will prevent corruption in event of crahses or power failure,
	// but is slower.
	DefaultNoSync = false
)

Variables

View Source
var (
	// Version release version
	Version = "0.1.1"

	// Commit will be overwritten automatically by the build system
	Commit = "HEAD"
)
View Source
var DefObjectives = map[float64]float64{
	0.50: 0.05,
	0.90: 0.01,
	0.95: 0.005,
	0.99: 0.001,
}

DefObjectives ...

View Source
var (
	// ErrBufferFull is logged in Subscribe() when a subscriber's
	// buffer is full and messages can no longer be enqueued for delivery
	ErrBufferFull = errors.New("error: subscriber buffer full")
)

Functions

func FullVersion

func FullVersion() string

FullVersion returns the full version, build and commit hash

func GenerateULID added in v0.1.16

func GenerateULID() (string, error)

GenerateULID generates a new unique identifer

func MustGenerateULID added in v0.1.16

func MustGenerateULID() string

MustGenerateULID generates a new unique identifer or fails

func SafeParseInt64 added in v0.1.15

func SafeParseInt64(s string, d int64) int64

SafeParseInt64 ...

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client ...

func NewClient

func NewClient(conn *websocket.Conn, topic *Topic, index int64, bus *MessageBus) *Client

NewClient ...

func (*Client) Start

func (c *Client) Start()

Start ...

type HandlerFunc

type HandlerFunc func(msg *Message) error

HandlerFunc ...

type Message

type Message struct {
	ID      int64     `json:"id"`
	Topic   *Topic    `json:"topic"`
	Payload []byte    `json:"payload"`
	Created time.Time `json:"created"`
}

Message ...

func LoadMessage added in v0.1.15

func LoadMessage(data []byte) (m Message, err error)

LoadMessage unmarshals a byte slice into a Message struct using msgpack. It returns the unmarshaled Message and any error encountered during the unmarshaling process.

func (Message) Bytes added in v0.1.15

func (m Message) Bytes() ([]byte, error)

Bytes marshals the Message into a byte slice using msgpack. It returns the marshalled byte slice and any error encountered during the marshaling process.

func (Message) String added in v0.1.21

func (m Message) String() string

type MessageBus

type MessageBus struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MessageBus ...

func NewMessageBus

func NewMessageBus(opts ...Option) (*MessageBus, error)

NewMessageBus creates a new message bus with the provided options

func (*MessageBus) Get

func (mb *MessageBus) Get(t *Topic) (Message, bool)

Get ...

func (*MessageBus) Len

func (mb *MessageBus) Len() int

Len ...

func (*MessageBus) Metrics

func (mb *MessageBus) Metrics() *Metrics

Metrics ...

func (*MessageBus) NewMessage

func (mb *MessageBus) NewMessage(topic *Topic, payload []byte) Message

NewMessage ...

func (*MessageBus) NewTopic

func (mb *MessageBus) NewTopic(topic string) *Topic

NewTopic ...

func (*MessageBus) Put

func (mb *MessageBus) Put(message Message) error

Put ...

func (*MessageBus) ServeHTTP

func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*MessageBus) Subscribe

func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan Message

Subscribe ...

func (*MessageBus) Unsubscribe

func (mb *MessageBus) Unsubscribe(id, topic string)

Unsubscribe ...

type Metrics

type Metrics struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Metrics ...

func NewMetrics

func NewMetrics(namespace string) *Metrics

NewMetrics ...

func (*Metrics) Counter

func (m *Metrics) Counter(subsystem, name string) prometheus.Counter

Counter ...

func (*Metrics) CounterVec added in v0.1.2

func (m *Metrics) CounterVec(subsystem, name string) *prometheus.CounterVec

CounterVec ...

func (*Metrics) Gauge

func (m *Metrics) Gauge(subsystem, name string) prometheus.Gauge

Gauge ...

func (*Metrics) GaugeVec

func (m *Metrics) GaugeVec(subsystem, name string) *prometheus.GaugeVec

GaugeVec ...

func (*Metrics) Handler

func (m *Metrics) Handler() http.Handler

Handler ...

func (*Metrics) NewCounter

func (m *Metrics) NewCounter(subsystem, name, help string) prometheus.Counter

NewCounter ...

func (*Metrics) NewCounterFunc

func (m *Metrics) NewCounterFunc(subsystem, name, help string, f func() float64) prometheus.CounterFunc

NewCounterFunc ...

func (*Metrics) NewCounterVec added in v0.1.2

func (m *Metrics) NewCounterVec(subsystem, name, help string, labels []string) *prometheus.CounterVec

NewCounterVec ...

func (*Metrics) NewGauge

func (m *Metrics) NewGauge(subsystem, name, help string) prometheus.Gauge

NewGauge ...

func (*Metrics) NewGaugeFunc

func (m *Metrics) NewGaugeFunc(subsystem, name, help string, f func() float64) prometheus.GaugeFunc

NewGaugeFunc ...

func (*Metrics) NewGaugeVec

func (m *Metrics) NewGaugeVec(subsystem, name, help string, labels []string) *prometheus.GaugeVec

NewGaugeVec ...

func (*Metrics) NewSummary

func (m *Metrics) NewSummary(subsystem, name, help string) prometheus.Summary

NewSummary ...

func (*Metrics) NewSummaryVec

func (m *Metrics) NewSummaryVec(subsystem, name, help string, labels []string) *prometheus.SummaryVec

NewSummaryVec ...

func (*Metrics) Run

func (m *Metrics) Run(addr string)

Run ...

func (*Metrics) Summary

func (m *Metrics) Summary(subsystem, name string) prometheus.Summary

Summary ...

func (*Metrics) SummaryVec

func (m *Metrics) SummaryVec(subsystem, name string) *prometheus.SummaryVec

SummaryVec ...

type Option added in v0.1.15

type Option func(opts *Options) error

Option is an option

func WithBufferLength added in v0.1.15

func WithBufferLength(bufferLength int) Option

WithBufferLength sets the buffer length for subscriber chans. This should be a power of two.

func WithLogPath added in v0.1.15

func WithLogPath(logPath string) Option

WithLogPath sets the path to write logs to (wal).

This will create the directory if it does not already exist.

func WithMaxPayloadSize added in v0.1.15

func WithMaxPayloadSize(maxPayloadSize int) Option

WithMaxPayloadSize sets the maximum payload size for messages. If a message exceeds this size, an error will be returned.

func WithMaxQueueSize added in v0.1.15

func WithMaxQueueSize(maxQueueSize int) Option

WithMaxQueueSize sets the maximum size of a queue. If a queue is full attempting to publish a message to it will result in an error.

func WithMetrics added in v0.1.15

func WithMetrics(metrics bool) Option

WithMetrics enables or disables metrics for the message bus. Metrics are enabled by default and should not be disabled unless you know what you are doing.

func WithNoSync added in v0.1.15

func WithNoSync(noSync bool) Option

WithNoSync disables or enables fsync after writing messages to the write-ahead-log (wal) files. The default is `false` which is safer and will prevent corruption in event of crahses or power failure, but is slower.

type Options

type Options struct {
	LogPath string

	BufferLength   int
	MaxQueueSize   int
	MaxPayloadSize int

	Metrics bool
	NoSync  bool
}

Options ...

func NewDefaultOptions added in v0.1.15

func NewDefaultOptions() *Options

NewDefaultOptions returns a new Options with default values set.

type Queue

type Queue struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Queue represents a single instance of a bounded queue data structure with access to both side. If maxLen is non-zero the queue is bounded otherwise unbounded.

func NewQueue added in v0.1.2

func NewQueue(maxLen ...int) *Queue

NewQueue creates a new instance of Queue with the provided maxLen

func (*Queue) Empty added in v0.1.2

func (q *Queue) Empty() bool

Empty returns true if the queue is empty false otherwise

func (*Queue) ForEach added in v0.1.13

func (q *Queue) ForEach(f func(elem interface{}) error) error

ForEach applies the function `f` over each item in the queue for read-only access into the queue in O(n) time.

func (*Queue) Full added in v0.1.2

func (q *Queue) Full() bool

Full returns true if the queue is full false otherwise

func (*Queue) Len

func (q *Queue) Len() int

Len returns the number of elements currently stored in the queue.

func (*Queue) MaxLen added in v0.1.2

func (q *Queue) MaxLen() int

MaxLen returns the maxLen of the queue

func (*Queue) Peek

func (q *Queue) Peek() interface{}

Peek returns the element at the front of the queue.

func (*Queue) Pop

func (q *Queue) Pop() interface{}

Pop removes and returns the element from the front of the queue.

func (*Queue) Push

func (q *Queue) Push(elem interface{})

Push appends an element to the back of the queue.

func (*Queue) Size added in v0.1.2

func (q *Queue) Size() int

Size returns the current size of the queue

type SubscribeOption added in v0.1.13

type SubscribeOption func(*SubscriberOptions)

SubscribeOption ...

func WithIndex added in v0.1.13

func WithIndex(index int64) SubscribeOption

WithIndex sets the index to start subscribing from

type SubscriberConfig added in v0.1.13

type SubscriberConfig struct {
	BufferLength int
}

SubscriberConfig is a configuration struct for Subscribers

type SubscriberOptions added in v0.1.13

type SubscriberOptions struct {
	Index int64
}

SubscriberOptions ...

type Subscribers added in v0.1.13

type Subscribers struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Subscribers is a thread-safe map of subscribers

func NewSubscribers added in v0.1.13

func NewSubscribers(config *SubscriberConfig) *Subscribers

NewSubscribers ...

func (*Subscribers) AddSubscriber added in v0.1.13

func (subs *Subscribers) AddSubscriber(id string) chan Message

AddSubscriber ...

func (*Subscribers) GetSubscriber added in v0.1.13

func (subs *Subscribers) GetSubscriber(id string) (chan Message, bool)

GetSubscriber ...

func (*Subscribers) HasSubscriber added in v0.1.13

func (subs *Subscribers) HasSubscriber(id string) bool

HasSubscriber ...

func (*Subscribers) Len added in v0.1.13

func (subs *Subscribers) Len() int

Len ...

func (*Subscribers) NotifyAll added in v0.1.13

func (subs *Subscribers) NotifyAll(message Message) int

NotifyAll ...

func (*Subscribers) RemoveSubscriber added in v0.1.13

func (subs *Subscribers) RemoveSubscriber(id string)

RemoveSubscriber ...

type Topic

type Topic struct {
	Name     string    `json:"name"`
	Sequence int64     `json:"seq"`
	Created  time.Time `json:"created"`
}

Topic ...

func (*Topic) String added in v0.1.2

func (t *Topic) String() string

Directories

Path Synopsis
Package client is a client library for the msgbus server
Package client is a client library for the msgbus server
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL