Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( DefaultProcessInterval = 10 * time.Second DefaultClaimDuration = 2 * time.Second DefaultBatchSize = 20 )
Functions ¶
func NamespaceFromContext ¶
NamespaceFromContext identifies what namespace to record published messages to in the outbox
Types ¶
type ClaimedEntry ¶
type ClaimedEntry struct { // Namespace is an identifier used to group outbox entries, e.g. for choosing what topics to route entries to Namespace string // ID is a unique identifier for any given Outbox ClaimedEntry, typically a database primary key ID string // Key to be included in the published Message Key []byte // Payload to be included in the published Message Payload []byte }
ClaimedEntry is an entry in the Outbox
type Config ¶
type Config struct { // Clock abstracts interactions with the time package, defaults to a real clock implementation Clock Clock // Storage allows the processing task to claim, retrieve and delete ClaimedEntry objects Storage ProcessorStorage // Publisher is used to publish Message objects, made from ClaimedEntry objects, pulled from ProcessorStorage Publisher Publisher // ProcessInterval specifies how long the processor should spend idle without checking for work, this // is reset if Outbox.WakeProcessor is called ProcessInterval time.Duration // ClaimDuration specifies how long the processor will claim ClaimedEntry objects in ProcessorStorage ClaimDuration time.Duration // ProcessorID is a unique identifier for any instance of the outbox, so a horizontally scaled app // can run many Outbox instances, each claiming ClaimedEntry objects and publishing them ProcessorID string // BatchSize indicates how many ClaimedEntry objects to attempt to retrieve & publish in one go BatchSize int // Logger can be provided to receive logging output Logger logr.Logger }
Config configures the behaviour of the Outbox
func (*Config) DefaultAndValidate ¶
DefaultAndValidate ensures the configuration is valid and, where possible, provides reasonable default values where no value is provided
type ContextSettings ¶
type ContextSettings struct {
Namespace string
}
ContextSettings are settings that can configure outbox behaviour through context
func (ContextSettings) Clone ¶
func (c ContextSettings) Clone() *ContextSettings
Clone clones context settings
type Message ¶
type Message struct { // Key is an optional value primarily used in streaming systems that partition // published messages by keys to facilitate in-order delivery and load balancing Key []byte // Payload is the actual message contents that should be published Payload []byte }
Message is what will be published over some pubsub/streaming system
type Outbox ¶
type Outbox struct {
// contains filtered or unexported fields
}
Outbox is the primary object in the package that implements the transactional outbox pattern.
func (*Outbox) Publish ¶
Publish publishes the provided messages to the outbox, and will be forwarded to the configured Publisher during one of the subsequent PumpOutbox calls
func (*Outbox) PumpOutbox ¶
PumpOutbox causes the Outbox to process entries immediately. This is typically not called directly, instead called from StartProcessing. However, this is exposed partially for ease of testing, but also to facilitate customising the processing logic if the provided StartProcessing function isn't suitable for your application.
func (*Outbox) StartProcessing ¶
StartProcessing blocks, processing the outbox until its context is cancelled. It wakes up to process regularly based on the Config.ProcessInterval and can be woken manually using WakeProcessor.
func (*Outbox) WakeProcessor ¶
func (o *Outbox) WakeProcessor()
WakeProcessor is used to notify the outbox processor that new data has been written to the outbox and it should wake up and process them, rather than wait for the Config.ProcessInterval. For batch write operations, try to only call this once so the processor is likely to wake up fewer times and process them as a batch. This function does not block.
type ProcessorStorage ¶
type ProcessorStorage interface { // ClaimEntries attempts to update all claimable entries as belonging to the calling processor ClaimEntries(ctx context.Context, processorID string, claimDeadline time.Time) error // GetClaimedEntries returns a batch of entries currently belonging to the calling processor GetClaimedEntries(ctx context.Context, processorID string, batchSize int) ([]ClaimedEntry, error) // DeleteEntries deletes the entries as specified by their ClaimedEntry.ID DeleteEntries(ctx context.Context, entryIDs ...string) error // Publish creates new outbox entries containing the provided messages, to be published as soon as possible // Note: implementations should consult the context for additional ContextSettings, e.g. namespace Publish(ctx context.Context, txn interface{}, messages ...Message) error }
ProcessorStorage is the Outbox's interaction with persistence, typically a database
type PublishError ¶
type PublishError struct { // Errors correlates one-to-one with the Message values passed to Publisher.Publish - if a message // was sent successfully it will have a nil entry, otherwise it will be an error value Errors []error }
PublishError allows callers to understand which Message objects, if any, were sent successfully
func (*PublishError) Error ¶
func (p *PublishError) Error() string
Error provides a brief string summary to implement the Error interface
func (*PublishError) ErrorCount ¶
func (p *PublishError) ErrorCount() (count int)
ErrorCount counts how many messages failed to publish
type Publisher ¶
type Publisher interface { // Publish attempts to write the given messages to a destination. It may return a PublishError // to indicate which messages were published successfully. // Note: implementations should consult the context for additional ContextSettings, e.g. namespace Publish(ctx context.Context, messages ...Message) error }
Publisher is something that can take a batch of Message objects and attempt to publish them. Note that this interface is useful both as:
- The destination that the Outbox will write Message objects to, e.g. some external pubsub/stream
- A promise from your application's persistence layer which - as part of some ongoing transaction - will write the given Message objects to the underlying ProcessorStorage for later publishing