gomessagestore

package module
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2020 License: Apache-2.0 Imports: 13 Imported by: 0

README

Go Message Store Connector

Postgres Eventide interface for go. Designed for use directly with the Postgres DB, not through Eventide.

For GoDoc documentation, click here

Writing to a message store

Writer description

A writer is a function on a message store instance that can write messages as commands or events to the message store database.

Creating a writer
Example
import ( 
    gms "github.com/blackhatbrigade/gomessagestore"
    
    "context"
)

func Process(ctx context.Context, ms gms.MessageStore, msg gms.Message) error {
    data := {}

    packedData, err := gms.Pack(data)

    newEvent := NewEvent()

    // attempt to write the message to the message store. If an error occurs, return the error.
    err = ms.Write(ctx, newEvent, gms.AtPosition(-1))

    if err != nil {
        return err
    }

    return nil
}

messageStore = gms.NewMessageStore(postgresDBInstance)

ctx, cancel := context.WithCancel(context.Background())

msg = someMessage

err := Process(ctx, messageStore, msg)
Tips and tricks

Subscribing to streams and categories

Subscriber description

A subscriber is used to retrieve new messages from a specified category or stream. It should only subscribe to a single category or stream. If the specified stream/category has new messages that have not yet been sent to the subscriber, they will be sent in the next poll iteration.

Creating a subscriber

Use the CreateSubscriber() function on a messageStore instance.

Some things to note:

subscriberOptions are set by injecting any of the following functions into the params of the CreateSubscriber function: SubscribeToEntityStream SubscribeToCommandStream SubscribeToCategory PollTime PollErrorDelay UpdatePositionEvery SubscribeBatchSize

See subscriber_options.go for more details on these functions.

In the example below, we set the category being subscribed to, as well as our batch size using the subscriber options functions.

Example
import (
    gms "github.com/blackhatbrigade/gomessagestore"
    "github.com/blackhatbrigade/gomessagestore/uuid"

    "context"
)

// Create a new messageStore instnace
messageStore := gms.NewMessageStore(postgresDB)

// Set up context for handling our routines
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create our subscriber
subscriber, error := messageStore.CreateSubscriber(
    "subscriberID", 
    []gms.MessageHandler{},
    gms.SubscribeToCategory("categoryID"),
    gms.SubscribeBatchSize(500),
)

if error != nil {
    return error
}

// Run our subscriber; our handlers will take care of processing the incoming messages.
go subscriber.Start(ctx)
Tips and tricks

Projecting from streams

Projector description

A projector allows you to take an initial state and update it by processing all of the messages in a specific stream in order to derive the current state of the stream.

Creating a projector

Use the CreateProjector() function on a messageStore instance.

Some things to note:

ProjectorOptions are set by injecting the following functions into the params of the CreateProjector function: WithReducer DefaultState

See projector.go for more details on these functions.

Example
import (
    gms "github.com/blackhatbrigade/gomessagestore"
    "github.com/blackhatbrigade/gomessagestore/uuid"

    "context"
)

// Create a new messageStore instnace
messageStore := gms.NewMessageStore(postgresDB)

projector, err := messageStore.CreateProjector(
    gms.DefaultState(someStruct{}),
    gms.WithReducer(reducer1),
    gms.WithReducer(reducer2),
)
Tips and tricks

projectors are typically passed into handlers. Here is a good example of an aggregator handler that ingests a projector as one of its parameters:

func genericHandler(ctx context.Context, repo ReadModelDatabase, projector gms.Projector, msg gms.Message, expectedType string) error {
	event, ok := msg.(*gms.Event)
	if !ok || event.Type() != expectedType {
		return ErrInvalidEventTypeInHandler
	}

	projection, err := projector.Run(ctx, event.StreamCategory, event.EntityID)
	if err != nil {
		return err
	}

	entity, ok := projection.(EntityDetail)
	if !ok {
		return ErrInvalidTypeFromProjection
	}

	// the projector already set everything up for us, so just store it
	error := ReadModelDatabase.store(entity)
    if error != nil {
        return databaseWriteError
    }

	return nil
}

Reducers

A reducer should take in a message and the previous state, and update the previous state based on the information contained in the message to derive the current state.

Example
import ( gms  "github.com/blackhatbrigade/gomessagestore" )
func AddMessageReduce(msg gms.Message, previousState interface{}) interface{} {
    tempState := previousState
    tempState.value += previousState.value + msg.data.addValue
    return tempState
}

UUID package

GO MESSAGE STORE includes a built in package for generating UUID's that you can use for message IDs.

Example
import ( uuid "github.com/blackhatbrigade/gomessagestore/uuid" )

// returns a random V4 UUID
uuid := uuid.NewRandom()

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidOptionCombination                      = errors.New("Cannot have the current combination of options for Get()")
	ErrSubscriberCannotUseBothStreamAndCategory      = errors.New("Subscriber function cannot use both Stream and Category")
	ErrInvalidPollTime                               = errors.New("Invalid Subscriber poll time provided, can not be negative or zero")
	ErrInvalidPollErrorDelay                         = errors.New("Invalid Subscriber poll error delay provided, can not be negative or zero")
	ErrInvalidBatchSize                              = errors.New("Invalid Subscriber batch size provided, can not be negative or zero")
	ErrInvalidMsgInterval                            = errors.New("MsgInterval cannot be less than 2")
	ErrSubscriberNeedsCategoryOrStream               = errors.New("Subscriber needs at least one of category or stream to be set upon creation")
	ErrSubscriberIDCannotBeEmpty                     = errors.New("Subscriber ID cannot be nil")
	ErrSubscriberIDCannotContainPlus                 = errors.New("Subscriber ID cannot contain a plus")
	ErrSubscriberIDCannotContainHyphen               = errors.New("Subscriber ID cannot contain a hyphen")
	ErrPositionVersionMissing                        = errors.New("Subscriber cannot use non-versioned positionMsg object")
	ErrSubscriberNeedsAtLeastOneMessageHandler       = errors.New("Subscriber needs at least one handler upon creation")
	ErrSubscriberCannotSubscribeToMultipleStreams    = errors.New("Subscribers can only subscribe to one stream")
	ErrSubscriberCannotSubscribeToMultipleCategories = errors.New("Subscribers can only subscribe to one category")
	ErrProjectorNeedsAtLeastOneReducer               = errors.New("Projector needs at least one reducer upon creation")
	ErrSubscriberMessageHandlerEqualToNil            = errors.New("Subscriber Message Handler cannot be equal to nil")
	ErrSubscriberMessageHandlersEqualToNil           = errors.New("Subscriber Message Handler array cannot be equal to nil")
	ErrSubscriberNilOption                           = errors.New("Options cannot include an option whose value is equal to nil")
	ErrDefaultStateNotSet                            = errors.New("Default state not set while trying to create a new projector")
	ErrDefaultStateCannotBePointer                   = errors.New("Default state cannot be a pointer when creating a projector")
	ErrGetMessagesCannotUseBothStreamAndCategory     = errors.New("Get messages function cannot use both Stream and Category")
	ErrMessageNoID                                   = errors.New("Message cannot be written without a new UUID")
	ErrGetMessagesRequiresEitherStreamOrCategory     = errors.New("Get messages function must have either Stream or Category")
	ErrGetLastRequiresStream                         = errors.New("Get Last option requires a stream")
	ErrIncorrectNumberOfPositionsFound               = errors.New("Exactly one position should be found per subscriber")
	ErrInvalidHandler                                = errors.New("Handler cannot be nil")
	ErrIncorrectMessageInPositionStream              = errors.New("Position streams can only have position messages")
	ErrHandlerError                                  = errors.New("Handler failed to handle message")
	ErrMissingMessageType                            = errors.New("All messages require a type")
	ErrMissingMessageCategory                        = errors.New("All messages require a category")
	ErrInvalidMessageCategory                        = errors.New("Hyphens are not allowed in category names")
	ErrInvalidCommandStream                          = errors.New("Hyphens are not allowed in command stream name")
	ErrInvalidEventStream                            = errors.New("Hyphens are not allowed in event stream name")
	ErrInvalidSubscriberID                           = errors.New("Hyphens and plusses are not allowed in subscriber ID")
	ErrInvalidPositionStream                         = errors.New("Position stream expects to have a single plus diving the subscriber ID and the word 'position'")
	ErrMissingMessageCategoryID                      = errors.New("All messages require a category ID")
	ErrMissingMessageData                            = errors.New("Messages payload must not be nil")
	ErrUnserializableData                            = errors.New("Message data could not be encoded as json")
	ErrDataIsNilPointer                              = errors.New("Message data is a nil pointer")
	ErrMissingGetOptions                             = errors.New("Options are required for the Get command")
	ErrExpectedVersionFailed                         = errors.New("Provided version does not match the expected version")
)

The following are the different error messages that can be potentially returned.

The following table shows which files the specific errors are used for easier troubleshooting:

Error											| 	File Path
------------------------------------------------------------------------------------------------------
ErrInvalidOptionCombination                     |	./get.go
ErrSubscriberCannotUseBothStreamAndCategory     |	./subscriber_options.go
ErrInvalidPollTime                              |	./subscriber_options.go
ErrInvalidPollErrorDelay                        |	./subscriber_options.go
ErrInvalidBatchSize                             |	./subscriber_options.go
ErrInvalidMsgInterval                           |	./subscriber_options.go
ErrSubscriberNeedsCategoryOrStream              |	./subscriber_options.go
ErrSubscriberIDCannotBeEmpty                    |	./subscriber.go | ./worker_getpostion.go
ErrSubscriberIDCannotContainPlus                |	no uses
ErrSubscriberIDCannotContainHyphen              |	no uses
ErrPositionVersionMissing                       |	./worker_getposition.go
ErrSubscriberNeedsAtLeastOneMessageHandler      |	./subscriber.go
ErrSubscriberCannotSubscribeToMultipleStreams   |	./subscriber_options.go
ErrSubscriberCannotSubscribeToMultipleCategories|	./subscriber_options.go
ErrProjectorNeedsAtLeastOneReducer              |	./projector.go
ErrSubscriberMessageHandlerEqualToNil           |	./subscriber.go
ErrSubscriberMessageHandlersEqualToNil          |	./subscriber.go
ErrSubscriberNilOption                          |	./subscriber_options.go
ErrDefaultStateNotSet                           |	./projector.go
ErrDefaultStateCannotBePointer                  |	./projector.go
ErrGetMessagesCannotUseBothStreamAndCategory    |	./get.go
ErrMessageNoID                                  |	./models.go | ./worker_getposition.go
ErrGetMessagesRequiresEitherStreamOrCategory    |	./get.go
ErrGetLastRequiresStream                        |	./get.go
ErrIncorrectNumberOfPositionsFound              |	no uses
ErrInvalidHandler                               |	no uses
ErrIncorrectMessageInPositionStream             |	./worker_getposition
ErrHandlerError                                 |	no uses
ErrMissingMessageType                           |	./models.go | ./worker_getposition.go
ErrMissingMessageCategory                       |	./models.go
ErrInvalidMessageCategory                       |	./get.go | ./models.go
ErrInvalidCommandStream                         |	./get.go
ErrInvalidEventStream                           |	./get.go
ErrInvalidSubscriberID                          |	./subscriber.go
ErrInvalidPositionStream                        |	./get.go | ./worker_getposition.go
ErrMissingMessageCategoryID                     |	./models.go
ErrMissingMessageData                           |	./models.go
ErrUnserializableData                           |	./models.go | ./worker_getposition.go
ErrDataIsNilPointer                             |	no uses
ErrMissingGetOptions                            |	./get.go
View Source
var NilUUID = uuid.Nil

NilUUID is a helper for tests

Functions

func CreatePoller

func CreatePoller(ms MessageStore, worker SubscriptionWorker, config *SubscriberConfig) (*poller, error)

CreatePoller returns a new instance of a Poller

func NewID

func NewID() uuid.UUID

NewID creates a new UUID.

func Pack

func Pack(source interface{}) (map[string]interface{}, error)

Pack packs a GO object into JSON-esque objects used in the Command and Event objects

func Unpack

func Unpack(source map[string]interface{}, dest interface{}) error

Unpack unpacks JSON-esque objects used in the Command and Event objects into GO objects

Types

type AtPositionMatcher added in v0.4.7

type AtPositionMatcher struct {
	Position int64
}

AtPositionMatcher is a gomock.Matcher interface that matches an AtPosition function

func (AtPositionMatcher) Matches added in v0.4.7

func (a AtPositionMatcher) Matches(unknown interface{}) bool

Matches checks agains a AtPosition/gms.WriteOption function

func (AtPositionMatcher) String added in v0.4.7

func (a AtPositionMatcher) String() string

String gives us a representation of our AtPositionMatcher

type Command

type Command struct {
	ID             uuid.UUID // ID for the command
	StreamCategory string    // Name of the stream category
	MessageType    string    // Name of the message type
	MessageVersion int64     // version number of the message
	GlobalPosition int64     // global position of the command
	Data           map[string]interface{}
	Metadata       map[string]interface{}
	Time           time.Time
}

Command implements the Message interface; returned by get function

func (*Command) Position

func (cmd *Command) Position() int64

Position returns the global position of the command

func (*Command) ToEnvelope

func (cmd *Command) ToEnvelope() (*repository.MessageEnvelope, error)

ToEnvelope converts the command to a Message Envelope that is returned

func (*Command) Type

func (cmd *Command) Type() string

Type returns the type of the command

func (*Command) Version

func (cmd *Command) Version() int64

Version returns the version of the command

type Event

type Event struct {
	ID             uuid.UUID // ID of the event
	EntityID       uuid.UUID // ID of the entity the event is associated with
	StreamCategory string    // the name of the category of the stream
	MessageType    string    // the message type of the event
	MessageVersion int64     // the version number of the message
	GlobalPosition int64     // the global position of the event
	Data           map[string]interface{}
	Metadata       map[string]interface{}
	Time           time.Time
}

Event implements the Message interface; returned by get function

func (*Event) Position

func (event *Event) Position() int64

Position returns the global position of the event

func (*Event) ToEnvelope

func (event *Event) ToEnvelope() (*repository.MessageEnvelope, error)

ToEnvelope converts the event to a MessageEnvelope which is then returned

func (*Event) Type

func (event *Event) Type() string

Type returns the type of the event

func (*Event) Version

func (event *Event) Version() int64

Version returns the version of the event

type GetOption

type GetOption func(g *getOpts) error

GetOption provide optional arguments to the Get function Invalid combinations: EventStream() and/or CommandStream() are called more than once EventStream()/CommandStream() and Category() are both called EventStream()/CommandStream() and Category() are both not called Last() is called and EventStream()/CommandStream is not called Last() and SincePosition()/SinceVersion() are both called SincePosition() and eventStream()/CommandStream() are both called SinceVersion() and eventStream()/CommandStream() are both called

func BatchSize

func BatchSize(batchsize int) GetOption

BatchSize changes how many messages are returned (default 1000)

func Category

func Category(category string) GetOption

Category allows for getting messages by category

func CommandStream

func CommandStream(category string) GetOption

CommandStream allows for writing messages using an expected position

func Converter

func Converter(converter MessageConverter) GetOption

Converter allows for automatic converting of non-Command/Event type messages

func EventStream

func EventStream(category string, entityID uuid.UUID) GetOption

EventStream allows for getting events in a specific stream

func GenericStream added in v0.7.0

func GenericStream(stream string) GetOption

GenericStream allows for getting events in a specific stream

func Last

func Last() GetOption

Last allows for getting only the most recent message (still returns an array)

func PositionStream

func PositionStream(subscriberID string) GetOption

PositionStream allows for getting messages by position subscriber

func SincePosition

func SincePosition(position int64) GetOption

SincePosition allows for getting only more recent messages

func SinceVersion

func SinceVersion(version int64) GetOption

SinceVersion allows for getting only more recent messages

type Message

type Message interface {
	ToEnvelope() (*repository.MessageEnvelope, error) // used to convert a message to a message envelope
	Type() string                                     // returns the message type
	Version() int64                                   // returns the version of the message
	Position() int64                                  // returns the position of the message
}

Message is an interface for all types of messages (commands, events, etc) that are handled through a message store.

func MsgEnvelopesToMessages

func MsgEnvelopesToMessages(msgEnvelopes []*repository.MessageEnvelope, converters ...MessageConverter) []Message

MsgEnvelopesToMessages converts envelopes to any number of different structs that impliment the Message interface

type MessageConverter

type MessageConverter func(*repository.MessageEnvelope) (Message, error)

MessageConverter is a function that takes in a MessageEnvelope and returns a Message; can be used to create custom messages

type MessageHandler

type MessageHandler interface {
	Type() string                                   // returns the message type
	Process(ctx context.Context, msg Message) error // called for each message being handled
}

MessageHandler is used to process messages; a handler should exist for each message type

type MessageReducer

type MessageReducer interface {
	Reduce(msg Message, previousState interface{}) interface{}
	Type() string
}

MessageReducer Defines the expected behaviours of a reducer that ultimately is used by the projectors.

type MessageReducerConfig

type MessageReducerConfig struct {
	Reducer MessageReducer
	Type    string
}

MessageReducerConfig Contains all of the information needed to use a given reducer.

type MessageReducerFunc added in v0.4.2

type MessageReducerFunc func(msg Message, previousState interface{}) interface{}

MessageReducerFunc is a functional way to create a reducer (for use with WithReducerFunc)

type MessageStore

type MessageStore interface {
	Write(ctx context.Context, message Message, opts ...WriteOption) error                                         // writes a message to the message store
	Get(ctx context.Context, opts ...GetOption) ([]Message, error)                                                 // retrieves messages from the message store
	CreateProjector(opts ...ProjectorOption) (Projector, error)                                                    // creates a new projector
	CreateSubscriber(subscriberID string, handlers []MessageHandler, opts ...SubscriberOption) (Subscriber, error) // creates a new subscriber
	GetLogger() (logger logrus.FieldLogger)                                                                        // gets the logger
}

MessageStore establishes the interface for Eventide

func NewMessageStore

func NewMessageStore(injectedDB *sql.DB, logger logrus.FieldLogger) MessageStore

NewMessageStore creates a new MessageStore instance using an injected DB.

func NewMessageStoreFromRepository

func NewMessageStoreFromRepository(injectedRepo repository.Repository, logger logrus.FieldLogger) MessageStore

NewMessageStoreFromRepository creates a new MessageStore instance using an injected repository. FOR TESTING ONLY

func NewMockMessageStoreWithMessages added in v0.2.2

func NewMockMessageStoreWithMessages(msgs []Message) MessageStore

NewMockMessageStoreWithMessages is used for testing purposes

type Poller

type Poller interface {
	Poll(context.Context) error // should handle a cycle of polling the message store
}

Poller interface requires a Poll function

type Projector

type Projector interface {
	Run(ctx context.Context, category string, entityID uuid.UUID) (interface{}, error)
	RunOnStream(ctx context.Context, stream string) (interface{}, error)
	Step(msg Message, previousState interface{}) (interface{}, bool)
}

Projector A base level interface that defines the projection functionality of gomessagestore.

type ProjectorOption

type ProjectorOption func(proj *projector)

ProjectorOption is used for creating projectors with reducers

func DefaultState

func DefaultState(defaultState interface{}) ProjectorOption

DefaultState registers a default state for use with a projector

func WithReducer

func WithReducer(reducer MessageReducer) ProjectorOption

WithReducer registers a ruducer with the new projector

