Documentation ¶
Overview ¶
Package persistence is a generated protocol buffer package.
It is generated from these files:
testprotos.proto
It has these top-level messages:
TestEvent TestSnapshot
Index ¶
- Variables
- func CreateDBConnection(url *url.URL) (*sqlx.DB, error)
- func ResetTestDB(dbPath string)
- func TestDBURL() (*url.URL, string)
- func Using(provider Provider) func(next actor.ActorFunc) actor.ActorFunc
- type InMemoryProvider
- func (provider *InMemoryProvider) GetEvents(actorName string, eventIndexStart int, callback func(index int, e interface{}))
- func (provider *InMemoryProvider) GetSnapshot(actorName string) (snapshot interface{}, eventIndex int, ok bool)
- func (provider *InMemoryProvider) GetSnapshotInterval() int
- func (provider *InMemoryProvider) PersistEvent(actorName string, eventIndex int, event proto.Message)
- func (provider *InMemoryProvider) PersistSnapshot(actorName string, eventIndex int, snapshot proto.Message)
- func (provider *InMemoryProvider) Restart()
- type Mixin
- type Provider
- type ProviderState
- type StreamCallBack
- type StreamEvent
- type StreamPredicate
- type StreamingProvider
- type StreamingProviderState
- type TestEvent
- type TestSnapshot
Constants ¶
This section is empty.
Variables ¶
var ErrMarshalling = fmt.Errorf("Persistence provider failed with marshalling error")
ErrMarshalling will be provided to panic on marshalling failures
var ErrPersistenceFailed = fmt.Errorf("Persistence provider failed to persist event")
ErrPersistenceFailed is the panic reason if PersistEvent fails to write to persistence provider
var ErrPersistingSnapshot = fmt.Errorf("Persistence provider failed to persist snapshot")
ErrPersistingSnapshot will be provided to panic on PersistSnapshot failures
var ErrReadingEvents = fmt.Errorf("Persistence provider failed to read events")
ErrReadingEvents is the panic reason if GetEvents fails to read from persistence provider
Functions ¶
func CreateDBConnection ¶
CreateDBConnection sets up a DB connection and ensures required tables exist
Types ¶
type InMemoryProvider ¶
type InMemoryProvider struct {
// contains filtered or unexported fields
}
InMemoryProvider is a proto.actor persistence provider
func (*InMemoryProvider) GetEvents ¶
func (provider *InMemoryProvider) GetEvents(actorName string, eventIndexStart int, callback func(index int, e interface{}))
GetEvents implements ProviderState.GetEvents
func (*InMemoryProvider) GetSnapshot ¶
func (provider *InMemoryProvider) GetSnapshot(actorName string) (snapshot interface{}, eventIndex int, ok bool)
GetSnapshot implements ProviderState.GetSnapshot
func (*InMemoryProvider) GetSnapshotInterval ¶
func (provider *InMemoryProvider) GetSnapshotInterval() int
GetSnapshotInterval implements ProviderState.GetSnapshotInterval
func (*InMemoryProvider) PersistEvent ¶
func (provider *InMemoryProvider) PersistEvent(actorName string, eventIndex int, event proto.Message)
PersistEvent implements ProviderState.PersistEvent
func (*InMemoryProvider) PersistSnapshot ¶
func (provider *InMemoryProvider) PersistSnapshot(actorName string, eventIndex int, snapshot proto.Message)
PersistSnapshot implements ProviderState.PersistSnapshot
func (*InMemoryProvider) Restart ¶
func (provider *InMemoryProvider) Restart()
Restart implements ProviderStage.Restart
type Mixin ¶
type Mixin struct {
// contains filtered or unexported fields
}
Mixin is the persistence mixin for actors
func (*Mixin) PersistReceive ¶
PersistReceive saves an event to the actors journal
func (*Mixin) PersistSnapshot ¶
PersistSnapshot overwrites an actor's current snapshot
func (*Mixin) Recovering ¶
Recovering indicates if this actor is recovering (in which all messages are replays) or not
type Provider ¶
type Provider interface {
GetState() ProviderState
}
Provider is the abstraction used for persistence
type ProviderState ¶
type ProviderState interface { Restart() GetSnapshotInterval() int GetSnapshot(actorName string) (snapshot interface{}, eventIndex int, ok bool) GetEvents(actorName string, eventIndexStart int, callback func(messageIndex int, e interface{})) PersistEvent(actorName string, eventIndex int, event proto.Message) PersistSnapshot(actorName string, eventIndex int, snapshot proto.Message) }
ProviderState is the contract with a given persistence provider
func NewInMemoryProvider ¶
func NewInMemoryProvider(snapshotInterval int) ProviderState
NewInMemoryProvider creates a new in mem provider
func NewSQLProvider ¶
func NewSQLProvider(db *sqlx.DB, snapshotInterval int) (ProviderState, error)
NewSQLProvider creates a journal/snapshot provider with an SQL db backing it
type StreamCallBack ¶
type StreamCallBack func(event *StreamEvent)
StreamCallBack is a callback when an event is delivered to a persistence stream
type StreamEvent ¶
StreamEvent adds metadata to an actor's message
type StreamPredicate ¶
type StreamPredicate func(event *StreamEvent) bool
StreamPredicate filters a stream
type StreamingProvider ¶
type StreamingProvider struct {
// contains filtered or unexported fields
}
StreamingProvider is a wrapper for an existing provider that adds event streaming support
func NewStreamingProvider ¶
func NewStreamingProvider(target ProviderState) *StreamingProvider
NewStreamingProvider wraps an existing provier to provide a stream on events
func (*StreamingProvider) GetEventStream ¶
func (p *StreamingProvider) GetEventStream() *eventstream.EventStream
GetEventStream returns the underlyinfg proto.actor stream for a streaming provider
func (*StreamingProvider) GetState ¶
func (p *StreamingProvider) GetState() ProviderState
GetState returns the persistence.ProviderState associated with this provider
func (*StreamingProvider) GetStreamingState ¶
func (p *StreamingProvider) GetStreamingState() StreamingProviderState
GetStreamingState returns the persistence.ProviderState associated with this provider
type StreamingProviderState ¶
type StreamingProviderState interface { ProviderState // StreamNewEvents sends any events that match the predicate as they arrive to fn StreamNewEvents(predicate StreamPredicate, fn StreamCallBack) *eventstream.Subscription // SubscribeActorJournal streams all events that were persisted as they were saved by a given actor after fromIndex, and continues to stream any new events to the subscription SubscribeActorJournal(persistenceName string, fromIndex int, fn StreamCallBack) *eventstream.Subscription // QueryActorJournal searches the actor journal from a given index and sends all events that match predicate to fn QueryActorJournal(persistenceName string, fromIndex int, predicate StreamPredicate, fn StreamCallBack) // UnsubscribeStream closes a subscription UnsubscribeStream(sub *eventstream.Subscription) }
StreamingProviderState provides streaming access to an actor's events (including historical journal)
type TestEvent ¶
type TestEvent struct { Index uint32 `protobuf:"varint,1,opt,name=index" json:"index,omitempty"` StringVal string `protobuf:"bytes,2,opt,name=string_val,json=stringVal" json:"string_val,omitempty"` }
func (*TestEvent) Descriptor ¶
func (*TestEvent) GetStringVal ¶
func (*TestEvent) ProtoMessage ¶
func (*TestEvent) ProtoMessage()
type TestSnapshot ¶
type TestSnapshot struct { StringVal string `protobuf:"bytes,1,opt,name=string_val,json=stringVal" json:"string_val,omitempty"` Index uint32 `protobuf:"varint,2,opt,name=index" json:"index,omitempty"` }
func (*TestSnapshot) Descriptor ¶
func (*TestSnapshot) Descriptor() ([]byte, []int)
func (*TestSnapshot) GetIndex ¶
func (m *TestSnapshot) GetIndex() uint32
func (*TestSnapshot) GetStringVal ¶
func (m *TestSnapshot) GetStringVal() string
func (*TestSnapshot) ProtoMessage ¶
func (*TestSnapshot) ProtoMessage()
func (*TestSnapshot) Reset ¶
func (m *TestSnapshot) Reset()
func (*TestSnapshot) String ¶
func (m *TestSnapshot) String() string