Documentation ¶
Index ¶
- Variables
- func CreatePoller(ms MessageStore, worker SubscriptionWorker, config *SubscriberConfig) (*poller, error)
- func NewID() uuid.UUID
- func Pack(source interface{}) (map[string]interface{}, error)
- func Unpack(source map[string]interface{}, dest interface{}) error
- type AtPositionMatcher
- type Command
- type Event
- type GetOption
- func BatchSize(batchsize int) GetOption
- func Category(category string) GetOption
- func CommandStream(category string) GetOption
- func Converter(converter MessageConverter) GetOption
- func EventStream(category string, entityID uuid.UUID) GetOption
- func GenericStream(stream string) GetOption
- func Last() GetOption
- func PositionStream(subscriberID string) GetOption
- func SincePosition(position int64) GetOption
- func SinceVersion(version int64) GetOption
- type Message
- type MessageConverter
- type MessageHandler
- type MessageReducer
- type MessageReducerConfig
- type MessageReducerFunc
- type MessageStore
- type Poller
- type Projector
- type ProjectorOption
- type Subscriber
- type SubscriberConfig
- type SubscriberOption
- func OnError(errorFunc func(error)) SubscriberOption
- func PollErrorDelay(pollErrorDelay time.Duration) SubscriberOption
- func PollTime(pollTime time.Duration) SubscriberOption
- func SubscribeBatchSize(batchSize int) SubscriberOption
- func SubscribeLogger(logger logrus.FieldLogger) SubscriberOption
- func SubscribeToCategory(category string) SubscriberOption
- func SubscribeToCommandStream(category string) SubscriberOption
- func SubscribeToEntityStream(category string, entityID uuid.UUID) SubscriberOption
- func UpdatePositionEvery(msgInterval int) SubscriberOption
- func WithConverter(converter MessageConverter) SubscriberOption
- type SubscriptionWorker
- type WriteOption
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidOptionCombination = errors.New("Cannot have the current combination of options for Get()") ErrSubscriberCannotUseBothStreamAndCategory = errors.New("Subscriber function cannot use both Stream and Category") ErrInvalidPollTime = errors.New("Invalid Subscriber poll time provided, can not be negative or zero") ErrInvalidPollErrorDelay = errors.New("Invalid Subscriber poll error delay provided, can not be negative or zero") ErrInvalidBatchSize = errors.New("Invalid Subscriber batch size provided, can not be negative or zero") ErrInvalidMsgInterval = errors.New("MsgInterval cannot be less than 2") ErrSubscriberNeedsCategoryOrStream = errors.New("Subscriber needs at least one of category or stream to be set upon creation") ErrSubscriberIDCannotBeEmpty = errors.New("Subscriber ID cannot be nil") ErrSubscriberIDCannotContainPlus = errors.New("Subscriber ID cannot contain a plus") ErrSubscriberIDCannotContainHyphen = errors.New("Subscriber ID cannot contain a hyphen") ErrPositionVersionMissing = errors.New("Subscriber cannot use non-versioned positionMsg object") ErrSubscriberNeedsAtLeastOneMessageHandler = errors.New("Subscriber needs at least one handler upon creation") ErrSubscriberCannotSubscribeToMultipleStreams = errors.New("Subscribers can only subscribe to one stream") ErrSubscriberCannotSubscribeToMultipleCategories = errors.New("Subscribers can only subscribe to one category") ErrProjectorNeedsAtLeastOneReducer = errors.New("Projector needs at least one reducer upon creation") ErrSubscriberMessageHandlerEqualToNil = errors.New("Subscriber Message Handler cannot be equal to nil") ErrSubscriberMessageHandlersEqualToNil = errors.New("Subscriber Message Handler array cannot be equal to nil") ErrSubscriberNilOption = errors.New("Options cannot include an option whose value is equal to nil") ErrDefaultStateNotSet = errors.New("Default state not set while trying to create a new projector") ErrDefaultStateCannotBePointer = errors.New("Default state cannot be a pointer when creating a projector") ErrGetMessagesCannotUseBothStreamAndCategory = errors.New("Get messages function cannot use both Stream and Category") ErrMessageNoID = errors.New("Message cannot be written without a new UUID") ErrGetMessagesRequiresEitherStreamOrCategory = errors.New("Get messages function must have either Stream or Category") ErrGetLastRequiresStream = errors.New("Get Last option requires a stream") ErrIncorrectNumberOfPositionsFound = errors.New("Exactly one position should be found per subscriber") ErrInvalidHandler = errors.New("Handler cannot be nil") ErrIncorrectMessageInPositionStream = errors.New("Position streams can only have position messages") ErrHandlerError = errors.New("Handler failed to handle message") ErrMissingMessageType = errors.New("All messages require a type") ErrMissingMessageCategory = errors.New("All messages require a category") ErrInvalidMessageCategory = errors.New("Hyphens are not allowed in category names") ErrInvalidCommandStream = errors.New("Hyphens are not allowed in command stream name") ErrInvalidEventStream = errors.New("Hyphens are not allowed in event stream name") ErrInvalidSubscriberID = errors.New("Hyphens and plusses are not allowed in subscriber ID") ErrInvalidPositionStream = errors.New("Position stream expects to have a single plus diving the subscriber ID and the word 'position'") ErrMissingMessageCategoryID = errors.New("All messages require a category ID") ErrMissingMessageData = errors.New("Messages payload must not be nil") ErrUnserializableData = errors.New("Message data could not be encoded as json") ErrDataIsNilPointer = errors.New("Message data is a nil pointer") ErrMissingGetOptions = errors.New("Options are required for the Get command") ErrExpectedVersionFailed = errors.New("Provided version does not match the expected version") )
The following are the different error messages that can be potentially returned.
The following table shows which files the specific errors are used for easier troubleshooting:
Error | File Path ------------------------------------------------------------------------------------------------------ ErrInvalidOptionCombination | ./get.go ErrSubscriberCannotUseBothStreamAndCategory | ./subscriber_options.go ErrInvalidPollTime | ./subscriber_options.go ErrInvalidPollErrorDelay | ./subscriber_options.go ErrInvalidBatchSize | ./subscriber_options.go ErrInvalidMsgInterval | ./subscriber_options.go ErrSubscriberNeedsCategoryOrStream | ./subscriber_options.go ErrSubscriberIDCannotBeEmpty | ./subscriber.go | ./worker_getpostion.go ErrSubscriberIDCannotContainPlus | no uses ErrSubscriberIDCannotContainHyphen | no uses ErrPositionVersionMissing | ./worker_getposition.go ErrSubscriberNeedsAtLeastOneMessageHandler | ./subscriber.go ErrSubscriberCannotSubscribeToMultipleStreams | ./subscriber_options.go ErrSubscriberCannotSubscribeToMultipleCategories| ./subscriber_options.go ErrProjectorNeedsAtLeastOneReducer | ./projector.go ErrSubscriberMessageHandlerEqualToNil | ./subscriber.go ErrSubscriberMessageHandlersEqualToNil | ./subscriber.go ErrSubscriberNilOption | ./subscriber_options.go ErrDefaultStateNotSet | ./projector.go ErrDefaultStateCannotBePointer | ./projector.go ErrGetMessagesCannotUseBothStreamAndCategory | ./get.go ErrMessageNoID | ./models.go | ./worker_getposition.go ErrGetMessagesRequiresEitherStreamOrCategory | ./get.go ErrGetLastRequiresStream | ./get.go ErrIncorrectNumberOfPositionsFound | no uses ErrInvalidHandler | no uses ErrIncorrectMessageInPositionStream | ./worker_getposition ErrHandlerError | no uses ErrMissingMessageType | ./models.go | ./worker_getposition.go ErrMissingMessageCategory | ./models.go ErrInvalidMessageCategory | ./get.go | ./models.go ErrInvalidCommandStream | ./get.go ErrInvalidEventStream | ./get.go ErrInvalidSubscriberID | ./subscriber.go ErrInvalidPositionStream | ./get.go | ./worker_getposition.go ErrMissingMessageCategoryID | ./models.go ErrMissingMessageData | ./models.go ErrUnserializableData | ./models.go | ./worker_getposition.go ErrDataIsNilPointer | no uses ErrMissingGetOptions | ./get.go
var NilUUID = uuid.Nil
NilUUID is a helper for tests
Functions ¶
func CreatePoller ¶
func CreatePoller(ms MessageStore, worker SubscriptionWorker, config *SubscriberConfig) (*poller, error)
CreatePoller returns a new instance of a Poller
Types ¶
type AtPositionMatcher ¶ added in v0.4.7
type AtPositionMatcher struct {
Position int64
}
AtPositionMatcher is a gomock.Matcher interface that matches an AtPosition function
func (AtPositionMatcher) Matches ¶ added in v0.4.7
func (a AtPositionMatcher) Matches(unknown interface{}) bool
Matches checks agains a AtPosition/gms.WriteOption function
func (AtPositionMatcher) String ¶ added in v0.4.7
func (a AtPositionMatcher) String() string
String gives us a representation of our AtPositionMatcher
type Command ¶
type Command struct { ID uuid.UUID // ID for the command StreamCategory string // Name of the stream category MessageType string // Name of the message type MessageVersion int64 // version number of the message GlobalPosition int64 // global position of the command Data map[string]interface{} Metadata map[string]interface{} Time time.Time }
Command implements the Message interface; returned by get function
func (*Command) ToEnvelope ¶
func (cmd *Command) ToEnvelope() (*repository.MessageEnvelope, error)
ToEnvelope converts the command to a Message Envelope that is returned
type Event ¶
type Event struct { ID uuid.UUID // ID of the event EntityID uuid.UUID // ID of the entity the event is associated with StreamCategory string // the name of the category of the stream MessageType string // the message type of the event MessageVersion int64 // the version number of the message GlobalPosition int64 // the global position of the event Data map[string]interface{} Metadata map[string]interface{} Time time.Time }
Event implements the Message interface; returned by get function
func (*Event) ToEnvelope ¶
func (event *Event) ToEnvelope() (*repository.MessageEnvelope, error)
ToEnvelope converts the event to a MessageEnvelope which is then returned
type GetOption ¶
type GetOption func(g *getOpts) error
GetOption provide optional arguments to the Get function Invalid combinations: EventStream() and/or CommandStream() are called more than once EventStream()/CommandStream() and Category() are both called EventStream()/CommandStream() and Category() are both not called Last() is called and EventStream()/CommandStream is not called Last() and SincePosition()/SinceVersion() are both called SincePosition() and eventStream()/CommandStream() are both called SinceVersion() and eventStream()/CommandStream() are both called
func CommandStream ¶
CommandStream allows for writing messages using an expected position
func Converter ¶
func Converter(converter MessageConverter) GetOption
Converter allows for automatic converting of non-Command/Event type messages
func EventStream ¶
EventStream allows for getting events in a specific stream
func GenericStream ¶ added in v0.7.0
GenericStream allows for getting events in a specific stream
func Last ¶
func Last() GetOption
Last allows for getting only the most recent message (still returns an array)
func PositionStream ¶
PositionStream allows for getting messages by position subscriber
func SincePosition ¶
SincePosition allows for getting only more recent messages
func SinceVersion ¶
SinceVersion allows for getting only more recent messages
type Message ¶
type Message interface { ToEnvelope() (*repository.MessageEnvelope, error) // used to convert a message to a message envelope Type() string // returns the message type Version() int64 // returns the version of the message Position() int64 // returns the position of the message }
Message is an interface for all types of messages (commands, events, etc) that are handled through a message store.
func MsgEnvelopesToMessages ¶
func MsgEnvelopesToMessages(msgEnvelopes []*repository.MessageEnvelope, converters ...MessageConverter) []Message
MsgEnvelopesToMessages converts envelopes to any number of different structs that impliment the Message interface
type MessageConverter ¶
type MessageConverter func(*repository.MessageEnvelope) (Message, error)
MessageConverter is a function that takes in a MessageEnvelope and returns a Message; can be used to create custom messages
type MessageHandler ¶
type MessageHandler interface { Type() string // returns the message type Process(ctx context.Context, msg Message) error // called for each message being handled }
MessageHandler is used to process messages; a handler should exist for each message type
type MessageReducer ¶
type MessageReducer interface { Reduce(msg Message, previousState interface{}) interface{} Type() string }
MessageReducer Defines the expected behaviours of a reducer that ultimately is used by the projectors.
type MessageReducerConfig ¶
type MessageReducerConfig struct { Reducer MessageReducer Type string }
MessageReducerConfig Contains all of the information needed to use a given reducer.
type MessageReducerFunc ¶ added in v0.4.2
type MessageReducerFunc func(msg Message, previousState interface{}) interface{}
MessageReducerFunc is a functional way to create a reducer (for use with WithReducerFunc)
type MessageStore ¶
type MessageStore interface { Write(ctx context.Context, message Message, opts ...WriteOption) error // writes a message to the message store Get(ctx context.Context, opts ...GetOption) ([]Message, error) // retrieves messages from the message store CreateProjector(opts ...ProjectorOption) (Projector, error) // creates a new projector CreateSubscriber(subscriberID string, handlers []MessageHandler, opts ...SubscriberOption) (Subscriber, error) // creates a new subscriber GetLogger() (logger logrus.FieldLogger) // gets the logger }
MessageStore establishes the interface for Eventide
func NewMessageStore ¶
func NewMessageStore(injectedDB *sql.DB, logger logrus.FieldLogger) MessageStore
NewMessageStore creates a new MessageStore instance using an injected DB.
func NewMessageStoreFromRepository ¶
func NewMessageStoreFromRepository(injectedRepo repository.Repository, logger logrus.FieldLogger) MessageStore
NewMessageStoreFromRepository creates a new MessageStore instance using an injected repository. FOR TESTING ONLY
func NewMockMessageStoreWithMessages ¶ added in v0.2.2
func NewMockMessageStoreWithMessages(msgs []Message) MessageStore
NewMockMessageStoreWithMessages is used for testing purposes
type Poller ¶
type Poller interface {
Poll(context.Context) error // should handle a cycle of polling the message store
}
Poller interface requires a Poll function
type Projector ¶
type Projector interface { Run(ctx context.Context, category string, entityID uuid.UUID) (interface{}, error) RunOnStream(ctx context.Context, stream string) (interface{}, error) Step(msg Message, previousState interface{}) (interface{}, bool) }
Projector A base level interface that defines the projection functionality of gomessagestore.
type ProjectorOption ¶
type ProjectorOption func(proj *projector)
ProjectorOption is used for creating projectors with reducers
func DefaultState ¶
func DefaultState(defaultState interface{}) ProjectorOption
DefaultState registers a default state for use with a projector
func WithReducer ¶
func WithReducer(reducer MessageReducer) ProjectorOption
WithReducer registers a ruducer with the new projector
func WithReducerFunc ¶ added in v0.4.2
func WithReducerFunc(msgType string, reducerFunc MessageReducerFunc) ProjectorOption
WithReducerFunc registers a message type and a ruducer function with the new projector
type Subscriber ¶
Subscriber allows for reaching out to the message service on a continual basis
func CreateSubscriberWithPoller ¶
func CreateSubscriberWithPoller(ms MessageStore, subscriberID string, handlers []MessageHandler, poller Poller, opts ...SubscriberOption) (Subscriber, error)
CreateSubscriberWithPoller is used for testing with dependency injection FOR TESTING ONLY
type SubscriberConfig ¶
type SubscriberConfig struct {
// contains filtered or unexported fields
}
SubscriberConfig contains configuration information for a subscriber
func GetSubscriberConfig ¶
func GetSubscriberConfig(opts ...SubscriberOption) (*SubscriberConfig, error)
GetSubscriberConfig changes SubscriberOptions into a valid SubscriberConfig object, or returns an error
type SubscriberOption ¶
type SubscriberOption func(config *SubscriberConfig) error
SubscriberOption allows for various options when creating a subscriber
func OnError ¶ added in v0.5.0
func OnError(errorFunc func(error)) SubscriberOption
OnError when the subscriber reaches an error, it will call this func instead of panicking
func PollErrorDelay ¶
func PollErrorDelay(pollErrorDelay time.Duration) SubscriberOption
PollErrorDelay sets the interval between handling operations when Poll() errors
func PollTime ¶
func PollTime(pollTime time.Duration) SubscriberOption
PollTime sets the interval between handling operations
func SubscribeBatchSize ¶
func SubscribeBatchSize(batchSize int) SubscriberOption
SubscribeBatchSize sets the amount of messages to retrieve in a single handling operation
func SubscribeLogger ¶ added in v0.6.0
func SubscribeLogger(logger logrus.FieldLogger) SubscriberOption
SubscribeLogger allows to configure the logger used inside the Subscriber
func SubscribeToCategory ¶
func SubscribeToCategory(category string) SubscriberOption
SubscribeToCategory subscribes to a category of streams and ensures that it is not also subscribed to a stream
func SubscribeToCommandStream ¶
func SubscribeToCommandStream(category string) SubscriberOption
SubscribeToCommandStream subscribes to a specific command stream and ensures that multiple streams are not subscribed to
func SubscribeToEntityStream ¶
func SubscribeToEntityStream(category string, entityID uuid.UUID) SubscriberOption
SubscribeToEntityStream subscribes to a specific entity stream and ensures that multiple streams are not subscribed to
func UpdatePositionEvery ¶
func UpdatePositionEvery(msgInterval int) SubscriberOption
UpdatePositionEvery determines how often a positionMessage is written to the message store to save the position of the worker; must be >= 2
func WithConverter ¶ added in v0.6.1
func WithConverter(converter MessageConverter) SubscriberOption
WithConverter allows for automatic converting of non-Command/Event type messages
type SubscriptionWorker ¶
type SubscriptionWorker interface { GetMessages(ctx context.Context, position int64) ([]Message, error) ProcessMessages(ctx context.Context, msgs []Message) (messagesHandled int, positionOfLastHandled int64, err error) GetPosition(ctx context.Context) (int64, error) SetPosition(ctx context.Context, position int64) error }
SubscriptionWorker handles the processes for retrieving and processing messages from the message store and updating positions
func CreateWorker ¶
func CreateWorker(ms MessageStore, subscriberID string, handlers []MessageHandler, config *SubscriberConfig) (SubscriptionWorker, error)
CreateWorker returns a new subscriptionWorker
type WriteOption ¶
type WriteOption func(w *writer)
WriteOption provides optional arguments to the Write function
func AtPosition ¶
func AtPosition(position int64) WriteOption
AtPosition allows for writing messages using an expected position
Source Files ¶
- errors.go
- get.go
- message.go
- message_handler.go
- message_reducer.go
- messageconverter.go
- messagestore.go
- models.go
- packing.go
- poller.go
- projector.go
- subscriber.go
- subscriber_options.go
- subscriber_start.go
- subscription_worker.go
- worker_getmessages.go
- worker_getposition.go
- worker_processmessages.go
- worker_setposition.go
- write.go