health

package
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2022 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrStreamNotFound = errors.New("stream not found")
	ErrEventNotFound  = errors.New("event not found")
)

Functions

This section is empty.

Types

type Core

type Core struct {
	// contains filtered or unexported fields
}

func NewCore

func NewCore(opts CoreOptions, consumer event.StreamConsumer, reducer Reducer) *Core

func (*Core) GetPastEvents added in v0.1.0

func (c *Core) GetPastEvents(manifestID string, from, to *time.Time) ([]data.Event, error)

func (*Core) GetStatus added in v0.1.0

func (c *Core) GetStatus(manifestID string) (*data.HealthStatus, error)

func (*Core) HandleMessage

func (c *Core) HandleMessage(msg event.StreamMessage)

func (*Core) IsHealthy added in v0.1.0

func (c *Core) IsHealthy() bool

func (*Core) Start

func (c *Core) Start(ctx context.Context) error

func (*Core) SubscribeEvents added in v0.1.0

func (c *Core) SubscribeEvents(ctx context.Context, manifestID string, lastEvtID *uuid.UUID, from *time.Time) ([]data.Event, <-chan data.Event, error)

type CoreOptions

type CoreOptions struct {
	Streaming        StreamingOptions
	StartTimeOffset  time.Duration
	MemoryRecordsTtl time.Duration
}

type Record

type Record struct {
	ID         string
	Conditions []data.ConditionType

	sync.RWMutex

	PastEvents []data.Event
	EventsByID map[uuid.UUID]data.Event
	EventSubs  []chan<- data.Event

	ReducerState interface{}
	LastStatus   *data.HealthStatus
	// contains filtered or unexported fields
}

func NewRecord

func NewRecord(id string, conditionTypes []data.ConditionType) *Record

func (*Record) SubscribeLocked added in v0.1.0

func (r *Record) SubscribeLocked(ctx context.Context, subs chan data.Event) chan data.Event

type RecordStorage

type RecordStorage struct {
	SizeGauge prometheus.Gauge
	// contains filtered or unexported fields
}

func (*RecordStorage) Get

func (s *RecordStorage) Get(id string) (*Record, bool)

func (*RecordStorage) GetOrCreate

func (s *RecordStorage) GetOrCreate(id string, conditions []data.ConditionType) *Record

func (*RecordStorage) StartCleanupLoop added in v0.1.0

func (s *RecordStorage) StartCleanupLoop(ctx context.Context, ttl time.Duration)

type Reducer

type Reducer interface {
	Bindings() []event.BindingArgs
	Conditions() []data.ConditionType
	Reduce(current *data.HealthStatus, state interface{}, evt data.Event) (*data.HealthStatus, interface{})
}

type ReducerFunc

type ReducerFunc func(*data.HealthStatus, interface{}, data.Event) (*data.HealthStatus, interface{})

func (ReducerFunc) Bindings

func (f ReducerFunc) Bindings() []event.BindingArgs

func (ReducerFunc) Conditions

func (f ReducerFunc) Conditions() []data.ConditionType

func (ReducerFunc) Reduce

func (f ReducerFunc) Reduce(current *data.HealthStatus, state interface{}, evt data.Event) (*data.HealthStatus, interface{})

type StreamingOptions

type StreamingOptions struct {
	Stream, ConsumerName string

	event.RawStreamOptions

	// EventFlowSilenceTolerance determines the amount of time to tolerate zero
	// messages in the stream before giving an error on the service healthcheck.
	// This is a workaround for a bug in the rabbitmq streams client in which it
	// freezes when the stream has leader election issues in a clustered setup.
	EventFlowSilenceTolerance time.Duration
}

Purposedly made of built-in types only to bind directly to cli flags.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL