Documentation ¶
Overview ¶
Package repos exposes all types and interfaces to deal with persistent things.
In this app, it provide access to "Messages" via the MessageRepo interface.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var (
ErrAlreadyProcessed = errors.New("message already processed")
)
Errors returned by this repo
Functions ¶
This section is empty.
Types ¶
type Decimal ¶
type Decimal struct {
*apd.NullDecimal
}
Decimal is an exact decimal type that knows how to marshal/unmarshal from JSON and DB.
It supports arbitrary decimal precision. Implemented on top of cockroachdb's Decimal, with additions to support gob encode/decode.
func NewDecimal ¶
func (Decimal) MarshalJSON ¶
func (*Decimal) UnmarshalJSON ¶
func (*Decimal) UnmarshalText ¶
type Iface ¶
type Iface interface {
Messages() MessageRepo
}
Iface serves interfaces to persistent repositories.
type InputPayload ¶
type InputPayload struct { CorrespondantBank string `json:"correspondant_bank"` OperationType OperationType `json:"operation_type"` CreditorAccount string `json:"creditor_account"` DebtorAccount string `json:"debtor_account"` Amount Decimal `json:"amount"` Currency string `json:"currency"` Comment *string `json:"comment,omitempty"` }
InputPayload represents the accepted user input for a message.
func (InputPayload) AsMessage ¶
func (p InputPayload) AsMessage() Message
type Message ¶
type Message struct { ID string `json:"id" db:"id"` ProducerID string `json:"producer_id" db:"producer_id"` ConsumerID string `json:"consumer_id" db:"consumer_id"` MessageStatus MessageStatus `json:"message_status" db:"message_status"` ProcessingStatus ProcessingStatus `json:"processing_status" db:"processing_status"` InceptionTime time.Time `json:"inception_time" db:"inception_time"` LastTime time.Time `json:"last_time" db:"last_time"` ProducerReplays uint `json:"producer_replays" db:"producer_replays"` ConsumerReplays uint `json:"consumer_replays" db:"consumer_replays"` Payload `json:"payload"` }
Message contains a Payload for processing as well a various headers used ensure a no loss transmission.
type MessageIterator ¶
type MessageIterator = iterators.StructIterator[Message]
type MessagePredicate ¶
type MessagePredicate struct { UpdatedSince *time.Time NotUpdatedSince *time.Time WithMessageStatus *MessageStatus WithProcessingStatus *ProcessingStatus MaxMessageStatus *MessageStatus MaxProcessingStatus *ProcessingStatus FromProducer *string FromConsumer *string Limit uint64 Unconfirmed bool // contains filtered or unexported fields }
MessagePredicate is used to specify filters when querying Messages
type MessageRepo ¶
type MessageRepo interface { // Create a new Message in the DB Create(context.Context, Message) error // Update a Message in the DB. Update(context.Context, Message, ...UpdateOption) error // Get retrieves a Message by its unique ID Get(context.Context, string) (Message, error) // List Messages using a MessagePredicate filter. The result is an iterator to the // fetched rows. List(context.Context, MessagePredicate) (MessageIterator, error) // UpdateConfirmed is an entry point for consumers to store their own view of the message status UpdateConfirmed(context.Context, string, MessageStatus) error // UpdateReplay is an entry point for producer keep track of how many times messages have been replayed UpdateReplay(context.Context, Message, ...UpdateOption) error }
MessageRepo exposes the persistent repository for messages.
type MessageStatus ¶
type MessageStatus uint8
MessageStatus represents the current status of a message in the end-to-end protocol.
const ( MessageStatusNacked MessageStatus = iota // initial posting by producer MessageStatusPosted // message ACK-ed by consumer MessageStatusReceived // response ACK-ed by producer MessageStatusConfirmed // confirmation ACK-ed by consumer )
Message acknoledgement statuses.
NOTE: statuses are ordered
func NewMessageStatus ¶
func NewMessageStatus(s MessageStatus) *MessageStatus
func (MessageStatus) IsValid ¶
func (s MessageStatus) IsValid() bool
func (MessageStatus) Less ¶
func (s MessageStatus) Less(m MessageStatus) bool
func (MessageStatus) MarshalText ¶
func (s MessageStatus) MarshalText() ([]byte, error)
func (MessageStatus) String ¶
func (s MessageStatus) String() string
func (*MessageStatus) UnmarshalText ¶
func (s *MessageStatus) UnmarshalText(data []byte) error
type OperationType ¶
type OperationType uint8
const ( OperationTypeDebit OperationType = iota + 1 OperationTypeCredit OperationTypeBalance OperationTypeCancel )
Operation types
func (OperationType) IsValid ¶
func (s OperationType) IsValid() bool
func (OperationType) MarshalText ¶
func (s OperationType) MarshalText() ([]byte, error)
func (OperationType) String ¶
func (s OperationType) String() string
func (*OperationType) UnmarshalText ¶
func (s *OperationType) UnmarshalText(data []byte) error
type Payload ¶
type Payload struct { OperationType OperationType `json:"operation_type" db:"operation_type"` CreditorAccount string `json:"creditor_account" db:"creditor_account"` DebtorAccount string `json:"debtor_account" db:"debtor_account"` Amount Decimal `json:"amount" db:"amount"` Currency string `json:"currency" db:"currency"` BalanceBefore *Decimal `json:"balance_before,omitempty" db:"balance_before"` BalanceAfter *Decimal `json:"balance_after,omitempty" db:"balance_after"` Comment *string `json:"comment,omitempty" db:"comment"` RejectionCause *string `json:"rejection_cause,omitempty" db:"rejection_cause"` }
Payload represents the functional payload of a message.
In this example, the message payload is a typical bank transfer.
type ProcessingStatus ¶
type ProcessingStatus uint8
ProcessingStatus represents the outcome of the operation requested in a message.
const ( ProcessingStatusPending ProcessingStatus = iota // message being processed ProcessingStatusRejected // processing outcome decided: rejected ProcessingStatusOK // processing outcome decided: OK )
Message result status.
NOTE: statuses are ordered
func NewProcessingStatus ¶
func NewProcessingStatus(s ProcessingStatus) *ProcessingStatus
func (ProcessingStatus) IsValid ¶
func (s ProcessingStatus) IsValid() bool
func (ProcessingStatus) Less ¶
func (s ProcessingStatus) Less(m ProcessingStatus) bool
func (ProcessingStatus) MarshalText ¶
func (s ProcessingStatus) MarshalText() ([]byte, error)
func (ProcessingStatus) String ¶
func (s ProcessingStatus) String() string
func (*ProcessingStatus) UnmarshalText ¶
func (s *ProcessingStatus) UnmarshalText(data []byte) error
type UpdateOption ¶
type UpdateOption func(*UpdateOptions)
UpdateOption provides a bit of flexibility to Updates.
func WithForceUpdate ¶
func WithForceUpdate(enabled bool) UpdateOption
type UpdateOptions ¶
type UpdateOptions struct {
Force bool
}
Directories ¶
Path | Synopsis |
---|---|
Package messages provides a postgres implementation of the repos.MessageRepo interface
|
Package messages provides a postgres implementation of the repos.MessageRepo interface |
Package pgrepo provides a postgres implementation of the repos.Iface interface.
|
Package pgrepo provides a postgres implementation of the repos.Iface interface. |