esstate

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2021 License: Apache-2.0 Imports: 10 Imported by: 1

Documentation

Index

Constants

View Source
const AggregateType = "eventsource.event_state"

AggregateType is the type of the EventState aggregate.

View Source
const EventHandlingFailedTopic = "eventsource.event_state.handling_failed"

EventHandlingFailedTopic is the topic used by the Event aggregate on the HandlingFailed event.

View Source
const EventHandlingFailedType = "event_state:handling_failed"

EventHandlingFailedType is the type used by the Event aggregate on the HandlingFailed event.

View Source
const EventHandlingFinishedTopic = "eventsource.event_state.handling_finished"

EventHandlingFinishedTopic is the topic used by the Event aggregate on the HandlingFinished event.

View Source
const EventHandlingFinishedType = "event_state:handling_finished"

EventHandlingFinishedType is the type used by the Event aggregate on the HandlingFinished event.

View Source
const EventHandlingStartedTopic = "eventsource.event_state.handling_started"

EventHandlingStartedTopic is the topic used by the Event aggregate on the HandlingStarted event.

View Source
const EventHandlingStartedType = "event_state:handling_started"

EventHandlingStartedType is the type used by the Event aggregate on the HandlingStarted event.

View Source
const EventUnhandledTopic = "eventsource.event_state.unhandled"

EventUnhandledTopic is the topic used by the Event aggregate on the Unhandled event.

View Source
const EventUnhandledType = "event_state:unhandled"

EventUnhandledType is the type used by the Event aggregate on the Unhandled event.

View Source
const FailureCountResetTopic = "eventsource.event_state.failure_count_reset"

FailureCountResetTopic is the topic used by the aggregate on the FailureCountReset event.

View Source
const FailureCountResetType = "event_state:failure_count_reset"

FailureCountResetType is the type used by the aggregate on the FailureCountReset event.

Variables

View Source
var File_eventstate_eventstate_proto protoreflect.FileDescriptor

Functions

This section is empty.

Types

type EventHandlingFailed

type EventHandlingFailed struct {
	HandlerName string `protobuf:"bytes,1,opt,name=handler_name,json=handlerName,proto3" json:"handler_name,omitempty"`
	Err         string `protobuf:"bytes,2,opt,name=err,proto3" json:"err,omitempty"`
	ErrCode     int32  `protobuf:"varint,3,opt,name=err_code,json=errCode,proto3" json:"err_code,omitempty"`
	// contains filtered or unexported fields
}

EventHandlingFailed is an event message occurred on a failure when handling given event.

func (*EventHandlingFailed) Descriptor deprecated

func (*EventHandlingFailed) Descriptor() ([]byte, []int)

Deprecated: Use EventHandlingFailed.ProtoReflect.Descriptor instead.

func (*EventHandlingFailed) GetErr

func (x *EventHandlingFailed) GetErr() string

func (*EventHandlingFailed) GetErrCode

func (x *EventHandlingFailed) GetErrCode() int32

func (*EventHandlingFailed) GetHandlerName

func (x *EventHandlingFailed) GetHandlerName() string

func (*EventHandlingFailed) MessageTopic

func (x *EventHandlingFailed) MessageTopic() string

MessageTopic returns messages.Topic from given message. Implements messages.Message interface.

func (*EventHandlingFailed) MessageType

func (x *EventHandlingFailed) MessageType() string

MessageType gets the type of the event. Implements messages.Message interface.

func (*EventHandlingFailed) ProtoMessage

func (*EventHandlingFailed) ProtoMessage()

func (*EventHandlingFailed) ProtoReflect

func (x *EventHandlingFailed) ProtoReflect() protoreflect.Message

func (*EventHandlingFailed) Reset

func (x *EventHandlingFailed) Reset()

func (*EventHandlingFailed) String

func (x *EventHandlingFailed) String() string

func (*EventHandlingFailed) Validate

func (x *EventHandlingFailed) Validate() error

Validate implements validator.Validator interface.

type EventHandlingFinished

type EventHandlingFinished struct {
	HandlerName string `protobuf:"bytes,1,opt,name=handler_name,json=handlerName,proto3" json:"handler_name,omitempty"`
	// contains filtered or unexported fields
}

EventHandlingFinished is an event message occurred when given handler just finished successfully handling an event.

func (*EventHandlingFinished) Descriptor deprecated

func (*EventHandlingFinished) Descriptor() ([]byte, []int)

Deprecated: Use EventHandlingFinished.ProtoReflect.Descriptor instead.

func (*EventHandlingFinished) GetHandlerName

func (x *EventHandlingFinished) GetHandlerName() string

func (*EventHandlingFinished) MessageTopic

func (x *EventHandlingFinished) MessageTopic() string

MessageTopic returns messages.Topic from given message. Implements messages.Message interface.

func (*EventHandlingFinished) MessageType

func (x *EventHandlingFinished) MessageType() string

MessageType gets the type of the event. Implements messages.Message interface.

func (*EventHandlingFinished) ProtoMessage

func (*EventHandlingFinished) ProtoMessage()

func (*EventHandlingFinished) ProtoReflect

func (x *EventHandlingFinished) ProtoReflect() protoreflect.Message

func (*EventHandlingFinished) Reset

func (x *EventHandlingFinished) Reset()

func (*EventHandlingFinished) String

func (x *EventHandlingFinished) String() string

func (*EventHandlingFinished) Validate

func (x *EventHandlingFinished) Validate() error

Validate implements validator.Validator interface.

type EventHandlingStarted

type EventHandlingStarted struct {
	HandlerName string `protobuf:"bytes,1,opt,name=handler_name,json=handlerName,proto3" json:"handler_name,omitempty"`
	// contains filtered or unexported fields
}

EventHandlingStarted is an event message occurred when given handler just started handling an event.

func (*EventHandlingStarted) Descriptor deprecated

func (*EventHandlingStarted) Descriptor() ([]byte, []int)

Deprecated: Use EventHandlingStarted.ProtoReflect.Descriptor instead.

func (*EventHandlingStarted) GetHandlerName

func (x *EventHandlingStarted) GetHandlerName() string

func (*EventHandlingStarted) MessageTopic

func (x *EventHandlingStarted) MessageTopic() string

MessageTopic returns messages.Topic from given message. Implements messages.Message interface.

func (*EventHandlingStarted) MessageType

func (x *EventHandlingStarted) MessageType() string

MessageType gets the type of the event. Implements messages.Message interface.

func (*EventHandlingStarted) ProtoMessage

func (*EventHandlingStarted) ProtoMessage()

func (*EventHandlingStarted) ProtoReflect

func (x *EventHandlingStarted) ProtoReflect() protoreflect.Message

func (*EventHandlingStarted) Reset

func (x *EventHandlingStarted) Reset()

func (*EventHandlingStarted) String

func (x *EventHandlingStarted) String() string

func (*EventHandlingStarted) Validate

func (x *EventHandlingStarted) Validate() error

Validate implements validator.Validator interface.

type EventState

type EventState struct {
	// contains filtered or unexported fields
}

EventState is an aggregate that stores the event handler changes.

func InitializeUnhandledEventState

func InitializeUnhandledEventState(eventID, eventType string, timestamp time.Time, bs *es.AggregateBaseSetter, o *Options) (*EventState, error)

InitializeUnhandledEventState creates and initializes a new EventState model with an EventUnhandled message.

func NewEventState

func NewEventState(eventID string, bs *es.AggregateBaseSetter) *EventState

NewEventState creates a new EventState aggregate model.

func (*EventState) AggBase

func (s *EventState) AggBase() *es.AggregateBase

AggBase implements es.Aggregate interface.

func (*EventState) Apply

func (s *EventState) Apply(e *es.Event) (err error)

Apply implements es.Aggregate interface.

func (*EventState) FinishHandling

func (s *EventState) FinishHandling(handlerName string) error

FinishHandling finishes handling given event state by the handlerName.

func (*EventState) HandlingFailed

func (s *EventState) HandlingFailed(handlerName string, handlingErr error) error

HandlingFailed marks the event state that it's handling had failed with given error.

func (*EventState) Reset

func (s *EventState) Reset()

Reset implements es.Aggregate interface.

func (*EventState) ResetFailures

func (s *EventState) ResetFailures(handlerName string) error

ResetFailures resets handling state failures.

func (*EventState) SetBase

func (s *EventState) SetBase(base *es.AggregateBase)

SetBase implements es.Aggregate interface.

func (*EventState) StartHandling

func (s *EventState) StartHandling(handlerName string) error

StartHandling starts handling given event by the handlerName.

type EventStore

type EventStore interface {
	es.EventStore

	// ChangeTypeOptions changes event type handling state options.
	ChangeTypeOptions(eventType string, options *Options)
}

EventStore is an interface that allows to operate on top of the standard es.EventStore, but also allows to control the state of the event handling.

type EventUnhandled

type EventUnhandled struct {
	EventType       string `protobuf:"bytes,1,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"`
	Timestamp       int64  `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	MaxFailures     int32  `protobuf:"varint,3,opt,name=max_failures,json=maxFailures,proto3" json:"max_failures,omitempty"`
	MinFailInterval int64  `protobuf:"varint,4,opt,name=min_fail_interval,json=minFailInterval,proto3" json:"min_fail_interval,omitempty"`
	MaxHandlingTime int64  `protobuf:"varint,5,opt,name=max_handling_time,json=maxHandlingTime,proto3" json:"max_handling_time,omitempty"`
	// contains filtered or unexported fields
}

EventUnhandled is an event message which states that an event is marked as unhandled.

func (*EventUnhandled) Descriptor deprecated

func (*EventUnhandled) Descriptor() ([]byte, []int)

Deprecated: Use EventUnhandled.ProtoReflect.Descriptor instead.

func (*EventUnhandled) GetEventType

func (x *EventUnhandled) GetEventType() string

func (*EventUnhandled) GetMaxFailures

func (x *EventUnhandled) GetMaxFailures() int32

func (*EventUnhandled) GetMaxHandlingTime

func (x *EventUnhandled) GetMaxHandlingTime() int64

func (*EventUnhandled) GetMinFailInterval

func (x *EventUnhandled) GetMinFailInterval() int64

func (*EventUnhandled) GetTimestamp

func (x *EventUnhandled) GetTimestamp() int64

func (*EventUnhandled) MessageTopic

func (x *EventUnhandled) MessageTopic() string

MessageTopic returns messages.Topic from given message. Implements messages.Message interface.

func (*EventUnhandled) MessageType

func (x *EventUnhandled) MessageType() string

MessageType gets the type of the event. Implements messages.Message interface.

func (*EventUnhandled) ProtoMessage

func (*EventUnhandled) ProtoMessage()

func (*EventUnhandled) ProtoReflect

func (x *EventUnhandled) ProtoReflect() protoreflect.Message

func (*EventUnhandled) Reset

func (x *EventUnhandled) Reset()

func (*EventUnhandled) String

func (x *EventUnhandled) String() string

func (*EventUnhandled) Validate

func (x *EventUnhandled) Validate() error

Validate implements validator.Validator interface.

type FailureCountReset

type FailureCountReset struct {
	HandlerName string `protobuf:"bytes,1,opt,name=handlerName,proto3" json:"handlerName,omitempty"`
	// contains filtered or unexported fields
}

FailureCountReset resets failure count for given event.

func (*FailureCountReset) Descriptor deprecated

func (*FailureCountReset) Descriptor() ([]byte, []int)

Deprecated: Use FailureCountReset.ProtoReflect.Descriptor instead.

func (*FailureCountReset) GetHandlerName

func (x *FailureCountReset) GetHandlerName() string

func (*FailureCountReset) MessageTopic

func (x *FailureCountReset) MessageTopic() string

MessageTopic returns messages.Topic from given message. Implements messages.Message interface.

func (*FailureCountReset) MessageType

func (x *FailureCountReset) MessageType() string

MessageType gets the type of the event. Implements messages.Message interface.

func (*FailureCountReset) ProtoMessage

func (*FailureCountReset) ProtoMessage()

func (*FailureCountReset) ProtoReflect

func (x *FailureCountReset) ProtoReflect() protoreflect.Message

func (*FailureCountReset) Reset

func (x *FailureCountReset) Reset()

func (*FailureCountReset) String

func (x *FailureCountReset) String() string

func (*FailureCountReset) Validate

func (x *FailureCountReset) Validate() error

Validate implements validator.Validator interface.

type Options

type Options struct {
	// MaxFailures is the maximum number of failures for which the event would not allow to, start until it is reset.
	MaxFailures int
	// MinFailInterval is a minimum duration interval to wait after failure.
	// Next failure duration would be increased exponentially.
	MinFailInterval time.Duration
	// MaxHandlingTime is a handling time after which the message is treated as lost.
	MaxHandlingTime time.Duration
}

func DefaultOptions

func DefaultOptions() *Options

DefaultOptions creates default event state options.

func (*Options) Validate

func (o *Options) Validate() error

type State

type State int

State is a state value for the event handling.

const (
	// StateUndefined is an undefined event handle state.
	StateUndefined State = 0
	// StateUnhandled is a state for unhandled event.
	StateUnhandled State = 1
	// StateStarted is a state that handling of an event was already started.
	StateStarted State = 2
	// StateFinished is a state that handling an event is done.
	StateFinished State = 3
	// StateFailed is a state of an event that states handling an event had failed.
	StateFailed State = 4
)

type Storage

type Storage interface {
	BeginTx(ctx context.Context) (TxStorage, error)
	StorageBase
}

Storage is an interface used for changing the state of given event with an ability of doing it in transaction.

type StorageBase

type StorageBase interface {
	es.StorageBase
	// MarkUnhandled marks the events as unhandled.
	MarkUnhandled(ctx context.Context, eventID, eventType string, timestamp int64) error
	// StartHandling marks given event that it is being handled.
	StartHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error
	// FinishHandling marks an event that it is already handled.
	FinishHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error
	// HandlingFailed marks given handling as failure.
	HandlingFailed(ctx context.Context, failure *eventstate2.HandleFailure) error
	// RegisterHandlers registers the information about event handler.
	// This function should be done during migration of the event handler.
	RegisterHandlers(ctx context.Context, eventHandler ...eventstate2.Handler) error
	// ListHandlers list the handlers for the
	ListHandlers(ctx context.Context) ([]eventstate2.Handler, error)
	// FindUnhandled finds all unhandled events for given handler.
	FindUnhandled(ctx context.Context, query eventstate2.FindUnhandledQuery) ([]eventstate2.Unhandled, error)
	// FindFailures finds the handle failures for given handler name.
	FindFailures(ctx context.Context, query eventstate2.FindFailureQuery) ([]eventstate2.HandleFailure, error)
}

StorageBase is an interface that contains base functionality of handling event state.

type Store

type Store struct {
	*es.Store
	// contains filtered or unexported fields
}

Store is an implementation of the event store that is also handling the event state on each commit.

func NewStore

func NewStore(cfg *es.Config, eventCodec es.EventCodec, snapCodec es.SnapshotCodec, storage Storage) (*Store, error)

NewStore creates a new store that works in the same way as the es.Store with enhanced feature of tracking event state on each commit.

func (*Store) ChangeTypeOptions

func (s *Store) ChangeTypeOptions(eventType string, options *Options)

ChangeTypeOptions changes the default options for given event type.

func (*Store) Commit

func (s *Store) Commit(ctx context.Context, aggregate es.Aggregate) error

Commit overwrites the default method of the es.Store and atomically commits given aggregate events, but also creates a new EventState per each committed event. This way no event is lost in handling, and the handlers now are able to control its status.

func (*Store) FindEventHandleFailures

func (s *Store) FindEventHandleFailures(ctx context.Context, query eventstate2.FindFailureQuery) ([]eventstate2.HandleFailure, error)

FindEventHandleFailures find all handling failures that matches given query.

func (*Store) FindUnhandledEvents

func (s *Store) FindUnhandledEvents(ctx context.Context, query eventstate2.FindUnhandledQuery) ([]eventstate2.Unhandled, error)

FindUnhandledEvents finds all unhandled events that matches given query.

func (*Store) FinishHandling

func (s *Store) FinishHandling(ctx context.Context, eventID, handlerName string) error

FinishHandling finishes handling given event by the handlerName.

func (*Store) HandlingFailed

func (s *Store) HandlingFailed(ctx context.Context, eventID, handlerName string, handleErr error) error

HandlingFailed finishes handling given event by the handlerName.

func (*Store) ListHandlers

func (s *Store) ListHandlers(ctx context.Context) ([]eventstate2.Handler, error)

ListHandlers list the handlers that matches given event type.

func (*Store) RegisterHandlers

func (s *Store) RegisterHandlers(ctx context.Context, eventHandlers ...eventstate2.Handler) error

RegisterHandlers registers unique event handlers. This function should be used on the event migration only once per service.

func (*Store) StartHandling

func (s *Store) StartHandling(ctx context.Context, eventID, handlerName string) error

StartHandling starts handling given event by the handler with a name = handlerName.

type TxStorage

type TxStorage interface {
	StorageBase
	Commit(ctx context.Context) error
	Rollback(ctx context.Context) error
}

TxStorage is an interface used for changing the state of

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL