Documentation ¶
Index ¶
- Variables
- type Condition
- type ConditionType
- type Core
- func (c *Core) GetPastEvents(manifestID string, from, to *time.Time) ([]data.Event, error)
- func (c *Core) GetStatus(manifestID string) (*Status, error)
- func (c *Core) HandleMessage(msg event.StreamMessage)
- func (c *Core) IsHealthy() bool
- func (c *Core) Start(ctx context.Context) error
- func (c *Core) SubscribeEvents(ctx context.Context, manifestID string, lastEvtID *uuid.UUID, from *time.Time) ([]data.Event, <-chan data.Event, error)
- func (c *Core) Use(reducers ...Reducer) *Core
- type CoreOptions
- type MultistreamStatus
- type Record
- type RecordStorage
- type Reducer
- type ReducerFunc
- type Status
- type StreamingOptions
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrEventNotFound = errors.New("event not found")
View Source
var ErrStreamNotFound = errors.New("stream not found")
Functions ¶
This section is empty.
Types ¶
type Condition ¶
type Condition struct { Type ConditionType `json:"type,omitempty"` Status *bool `json:"status"` Frequency stats.ByWindow `json:"frequency,omitempty"` LastProbeTime *time.Time `json:"lastProbeTime"` LastTransitionTime *time.Time `json:"lastTransitionsTime"` }
func NewCondition ¶
type ConditionType ¶
type ConditionType string
type Core ¶
type Core struct {
// contains filtered or unexported fields
}
func NewCore ¶
func NewCore(opts CoreOptions, consumer event.StreamConsumer) *Core
func (*Core) GetPastEvents ¶ added in v0.1.0
func (*Core) HandleMessage ¶
func (c *Core) HandleMessage(msg event.StreamMessage)
func (*Core) SubscribeEvents ¶ added in v0.1.0
type CoreOptions ¶
type CoreOptions struct { Streaming StreamingOptions StartTimeOffset time.Duration MemoryRecordsTtl time.Duration }
type MultistreamStatus ¶ added in v0.2.0
type MultistreamStatus struct { Target data.MultistreamTargetInfo `json:"target"` Connected *Condition `json:"connected"` }
type Record ¶
type Record struct { ID string Conditions []ConditionType sync.RWMutex PastEvents []data.Event EventsByID map[uuid.UUID]data.Event EventSubs []chan<- data.Event ReducersState map[int]interface{} LastStatus *Status // contains filtered or unexported fields }
func NewRecord ¶
func NewRecord(id string, conditions []ConditionType) *Record
type RecordStorage ¶
type RecordStorage struct {
// contains filtered or unexported fields
}
func (*RecordStorage) GetOrCreate ¶
func (s *RecordStorage) GetOrCreate(id string, conditions []ConditionType) *Record
func (*RecordStorage) StartCleanupLoop ¶ added in v0.1.0
func (s *RecordStorage) StartCleanupLoop(ctx context.Context, ttl time.Duration)
type Reducer ¶
type Reducer interface { Bindings() []event.BindingArgs Conditions() []ConditionType Reduce(current *Status, state interface{}, evt data.Event) (*Status, interface{}) }
type ReducerFunc ¶
func (ReducerFunc) Bindings ¶
func (f ReducerFunc) Bindings() []event.BindingArgs
func (ReducerFunc) Conditions ¶
func (f ReducerFunc) Conditions() []ConditionType
type Status ¶
type Status struct { ID string `json:"id"` Healthy *Condition `json:"healthy"` Conditions []*Condition `json:"conditions"` Multistream []*MultistreamStatus `json:"multistream,omitempty"` }
Status is a soft-immutable struct. It should never be modified inline or a lot of things could become inconsistent. Create a new instance/copy for any mutation to be performed and beware of the internal slices and pointers.
Use NewMergedStatus below to facilitate the creation of new status objects with mutated fields. Notice that you still need to clone the internal slices if you want to do any mutations to them.
func NewMergedStatus ¶ added in v0.2.0
func (Status) ConditionsCopy ¶ added in v0.2.0
func (Status) GetCondition ¶
func (s Status) GetCondition(condType ConditionType) *Condition
func (Status) MultistreamCopy ¶ added in v0.2.0
func (s Status) MultistreamCopy() []*MultistreamStatus
type StreamingOptions ¶
type StreamingOptions struct {
Stream, ConsumerName string
event.RawStreamOptions
}
Purposedly made of built-in types only to bind directly to cli flags.
Click to show internal directories.
Click to hide internal directories.