repos

package
v0.0.0-...-132c30d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2023 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package repos exposes all types and interfaces to deal with persistent things.

In this app, it provide access to "Messages" via the MessageRepo interface.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAlreadyProcessed = errors.New("message already processed")
)

Errors returned by this repo

Functions

This section is empty.

Types

type Decimal

type Decimal struct {
	*apd.NullDecimal
}

Decimal is an exact decimal type that knows how to marshal/unmarshal from JSON and DB.

It supports arbitrary decimal precision. Implemented on top of cockroachdb's Decimal, with additions to support gob encode/decode.

func NewDecimal

func NewDecimal(coef int64, exponent int32) *Decimal

func (*Decimal) GobDecode

func (d *Decimal) GobDecode(data []byte) error

func (Decimal) GobEncode

func (d Decimal) GobEncode() ([]byte, error)

func (Decimal) MarshalJSON

func (d Decimal) MarshalJSON() ([]byte, error)

func (*Decimal) Scan

func (d *Decimal) Scan(value interface{}) error

func (*Decimal) UnmarshalJSON

func (d *Decimal) UnmarshalJSON(data []byte) error

func (*Decimal) UnmarshalText

func (d *Decimal) UnmarshalText(data []byte) error

type Iface

type Iface interface {
	Messages() MessageRepo
}

Iface serves interfaces to persistent repositories.

type InputPayload

type InputPayload struct {
	CorrespondantBank string        `json:"correspondant_bank"`
	OperationType     OperationType `json:"operation_type"`
	CreditorAccount   string        `json:"creditor_account"`
	DebtorAccount     string        `json:"debtor_account"`
	Amount            Decimal       `json:"amount"`
	Currency          string        `json:"currency"`
	Comment           *string       `json:"comment,omitempty"`
}

InputPayload represents the accepted user input for a message.

func (InputPayload) AsMessage

func (p InputPayload) AsMessage() Message

type Message

type Message struct {
	ID         string `json:"id" db:"id"`
	ProducerID string `json:"producer_id" db:"producer_id"`
	ConsumerID string `json:"consumer_id" db:"consumer_id"`

	MessageStatus    MessageStatus    `json:"message_status" db:"message_status"`
	ProcessingStatus ProcessingStatus `json:"processing_status" db:"processing_status"`

	InceptionTime time.Time `json:"inception_time" db:"inception_time"`
	LastTime      time.Time `json:"last_time" db:"last_time"`

	ProducerReplays uint `json:"producer_replays" db:"producer_replays"`
	ConsumerReplays uint `json:"consumer_replays" db:"consumer_replays"`

	Payload `json:"payload"`
}

Message contains a Payload for processing as well a various headers used ensure a no loss transmission.

func (Message) Bytes

func (p Message) Bytes() ([]byte, error)

func (Message) Validate

func (p Message) Validate() error

type MessageIterator

type MessageIterator = iterators.StructIterator[Message]

type MessagePredicate

type MessagePredicate struct {
	UpdatedSince         *time.Time
	NotUpdatedSince      *time.Time
	WithMessageStatus    *MessageStatus
	WithProcessingStatus *ProcessingStatus
	MaxMessageStatus     *MessageStatus
	MaxProcessingStatus  *ProcessingStatus
	FromProducer         *string
	FromConsumer         *string
	Limit                uint64
	Unconfirmed          bool
	// contains filtered or unexported fields
}

MessagePredicate is used to specify filters when querying Messages

type MessageRepo

type MessageRepo interface {
	// Create a new Message in the DB
	Create(context.Context, Message) error

	// Update a Message in the DB.
	Update(context.Context, Message, ...UpdateOption) error

	// Get retrieves a Message by its unique ID
	Get(context.Context, string) (Message, error)

	// List Messages using a MessagePredicate filter. The result is an iterator to the
	// fetched rows.
	List(context.Context, MessagePredicate) (MessageIterator, error)

	// UpdateConfirmed is an entry point for consumers to store their own view of the message status
	UpdateConfirmed(context.Context, string, MessageStatus) error

	// UpdateReplay is an entry point for producer keep track of how many times messages have been replayed
	UpdateReplay(context.Context, Message, ...UpdateOption) error
}

MessageRepo exposes the persistent repository for messages.

type MessageStatus

type MessageStatus uint8

MessageStatus represents the current status of a message in the end-to-end protocol.

const (
	MessageStatusNacked    MessageStatus = iota // initial posting by producer
	MessageStatusPosted                         // message ACK-ed by consumer
	MessageStatusReceived                       // response ACK-ed by producer
	MessageStatusConfirmed                      // confirmation ACK-ed by consumer
)

Message acknoledgement statuses.

NOTE: statuses are ordered

func NewMessageStatus

func NewMessageStatus(s MessageStatus) *MessageStatus

func (MessageStatus) IsValid

func (s MessageStatus) IsValid() bool

func (MessageStatus) Less

func (s MessageStatus) Less(m MessageStatus) bool

func (MessageStatus) MarshalText

func (s MessageStatus) MarshalText() ([]byte, error)

func (MessageStatus) String

func (s MessageStatus) String() string

func (*MessageStatus) UnmarshalText

func (s *MessageStatus) UnmarshalText(data []byte) error

type OperationType

type OperationType uint8
const (
	OperationTypeDebit OperationType = iota + 1
	OperationTypeCredit
	OperationTypeBalance
	OperationTypeCancel
)

Operation types

func (OperationType) IsValid

func (s OperationType) IsValid() bool

func (OperationType) MarshalText

func (s OperationType) MarshalText() ([]byte, error)

func (OperationType) String

func (s OperationType) String() string

func (*OperationType) UnmarshalText

func (s *OperationType) UnmarshalText(data []byte) error

type Payload

type Payload struct {
	OperationType   OperationType `json:"operation_type" db:"operation_type"`
	CreditorAccount string        `json:"creditor_account" db:"creditor_account"`
	DebtorAccount   string        `json:"debtor_account" db:"debtor_account"`
	Amount          Decimal       `json:"amount" db:"amount"`
	Currency        string        `json:"currency" db:"currency"`
	BalanceBefore   *Decimal      `json:"balance_before,omitempty" db:"balance_before"`
	BalanceAfter    *Decimal      `json:"balance_after,omitempty" db:"balance_after"`
	Comment         *string       `json:"comment,omitempty" db:"comment"`
	RejectionCause  *string       `json:"rejection_cause,omitempty" db:"rejection_cause"`
}

Payload represents the functional payload of a message.

In this example, the message payload is a typical bank transfer.

func (Payload) Validate

func (p Payload) Validate() error

Validate the presence of required field and legit IBAN account identifiers.

type ProcessingStatus

type ProcessingStatus uint8

ProcessingStatus represents the outcome of the operation requested in a message.

const (
	ProcessingStatusPending  ProcessingStatus = iota // message being processed
	ProcessingStatusRejected                         // processing outcome decided: rejected
	ProcessingStatusOK                               // processing outcome decided: OK
)

Message result status.

NOTE: statuses are ordered

func NewProcessingStatus

func NewProcessingStatus(s ProcessingStatus) *ProcessingStatus

func (ProcessingStatus) IsValid

func (s ProcessingStatus) IsValid() bool

func (ProcessingStatus) Less

func (ProcessingStatus) MarshalText

func (s ProcessingStatus) MarshalText() ([]byte, error)

func (ProcessingStatus) String

func (s ProcessingStatus) String() string

func (*ProcessingStatus) UnmarshalText

func (s *ProcessingStatus) UnmarshalText(data []byte) error

type UpdateOption

type UpdateOption func(*UpdateOptions)

UpdateOption provides a bit of flexibility to Updates.

func WithForceUpdate

func WithForceUpdate(enabled bool) UpdateOption

type UpdateOptions

type UpdateOptions struct {
	Force bool
}

Directories

Path Synopsis
Package messages provides a postgres implementation of the repos.MessageRepo interface
Package messages provides a postgres implementation of the repos.MessageRepo interface
Package pgrepo provides a postgres implementation of the repos.Iface interface.
Package pgrepo provides a postgres implementation of the repos.Iface interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL