Documentation ¶
Index ¶
- Variables
- type Condition
- type ConditionType
- type Core
- func (c *Core) GetPastEvents(manifestID string, from, to *time.Time) ([]data.Event, error)
- func (c *Core) GetStatus(manifestID string) (*Status, error)
- func (c *Core) HandleMessage(msg event.StreamMessage)
- func (c *Core) IsHealthy() bool
- func (c *Core) Start(ctx context.Context) error
- func (c *Core) SubscribeEvents(ctx context.Context, manifestID string, lastEvtID *uuid.UUID, from *time.Time) ([]data.Event, <-chan data.Event, error)
- type CoreOptions
- type Measure
- type Metric
- type MetricName
- type MetricsMap
- type MultistreamStatus
- type Record
- type RecordStorage
- type Reducer
- type ReducerFunc
- type Status
- type StreamingOptions
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrEventNotFound = errors.New("event not found")
View Source
var ErrStreamNotFound = errors.New("stream not found")
Functions ¶
This section is empty.
Types ¶
type Condition ¶
type Condition struct { Type ConditionType `json:"type,omitempty"` Status *bool `json:"status"` Frequency stats.ByWindow `json:"frequency,omitempty"` LastProbeTime *data.UnixMillisTime `json:"lastProbeTime"` LastTransitionTime *data.UnixMillisTime `json:"lastTransitionsTime"` }
func NewCondition ¶
type ConditionType ¶
type ConditionType string
type Core ¶
type Core struct {
// contains filtered or unexported fields
}
func NewCore ¶
func NewCore(opts CoreOptions, consumer event.StreamConsumer, reducer Reducer) *Core
func (*Core) GetPastEvents ¶ added in v0.1.0
func (*Core) HandleMessage ¶
func (c *Core) HandleMessage(msg event.StreamMessage)
type CoreOptions ¶
type CoreOptions struct { Streaming StreamingOptions StartTimeOffset time.Duration MemoryRecordsTtl time.Duration }
type Measure ¶ added in v0.3.0
func (Measure) MarshalJSON ¶ added in v0.3.0
func (*Measure) UnmarshalJSON ¶ added in v0.3.0
type Metric ¶ added in v0.3.0
type Metric struct { Name MetricName `json:"name"` Dimensions map[string]string `json:"dimensions,omitempty"` Last Measure `json:"last"` }
type MetricName ¶ added in v0.3.0
type MetricName string
type MetricsMap ¶ added in v0.3.0
type MetricsMap map[MetricName][]*Metric
func (MetricsMap) Add ¶ added in v0.3.0
func (m MetricsMap) Add(metric *Metric) MetricsMap
func (MetricsMap) GetMetric ¶ added in v0.3.0
func (m MetricsMap) GetMetric(name MetricName, dimensions map[string]string) *Metric
type MultistreamStatus ¶ added in v0.2.0
type MultistreamStatus struct { Target data.MultistreamTargetInfo `json:"target"` Connected *Condition `json:"connected"` }
type Record ¶
type Record struct { ID string Conditions []ConditionType sync.RWMutex PastEvents []data.Event EventsByID map[uuid.UUID]data.Event EventSubs []chan<- data.Event ReducerState interface{} LastStatus *Status // contains filtered or unexported fields }
func NewRecord ¶
func NewRecord(id string, conditions []ConditionType) *Record
type RecordStorage ¶
type RecordStorage struct {
// contains filtered or unexported fields
}
func (*RecordStorage) GetOrCreate ¶
func (s *RecordStorage) GetOrCreate(id string, conditions []ConditionType) *Record
func (*RecordStorage) StartCleanupLoop ¶ added in v0.1.0
func (s *RecordStorage) StartCleanupLoop(ctx context.Context, ttl time.Duration)
type Reducer ¶
type Reducer interface { Bindings() []event.BindingArgs Conditions() []ConditionType Reduce(current *Status, state interface{}, evt data.Event) (*Status, interface{}) }
type ReducerFunc ¶
func (ReducerFunc) Bindings ¶
func (f ReducerFunc) Bindings() []event.BindingArgs
func (ReducerFunc) Conditions ¶
func (f ReducerFunc) Conditions() []ConditionType
type Status ¶
type Status struct { ID string `json:"id"` Healthy *Condition `json:"healthy"` Conditions []*Condition `json:"conditions"` Metrics MetricsMap `json:"metrics,omitempty"` // TODO: Move this `multistream` field somewhere else to make this struct more // generic. Maybe condition dimensions/extraArgs? Multistream []*MultistreamStatus `json:"multistream,omitempty"` }
Status is a soft-immutable struct. It should never be modified inline or a lot of things could become inconsistent. Create a new instance/copy for any mutation to be performed and beware of the internal slices and pointers.
Use NewMergedStatus below to facilitate the creation of new status objects with mutated fields. Notice that you still need to clone the internal slices if you want to do any mutations to them.
func NewMergedStatus ¶ added in v0.2.0
func (Status) ConditionsCopy ¶ added in v0.2.0
func (Status) MetricsCopy ¶ added in v0.3.0
func (s Status) MetricsCopy() MetricsMap
func (Status) MultistreamCopy ¶ added in v0.2.0
func (s Status) MultistreamCopy() []*MultistreamStatus
type StreamingOptions ¶
type StreamingOptions struct {
Stream, ConsumerName string
event.RawStreamOptions
}
Purposedly made of built-in types only to bind directly to cli flags.
Click to show internal directories.
Click to hide internal directories.