Documentation ¶
Overview ¶
Package store encompasses access handling for the stream store
Index ¶
- Variables
- type Option
- type Provider
- func (p *Provider) Entity(streamID int, entityID uint64) (*models.Entity, error)
- func (p *Provider) EntityEventSource() models.EntityEventSource
- func (p *Provider) Serve()
- func (p *Provider) Stop()
- func (p *Provider) Stream(streamID int) (*models.Stream, error)
- func (p *Provider) StreamEventSource() models.StreamEventSource
- func (p *Provider) Streams() ([]models.Stream, error)
- func (p *Provider) UpdatesChan() chan<- Update
- type Streams
- type Update
Constants ¶
This section is empty.
Variables ¶
var ErrRequestTimedOut = errors.New("request timed out")
ErrRequestTimedOut is returned if the store is taking too long to return from query
Functions ¶
This section is empty.
Types ¶
type Option ¶
type Option func(p *providerConfig)
Option defines an optional configuration parameter to the constructor of the Provider
func WithEventBufferSize ¶
WithEventBufferSize sets the size of the outgoing event buffer. There shouldn't be any real reason to change this unless the event consumer is very slow at consuming updates.
The default value is 10000.
func WithQueryTimeout ¶
WithQueryTimeout sets the timeout for read accesses to the store provider. Since all store acceses happen on the same goroutine, it is possible for misbehaving updates to block reads. The query methods on the provider will timeout after this duration if this scenario happens.
The default value is 5 seconds.
func WithRequestBufferSize ¶
WithRequestBufferSize sets the size of the internal request queue. be any real reason to change this unless the the core API is getting hammered with requests.
The default value is 10.
func WithUpdateBufferSize ¶
WithUpdateBufferSize sets the size of the store provider channel that is responsible for receiving updates. There shouldn't be any real reason to change this unless the provider is very slow at consuming updates.
The default value is 10000.
type Provider ¶
type Provider struct {
// contains filtered or unexported fields
}
Provider provides access to the store. It runs as a long running service that updates the store in response to update events. All updates and accesses are handled in an evented loop to serialize changes to the store for thread safety. Provider also emits events for updates made to the store.
func NewProvider ¶
NewProvider creates a new store provider. It will also initialize the store. Options can optionally be provided like follows:
provider = store.NewProvider( logger, store.WithQueryTimeout(10*time.Millisecond), store.WithUpdateBufferSize(10), store.WithEventBufferSize(10), store.WithRequestBufferSize(10), )
func (*Provider) Entity ¶
Entity returns a specific entity in a specific from the store, queried by streamID and entityID. It returns an error if the stream ID is not found or if the entityID is not found in the stream. This query will return an error if the request exceeds the timeout duration.
func (*Provider) EntityEventSource ¶
func (p *Provider) EntityEventSource() models.EntityEventSource
EntityEventSource returns an event source that allows consumers to subscribe to entity events
func (*Provider) Serve ¶
func (p *Provider) Serve()
Serve runs the main loop for the provider. It runs inside a goroutine as a service and is responsible for exclusively handling all reads and updates, including serving read requests.
func (*Provider) Stop ¶
func (p *Provider) Stop()
Stop will shutdown this service and wait on it to stop before returning
func (*Provider) Stream ¶
Stream returns a specific stream from the store, queried by streamID. This query will return an error if the request exceeds the timeout duration.
func (*Provider) StreamEventSource ¶
func (p *Provider) StreamEventSource() models.StreamEventSource
StreamEventSource returns an event source that allows consumers to subscribe to stream events
func (*Provider) Streams ¶
Streams returns all the streams from the internal store. This query will return an error if the request exceeds the timeout duration.
func (*Provider) UpdatesChan ¶
UpdatesChan returns a channel on which other services can send store updates
type Streams ¶
Streams defines the structure of the data store in the store provider and is consumed by updates for modification
type Update ¶
type Update interface {
ModifyStore(streams *Streams) ([]models.StreamEvent, []models.EntityEvent, error)
}
Update defines the interface for making modifications to the streams store and emitting the resulting stream and entity events. ModifyStore is expected to run in a thread safe environment so it doesn't compete with other updates running at the same time. It is assumed that the resulting stream events are applied first and in order. Then the resulting entity events are applied in order.