Documentation ¶
Index ¶
- Constants
- Variables
- type EventHandlingFailed
- func (*EventHandlingFailed) Descriptor() ([]byte, []int)deprecated
- func (x *EventHandlingFailed) GetErr() string
- func (x *EventHandlingFailed) GetErrCode() int32
- func (x *EventHandlingFailed) GetHandlerName() string
- func (x *EventHandlingFailed) MessageTopic() string
- func (x *EventHandlingFailed) MessageType() string
- func (*EventHandlingFailed) ProtoMessage()
- func (x *EventHandlingFailed) ProtoReflect() protoreflect.Message
- func (x *EventHandlingFailed) Reset()
- func (x *EventHandlingFailed) String() string
- func (x *EventHandlingFailed) Validate() error
- type EventHandlingFinished
- func (*EventHandlingFinished) Descriptor() ([]byte, []int)deprecated
- func (x *EventHandlingFinished) GetHandlerName() string
- func (x *EventHandlingFinished) MessageTopic() string
- func (x *EventHandlingFinished) MessageType() string
- func (*EventHandlingFinished) ProtoMessage()
- func (x *EventHandlingFinished) ProtoReflect() protoreflect.Message
- func (x *EventHandlingFinished) Reset()
- func (x *EventHandlingFinished) String() string
- func (x *EventHandlingFinished) Validate() error
- type EventHandlingStarted
- func (*EventHandlingStarted) Descriptor() ([]byte, []int)deprecated
- func (x *EventHandlingStarted) GetHandlerName() string
- func (x *EventHandlingStarted) MessageTopic() string
- func (x *EventHandlingStarted) MessageType() string
- func (*EventHandlingStarted) ProtoMessage()
- func (x *EventHandlingStarted) ProtoReflect() protoreflect.Message
- func (x *EventHandlingStarted) Reset()
- func (x *EventHandlingStarted) String() string
- func (x *EventHandlingStarted) Validate() error
- type EventState
- func (s *EventState) AggBase() *es.AggregateBase
- func (s *EventState) Apply(e *es.Event) (err error)
- func (s *EventState) FinishHandling(handlerName string) error
- func (s *EventState) HandlingFailed(handlerName string, handlingErr error) error
- func (s *EventState) Reset()
- func (s *EventState) ResetFailures(handlerName string) error
- func (s *EventState) SetBase(base *es.AggregateBase)
- func (s *EventState) StartHandling(handlerName string) error
- type EventStore
- type EventUnhandled
- func (*EventUnhandled) Descriptor() ([]byte, []int)deprecated
- func (x *EventUnhandled) GetEventType() string
- func (x *EventUnhandled) GetMaxFailures() int32
- func (x *EventUnhandled) GetMaxHandlingTime() int64
- func (x *EventUnhandled) GetMinFailInterval() int64
- func (x *EventUnhandled) GetTimestamp() int64
- func (x *EventUnhandled) MessageTopic() string
- func (x *EventUnhandled) MessageType() string
- func (*EventUnhandled) ProtoMessage()
- func (x *EventUnhandled) ProtoReflect() protoreflect.Message
- func (x *EventUnhandled) Reset()
- func (x *EventUnhandled) String() string
- func (x *EventUnhandled) Validate() error
- type FailureCountReset
- func (*FailureCountReset) Descriptor() ([]byte, []int)deprecated
- func (x *FailureCountReset) GetHandlerName() string
- func (x *FailureCountReset) MessageTopic() string
- func (x *FailureCountReset) MessageType() string
- func (*FailureCountReset) ProtoMessage()
- func (x *FailureCountReset) ProtoReflect() protoreflect.Message
- func (x *FailureCountReset) Reset()
- func (x *FailureCountReset) String() string
- func (x *FailureCountReset) Validate() error
- type Options
- type State
- type Storage
- type StorageBase
- type Store
- func (s *Store) ChangeTypeOptions(eventType string, options *Options)
- func (s *Store) Commit(ctx context.Context, aggregate es.Aggregate) error
- func (s *Store) FindEventHandleFailures(ctx context.Context, query eventstate2.FindFailureQuery) ([]eventstate2.HandleFailure, error)
- func (s *Store) FindUnhandledEvents(ctx context.Context, query eventstate2.FindUnhandledQuery) ([]eventstate2.Unhandled, error)
- func (s *Store) FinishHandling(ctx context.Context, eventID, handlerName string) error
- func (s *Store) HandlingFailed(ctx context.Context, eventID, handlerName string, handleErr error) error
- func (s *Store) ListHandlers(ctx context.Context) ([]eventstate2.Handler, error)
- func (s *Store) RegisterHandlers(ctx context.Context, eventHandlers ...eventstate2.Handler) error
- func (s *Store) StartHandling(ctx context.Context, eventID, handlerName string) error
- type TxStorage
Constants ¶
const AggregateType = "eventsource.event_state"
AggregateType is the type of the EventState aggregate.
const EventHandlingFailedTopic = "eventsource.event_state.handling_failed"
EventHandlingFailedTopic is the topic used by the Event aggregate on the HandlingFailed event.
const EventHandlingFailedType = "event_state:handling_failed"
EventHandlingFailedType is the type used by the Event aggregate on the HandlingFailed event.
const EventHandlingFinishedTopic = "eventsource.event_state.handling_finished"
EventHandlingFinishedTopic is the topic used by the Event aggregate on the HandlingFinished event.
const EventHandlingFinishedType = "event_state:handling_finished"
EventHandlingFinishedType is the type used by the Event aggregate on the HandlingFinished event.
const EventHandlingStartedTopic = "eventsource.event_state.handling_started"
EventHandlingStartedTopic is the topic used by the Event aggregate on the HandlingStarted event.
const EventHandlingStartedType = "event_state:handling_started"
EventHandlingStartedType is the type used by the Event aggregate on the HandlingStarted event.
const EventUnhandledTopic = "eventsource.event_state.unhandled"
EventUnhandledTopic is the topic used by the Event aggregate on the Unhandled event.
const EventUnhandledType = "event_state:unhandled"
EventUnhandledType is the type used by the Event aggregate on the Unhandled event.
const FailureCountResetTopic = "eventsource.event_state.failure_count_reset"
FailureCountResetTopic is the topic used by the aggregate on the FailureCountReset event.
const FailureCountResetType = "event_state:failure_count_reset"
FailureCountResetType is the type used by the aggregate on the FailureCountReset event.
Variables ¶
var File_eventstate_eventstate_proto protoreflect.FileDescriptor
Functions ¶
This section is empty.
Types ¶
type EventHandlingFailed ¶
type EventHandlingFailed struct { HandlerName string `protobuf:"bytes,1,opt,name=handler_name,json=handlerName,proto3" json:"handler_name,omitempty"` Err string `protobuf:"bytes,2,opt,name=err,proto3" json:"err,omitempty"` ErrCode int32 `protobuf:"varint,3,opt,name=err_code,json=errCode,proto3" json:"err_code,omitempty"` // contains filtered or unexported fields }
EventHandlingFailed is an event message occurred on a failure when handling given event.
func (*EventHandlingFailed) Descriptor
deprecated
func (*EventHandlingFailed) Descriptor() ([]byte, []int)
Deprecated: Use EventHandlingFailed.ProtoReflect.Descriptor instead.
func (*EventHandlingFailed) GetErr ¶
func (x *EventHandlingFailed) GetErr() string
func (*EventHandlingFailed) GetErrCode ¶
func (x *EventHandlingFailed) GetErrCode() int32
func (*EventHandlingFailed) GetHandlerName ¶
func (x *EventHandlingFailed) GetHandlerName() string
func (*EventHandlingFailed) MessageTopic ¶
func (x *EventHandlingFailed) MessageTopic() string
MessageTopic returns messages.Topic from given message. Implements messages.Message interface.
func (*EventHandlingFailed) MessageType ¶
func (x *EventHandlingFailed) MessageType() string
MessageType gets the type of the event. Implements messages.Message interface.
func (*EventHandlingFailed) ProtoMessage ¶
func (*EventHandlingFailed) ProtoMessage()
func (*EventHandlingFailed) ProtoReflect ¶
func (x *EventHandlingFailed) ProtoReflect() protoreflect.Message
func (*EventHandlingFailed) Reset ¶
func (x *EventHandlingFailed) Reset()
func (*EventHandlingFailed) String ¶
func (x *EventHandlingFailed) String() string
func (*EventHandlingFailed) Validate ¶
func (x *EventHandlingFailed) Validate() error
Validate implements validator.Validator interface.
type EventHandlingFinished ¶
type EventHandlingFinished struct { HandlerName string `protobuf:"bytes,1,opt,name=handler_name,json=handlerName,proto3" json:"handler_name,omitempty"` // contains filtered or unexported fields }
EventHandlingFinished is an event message occurred when given handler just finished successfully handling an event.
func (*EventHandlingFinished) Descriptor
deprecated
func (*EventHandlingFinished) Descriptor() ([]byte, []int)
Deprecated: Use EventHandlingFinished.ProtoReflect.Descriptor instead.
func (*EventHandlingFinished) GetHandlerName ¶
func (x *EventHandlingFinished) GetHandlerName() string
func (*EventHandlingFinished) MessageTopic ¶
func (x *EventHandlingFinished) MessageTopic() string
MessageTopic returns messages.Topic from given message. Implements messages.Message interface.
func (*EventHandlingFinished) MessageType ¶
func (x *EventHandlingFinished) MessageType() string
MessageType gets the type of the event. Implements messages.Message interface.
func (*EventHandlingFinished) ProtoMessage ¶
func (*EventHandlingFinished) ProtoMessage()
func (*EventHandlingFinished) ProtoReflect ¶
func (x *EventHandlingFinished) ProtoReflect() protoreflect.Message
func (*EventHandlingFinished) Reset ¶
func (x *EventHandlingFinished) Reset()
func (*EventHandlingFinished) String ¶
func (x *EventHandlingFinished) String() string
func (*EventHandlingFinished) Validate ¶
func (x *EventHandlingFinished) Validate() error
Validate implements validator.Validator interface.
type EventHandlingStarted ¶
type EventHandlingStarted struct { HandlerName string `protobuf:"bytes,1,opt,name=handler_name,json=handlerName,proto3" json:"handler_name,omitempty"` // contains filtered or unexported fields }
EventHandlingStarted is an event message occurred when given handler just started handling an event.
func (*EventHandlingStarted) Descriptor
deprecated
func (*EventHandlingStarted) Descriptor() ([]byte, []int)
Deprecated: Use EventHandlingStarted.ProtoReflect.Descriptor instead.
func (*EventHandlingStarted) GetHandlerName ¶
func (x *EventHandlingStarted) GetHandlerName() string
func (*EventHandlingStarted) MessageTopic ¶
func (x *EventHandlingStarted) MessageTopic() string
MessageTopic returns messages.Topic from given message. Implements messages.Message interface.
func (*EventHandlingStarted) MessageType ¶
func (x *EventHandlingStarted) MessageType() string
MessageType gets the type of the event. Implements messages.Message interface.
func (*EventHandlingStarted) ProtoMessage ¶
func (*EventHandlingStarted) ProtoMessage()
func (*EventHandlingStarted) ProtoReflect ¶
func (x *EventHandlingStarted) ProtoReflect() protoreflect.Message
func (*EventHandlingStarted) Reset ¶
func (x *EventHandlingStarted) Reset()
func (*EventHandlingStarted) String ¶
func (x *EventHandlingStarted) String() string
func (*EventHandlingStarted) Validate ¶
func (x *EventHandlingStarted) Validate() error
Validate implements validator.Validator interface.
type EventState ¶
type EventState struct {
// contains filtered or unexported fields
}
EventState is an aggregate that stores the event handler changes.
func InitializeUnhandledEventState ¶
func InitializeUnhandledEventState(eventID, eventType string, timestamp time.Time, bs *es.AggregateBaseSetter, o *Options) (*EventState, error)
InitializeUnhandledEventState creates and initializes a new EventState model with an EventUnhandled message.
func NewEventState ¶
func NewEventState(eventID string, bs *es.AggregateBaseSetter) *EventState
NewEventState creates a new EventState aggregate model.
func (*EventState) AggBase ¶
func (s *EventState) AggBase() *es.AggregateBase
AggBase implements es.Aggregate interface.
func (*EventState) Apply ¶
func (s *EventState) Apply(e *es.Event) (err error)
Apply implements es.Aggregate interface.
func (*EventState) FinishHandling ¶
func (s *EventState) FinishHandling(handlerName string) error
FinishHandling finishes handling given event state by the handlerName.
func (*EventState) HandlingFailed ¶
func (s *EventState) HandlingFailed(handlerName string, handlingErr error) error
HandlingFailed marks the event state that it's handling had failed with given error.
func (*EventState) ResetFailures ¶
func (s *EventState) ResetFailures(handlerName string) error
ResetFailures resets handling state failures.
func (*EventState) SetBase ¶
func (s *EventState) SetBase(base *es.AggregateBase)
SetBase implements es.Aggregate interface.
func (*EventState) StartHandling ¶
func (s *EventState) StartHandling(handlerName string) error
StartHandling starts handling given event by the handlerName.
type EventStore ¶
type EventStore interface { es.EventStore // ChangeTypeOptions changes event type handling state options. ChangeTypeOptions(eventType string, options *Options) }
EventStore is an interface that allows to operate on top of the standard es.EventStore, but also allows to control the state of the event handling.
type EventUnhandled ¶
type EventUnhandled struct { EventType string `protobuf:"bytes,1,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"` Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` MaxFailures int32 `protobuf:"varint,3,opt,name=max_failures,json=maxFailures,proto3" json:"max_failures,omitempty"` MinFailInterval int64 `protobuf:"varint,4,opt,name=min_fail_interval,json=minFailInterval,proto3" json:"min_fail_interval,omitempty"` MaxHandlingTime int64 `protobuf:"varint,5,opt,name=max_handling_time,json=maxHandlingTime,proto3" json:"max_handling_time,omitempty"` // contains filtered or unexported fields }
EventUnhandled is an event message which states that an event is marked as unhandled.
func (*EventUnhandled) Descriptor
deprecated
func (*EventUnhandled) Descriptor() ([]byte, []int)
Deprecated: Use EventUnhandled.ProtoReflect.Descriptor instead.
func (*EventUnhandled) GetEventType ¶
func (x *EventUnhandled) GetEventType() string
func (*EventUnhandled) GetMaxFailures ¶
func (x *EventUnhandled) GetMaxFailures() int32
func (*EventUnhandled) GetMaxHandlingTime ¶
func (x *EventUnhandled) GetMaxHandlingTime() int64
func (*EventUnhandled) GetMinFailInterval ¶
func (x *EventUnhandled) GetMinFailInterval() int64
func (*EventUnhandled) GetTimestamp ¶
func (x *EventUnhandled) GetTimestamp() int64
func (*EventUnhandled) MessageTopic ¶
func (x *EventUnhandled) MessageTopic() string
MessageTopic returns messages.Topic from given message. Implements messages.Message interface.
func (*EventUnhandled) MessageType ¶
func (x *EventUnhandled) MessageType() string
MessageType gets the type of the event. Implements messages.Message interface.
func (*EventUnhandled) ProtoMessage ¶
func (*EventUnhandled) ProtoMessage()
func (*EventUnhandled) ProtoReflect ¶
func (x *EventUnhandled) ProtoReflect() protoreflect.Message
func (*EventUnhandled) Reset ¶
func (x *EventUnhandled) Reset()
func (*EventUnhandled) String ¶
func (x *EventUnhandled) String() string
func (*EventUnhandled) Validate ¶
func (x *EventUnhandled) Validate() error
Validate implements validator.Validator interface.
type FailureCountReset ¶
type FailureCountReset struct { HandlerName string `protobuf:"bytes,1,opt,name=handlerName,proto3" json:"handlerName,omitempty"` // contains filtered or unexported fields }
FailureCountReset resets failure count for given event.
func (*FailureCountReset) Descriptor
deprecated
func (*FailureCountReset) Descriptor() ([]byte, []int)
Deprecated: Use FailureCountReset.ProtoReflect.Descriptor instead.
func (*FailureCountReset) GetHandlerName ¶
func (x *FailureCountReset) GetHandlerName() string
func (*FailureCountReset) MessageTopic ¶
func (x *FailureCountReset) MessageTopic() string
MessageTopic returns messages.Topic from given message. Implements messages.Message interface.
func (*FailureCountReset) MessageType ¶
func (x *FailureCountReset) MessageType() string
MessageType gets the type of the event. Implements messages.Message interface.
func (*FailureCountReset) ProtoMessage ¶
func (*FailureCountReset) ProtoMessage()
func (*FailureCountReset) ProtoReflect ¶
func (x *FailureCountReset) ProtoReflect() protoreflect.Message
func (*FailureCountReset) Reset ¶
func (x *FailureCountReset) Reset()
func (*FailureCountReset) String ¶
func (x *FailureCountReset) String() string
func (*FailureCountReset) Validate ¶
func (x *FailureCountReset) Validate() error
Validate implements validator.Validator interface.
type Options ¶
type Options struct { // MaxFailures is the maximum number of failures for which the event would not allow to, start until it is reset. MaxFailures int // MinFailInterval is a minimum duration interval to wait after failure. // Next failure duration would be increased exponentially. MinFailInterval time.Duration // MaxHandlingTime is a handling time after which the message is treated as lost. MaxHandlingTime time.Duration }
func DefaultOptions ¶
func DefaultOptions() *Options
DefaultOptions creates default event state options.
type State ¶
type State int
State is a state value for the event handling.
const ( // StateUndefined is an undefined event handle state. StateUndefined State = 0 // StateUnhandled is a state for unhandled event. StateUnhandled State = 1 // StateStarted is a state that handling of an event was already started. StateStarted State = 2 // StateFinished is a state that handling an event is done. StateFinished State = 3 // StateFailed is a state of an event that states handling an event had failed. StateFailed State = 4 )
type Storage ¶
type Storage interface { BeginTx(ctx context.Context) (TxStorage, error) StorageBase }
Storage is an interface used for changing the state of given event with an ability of doing it in transaction.
type StorageBase ¶
type StorageBase interface { es.StorageBase // MarkUnhandled marks the events as unhandled. MarkUnhandled(ctx context.Context, eventID, eventType string, timestamp int64) error // StartHandling marks given event that it is being handled. StartHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error // FinishHandling marks an event that it is already handled. FinishHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error // HandlingFailed marks given handling as failure. HandlingFailed(ctx context.Context, failure *eventstate2.HandleFailure) error // RegisterHandlers registers the information about event handler. // This function should be done during migration of the event handler. RegisterHandlers(ctx context.Context, eventHandler ...eventstate2.Handler) error // ListHandlers list the handlers for the ListHandlers(ctx context.Context) ([]eventstate2.Handler, error) // FindUnhandled finds all unhandled events for given handler. FindUnhandled(ctx context.Context, query eventstate2.FindUnhandledQuery) ([]eventstate2.Unhandled, error) // FindFailures finds the handle failures for given handler name. FindFailures(ctx context.Context, query eventstate2.FindFailureQuery) ([]eventstate2.HandleFailure, error) }
StorageBase is an interface that contains base functionality of handling event state.
type Store ¶
Store is an implementation of the event store that is also handling the event state on each commit.
func NewStore ¶
func NewStore(cfg *es.Config, eventCodec es.EventCodec, snapCodec es.SnapshotCodec, storage Storage) (*Store, error)
NewStore creates a new store that works in the same way as the es.Store with enhanced feature of tracking event state on each commit.
func (*Store) ChangeTypeOptions ¶
ChangeTypeOptions changes the default options for given event type.
func (*Store) Commit ¶
Commit overwrites the default method of the es.Store and atomically commits given aggregate events, but also creates a new EventState per each committed event. This way no event is lost in handling, and the handlers now are able to control its status.
func (*Store) FindEventHandleFailures ¶
func (s *Store) FindEventHandleFailures(ctx context.Context, query eventstate2.FindFailureQuery) ([]eventstate2.HandleFailure, error)
FindEventHandleFailures find all handling failures that matches given query.
func (*Store) FindUnhandledEvents ¶
func (s *Store) FindUnhandledEvents(ctx context.Context, query eventstate2.FindUnhandledQuery) ([]eventstate2.Unhandled, error)
FindUnhandledEvents finds all unhandled events that matches given query.
func (*Store) FinishHandling ¶
FinishHandling finishes handling given event by the handlerName.
func (*Store) HandlingFailed ¶
func (s *Store) HandlingFailed(ctx context.Context, eventID, handlerName string, handleErr error) error
HandlingFailed finishes handling given event by the handlerName.
func (*Store) ListHandlers ¶
ListHandlers list the handlers that matches given event type.
func (*Store) RegisterHandlers ¶
RegisterHandlers registers unique event handlers. This function should be used on the event migration only once per service.