Documentation ¶
Index ¶
- Variables
- type Condition
- type ConditionType
- type Core
- func (c *Core) GetPastEvents(manifestID string, from, to *time.Time) ([]data.Event, error)
- func (c *Core) GetStatus(manifestID string) (Status, error)
- func (c *Core) HandleMessage(msg event.StreamMessage)
- func (c *Core) IsHealthy() bool
- func (c *Core) Start(ctx context.Context) error
- func (c *Core) SubscribeEvents(ctx context.Context, manifestID string, lastEvtID *uuid.UUID, from *time.Time) ([]data.Event, <-chan data.Event, error)
- func (c *Core) Use(reducers ...Reducer) *Core
- type CoreOptions
- type Record
- type RecordStorage
- type Reducer
- type ReducerFunc
- type Status
- type StreamingOptions
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrEventNotFound = errors.New("event not found")
View Source
var ErrStreamNotFound = errors.New("stream not found")
Functions ¶
This section is empty.
Types ¶
type Condition ¶
type Condition struct { Type ConditionType `json:"type,omitempty"` Status *bool `json:"status"` Frequency stats.ByWindow `json:"frequency,omitempty"` LastProbeTime *time.Time `json:"lastProbeTime"` LastTransitionTime *time.Time `json:"lastTransitionsTime"` }
func NewCondition ¶
type ConditionType ¶
type ConditionType string
type Core ¶
type Core struct {
// contains filtered or unexported fields
}
func NewCore ¶
func NewCore(opts CoreOptions, consumer event.StreamConsumer) *Core
func (*Core) GetPastEvents ¶ added in v0.1.0
func (*Core) HandleMessage ¶
func (c *Core) HandleMessage(msg event.StreamMessage)
func (*Core) SubscribeEvents ¶ added in v0.1.0
type CoreOptions ¶
type CoreOptions struct { Streaming StreamingOptions StartTimeOffset time.Duration MemoryRecordsTtl time.Duration }
type Record ¶
type Record struct { ID string Conditions []ConditionType sync.RWMutex PastEvents []data.Event EventsByID map[uuid.UUID]data.Event EventSubs []chan<- data.Event ReducersState map[int]interface{} LastStatus Status // contains filtered or unexported fields }
func NewRecord ¶
func NewRecord(id string, conditions []ConditionType) *Record
type RecordStorage ¶
type RecordStorage struct {
// contains filtered or unexported fields
}
func (*RecordStorage) GetOrCreate ¶
func (s *RecordStorage) GetOrCreate(id string, conditions []ConditionType) *Record
func (*RecordStorage) StartCleanupLoop ¶ added in v0.1.0
func (s *RecordStorage) StartCleanupLoop(ctx context.Context, ttl time.Duration)
type Reducer ¶
type Reducer interface { Bindings() []event.BindingArgs Conditions() []ConditionType Reduce(current Status, state interface{}, evt data.Event) (Status, interface{}) }
type ReducerFunc ¶
func (ReducerFunc) Bindings ¶
func (f ReducerFunc) Bindings() []event.BindingArgs
func (ReducerFunc) Conditions ¶
func (f ReducerFunc) Conditions() []ConditionType
type Status ¶
type Status struct { ID string `json:"id"` Healthy Condition `json:"healthy"` Conditions []*Condition `json:"conditions"` }
func (Status) GetCondition ¶
func (s Status) GetCondition(condType ConditionType) *Condition
type StreamingOptions ¶
type StreamingOptions struct {
Stream, ConsumerName string
event.RawStreamOptions
}
Purposedly made of built-in types only to bind directly to cli flags.
Click to show internal directories.
Click to hide internal directories.