Documentation ¶
Index ¶
- Variables
- type Core
- func (c *Core) GetPastEvents(manifestID string, from, to *time.Time) ([]data.Event, error)
- func (c *Core) GetStatus(manifestID string) (*data.HealthStatus, error)
- func (c *Core) HandleMessage(msg event.StreamMessage)
- func (c *Core) IsHealthy() bool
- func (c *Core) Start(ctx context.Context) error
- func (c *Core) SubscribeEvents(ctx context.Context, manifestID string, lastEvtID *uuid.UUID, from *time.Time) ([]data.Event, <-chan data.Event, error)
- type CoreOptions
- type Record
- type RecordStorage
- type Reducer
- type ReducerFunc
- type StreamingOptions
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrStreamNotFound = errors.New("stream not found") ErrEventNotFound = errors.New("event not found") )
Functions ¶
This section is empty.
Types ¶
type Core ¶
type Core struct {
// contains filtered or unexported fields
}
func NewCore ¶
func NewCore(opts CoreOptions, consumer event.StreamConsumer, reducer Reducer) *Core
func (*Core) GetPastEvents ¶ added in v0.1.0
func (*Core) GetStatus ¶ added in v0.1.0
func (c *Core) GetStatus(manifestID string) (*data.HealthStatus, error)
func (*Core) HandleMessage ¶
func (c *Core) HandleMessage(msg event.StreamMessage)
type CoreOptions ¶
type CoreOptions struct { Streaming StreamingOptions StartTimeOffset time.Duration MemoryRecordsTtl time.Duration }
type Record ¶
type RecordStorage ¶
type RecordStorage struct { SizeGauge prometheus.Gauge // contains filtered or unexported fields }
func (*RecordStorage) GetOrCreate ¶
func (s *RecordStorage) GetOrCreate(id string, conditions []data.ConditionType) *Record
func (*RecordStorage) StartCleanupLoop ¶ added in v0.1.0
func (s *RecordStorage) StartCleanupLoop(ctx context.Context, ttl time.Duration)
type Reducer ¶
type Reducer interface { Bindings() []event.BindingArgs Conditions() []data.ConditionType Reduce(current *data.HealthStatus, state interface{}, evt data.Event) (*data.HealthStatus, interface{}) }
type ReducerFunc ¶
type ReducerFunc func(*data.HealthStatus, interface{}, data.Event) (*data.HealthStatus, interface{})
func (ReducerFunc) Bindings ¶
func (f ReducerFunc) Bindings() []event.BindingArgs
func (ReducerFunc) Conditions ¶
func (f ReducerFunc) Conditions() []data.ConditionType
func (ReducerFunc) Reduce ¶
func (f ReducerFunc) Reduce(current *data.HealthStatus, state interface{}, evt data.Event) (*data.HealthStatus, interface{})
type StreamingOptions ¶
type StreamingOptions struct {
Stream, ConsumerName string
event.RawStreamOptions
// EventFlowSilenceTolerance determines the amount of time to tolerate zero
// messages in the stream before giving an error on the service healthcheck.
// This is a workaround for a bug in the rabbitmq streams client in which it
// freezes when the stream has leader election issues in a clustered setup.
EventFlowSilenceTolerance time.Duration
}
Purposedly made of built-in types only to bind directly to cli flags.
Click to show internal directories.
Click to hide internal directories.