Documentation ¶
Overview ¶
A package with a Backend interface for storing streams of data and it's implementations for different storage engines.
Index ¶
- func Copy(drain stream.Stream, sink Stream) (error, bool)
- func CopyDirect(drain stream.Stream, sink *BufferedStream) (error, bool)
- func CreatorTypes() []string
- func NewHandler(b Backend, errorCb func(error)) http.Handler
- func RegisterCreator(btype string, creator BackendCreator) error
- func RegisterDefault()
- type Backend
- type BackendCreator
- type BackendStream
- type BufferedStream
- type Stream
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Copy ¶
Copy a producer stream to a consumer stream.
Returns an optional error and, if error is not nil, a flag indicating whether this error had happened in the consumer (false) or the producer (true).
func CopyDirect ¶
func CopyDirect(drain stream.Stream, sink *BufferedStream) (error, bool)
An analog of Copy for BufferedStream sink.
func NewHandler ¶
Create a http.Handler that maps URLs from HTTP backend to a methods of an object implementing Backend interface.
func RegisterCreator ¶
func RegisterCreator(btype string, creator BackendCreator) error
Register backend creator by backend type.
Types ¶
type Backend ¶
type Backend interface { // Get backend configuration. Config() (interface{}, error) // List all available streams. Streams() ([]string, error) // Get a BackendStream for the stream by it's name. GetStream(name string) (BackendStream, error) // Delete all streams and supporting databases. Drop() error // Close the backend handler. Close() error }
Backend is an interface for stream storage system.
func NewDir ¶
Create a backend that stores pushed events in files in a specified directory, one file per stream.
func NewHttp ¶
Create a remote http backend/ It doesn't store anything and just send commants to a remote server via HTTP.
type BackendCreator ¶
BackendCreator is a function that created a specific type of backend from config.
type BackendStream ¶
type BackendStream interface { Stream // Convert a relative interval into an absolute: like (0, -1) into (0, Len()). Interval(from int, to int) (uint, uint, error) // Read a range of events from the stream. Read(from uint, to uint) (stream.Stream, error) // Delete a range of events from the stream. Del(from uint, to uint) (bool, error) // Get a number of events in the stream. Len() (uint, error) }
BackendStream is an interface for manipulating stream of events stored on a backend.
type BufferedStream ¶
type BufferedStream struct {
// contains filtered or unexported fields
}
Buffered stream is an implementation of a stream that collects all pushed events in a buffer until Start() is called. After that it flushes all events in the buffer to a base stream and then adds all the new incoming events directly to the base stream.
The usecase for it is event subscription to a golfstream service. When you add a subscriber golfstream returns you a stream representing a range from backend stream's history and you might want to push the history to this subscriber befire any new events.
Then you wrap your subscriber with BufferedStream before adding it to the golfstream service, manually push the history and call Start() like this:
bsub := backend.Buffered(sub, 0) history, _ := sback.AddSub(bstrName, bsub, 0, -1) go func() { _, _ := backend.CopyDirect(history, bsub) bsub.Start() }()
func Buffered ¶
func Buffered(s Stream, buf int) *BufferedStream
Create a buffered stream from a stream.
Using original stream after creating a bufferes stream from it is unsafe.
func (*BufferedStream) Add ¶
func (self *BufferedStream) Add(evt stream.Event) error
And implementation of backend.Stream.Add for BufferedStream.
func (*BufferedStream) Close ¶
func (self *BufferedStream) Close() error
And implementation of backend.Stream.Close for BufferedStream.
func (*BufferedStream) Start ¶
func (self *BufferedStream) Start()
Tell the buffered stream that it can safely push to the the base stream now.