msgbus

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2022 License: MIT Imports: 12 Imported by: 3

README

msgbus

Build Status CodeCov Go Report Card GoDoc GitHub license Sourcegraph

A real-time message bus server and library written in Go.

Features

  • Simple HTTP API
  • Simple command-line client
  • In memory queues
  • WebSockets for real-time messages
  • Pull and Push model

Install

$ go install git.mills.io/prologic/msgbus/...

Use Cases

  • As a simple generic webhook

You can use msgbus as a simple generic webhook. For example in my dockerfiles repo I have hooked up Prometheus's AlertManager to send alert notifications to an IRC channel using some simple shell scripts.

See: alert

  • As a general-purpose message / event bus that supports pub/sub as well as pulling messages synchronously.

Usage (library)

Install the package into your project:

$ go get git.mills.io/prologic/msgbus

Use the MessageBus type either directly:

package main

import (
    "log"

    "git.mills.io/prologic/msgbus"
)

func main() {
    m := msgbus.New()
    m.Put("foo", m.NewMessage([]byte("Hello World!")))

    msg, ok := m.Get("foo")
    if !ok {
        log.Printf("No more messages in queue: foo")
    } else {
        log.Printf
	    "Received message: id=%s topic=%s payload=%s",
	    msg.ID, msg.Topic, msg.Payload,
	)
    }
}

Running this example should yield something like this:

$ go run examples/hello.go
2017/08/09 03:01:54 [msgbus] PUT id=0 topic=foo payload=Hello World!
2017/08/09 03:01:54 [msgbus] NotifyAll id=0 topic=foo payload=Hello World!
2017/08/09 03:01:54 [msgbus] GET topic=foo
2017/08/09 03:01:54 Received message: id=%!s(uint64=0) topic=foo payload=Hello World!

See the godoc for further documentation and other examples.

Usage (tool)

Run the message bus daemon/server:

$ msgbusd
2017/08/07 01:11:16 [msgbus] Subscribe id=[::1]:55341 topic=foo
2017/08/07 01:11:22 [msgbus] PUT id=0 topic=foo payload=hi
2017/08/07 01:11:22 [msgbus] NotifyAll id=0 topic=foo payload=hi
2017/08/07 01:11:26 [msgbus] PUT id=1 topic=foo payload=bye
2017/08/07 01:11:26 [msgbus] NotifyAll id=1 topic=foo payload=bye
2017/08/07 01:11:33 [msgbus] GET topic=foo
2017/08/07 01:11:33 [msgbus] GET topic=foo
2017/08/07 01:11:33 [msgbus] GET topic=foo

Subscribe to a topic using the message bus client:

$ msgbus sub foo
2017/08/07 01:11:22 [msgbus] received message: id=0 topic=foo payload=hi
2017/08/07 01:11:26 [msgbus] received message: id=1 topic=foo payload=bye

Send a few messages with the message bus client:

$ msgbus pub foo hi
$ msgbus pub foo bye

You can also manually pull messages using the client:

$ msgbus pull foo
2017/08/07 01:11:33 [msgbus] received message: id=0 topic=foo payload=hi
2017/08/07 01:11:33 [msgbus] received message: id=1 topic=foo payload=bye

This is slightly different from a listening subscriber (using websockets) where messages are pulled directly.

Usage (HTTP)

Run the message bus daemon/server:

$ msgbusd
2018/03/25 13:21:18 msgbusd listening on :8000

Send a message with using curl:

$ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello

Pull the messages off the "hello" queue using curl:

$ curl -q -o - http://localhost:8000/hello
{"id":0,"topic":{"name":"hello","ttl":60000000000,"seq":1,"created":"2018-03-25T13:18:38.732437-07:00"},"payload":"eyJtZXNzYWdlIjogImhlbGxvIn0=","created":"2018-03-25T13:18:38.732465-07:00"}

Decode the payload:

$ echo 'eyJtZXNzYWdlIjogImhlbGxvIn0=' | base64 -d
{"message": "hello"}

API

GET /

List all known topics/queues.

Example:

$ curl -q -o - http://localhost:8000/ | jq '.'
{
  "hello": {
    "name": "hello",
    "ttl": 60000000000,
    "seq": 1,
    "created": "2018-05-07T23:44:25.681392205-07:00"
  }
}

POST|PUT /topic

Post a new message to the queue named by <topic>.

NB: Either POST or PUT methods can be used here.

Example:

$ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello
message successfully published to hello with sequence 1

GET /topic

Get the next message of the queue named by <topic>.

  • If the topic is not found. Returns: 404 Not Found
  • If the Websockets Upgrade header is found, upgrades to a websocket channel and subscribes to the topic <topic>. Each new message published to the topic <topic> are instantly published to all subscribers.

Example:

$ curl -q -o - http://localhost:8000/hello
{"id":0,"topic":{"name":"hello","ttl":60000000000,"seq":1,"created":"2018-03-25T13:18:38.732437-07:00"},"payload":"eyJtZXNzYWdlIjogImhlbGxvIn0=","created":"2018-03-25T13:18:38.732465-07:00"}

DELETE /topic

Deletes a queue named by <topic>.

Not implemented.

  • je -- A distributed job execution engine for the execution of batch jobs, workflows, remediations and more.

License

msgbus is licensed under the MIT License

Documentation

Index

Constants

View Source
const (
	// DefaultMaxQueueSize is the default maximum size of queues
	DefaultMaxQueueSize = 1000 // ~4MB per queue (1000 * 4KB)

	// DefaultMaxPayloadSize is the default maximum payload size
	DefaultMaxPayloadSize = 8192 // 8KB

	// DefaultBufferLength is the default buffer length for subscriber chans
	DefaultBufferLength = 100
)

Variables

View Source
var (
	// Version release version
	Version = "0.1.1"

	// Build will be overwritten automatically by the build system
	Build = "dev"

	// GitCommit will be overwritten automatically by the build system
	GitCommit = "HEAD"
)
View Source
var DefObjectives = map[float64]float64{
	0.50: 0.05,
	0.90: 0.01,
	0.95: 0.005,
	0.99: 0.001,
}

DefObjectives ...

Functions

func FullVersion

func FullVersion() string

FullVersion returns the full version, build and commit hash

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client ...

func NewClient

func NewClient(conn *websocket.Conn, topic *Topic, bus *MessageBus) *Client

NewClient ...

func (*Client) Start

func (c *Client) Start()

Start ...

type HandlerFunc

type HandlerFunc func(msg *Message) error

HandlerFunc ...

type ListenerOptions added in v0.1.2

type ListenerOptions struct {
	BufferLength int
}

ListenerOptions ...

type Listeners

type Listeners struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Listeners ...

func NewListeners

func NewListeners(options *ListenerOptions) *Listeners

NewListeners ...

func (*Listeners) Add

func (ls *Listeners) Add(id string) chan Message

Add ...

func (*Listeners) Exists

func (ls *Listeners) Exists(id string) bool

Exists ...

func (*Listeners) Get

func (ls *Listeners) Get(id string) (chan Message, bool)

Get ...

func (*Listeners) Length

func (ls *Listeners) Length() int

Length ...

func (*Listeners) NotifyAll

func (ls *Listeners) NotifyAll(message Message) int

NotifyAll ...

func (*Listeners) Remove

func (ls *Listeners) Remove(id string)

Remove ...

type Message

type Message struct {
	ID      uint64    `json:"id"`
	Topic   *Topic    `json:"topic"`
	Payload []byte    `json:"payload"`
	Created time.Time `json:"created"`
}

Message ...

type MessageBus

type MessageBus struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MessageBus ...

func New added in v0.1.2

func New(options *Options) *MessageBus

New ...

func (*MessageBus) Get

func (mb *MessageBus) Get(t *Topic) (Message, bool)

Get ...

func (*MessageBus) Len

func (mb *MessageBus) Len() int

Len ...

func (*MessageBus) Metrics

func (mb *MessageBus) Metrics() *Metrics

Metrics ...

func (*MessageBus) NewMessage

func (mb *MessageBus) NewMessage(topic *Topic, payload []byte) Message

NewMessage ...

func (*MessageBus) NewTopic

func (mb *MessageBus) NewTopic(topic string) *Topic

NewTopic ...

func (*MessageBus) Put

func (mb *MessageBus) Put(message Message)

Put ...

func (*MessageBus) ServeHTTP

func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*MessageBus) Subscribe

func (mb *MessageBus) Subscribe(id, topic string) chan Message

Subscribe ...

func (*MessageBus) Unsubscribe

func (mb *MessageBus) Unsubscribe(id, topic string)

Unsubscribe ...

type Metrics

type Metrics struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Metrics ...

func NewMetrics

func NewMetrics(namespace string) *Metrics

NewMetrics ...

func (*Metrics) Counter

func (m *Metrics) Counter(subsystem, name string) prometheus.Counter

Counter ...

func (*Metrics) CounterVec added in v0.1.2

func (m *Metrics) CounterVec(subsystem, name string) *prometheus.CounterVec

CounterVec ...

func (*Metrics) Gauge

func (m *Metrics) Gauge(subsystem, name string) prometheus.Gauge

Gauge ...

func (*Metrics) GaugeVec

func (m *Metrics) GaugeVec(subsystem, name string) *prometheus.GaugeVec

GaugeVec ...

func (*Metrics) Handler

func (m *Metrics) Handler() http.Handler

Handler ...

func (*Metrics) NewCounter

func (m *Metrics) NewCounter(subsystem, name, help string) prometheus.Counter

NewCounter ...

func (*Metrics) NewCounterFunc

func (m *Metrics) NewCounterFunc(subsystem, name, help string, f func() float64) prometheus.CounterFunc

NewCounterFunc ...

func (*Metrics) NewCounterVec added in v0.1.2

func (m *Metrics) NewCounterVec(subsystem, name, help string, labels []string) *prometheus.CounterVec

NewCounterVec ...

func (*Metrics) NewGauge

func (m *Metrics) NewGauge(subsystem, name, help string) prometheus.Gauge

NewGauge ...

func (*Metrics) NewGaugeFunc

func (m *Metrics) NewGaugeFunc(subsystem, name, help string, f func() float64) prometheus.GaugeFunc

NewGaugeFunc ...

func (*Metrics) NewGaugeVec

func (m *Metrics) NewGaugeVec(subsystem, name, help string, labels []string) *prometheus.GaugeVec

NewGaugeVec ...

func (*Metrics) NewSummary

func (m *Metrics) NewSummary(subsystem, name, help string) prometheus.Summary

NewSummary ...

func (*Metrics) NewSummaryVec

func (m *Metrics) NewSummaryVec(subsystem, name, help string, labels []string) *prometheus.SummaryVec

NewSummaryVec ...

func (*Metrics) Run

func (m *Metrics) Run(addr string)

Run ...

func (*Metrics) Summary

func (m *Metrics) Summary(subsystem, name string) prometheus.Summary

Summary ...

func (*Metrics) SummaryVec

func (m *Metrics) SummaryVec(subsystem, name string) *prometheus.SummaryVec

SummaryVec ...

type Options

type Options struct {
	BufferLength   int
	MaxQueueSize   int
	MaxPayloadSize int
	WithMetrics    bool
}

Options ...

type Queue

type Queue struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Queue represents a single instance of a bounded queue data structure with access to both side. If maxlen is non-zero the queue is bounded otherwise unbounded.

func NewQueue added in v0.1.2

func NewQueue(maxlen ...int) *Queue

NewQueue creates a new instance of Queue with the provided maxlen

func (*Queue) Empty added in v0.1.2

func (q *Queue) Empty() bool

Empty returns true if the queue is empty false otherwise

func (*Queue) Full added in v0.1.2

func (q *Queue) Full() bool

Full returns true if the queue is full false otherwise

func (*Queue) Len

func (q *Queue) Len() int

Len returns the number of elements currently stored in the queue.

func (*Queue) MaxLen added in v0.1.2

func (q *Queue) MaxLen() int

MaxLen returns the maxlen of the queue

func (*Queue) Peek

func (q *Queue) Peek() interface{}

Peek returns the element at the front of the queue.

func (*Queue) Pop

func (q *Queue) Pop() interface{}

Pop removes and returns the element from the front of the queue.

func (*Queue) Push

func (q *Queue) Push(elem interface{})

Push appends an element to the back of the queue.

func (*Queue) Size added in v0.1.2

func (q *Queue) Size() int

Size returns the current size of the queue

type Topic

type Topic struct {
	Name     string    `json:"name"`
	Sequence uint64    `json:"seq"`
	Created  time.Time `json:"created"`
}

Topic ...

func (*Topic) String added in v0.1.2

func (t *Topic) String() string

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL