gomdb

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2021 License: Apache-2.0 Imports: 8 Imported by: 1

README

Go Message DB Client

This module implements a thin Go wrapper around the Message DB message store. Message DB is an event store implemented on top of Postgres, ideal for event sourcing applications.

The client supports all Message DB read and write procedures, choosing to default to their simplest forms and providing configurability through options functions.

Getting started

// Open database connection
db, err := sql.Open("postgres", "dbname=message_store sslmode=disable user=message_store")
if err != nil {
    log.Fatalf("unexpected error opening db: %s", err)
}
defer db.Close()

// Set search path for schema
if _, err := db.Exec("SET search_path TO message_store,public;"); err != nil {
    log.Fatalf("setting search path: %s", err)
}

// create client
client := gomdb.NewClient(db)

// read from stream
msgs, err := client.GetStreamMessages(context.Background(), stream)
if err != nil {
    log.Fatalf("reading from stream: %s", err)
}

log.Println(msgs)

See the example directory for a more complete example.

Running tests

The unit tests require an instance of Message DB running to test against.

# start Message DB
docker build -t message-db .
docker run -d --rm \
    -p 5432:5432 \
    -e POSTGRES_HOST_AUTH_METHOD=trust \
    message-db \
    -c message_store.sql_condition=on

# run tests
go test -condition-on

Documentation

Index

Constants

View Source
const (
	// NoStreamVersion is expected version for a stream that doesn't exist.
	NoStreamVersion = int64(-1)
	// AnyVersion allows writing of a message regardless of the stream version.
	AnyVersion = int64(-2)
)
View Source
const (
	// CorrelationKey attribute allows a component to tag an outbound message
	// with its origin
	CorrelationKey = "correlationStreamName"

	// WriteMessageSQL with (
	//   id,
	//   stream_name,
	//   type,
	//   data,
	//   metadata,
	//   expected_version
	// )
	WriteMessageSQL = "SELECT write_message($1, $2, $3, $4, $5, $6)"
	// GetStreamMessagesSQL with (
	//   stream_name,
	//   position,
	//   batch_size,
	//   condition
	// )
	GetStreamMessagesSQL = "SELECT * FROM get_stream_messages($1, $2, $3, $4)"
	// GetCategoryMessagesSQL with (
	//   category_name,
	//   position,
	//   batch_size,
	//   correlation,
	//   consumer_group_member,
	//   consumer_group_size,
	//   condition
	// )
	GetCategoryMessagesSQL = "SELECT * FROM get_category_messages($1, $2, $3, $4, $5, $6, $7)"
	// GetLastStreamMessageSQL with (stream_name)
	GetLastStreamMessageSQL = "SELECT * FROM get_last_stream_message($1)"
	// StreamVersionSQL with (stream_name)
	GetStreamVersionSQL = "SELECT * FROM stream_version($1)"
)
View Source
const StreamNameSeparator = "-"

StreamNameSeparator is the character used to separate the stream category from the stream ID in a stream name.

Variables

View Source
var (
	// ErrInvalidMessageID is returned when the proposed message ID is not a
	// valid UUID.
	ErrInvalidMessageID = errors.New("proposed message ID must be a valid UUID")
	// ErrMissingType is returned when the proposed message is missing the
	// message type.
	ErrMissingType = errors.New("proposed message must include Type")
	// ErrMissingData is returned when the proposed message is missing any
	// data.
	ErrMissingData = errors.New("proposed message must include Data")
	// ErrMissingCategory is returned when the stream identifier category is
	// missing.
	ErrMissingCategory = errors.New("category cannot be blank")
	// ErrInvalidCategory is returned when the stream identifier category
	// contains the reserved stream name seperator character.
	ErrInvalidCategory = fmt.Errorf("category cannot contain separator (%s)", StreamNameSeparator)
	// ErrMissingStreamID is returned when the stream identifier ID is missing.
	ErrMissingStreamID = errors.New("ID cannot be blank")
	// ErrInvalidStreamID is returned whenthe stream identifier ID contains the
	// reserved stream name seperator character.
	ErrInvalidStreamID = fmt.Errorf("ID cannot contain separator (%s)", StreamNameSeparator)
)
View Source
var ErrUnexpectedStreamVersion = errors.New("unexpected stream version when writing message")

ErrUnexpectedStreamVersion is returned when a stream is not at the expected version when writing a message.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client exposes the message-db interface.

func NewClient

func NewClient(db *sql.DB) *Client

NewClient returns a new message-db client for the provided database.

func (*Client) GetCategoryMessages

func (c *Client) GetCategoryMessages(ctx context.Context, category string, opts ...GetCategoryOption) ([]*Message, error)

GetCategoryMessages reads messages from a category. By default the category is read from the beginning of the message store with a batch size of 1000. Use GetCategoryOptions to adjust this behaviour and to configure consumer groups and filtering.

func (*Client) GetLastStreamMessage

func (c *Client) GetLastStreamMessage(ctx context.Context, stream StreamIdentifier) (*Message, error)

GetLastStreamMessage returns the last message for the specified stream, or nil if the stream is empty.

func (*Client) GetStreamMessages

func (c *Client) GetStreamMessages(ctx context.Context, stream StreamIdentifier, opts ...GetStreamOption) ([]*Message, error)

GetStreamMessages reads messages from an individual stream. By default the stream is read from the beginning with a batch size of 1000. Use GetStreamOptions to adjust this behaviour.

func (*Client) GetStreamVersion

func (c *Client) GetStreamVersion(ctx context.Context, stream StreamIdentifier) (int64, error)

GetStreamVersion returns the version of the specified stream. Always check the error value before using the returned version.

func (*Client) WriteMessage

func (c *Client) WriteMessage(ctx context.Context, stream StreamIdentifier, message ProposedMessage, expectedVersion int64) (int64, error)

WriteMessage attempted to write the proposed message to the specifed stream.

type GetCategoryOption

type GetCategoryOption func(*categoryConfig)

GetCategoryOption is an option for modifiying how to read from a category.

func AsConsumerGroup

func AsConsumerGroup(member, size int64) GetCategoryOption

AsConsumerGroup specifies the consumer group options for this read. Size is used to specify the number of consumers, and member specifies which consumer is currently reading. Message-db used consistent hashing on stream names within a category and then distributes the streams amoungst the consumer group members.

func FromPosition

func FromPosition(position int64) GetCategoryOption

FromPosition specifies the inclusive global position from which to read messages.

func WithCategoryBatchSize

func WithCategoryBatchSize(batchSize int64) GetCategoryOption

WithCategoryBatchSize specifies the batch size to read messages.

func WithCategoryCondition

func WithCategoryCondition(condition string) GetCategoryOption

WithCategoryCondition specifies an SQL condition to apply to the read request. For example: "messages.time::time >= current_time"

func WithCorrelation

func WithCorrelation(correlation string) GetCategoryOption

WithCorrelation sets the correlation value that messages will be filtered by. correlation is compared against each messages medatadata correlationStreamName field.

type GetStreamOption

type GetStreamOption func(*streamConfig)

GetStreamOption is an option for modifiying how to read from a stream.

func FromVersion

func FromVersion(version int64) GetStreamOption

FromVersion specifies the inclusive version from which to read messages.

func WithStreamBatchSize

func WithStreamBatchSize(batchSize int64) GetStreamOption

WithStreamBatchSize specifies the batch size to read messages.

func WithStreamCondition

func WithStreamCondition(condition string) GetStreamOption

WithStreamCondition specifies an SQL condition to apply to the read request. For example: "messages.time::time >= current_time"

type Message

type Message struct {
	ID             uuid.UUID
	Stream         StreamIdentifier
	Type           string
	Version        int64
	GlobalPosition int64
	Timestamp      time.Time
	// contains filtered or unexported fields
}

Message represents a message that was stored in message-db.

func (*Message) UnmarshalData

func (m *Message) UnmarshalData(i interface{}) error

UnmarshalData attempts to unmarshall the Message's data into the provided object.

func (*Message) UnmarshalMetadata

func (m *Message) UnmarshalMetadata(i interface{}) error

UnmarshalMetadata attempts to unmarshall the Message's metadata into the provided object.

type ProposedMessage

type ProposedMessage struct {
	ID       string
	Type     string
	Data     interface{}
	Metadata interface{}
}

ProposedMessage proposes a messages to be written to message-db.

type StreamIdentifier

type StreamIdentifier struct {
	Category string
	ID       string
}

StreamIdentifier captures the two components of a message-db stream name.

func (StreamIdentifier) String

func (si StreamIdentifier) String() string

String returns the string respresentation of a StreamIdentifier.

Directories

Path Synopsis
tests module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL