Documentation
¶
Overview ¶
Package gomdb provides a Client for calling Message DB procedures.
Index ¶
- Constants
- Variables
- type Client
- func (c *Client) GetCategoryMessages(ctx context.Context, category string, opts ...GetCategoryOption) ([]*Message, error)
- func (c *Client) GetLastStreamMessage(ctx context.Context, stream StreamIdentifier) (*Message, error)
- func (c *Client) GetStreamMessages(ctx context.Context, stream StreamIdentifier, opts ...GetStreamOption) ([]*Message, error)
- func (c *Client) GetStreamVersion(ctx context.Context, stream StreamIdentifier) (int64, error)
- func (c *Client) SubscribeToCategory(ctx context.Context, category string, handleMessage MessageHandler, ...) error
- func (c *Client) SubscribeToStream(ctx context.Context, stream StreamIdentifier, handleMessage MessageHandler, ...) error
- func (c *Client) WriteMessage(ctx context.Context, stream StreamIdentifier, message ProposedMessage, ...) (int64, error)
- type ClientOption
- type GetCategoryOption
- type GetStreamOption
- type LivenessHandler
- type Message
- type MessageHandler
- type PollingStrategy
- type ProposedMessage
- type StreamIdentifier
- type SubDroppedHandler
Constants ¶
const ( // NoStreamVersion is expected version for a stream that doesn't exist. NoStreamVersion = int64(-1) // AnyVersion allows writing of a message regardless of the stream version. AnyVersion = int64(-2) // DefaultPollingInterval defines the default polling duration for // subscriptions. DefaultPollingInterval = 100 * time.Millisecond )
const ( // CorrelationKey attribute allows a component to tag an outbound message // with its origin CorrelationKey = "correlationStreamName" // WriteMessageSQL with ( // id, // stream_name, // type, // data, // metadata, // expected_version // ) WriteMessageSQL = "SELECT write_message($1, $2, $3, $4, $5, $6)" // GetStreamMessagesSQL with ( // stream_name, // position, // batch_size, // condition // ) GetStreamMessagesSQL = "SELECT * FROM get_stream_messages($1, $2, $3, $4)" // GetCategoryMessagesSQL with ( // category_name, // position, // batch_size, // correlation, // consumer_group_member, // consumer_group_size, // condition // ) GetCategoryMessagesSQL = "SELECT * FROM get_category_messages($1, $2, $3, $4, $5, $6, $7)" // GetLastStreamMessageSQL with (stream_name) GetLastStreamMessageSQL = "SELECT * FROM get_last_stream_message($1)" // StreamVersionSQL with (stream_name) GetStreamVersionSQL = "SELECT * FROM stream_version($1)" )
const StreamNameSeparator = "-"
StreamNameSeparator is the character used to separate the stream category from the stream ID in a stream name.
Variables ¶
var ( // ErrInvalidReadStreamVersion is returned when the stream version inside a // read call is less than zero. ErrInvalidReadStreamVersion = errors.New("stream version cannot be less than 0") // ErrInvalidReadBatchSize is returned when the batch size inside a read // call is less than one. ErrInvalidReadBatchSize = errors.New("batch size must be greater than 0") // ErrInvalidReadPosition is returned when the stream position inside a // read call is less than zero. ErrInvalidReadPosition = errors.New("stream position cannot be less than 0") // ErrInvalidConsumerGroupMember is returned when the consumer group ID // index is either less than zero or greater than or equal to the consumer // group size. ErrInvalidConsumerGroupMember = errors.New("consumer group member must be >= 0 < group size") // ErrInvalidConsumerGroupSize is returned when the consumer group size is // less that zero. ErrInvalidConsumerGroupSize = errors.New("consumer group size must be 0 or greater (0 to disbale consumer groups)") )
var ( // ErrInvalidMessageID is returned when the proposed message ID is not a // valid UUID. ErrInvalidMessageID = errors.New("proposed message ID must be a valid UUID") // ErrMissingType is returned when the proposed message is missing the // message type. ErrMissingType = errors.New("proposed message must include Type") // ErrMissingData is returned when the proposed message is missing any // data. ErrMissingData = errors.New("proposed message must include Data") // ErrMissingCategory is returned when the stream identifier category is // missing. ErrMissingCategory = errors.New("category cannot be blank") // ErrInvalidCategory is returned when the stream identifier category // contains the reserved stream name seperator character. ErrInvalidCategory = fmt.Errorf("category cannot contain separator (%s)", StreamNameSeparator) // ErrMissingStreamID is returned when the stream identifier ID is missing. ErrMissingStreamID = errors.New("ID cannot be blank") // ErrInvalidStreamID is returned whenthe stream identifier ID contains the // reserved stream name seperator character. ErrInvalidStreamID = fmt.Errorf("ID cannot contain separator (%s)", StreamNameSeparator) )
var ErrUnexpectedStreamVersion = errors.New("unexpected stream version when writing message")
ErrUnexpectedStreamVersion is returned when a stream is not at the expected version when writing a message.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client exposes the message-db interface.
func NewClient ¶
func NewClient(db *sql.DB, opts ...ClientOption) *Client
NewClient returns a new message-db client for the provided database.
func (*Client) GetCategoryMessages ¶
func (c *Client) GetCategoryMessages(ctx context.Context, category string, opts ...GetCategoryOption) ([]*Message, error)
GetCategoryMessages reads messages from a category. By default the category is read from the beginning of the message store with a batch size of 1000. Use GetCategoryOptions to adjust this behaviour and to configure consumer groups and filtering.
func (*Client) GetLastStreamMessage ¶
func (c *Client) GetLastStreamMessage(ctx context.Context, stream StreamIdentifier) (*Message, error)
GetLastStreamMessage returns the last message for the specified stream, or nil if the stream is empty.
func (*Client) GetStreamMessages ¶
func (c *Client) GetStreamMessages(ctx context.Context, stream StreamIdentifier, opts ...GetStreamOption) ([]*Message, error)
GetStreamMessages reads messages from an individual stream. By default the stream is read from the beginning with a batch size of 1000. Use GetStreamOptions to adjust this behaviour.
func (*Client) GetStreamVersion ¶
GetStreamVersion returns the version of the specified stream. Always check the error value before using the returned version.
func (*Client) SubscribeToCategory ¶ added in v0.2.0
func (c *Client) SubscribeToCategory( ctx context.Context, category string, handleMessage MessageHandler, handleLiveness LivenessHandler, handleDropped SubDroppedHandler, opts ...GetCategoryOption, ) error
SubscribeToCategory subscribes to a category and asynchronously passes messages to the message handler in batches. Once a subscription has caught up it will poll the database periodically for new messages. To stop a subscription cancel the provided context. When a subscription catches up it will call the LivenessHandler with true. If the subscription falls behind again it will called the LivenessHandler with false. If there is an error while reading messages then the subscription will be stopped and the SubDroppedHandler will be called with the stopping error. If the subscription is cancelled then the SubDroppedHandler will be called with nil.
func (*Client) SubscribeToStream ¶ added in v0.2.0
func (c *Client) SubscribeToStream( ctx context.Context, stream StreamIdentifier, handleMessage MessageHandler, handleLiveness LivenessHandler, handleDropped SubDroppedHandler, opts ...GetStreamOption, ) error
SubscribeToStream subscribes to a stream and asynchronously passes messages to the message handler in batches. Once a subscription has caught up it will poll the database periodically for new messages. To stop a subscription cancel the provided context. When a subscription catches up it will call the LivenessHandler with true. If the subscription falls behind again it will called the LivenessHandler with false. If there is an error while reading messages then the subscription will be stopped and the SubDroppedHandler will be called with the stopping error. If the subscription is cancelled then the SubDroppedHandler will be called with nil.
func (*Client) WriteMessage ¶
func (c *Client) WriteMessage(ctx context.Context, stream StreamIdentifier, message ProposedMessage, expectedVersion int64) (int64, error)
WriteMessage attempted to write the proposed message to the specifed stream.
type ClientOption ¶ added in v0.2.0
type ClientOption func(*Client)
ClientOption is an option for modifiying how the Message DB client operates.
func WithSubPollingStrategy ¶ added in v0.2.0
func WithSubPollingStrategy(strat PollingStrategy) ClientOption
WithSubPollingStrategy configures the client with the specified PollingStrategy.
type GetCategoryOption ¶
type GetCategoryOption func(*categoryConfig)
GetCategoryOption is an option for modifiying how to read from a category.
func AsConsumerGroup ¶
func AsConsumerGroup(member, size int64) GetCategoryOption
AsConsumerGroup specifies the consumer group options for this read. Size is used to specify the number of consumers, and member specifies which consumer is currently reading. Message-db used consistent hashing on stream names within a category and then distributes the streams amoungst the consumer group members.
func FromPosition ¶
func FromPosition(position int64) GetCategoryOption
FromPosition specifies the inclusive global position from which to read messages.
func WithCategoryBatchSize ¶
func WithCategoryBatchSize(batchSize int64) GetCategoryOption
WithCategoryBatchSize specifies the batch size to read messages.
func WithCategoryCondition ¶
func WithCategoryCondition(condition string) GetCategoryOption
WithCategoryCondition specifies an SQL condition to apply to the read request. For example: "messages.time::time >= current_time"
func WithCorrelation ¶
func WithCorrelation(correlation string) GetCategoryOption
WithCorrelation sets the correlation value that messages will be filtered by. correlation is compared against each messages medatadata correlationStreamName field.
type GetStreamOption ¶
type GetStreamOption func(*streamConfig)
GetStreamOption is an option for modifiying how to read from a stream.
func FromVersion ¶
func FromVersion(version int64) GetStreamOption
FromVersion specifies the inclusive version from which to read messages.
func WithStreamBatchSize ¶
func WithStreamBatchSize(batchSize int64) GetStreamOption
WithStreamBatchSize specifies the batch size to read messages.
func WithStreamCondition ¶
func WithStreamCondition(condition string) GetStreamOption
WithStreamCondition specifies an SQL condition to apply to the read request. For example: "messages.time::time >= current_time"
type LivenessHandler ¶ added in v0.2.0
type LivenessHandler func(bool)
LivenessHandler handles whether the subscription is in a "live" state or whether it is catching up.
type Message ¶
type Message struct { ID uuid.UUID Stream StreamIdentifier Type string Version int64 GlobalPosition int64 Timestamp time.Time // contains filtered or unexported fields }
Message represents a message that was stored in message-db.
func (*Message) UnmarshalData ¶
UnmarshalData attempts to unmarshall the Message's data into the provided object.
func (*Message) UnmarshalMetadata ¶
UnmarshalMetadata attempts to unmarshall the Message's metadata into the provided object.
type MessageHandler ¶ added in v0.2.0
type MessageHandler func(*Message)
MessageHandler handles messages as they appear after being written.
type PollingStrategy ¶ added in v0.2.0
PollingStrategy returns the delay duration before the next polling attempt based on how many messages were returned from the previous poll vs how many were expected.
func ConstantPolling ¶ added in v0.2.0
func ConstantPolling(interval time.Duration) PollingStrategy
ConstantPolling returns a constant interval polling strategy
func ExpBackoffPolling ¶ added in v0.2.0
func ExpBackoffPolling(min, max time.Duration, multiplier float64) PollingStrategy
ExpBackoffPolling returns an exponential polling backoff strategy that starts at the min duration but is multipled for every read that did not return any messages up to the max duration. The backoff duration is reset to min everytime a message is read.
type ProposedMessage ¶
ProposedMessage proposes a messages to be written to message-db.
type StreamIdentifier ¶
StreamIdentifier captures the two components of a message-db stream name.
func (StreamIdentifier) String ¶
func (si StreamIdentifier) String() string
String returns the string respresentation of a StreamIdentifier.
type SubDroppedHandler ¶ added in v0.2.0
type SubDroppedHandler func(error)
SubDroppedHandler handles errors that appear and stop the subscription.