Documentation
¶
Index ¶
- Constants
- Variables
- type Client
- func (c *Client) GetCategoryMessages(ctx context.Context, category string, opts ...GetCategoryOption) ([]*Message, error)
- func (c *Client) GetLastStreamMessage(ctx context.Context, stream StreamIdentifier) (*Message, error)
- func (c *Client) GetStreamMessages(ctx context.Context, stream StreamIdentifier, opts ...GetStreamOption) ([]*Message, error)
- func (c *Client) GetStreamVersion(ctx context.Context, stream StreamIdentifier) (int64, error)
- func (c *Client) WriteMessage(ctx context.Context, stream StreamIdentifier, message ProposedMessage, ...) (int64, error)
- type GetCategoryOption
- type GetStreamOption
- type Message
- type ProposedMessage
- type StreamIdentifier
Constants ¶
const ( // NoStreamVersion is expected version for a stream that doesn't exist. NoStreamVersion = int64(-1) // AnyVersion allows writing of a message regardless of the stream version. AnyVersion = int64(-2) )
const ( // CorrelationKey attribute allows a component to tag an outbound message // with its origin CorrelationKey = "correlationStreamName" // WriteMessageSQL with ( // id, // stream_name, // type, // data, // metadata, // expected_version // ) WriteMessageSQL = "SELECT write_message($1, $2, $3, $4, $5, $6)" // GetStreamMessagesSQL with ( // stream_name, // position, // batch_size, // condition // ) GetStreamMessagesSQL = "SELECT * FROM get_stream_messages($1, $2, $3, $4)" // GetCategoryMessagesSQL with ( // category_name, // position, // batch_size, // correlation, // consumer_group_member, // consumer_group_size, // condition // ) GetCategoryMessagesSQL = "SELECT * FROM get_category_messages($1, $2, $3, $4, $5, $6, $7)" // GetLastStreamMessageSQL with (stream_name) GetLastStreamMessageSQL = "SELECT * FROM get_last_stream_message($1)" // StreamVersionSQL with (stream_name) GetStreamVersionSQL = "SELECT * FROM stream_version($1)" )
const StreamNameSeparator = "-"
StreamNameSeparator is the character used to separate the stream category from the stream ID in a stream name.
Variables ¶
var ( // ErrInvalidMessageID is returned when the proposed message ID is not a // valid UUID. ErrInvalidMessageID = errors.New("proposed message ID must be a valid UUID") // ErrMissingType is returned when the proposed message is missing the // message type. ErrMissingType = errors.New("proposed message must include Type") // ErrMissingData is returned when the proposed message is missing any // data. ErrMissingData = errors.New("proposed message must include Data") // ErrMissingCategory is returned when the stream identifier category is // missing. ErrMissingCategory = errors.New("category cannot be blank") // ErrInvalidCategory is returned when the stream identifier category // contains the reserved stream name seperator character. ErrInvalidCategory = fmt.Errorf("category cannot contain separator (%s)", StreamNameSeparator) // ErrMissingStreamID is returned when the stream identifier ID is missing. ErrMissingStreamID = errors.New("ID cannot be blank") // ErrInvalidStreamID is returned whenthe stream identifier ID contains the // reserved stream name seperator character. ErrInvalidStreamID = fmt.Errorf("ID cannot contain separator (%s)", StreamNameSeparator) )
var ErrUnexpectedStreamVersion = errors.New("unexpected stream version when writing message")
ErrUnexpectedStreamVersion is returned when a stream is not at the expected version when writing a message.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client exposes the message-db interface.
func (*Client) GetCategoryMessages ¶
func (c *Client) GetCategoryMessages(ctx context.Context, category string, opts ...GetCategoryOption) ([]*Message, error)
GetCategoryMessages reads messages from a category. By default the category is read from the beginning of the message store with a batch size of 1000. Use GetCategoryOptions to adjust this behaviour and to configure consumer groups and filtering.
func (*Client) GetLastStreamMessage ¶
func (c *Client) GetLastStreamMessage(ctx context.Context, stream StreamIdentifier) (*Message, error)
GetLastStreamMessage returns the last message for the specified stream, or nil if the stream is empty.
func (*Client) GetStreamMessages ¶
func (c *Client) GetStreamMessages(ctx context.Context, stream StreamIdentifier, opts ...GetStreamOption) ([]*Message, error)
GetStreamMessages reads messages from an individual stream. By default the stream is read from the beginning with a batch size of 1000. Use GetStreamOptions to adjust this behaviour.
func (*Client) GetStreamVersion ¶
GetStreamVersion returns the version of the specified stream. Always check the error value before using the returned version.
func (*Client) WriteMessage ¶
func (c *Client) WriteMessage(ctx context.Context, stream StreamIdentifier, message ProposedMessage, expectedVersion int64) (int64, error)
WriteMessage attempted to write the proposed message to the specifed stream.
type GetCategoryOption ¶
type GetCategoryOption func(*categoryConfig)
GetCategoryOption is an option for modifiying how to read from a category.
func AsConsumerGroup ¶
func AsConsumerGroup(member, size int64) GetCategoryOption
AsConsumerGroup specifies the consumer group options for this read. Size is used to specify the number of consumers, and member specifies which consumer is currently reading. Message-db used consistent hashing on stream names within a category and then distributes the streams amoungst the consumer group members.
func FromPosition ¶
func FromPosition(position int64) GetCategoryOption
FromPosition specifies the inclusive global position from which to read messages.
func WithCategoryBatchSize ¶
func WithCategoryBatchSize(batchSize int64) GetCategoryOption
WithCategoryBatchSize specifies the batch size to read messages.
func WithCategoryCondition ¶
func WithCategoryCondition(condition string) GetCategoryOption
WithCategoryCondition specifies an SQL condition to apply to the read request. For example: "messages.time::time >= current_time"
func WithCorrelation ¶
func WithCorrelation(correlation string) GetCategoryOption
WithCorrelation sets the correlation value that messages will be filtered by. correlation is compared against each messages medatadata correlationStreamName field.
type GetStreamOption ¶
type GetStreamOption func(*streamConfig)
GetStreamOption is an option for modifiying how to read from a stream.
func FromVersion ¶
func FromVersion(version int64) GetStreamOption
FromVersion specifies the inclusive version from which to read messages.
func WithStreamBatchSize ¶
func WithStreamBatchSize(batchSize int64) GetStreamOption
WithStreamBatchSize specifies the batch size to read messages.
func WithStreamCondition ¶
func WithStreamCondition(condition string) GetStreamOption
WithStreamCondition specifies an SQL condition to apply to the read request. For example: "messages.time::time >= current_time"
type Message ¶
type Message struct { ID uuid.UUID Stream StreamIdentifier Type string Version int64 GlobalPosition int64 Timestamp time.Time // contains filtered or unexported fields }
Message represents a message that was stored in message-db.
func (*Message) UnmarshalData ¶
UnmarshalData attempts to unmarshall the Message's data into the provided object.
func (*Message) UnmarshalMetadata ¶
UnmarshalMetadata attempts to unmarshall the Message's metadata into the provided object.
type ProposedMessage ¶
ProposedMessage proposes a messages to be written to message-db.
type StreamIdentifier ¶
StreamIdentifier captures the two components of a message-db stream name.
func (StreamIdentifier) String ¶
func (si StreamIdentifier) String() string
String returns the string respresentation of a StreamIdentifier.