Documentation ¶
Overview ¶
Package msgbus is a real-time message bus server and library written in Go
Index ¶
- Constants
- Variables
- func FullVersion() string
- func GenerateULID() (string, error)
- func MustGenerateULID() string
- func SafeParseInt64(s string, d int64) int64
- type Client
- type HandlerFunc
- type Message
- type MessageBus
- func (mb *MessageBus) Get(t *Topic) (Message, bool)
- func (mb *MessageBus) Len() int
- func (mb *MessageBus) Metrics() *Metrics
- func (mb *MessageBus) NewMessage(topic *Topic, payload []byte) Message
- func (mb *MessageBus) NewTopic(topic string) *Topic
- func (mb *MessageBus) Put(message Message) error
- func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan Message
- func (mb *MessageBus) Unsubscribe(id, topic string)
- type Metrics
- func (m *Metrics) Counter(subsystem, name string) prometheus.Counter
- func (m *Metrics) CounterVec(subsystem, name string) *prometheus.CounterVec
- func (m *Metrics) Gauge(subsystem, name string) prometheus.Gauge
- func (m *Metrics) GaugeVec(subsystem, name string) *prometheus.GaugeVec
- func (m *Metrics) Handler() http.Handler
- func (m *Metrics) NewCounter(subsystem, name, help string) prometheus.Counter
- func (m *Metrics) NewCounterFunc(subsystem, name, help string, f func() float64) prometheus.CounterFunc
- func (m *Metrics) NewCounterVec(subsystem, name, help string, labels []string) *prometheus.CounterVec
- func (m *Metrics) NewGauge(subsystem, name, help string) prometheus.Gauge
- func (m *Metrics) NewGaugeFunc(subsystem, name, help string, f func() float64) prometheus.GaugeFunc
- func (m *Metrics) NewGaugeVec(subsystem, name, help string, labels []string) *prometheus.GaugeVec
- func (m *Metrics) NewSummary(subsystem, name, help string) prometheus.Summary
- func (m *Metrics) NewSummaryVec(subsystem, name, help string, labels []string) *prometheus.SummaryVec
- func (m *Metrics) Run(addr string)
- func (m *Metrics) Summary(subsystem, name string) prometheus.Summary
- func (m *Metrics) SummaryVec(subsystem, name string) *prometheus.SummaryVec
- type Option
- type Options
- type Queue
- func (q *Queue) Empty() bool
- func (q *Queue) ForEach(f func(elem interface{}) error) error
- func (q *Queue) Full() bool
- func (q *Queue) Len() int
- func (q *Queue) MaxLen() int
- func (q *Queue) Peek() interface{}
- func (q *Queue) Pop() interface{}
- func (q *Queue) Push(elem interface{})
- func (q *Queue) Size() int
- type SubscribeOption
- type SubscriberConfig
- type SubscriberOptions
- type Subscribers
- func (subs *Subscribers) AddSubscriber(id string) chan Message
- func (subs *Subscribers) GetSubscriber(id string) (chan Message, bool)
- func (subs *Subscribers) HasSubscriber(id string) bool
- func (subs *Subscribers) Len() int
- func (subs *Subscribers) NotifyAll(message Message) int
- func (subs *Subscribers) RemoveSubscriber(id string)
- type Topic
Constants ¶
const ( // DefaultBind is the default bind address DefaultBind = ":8000" // DefaultLogPath is the default path to write logs to (wal) DefaultLogPath = "./logs" // DefaultMaxQueueSize is the default maximum size of queues DefaultMaxQueueSize = 1024 // ~8MB per queue (1000 * 4KB) // DefaultMaxPayloadSize is the default maximum payload size DefaultMaxPayloadSize = 8192 // 8KB // DefaultBufferLength is the default buffer length for subscriber chans DefaultBufferLength = 256 // DefaultMetrics is the default for whether to enable metrics DefaultMetrics = false // DefaultNoSync is the default for whether to disable faync after writing // messages to the write-ahead-log (wal) files. The default is `false` which // is safer and will prevent corruption in event of crahses or power failure, // but is slower. DefaultNoSync = false )
Variables ¶
var ( // Version release version Version = "0.1.1" // Commit will be overwritten automatically by the build system Commit = "HEAD" )
var DefObjectives = map[float64]float64{
0.50: 0.05,
0.90: 0.01,
0.95: 0.005,
0.99: 0.001,
}
DefObjectives ...
var ( // ErrBufferFull is logged in Subscribe() when a subscriber's // buffer is full and messages can no longer be enqueued for delivery ErrBufferFull = errors.New("error: subscriber buffer full") )
Functions ¶
func FullVersion ¶
func FullVersion() string
FullVersion returns the full version, build and commit hash
func GenerateULID ¶ added in v0.1.16
GenerateULID generates a new unique identifer
func MustGenerateULID ¶ added in v0.1.16
func MustGenerateULID() string
MustGenerateULID generates a new unique identifer or fails
func SafeParseInt64 ¶ added in v0.1.15
SafeParseInt64 ...
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client ...
type Message ¶
type Message struct { ID int64 `json:"id"` Topic *Topic `json:"topic"` Payload []byte `json:"payload"` Created time.Time `json:"created"` }
Message ...
func LoadMessage ¶ added in v0.1.15
LoadMessage unmarshals a byte slice into a Message struct using msgpack. It returns the unmarshaled Message and any error encountered during the unmarshaling process.
type MessageBus ¶
MessageBus ...
func NewMessageBus ¶
func NewMessageBus(opts ...Option) (*MessageBus, error)
NewMessageBus creates a new message bus with the provided options
func (*MessageBus) NewMessage ¶
func (mb *MessageBus) NewMessage(topic *Topic, payload []byte) Message
NewMessage ...
func (*MessageBus) ServeHTTP ¶
func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request)
func (*MessageBus) Subscribe ¶
func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan Message
Subscribe ...
func (*MessageBus) Unsubscribe ¶
func (mb *MessageBus) Unsubscribe(id, topic string)
Unsubscribe ...
type Metrics ¶
Metrics ...
func (*Metrics) Counter ¶
func (m *Metrics) Counter(subsystem, name string) prometheus.Counter
Counter ...
func (*Metrics) CounterVec ¶ added in v0.1.2
func (m *Metrics) CounterVec(subsystem, name string) *prometheus.CounterVec
CounterVec ...
func (*Metrics) GaugeVec ¶
func (m *Metrics) GaugeVec(subsystem, name string) *prometheus.GaugeVec
GaugeVec ...
func (*Metrics) NewCounter ¶
func (m *Metrics) NewCounter(subsystem, name, help string) prometheus.Counter
NewCounter ...
func (*Metrics) NewCounterFunc ¶
func (m *Metrics) NewCounterFunc(subsystem, name, help string, f func() float64) prometheus.CounterFunc
NewCounterFunc ...
func (*Metrics) NewCounterVec ¶ added in v0.1.2
func (m *Metrics) NewCounterVec(subsystem, name, help string, labels []string) *prometheus.CounterVec
NewCounterVec ...
func (*Metrics) NewGauge ¶
func (m *Metrics) NewGauge(subsystem, name, help string) prometheus.Gauge
NewGauge ...
func (*Metrics) NewGaugeFunc ¶
func (m *Metrics) NewGaugeFunc(subsystem, name, help string, f func() float64) prometheus.GaugeFunc
NewGaugeFunc ...
func (*Metrics) NewGaugeVec ¶
func (m *Metrics) NewGaugeVec(subsystem, name, help string, labels []string) *prometheus.GaugeVec
NewGaugeVec ...
func (*Metrics) NewSummary ¶
func (m *Metrics) NewSummary(subsystem, name, help string) prometheus.Summary
NewSummary ...
func (*Metrics) NewSummaryVec ¶
func (m *Metrics) NewSummaryVec(subsystem, name, help string, labels []string) *prometheus.SummaryVec
NewSummaryVec ...
func (*Metrics) Summary ¶
func (m *Metrics) Summary(subsystem, name string) prometheus.Summary
Summary ...
func (*Metrics) SummaryVec ¶
func (m *Metrics) SummaryVec(subsystem, name string) *prometheus.SummaryVec
SummaryVec ...
type Option ¶ added in v0.1.15
Option is an option
func WithBufferLength ¶ added in v0.1.15
WithBufferLength sets the buffer length for subscriber chans. This should be a power of two.
func WithLogPath ¶ added in v0.1.15
WithLogPath sets the path to write logs to (wal).
This will create the directory if it does not already exist.
func WithMaxPayloadSize ¶ added in v0.1.15
WithMaxPayloadSize sets the maximum payload size for messages. If a message exceeds this size, an error will be returned.
func WithMaxQueueSize ¶ added in v0.1.15
WithMaxQueueSize sets the maximum size of a queue. If a queue is full attempting to publish a message to it will result in an error.
func WithMetrics ¶ added in v0.1.15
WithMetrics enables or disables metrics for the message bus. Metrics are enabled by default and should not be disabled unless you know what you are doing.
func WithNoSync ¶ added in v0.1.15
WithNoSync disables or enables fsync after writing messages to the write-ahead-log (wal) files. The default is `false` which is safer and will prevent corruption in event of crahses or power failure, but is slower.
type Options ¶
type Options struct { LogPath string BufferLength int MaxQueueSize int MaxPayloadSize int Metrics bool NoSync bool }
Options ...
func NewDefaultOptions ¶ added in v0.1.15
func NewDefaultOptions() *Options
NewDefaultOptions returns a new Options with default values set.
type Queue ¶
Queue represents a single instance of a bounded queue data structure with access to both side. If maxLen is non-zero the queue is bounded otherwise unbounded.
func (*Queue) ForEach ¶ added in v0.1.13
ForEach applies the function `f` over each item in the queue for read-only access into the queue in O(n) time.
func (*Queue) Peek ¶
func (q *Queue) Peek() interface{}
Peek returns the element at the front of the queue.
func (*Queue) Pop ¶
func (q *Queue) Pop() interface{}
Pop removes and returns the element from the front of the queue.
type SubscribeOption ¶ added in v0.1.13
type SubscribeOption func(*SubscriberOptions)
SubscribeOption ...
func WithIndex ¶ added in v0.1.13
func WithIndex(index int64) SubscribeOption
WithIndex sets the index to start subscribing from
type SubscriberConfig ¶ added in v0.1.13
type SubscriberConfig struct {
BufferLength int
}
SubscriberConfig is a configuration struct for Subscribers
type SubscriberOptions ¶ added in v0.1.13
type SubscriberOptions struct {
Index int64
}
SubscriberOptions ...
type Subscribers ¶ added in v0.1.13
Subscribers is a thread-safe map of subscribers
func NewSubscribers ¶ added in v0.1.13
func NewSubscribers(config *SubscriberConfig) *Subscribers
NewSubscribers ...
func (*Subscribers) AddSubscriber ¶ added in v0.1.13
func (subs *Subscribers) AddSubscriber(id string) chan Message
AddSubscriber ...
func (*Subscribers) GetSubscriber ¶ added in v0.1.13
func (subs *Subscribers) GetSubscriber(id string) (chan Message, bool)
GetSubscriber ...
func (*Subscribers) HasSubscriber ¶ added in v0.1.13
func (subs *Subscribers) HasSubscriber(id string) bool
HasSubscriber ...
func (*Subscribers) NotifyAll ¶ added in v0.1.13
func (subs *Subscribers) NotifyAll(message Message) int
NotifyAll ...
func (*Subscribers) RemoveSubscriber ¶ added in v0.1.13
func (subs *Subscribers) RemoveSubscriber(id string)
RemoveSubscriber ...