func WithReducerFunc added in v0.4.2

func WithReducerFunc(msgType string, reducerFunc MessageReducerFunc) ProjectorOption

WithReducerFunc registers a message type and a ruducer function with the new projector

type Subscriber

type Subscriber interface {
	Start(context.Context) error
}

Subscriber allows for reaching out to the message service on a continual basis

func CreateSubscriberWithPoller

func CreateSubscriberWithPoller(ms MessageStore, subscriberID string, handlers []MessageHandler, poller Poller, opts ...SubscriberOption) (Subscriber, error)

CreateSubscriberWithPoller is used for testing with dependency injection FOR TESTING ONLY

type SubscriberConfig

type SubscriberConfig struct {
	// contains filtered or unexported fields
}

SubscriberConfig contains configuration information for a subscriber

func GetSubscriberConfig

func GetSubscriberConfig(opts ...SubscriberOption) (*SubscriberConfig, error)

GetSubscriberConfig changes SubscriberOptions into a valid SubscriberConfig object, or returns an error

type SubscriberOption

type SubscriberOption func(config *SubscriberConfig) error

SubscriberOption allows for various options when creating a subscriber

func OnError added in v0.5.0

func OnError(errorFunc func(error)) SubscriberOption

OnError when the subscriber reaches an error, it will call this func instead of panicking

func PollErrorDelay

func PollErrorDelay(pollErrorDelay time.Duration) SubscriberOption

PollErrorDelay sets the interval between handling operations when Poll() errors

func PollTime

func PollTime(pollTime time.Duration) SubscriberOption

PollTime sets the interval between handling operations

func SubscribeBatchSize

func SubscribeBatchSize(batchSize int) SubscriberOption

SubscribeBatchSize sets the amount of messages to retrieve in a single handling operation

func SubscribeLogger added in v0.6.0

func SubscribeLogger(logger logrus.FieldLogger) SubscriberOption

SubscribeLogger allows to configure the logger used inside the Subscriber

func SubscribeToCategory

func SubscribeToCategory(category string) SubscriberOption

SubscribeToCategory subscribes to a category of streams and ensures that it is not also subscribed to a stream

func SubscribeToCommandStream

func SubscribeToCommandStream(category string) SubscriberOption

SubscribeToCommandStream subscribes to a specific command stream and ensures that multiple streams are not subscribed to

func SubscribeToEntityStream

func SubscribeToEntityStream(category string, entityID uuid.UUID) SubscriberOption

SubscribeToEntityStream subscribes to a specific entity stream and ensures that multiple streams are not subscribed to

func UpdatePositionEvery

func UpdatePositionEvery(msgInterval int) SubscriberOption

UpdatePositionEvery determines how often a positionMessage is written to the message store to save the position of the worker; must be >= 2

func WithConverter added in v0.6.1

func WithConverter(converter MessageConverter) SubscriberOption

WithConverter allows for automatic converting of non-Command/Event type messages

type SubscriptionWorker

type SubscriptionWorker interface {
	GetMessages(ctx context.Context, position int64) ([]Message, error)
	ProcessMessages(ctx context.Context, msgs []Message) (messagesHandled int, positionOfLastHandled int64, err error)
	GetPosition(ctx context.Context) (int64, error)
	SetPosition(ctx context.Context, position int64) error
}

SubscriptionWorker handles the processes for retrieving and processing messages from the message store and updating positions

func CreateWorker

func CreateWorker(ms MessageStore, subscriberID string, handlers []MessageHandler, config *SubscriberConfig) (SubscriptionWorker, error)

CreateWorker returns a new subscriptionWorker

type WriteOption

type WriteOption func(w *writer)

WriteOption provides optional arguments to the Write function

func AtPosition

func AtPosition(position int64) WriteOption

AtPosition allows for writing messages using an expected position

Directories

Path Synopsis
Package mock_gomessagestore is a generated GoMock package.
Package mock_gomessagestore is a generated GoMock package.
mocks
Package mock_repository is a generated GoMock package.
Package mock_repository is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL