engine

package
v0.0.0-...-e0e2f15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2019 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultReadCloser

func DefaultReadCloser(snapshotFile string) (io.ReadCloser, error)

func DefaultWriteCloser

func DefaultWriteCloser(snapshotFile string) (io.WriteCloser, error)

func SnapshotTimer

func SnapshotTimer(
	ctx context.Context,
	interval time.Duration,
	newEncoder EncoderFactory,
	newWriteCloser WriteCloserFactory,
) <-chan Event

SnapshotTimer produces Snapshot Events on the returned channel until ctx is Done. `interval specifies how often to produce events. `newEncoder` is a factory that builds encoders to serialize the Engine State `newWriteCloser` is a factory that builds io.WriteClosers to save the serialized state to.

Types

type Channel

type Channel struct {
	// Buffer is an array of arbitrary messages.
	Buffer [][]byte

	// Size is the maximum number of messages  to buffer in this queue at any
	// given time.
	Size uint
	// contains filtered or unexported fields
}

Channel manages the state for a queue of messages.

func NewChannel

func NewChannel(size uint) *Channel

NewChannel creates a Channel that contains up to `size` messages.

func (*Channel) Copy

func (q *Channel) Copy() *Channel

Copy returns a deep copy of the queue and it's messages.

type ChannelDetail

type ChannelDetail struct {
	Name string `json:"name"`
	Size uint   `json:"size"`
	Max  uint   `json:"max"`
}

ChannelDetail represents current stats for a particular queue.

type Config

type Config struct {
	// contains filtered or unexported fields
}

Config captures configurable options for the Engine.

type DataStore

type DataStore struct {
	Channels map[string]*Channel
}

DataStore contains the Engine's current state.

func NewDataStore

func NewDataStore() *DataStore

NewDataStore initializes a new DataStore

func (*DataStore) Copy

func (ds *DataStore) Copy() *DataStore

Copy returns a new datastore that is a deep copy of the current datastore.

type Decoder

type Decoder interface {
	Decode(interface{}) error
}

func DefaultDecoder

func DefaultDecoder(reader io.Reader) (Decoder, error)

type Encoder

type Encoder interface {
	Encode(interface{}) error
}

Encoder is a generalized interface implemented by Go's various encoding libraries.

func DefaultEncoder

func DefaultEncoder(writer io.Writer) (Encoder, error)

DefaultEncoder returns a new gob.Encoder for the provided writer.

type EncoderFactory

type EncoderFactory func(io.Writer) (Encoder, error)

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine manages a list of queues and a continuous stream of events.

func New

func New(opts ...Opt) *Engine

New returns a new instance of the Bifrost Engine.

func (*Engine) GetChannelDetails

func (eng *Engine) GetChannelDetails(name string) (ChannelDetail, bool)

func (*Engine) ListChannels

func (eng *Engine) ListChannels() []ChannelDetail

func (*Engine) Process

func (eng *Engine) Process(ctx context.Context, eventCh <-chan Event)

Process multiplexes the provided eventCh onto the Engine's main event channel until ctx is finished.

func (*Engine) Run

func (eng *Engine) Run() error

Run starts the engine processing messages

func (*Engine) Stats

func (eng *Engine) Stats() StatsAPI

Stats returns only an interface to querying the engine about it's current statistics.

func (*Engine) With

func (eng *Engine) With(opts ...Opt) *Engine

With applies the list of Opts to the engine. It returns the engine to make it chainable.

type Event

type Event interface {
	Transition(*DataStore)
}

Event is an action that causes the engine to transition from one state to the next.

func AddChannel

func AddChannel(name string, size uint) Event

AddChannel returns an Event that will create a new channel with the appropriate name and size.

func Pop

func Pop(ctx context.Context, queueName string) (Event, <-chan []byte)

Pop returns an Event that will read the next message from the specified queue and write it to the returned channel.

func PopNow

func PopNow(queueName string) (Event, <-chan []byte)

PopNow returns an event that will pop the next item off the queue, but fails immediately if no messages are preseent to be read.

func PushMessage

func PushMessage(name string, message []byte) (Event, <-chan bool)

PushMessage creates an event that sends the message to the named channel. It also returns a confirmation channel that it will write to once the message has been successfully added to the queue. If the message cannot be added, the confirmation channel will be closed without sending a value.

func RemoveChannel

func RemoveChannel(name string) Event

RemoveChannel returns an Event that removes the named channel from the engine.

func Snapshot

func Snapshot(encoder Encoder, done chan struct{}) Event

Snapshot returns a new Snapshot event that will use the provided encoder to serialize the Engine state. It will close() the `done` channel when the snapshot is complete.

type EventFn

type EventFn func(*DataStore)

EventFn implement the Event interface, allowing you to use simple functions instead of objects as events.

func (EventFn) Transition

func (fn EventFn) Transition(ds *DataStore)

Transition Invokes the function.

type Opt

type Opt func(Config) Config

func Context

func Context(ctx context.Context) Opt

Context sets the Engine's context, the Engine will exit when it is `Done()`.

func SnapshotFilename

func SnapshotFilename(name string) Opt

SnapshotFilename specifies where to save snapshots.

func SnapshotInterval

func SnapshotInterval(i time.Duration) Opt

SnapshotInterval specifies how often to write the current engine state to the file system.

type ReadCloserFactory

type ReadCloserFactory func() (io.ReadCloser, error)

func DefaultReadCloserFactory

func DefaultReadCloserFactory(snapshotFile string) ReadCloserFactory

type StatsAPI

type StatsAPI interface {
	GetChannelDetails(string) (ChannelDetail, bool)
	ListChannels() []ChannelDetail
}

StatsAPI provides methods to access up-to-date statistics about the Engine's current state.

type WriteCloserFactory

type WriteCloserFactory func() (io.WriteCloser, error)

func DefaultWriteCloserFactory

func DefaultWriteCloserFactory(snapshotFile string) WriteCloserFactory

DefaultWriteCloserFactory returns a new WriteCloserFactory that will write to the file specified by `snapshotFile`. If the file already exists, it will be copied to a file ending in `.bkp` before writing the new file.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL