Documentation ¶
Index ¶
- type Dispatcher
- type DispatcherSettings
- type Message
- type MessageBroker
- type MessageHeader
- type MockBroker
- type MockStore
- func (m *MockStore) AddRecordTx(record Record, tx *sql.Tx) error
- func (m *MockStore) ClearLocksByLockID(lockID string) error
- func (m *MockStore) ClearLocksWithDurationBeforeDate(time time.Time) error
- func (m *MockStore) GetRecordsByLockID(lockID string) ([]Record, error)
- func (m *MockStore) RemoveRecordsBeforeDatetime(expiryTime time.Time) error
- func (m *MockStore) UpdateRecordByID(message Record) error
- func (m *MockStore) UpdateRecordLockByState(lockID string, lockedOn time.Time, state RecordState) error
- type Publisher
- type Record
- type RecordState
- type RetrialPolicy
- type Store
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher initializes and runs the outbox dispatcher
func NewDispatcher ¶
func NewDispatcher(store Store, broker MessageBroker, settings DispatcherSettings, machineID string) *Dispatcher
NewDispatcher constructor
func (Dispatcher) Run ¶
func (d Dispatcher) Run(errChan chan<- error, doneChan <-chan struct{})
Run periodically checks for new outbox messages from the Store, sends the messages through the MessageBroker and updates the message status accordingly
type DispatcherSettings ¶
type DispatcherSettings struct { ProcessInterval time.Duration LockCheckerInterval time.Duration MaxLockTimeDuration time.Duration CleanupWorkerInterval time.Duration RetrialPolicy RetrialPolicy MessagesRetentionDuration time.Duration }
DispatcherSettings defines the set of configurations for the dispatcher
type Message ¶
type Message struct { Key string Headers []MessageHeader Body []byte Topic string }
Message encapsulates the contents of the message to be sent
type MessageBroker ¶
MessageBroker provides an interface for message brokers to send Message objects
type MessageHeader ¶
MessageHeader is the MessageHeader of the Message to be sent. It is used by Brokers
type MockBroker ¶
MockBroker mocks the Broker interface
type MockStore ¶
MockStore mocks the Store
func (*MockStore) AddRecordTx ¶
AddRecordTx method mock
func (*MockStore) ClearLocksByLockID ¶
ClearLocksByLockID method mock
func (*MockStore) ClearLocksWithDurationBeforeDate ¶
ClearLocksWithDurationBeforeDate method mock
func (*MockStore) GetRecordsByLockID ¶
GetRecordsByLockID method mock
func (*MockStore) RemoveRecordsBeforeDatetime ¶
RemoveRecordsBeforeDatetime method mock
func (*MockStore) UpdateRecordByID ¶
UpdateRecordByID method mock
func (*MockStore) UpdateRecordLockByState ¶
func (m *MockStore) UpdateRecordLockByState(lockID string, lockedOn time.Time, state RecordState) error
UpdateRecordLockByState method mock
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher encapsulates the save functionality of the outbox pattern
func NewPublisher ¶
NewPublisher is the Publisher constructor
type Record ¶
type Record struct { ID uuid.UUID Message Message State RecordState CreatedOn time.Time LockID *string LockedOn *time.Time ProcessedOn *time.Time NumberOfAttempts int LastAttemptOn *time.Time Error *string }
Record represents the record that is stored and retrieved from the database
type RecordState ¶
type RecordState int
RecordState is the State of the Record
const ( //PendingDelivery is the initial state of all records PendingDelivery RecordState = iota //Delivered indicates that the Records is already Delivered Delivered //MaxAttemptsReached indicates that the message is not Delivered but the max attempts are reached so it shouldn't be delivered MaxAttemptsReached )
type RetrialPolicy ¶
RetrialPolicy contains the retrial settings
type Store ¶
type Store interface { //AddRecordTx stores the message within the provided database transaction AddRecordTx(record Record, tx *sql.Tx) error //GetRecordsByLockID returns the records by lockID GetRecordsByLockID(lockID string) ([]Record, error) //UpdateRecordLockByState updates the lock of all records with the provided state UpdateRecordLockByState(lockID string, lockedOn time.Time, state RecordState) error //UpdateRecordByID updates the provided the record UpdateRecordByID(message Record) error //ClearLocksWithDurationBeforeDate clears the locks of records with a lock time before the provided time ClearLocksWithDurationBeforeDate(time time.Time) error //ClearLocksByLockID clears all records locked by the provided lockID ClearLocksByLockID(lockID string) error //RemoveRecordsBeforeDatetime removes all records before the provided time RemoveRecordsBeforeDatetime(expiryTime time.Time) error }
Store is the interface that should be implemented by SQL-like database drivers to support the outbox functionality