Documentation ¶
Index ¶
- func DefaultReadCloser(snapshotFile string) (io.ReadCloser, error)
- func DefaultWriteCloser(snapshotFile string) (io.WriteCloser, error)
- func SnapshotTimer(ctx context.Context, interval time.Duration, newEncoder EncoderFactory, ...) <-chan Event
- type Channel
- type ChannelDetail
- type Config
- type DataStore
- type Decoder
- type Encoder
- type EncoderFactory
- type Engine
- func (eng *Engine) GetChannelDetails(name string) (ChannelDetail, bool)
- func (eng *Engine) ListChannels() []ChannelDetail
- func (eng *Engine) Process(ctx context.Context, eventCh <-chan Event)
- func (eng *Engine) Run() error
- func (eng *Engine) Stats() StatsAPI
- func (eng *Engine) With(opts ...Opt) *Engine
- type Event
- func AddChannel(name string, size uint) Event
- func Pop(ctx context.Context, queueName string) (Event, <-chan []byte)
- func PopNow(queueName string) (Event, <-chan []byte)
- func PushMessage(name string, message []byte) (Event, <-chan bool)
- func RemoveChannel(name string) Event
- func Snapshot(encoder Encoder, done chan struct{}) Event
- type EventFn
- type Opt
- type ReadCloserFactory
- type StatsAPI
- type WriteCloserFactory
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultReadCloser ¶
func DefaultReadCloser(snapshotFile string) (io.ReadCloser, error)
func DefaultWriteCloser ¶
func DefaultWriteCloser(snapshotFile string) (io.WriteCloser, error)
func SnapshotTimer ¶
func SnapshotTimer( ctx context.Context, interval time.Duration, newEncoder EncoderFactory, newWriteCloser WriteCloserFactory, ) <-chan Event
SnapshotTimer produces Snapshot Events on the returned channel until ctx is Done. `interval specifies how often to produce events. `newEncoder` is a factory that builds encoders to serialize the Engine State `newWriteCloser` is a factory that builds io.WriteClosers to save the serialized state to.
Types ¶
type Channel ¶
type Channel struct { // Buffer is an array of arbitrary messages. Buffer [][]byte // Size is the maximum number of messages to buffer in this queue at any // given time. Size uint // contains filtered or unexported fields }
Channel manages the state for a queue of messages.
func NewChannel ¶
NewChannel creates a Channel that contains up to `size` messages.
type ChannelDetail ¶
type ChannelDetail struct { Name string `json:"name"` Size uint `json:"size"` Max uint `json:"max"` }
ChannelDetail represents current stats for a particular queue.
type Config ¶
type Config struct {
// contains filtered or unexported fields
}
Config captures configurable options for the Engine.
type Encoder ¶
type Encoder interface {
Encode(interface{}) error
}
Encoder is a generalized interface implemented by Go's various encoding libraries.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine manages a list of queues and a continuous stream of events.
func (*Engine) GetChannelDetails ¶
func (eng *Engine) GetChannelDetails(name string) (ChannelDetail, bool)
func (*Engine) ListChannels ¶
func (eng *Engine) ListChannels() []ChannelDetail
func (*Engine) Process ¶
Process multiplexes the provided eventCh onto the Engine's main event channel until ctx is finished.
type Event ¶
type Event interface {
Transition(*DataStore)
}
Event is an action that causes the engine to transition from one state to the next.
func AddChannel ¶
AddChannel returns an Event that will create a new channel with the appropriate name and size.
func Pop ¶
Pop returns an Event that will read the next message from the specified queue and write it to the returned channel.
func PopNow ¶
PopNow returns an event that will pop the next item off the queue, but fails immediately if no messages are preseent to be read.
func PushMessage ¶
PushMessage creates an event that sends the message to the named channel. It also returns a confirmation channel that it will write to once the message has been successfully added to the queue. If the message cannot be added, the confirmation channel will be closed without sending a value.
func RemoveChannel ¶
RemoveChannel returns an Event that removes the named channel from the engine.
type EventFn ¶
type EventFn func(*DataStore)
EventFn implement the Event interface, allowing you to use simple functions instead of objects as events.
func (EventFn) Transition ¶
Transition Invokes the function.
type Opt ¶
func SnapshotFilename ¶
SnapshotFilename specifies where to save snapshots.
func SnapshotInterval ¶
SnapshotInterval specifies how often to write the current engine state to the file system.
type ReadCloserFactory ¶
type ReadCloserFactory func() (io.ReadCloser, error)
func DefaultReadCloserFactory ¶
func DefaultReadCloserFactory(snapshotFile string) ReadCloserFactory
type StatsAPI ¶
type StatsAPI interface { GetChannelDetails(string) (ChannelDetail, bool) ListChannels() []ChannelDetail }
StatsAPI provides methods to access up-to-date statistics about the Engine's current state.
type WriteCloserFactory ¶
type WriteCloserFactory func() (io.WriteCloser, error)
func DefaultWriteCloserFactory ¶
func DefaultWriteCloserFactory(snapshotFile string) WriteCloserFactory
DefaultWriteCloserFactory returns a new WriteCloserFactory that will write to the file specified by `snapshotFile`. If the file already exists, it will be copied to a file ending in `.bkp` before writing the new file.