Documentation ¶
Index ¶
- Constants
- Variables
- func FullVersion() string
- func GenerateULID() (string, error)
- func MustGenerateULID() string
- func SafeParseInt64(s string, d int64) int64
- type Client
- type HandlerFunc
- type Message
- type MessageBus
- func (mb *MessageBus) Get(t *Topic) (Message, bool)
- func (mb *MessageBus) Len() int
- func (mb *MessageBus) Metrics() *Metrics
- func (mb *MessageBus) NewMessage(topic *Topic, payload []byte) Message
- func (mb *MessageBus) NewTopic(topic string) *Topic
- func (mb *MessageBus) Put(message Message) error
- func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan Message
- func (mb *MessageBus) Unsubscribe(id, topic string)
- type Metrics
- func (m *Metrics) Counter(subsystem, name string) prometheus.Counter
- func (m *Metrics) CounterVec(subsystem, name string) *prometheus.CounterVec
- func (m *Metrics) Gauge(subsystem, name string) prometheus.Gauge
- func (m *Metrics) GaugeVec(subsystem, name string) *prometheus.GaugeVec
- func (m *Metrics) Handler() http.Handler
- func (m *Metrics) NewCounter(subsystem, name, help string) prometheus.Counter
- func (m *Metrics) NewCounterFunc(subsystem, name, help string, f func() float64) prometheus.CounterFunc
- func (m *Metrics) NewCounterVec(subsystem, name, help string, labels []string) *prometheus.CounterVec
- func (m *Metrics) NewGauge(subsystem, name, help string) prometheus.Gauge
- func (m *Metrics) NewGaugeFunc(subsystem, name, help string, f func() float64) prometheus.GaugeFunc
- func (m *Metrics) NewGaugeVec(subsystem, name, help string, labels []string) *prometheus.GaugeVec
- func (m *Metrics) NewSummary(subsystem, name, help string) prometheus.Summary
- func (m *Metrics) NewSummaryVec(subsystem, name, help string, labels []string) *prometheus.SummaryVec
- func (m *Metrics) Run(addr string)
- func (m *Metrics) Summary(subsystem, name string) prometheus.Summary
- func (m *Metrics) SummaryVec(subsystem, name string) *prometheus.SummaryVec
- type Option
- type Options
- type Queue
- func (q *Queue) Empty() bool
- func (q *Queue) ForEach(f func(elem interface{}) error) error
- func (q *Queue) Full() bool
- func (q *Queue) Len() int
- func (q *Queue) MaxLen() int
- func (q *Queue) Peek() interface{}
- func (q *Queue) Pop() interface{}
- func (q *Queue) Push(elem interface{})
- func (q *Queue) Size() int
- type SubscribeOption
- type SubscriberConfig
- type SubscriberOptions
- type Subscribers
- func (subs *Subscribers) AddSubscriber(id string) chan Message
- func (subs *Subscribers) GetSubscriber(id string) (chan Message, bool)
- func (subs *Subscribers) HasSubscriber(id string) bool
- func (subs *Subscribers) Len() int
- func (subs *Subscribers) NotifyAll(message Message) int
- func (subs *Subscribers) RemoveSubscriber(id string)
- type Topic
Constants ¶
const ( // DefaultBind is the default bind address DefaultBind = ":8000" // DefaultLogPath is the default path to write logs to (wal) DefaultLogPath = "./logs" // DefaultMaxQueueSize is the default maximum size of queues DefaultMaxQueueSize = 1024 // ~8MB per queue (1000 * 4KB) // DefaultMaxPayloadSize is the default maximum payload size DefaultMaxPayloadSize = 8192 // 8KB // DefaultBufferLength is the default buffer length for subscriber chans DefaultBufferLength = 256 // DefaultMetrics is the default for whether to enable metrics DefaultMetrics = false // DefaultNoSync is the default for whether to disable faync after writing // messages to the write-ahead-log (wal) files. The default is `false` which // is safer and will prevent corruption in event of crahses or power failure, // but is slower. DefaultNoSync = false )
Variables ¶
var ( // Version release version Version = "0.1.1" // Commit will be overwritten automatically by the build system Commit = "HEAD" )
var DefObjectives = map[float64]float64{
0.50: 0.05,
0.90: 0.01,
0.95: 0.005,
0.99: 0.001,
}
DefObjectives ...
var ( // BufferFull is logged in Subscribe() when a subscriber's // buffer is full and messages can no longer be enqueued for delivery ErrBufferFull = errors.New("error: subscriber buffer full") )
Functions ¶
func FullVersion ¶
func FullVersion() string
FullVersion returns the full version, build and commit hash
func GenerateULID ¶ added in v0.1.16
GenerateULID generates a new unique identifer
func MustGenerateULID ¶ added in v0.1.16
func MustGenerateULID() string
MustGenerateULID generates a new unique identifer or fails
func SafeParseInt64 ¶ added in v0.1.15
SafeParseInt64 ...
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client ...
type Message ¶
type Message struct { ID int64 `json:"id"` Topic *Topic `json:"topic"` Payload []byte `json:"payload"` Created time.Time `json:"created"` }
Message ...
func LoadMessage ¶ added in v0.1.15
type MessageBus ¶
MessageBus ...
func NewMessageBus ¶
func NewMessageBus(opts ...Option) (*MessageBus, error)
NewMessageBus creates a new message bus with the provided options
func (*MessageBus) NewMessage ¶
func (mb *MessageBus) NewMessage(topic *Topic, payload []byte) Message
NewMessage ...
func (*MessageBus) ServeHTTP ¶
func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request)
func (*MessageBus) Subscribe ¶
func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan Message
Subscribe ...
func (*MessageBus) Unsubscribe ¶
func (mb *MessageBus) Unsubscribe(id, topic string)
Unsubscribe ...
type Metrics ¶
Metrics ...
func (*Metrics) Counter ¶
func (m *Metrics) Counter(subsystem, name string) prometheus.Counter
Counter ...
func (*Metrics) CounterVec ¶ added in v0.1.2
func (m *Metrics) CounterVec(subsystem, name string) *prometheus.CounterVec
CounterVec ...
func (*Metrics) GaugeVec ¶
func (m *Metrics) GaugeVec(subsystem, name string) *prometheus.GaugeVec
GaugeVec ...
func (*Metrics) NewCounter ¶
func (m *Metrics) NewCounter(subsystem, name, help string) prometheus.Counter
NewCounter ...
func (*Metrics) NewCounterFunc ¶
func (m *Metrics) NewCounterFunc(subsystem, name, help string, f func() float64) prometheus.CounterFunc
NewCounterFunc ...
func (*Metrics) NewCounterVec ¶ added in v0.1.2
func (m *Metrics) NewCounterVec(subsystem, name, help string, labels []string) *prometheus.CounterVec
NewCounterVec ...
func (*Metrics) NewGauge ¶
func (m *Metrics) NewGauge(subsystem, name, help string) prometheus.Gauge
NewGauge ...
func (*Metrics) NewGaugeFunc ¶
func (m *Metrics) NewGaugeFunc(subsystem, name, help string, f func() float64) prometheus.GaugeFunc
NewGaugeFunc ...
func (*Metrics) NewGaugeVec ¶
func (m *Metrics) NewGaugeVec(subsystem, name, help string, labels []string) *prometheus.GaugeVec
NewGaugeVec ...
func (*Metrics) NewSummary ¶
func (m *Metrics) NewSummary(subsystem, name, help string) prometheus.Summary
NewSummary ...
func (*Metrics) NewSummaryVec ¶
func (m *Metrics) NewSummaryVec(subsystem, name, help string, labels []string) *prometheus.SummaryVec
NewSummaryVec ...
func (*Metrics) Summary ¶
func (m *Metrics) Summary(subsystem, name string) prometheus.Summary
Summary ...
func (*Metrics) SummaryVec ¶
func (m *Metrics) SummaryVec(subsystem, name string) *prometheus.SummaryVec
SummaryVec ...
type Option ¶ added in v0.1.15
func WithBufferLength ¶ added in v0.1.15
func WithLogPath ¶ added in v0.1.15
func WithMaxPayloadSize ¶ added in v0.1.15
func WithMaxQueueSize ¶ added in v0.1.15
func WithMetrics ¶ added in v0.1.15
func WithNoSync ¶ added in v0.1.15
type Options ¶
type Options struct { LogPath string BufferLength int MaxQueueSize int MaxPayloadSize int Metrics bool NoSync bool }
Options ...
func NewDefaultOptions ¶ added in v0.1.15
func NewDefaultOptions() *Options
type Queue ¶
Queue represents a single instance of a bounded queue data structure with access to both side. If maxlen is non-zero the queue is bounded otherwise unbounded.
func (*Queue) ForEach ¶ added in v0.1.13
ForEach applys the function `f` over each item in the queue for read-only access into the queue in O(n) time for indexining into the queue.
func (*Queue) Peek ¶
func (q *Queue) Peek() interface{}
Peek returns the element at the front of the queue.
func (*Queue) Pop ¶
func (q *Queue) Pop() interface{}
Pop removes and returns the element from the front of the queue.
type SubscribeOption ¶ added in v0.1.13
type SubscribeOption func(*SubscriberOptions)
SubscribeOption ...
func WithIndex ¶ added in v0.1.13
func WithIndex(index int64) SubscribeOption
WithIndex sets the index to start subscribing from
type SubscriberConfig ¶ added in v0.1.13
type SubscriberConfig struct {
BufferLength int
}
SubscribersConfig ...
type SubscriberOptions ¶ added in v0.1.13
type SubscriberOptions struct {
Index int64
}
SubscriberOptions ...
type Subscribers ¶ added in v0.1.13
Subscribers ...
func NewSubscribers ¶ added in v0.1.13
func NewSubscribers(config *SubscriberConfig) *Subscribers
NewSubscribers ...
func (*Subscribers) AddSubscriber ¶ added in v0.1.13
func (subs *Subscribers) AddSubscriber(id string) chan Message
AddSubscriber ...
func (*Subscribers) GetSubscriber ¶ added in v0.1.13
func (subs *Subscribers) GetSubscriber(id string) (chan Message, bool)
GetSubscriber ...
func (*Subscribers) HasSubscriber ¶ added in v0.1.13
func (subs *Subscribers) HasSubscriber(id string) bool
HasSubscriber ...
func (*Subscribers) NotifyAll ¶ added in v0.1.13
func (subs *Subscribers) NotifyAll(message Message) int
NotifyAll ...
func (*Subscribers) RemoveSubscriber ¶ added in v0.1.13
func (subs *Subscribers) RemoveSubscriber(id string)
RemoveSubscriber ...