Documentation
¶
Index ¶
- Variables
- func GetConnectionPool(dbConfig config.RelationalDatabaseConfig, migrationConf *MigrationConfig, ...) (*sql.DB, error)
- type AppDBRepository
- type AppRepository
- type ChannelDBRepository
- type ChannelRepository
- type ConsumerDBRepository
- func (consumerRepo *ConsumerDBRepository) Delete(consumer *data.Consumer) error
- func (consumerRepo *ConsumerDBRepository) Get(channelID string, consumerID string) (consumer *data.Consumer, err error)
- func (consumerRepo *ConsumerDBRepository) GetByID(id string) (consumer *data.Consumer, err error)
- func (consumerRepo *ConsumerDBRepository) GetList(channelID string, page *data.Pagination) ([]*data.Consumer, *data.Pagination, error)
- func (consumerRepo *ConsumerDBRepository) Store(consumer *data.Consumer) (*data.Consumer, error)
- type ConsumerRepository
- type ContextKey
- type DataAccessor
- type DeliveryJobDBRepository
- func (djRepo *DeliveryJobDBRepository) DispatchMessage(message *data.Message, deliveryJobs ...*data.DeliveryJob) (err error)
- func (djRepo *DeliveryJobDBRepository) GetByID(id string) (job *data.DeliveryJob, err error)
- func (djRepo *DeliveryJobDBRepository) GetJobsForConsumer(consumer *data.Consumer, jobStatus data.JobStatus, page *data.Pagination) ([]*data.DeliveryJob, *data.Pagination, error)
- func (djRepo *DeliveryJobDBRepository) GetJobsForMessage(message *data.Message, page *data.Pagination) ([]*data.DeliveryJob, *data.Pagination, error)
- func (djRepo *DeliveryJobDBRepository) GetJobsInflightSince(delta time.Duration) []*data.DeliveryJob
- func (djRepo *DeliveryJobDBRepository) GetJobsReadyForInflightSince(delta time.Duration) []*data.DeliveryJob
- func (djRepo *DeliveryJobDBRepository) MarkJobDead(deliveryJob *data.DeliveryJob) error
- func (djRepo *DeliveryJobDBRepository) MarkJobDelivered(deliveryJob *data.DeliveryJob) error
- func (djRepo *DeliveryJobDBRepository) MarkJobInflight(deliveryJob *data.DeliveryJob) error
- func (djRepo *DeliveryJobDBRepository) MarkJobRetry(deliveryJob *data.DeliveryJob, earliestDelta time.Duration) (err error)
- func (djRepo *DeliveryJobDBRepository) RequeueDeadJobsForConsumer(consumer *data.Consumer) (err error)
- type DeliveryJobRepository
- type LockDBRepository
- type LockRepository
- type MessageDBRepository
- func (msgRepo *MessageDBRepository) Create(message *data.Message) (err error)
- func (msgRepo *MessageDBRepository) Get(channelID string, messageID string) (*data.Message, error)
- func (msgRepo *MessageDBRepository) GetByID(id string) (*data.Message, error)
- func (msgRepo *MessageDBRepository) GetMessagesForChannel(channelID string, page *data.Pagination) ([]*data.Message, *data.Pagination, error)
- func (msgRepo *MessageDBRepository) GetMessagesNotDispatchedForCertainPeriod(delta time.Duration) []*data.Message
- func (msgRepo *MessageDBRepository) SetDispatched(txContext context.Context, message *data.Message) error
- type MessageRepository
- type MigrationConfig
- type ProducerDBRepository
- type ProducerRepository
- type RelationalDBDataAccessor
- func (rdbmsDataAccessor *RelationalDBDataAccessor) Close()
- func (rdbmsDataAccessor *RelationalDBDataAccessor) GetAppRepository() AppRepository
- func (rdbmsDataAccessor *RelationalDBDataAccessor) GetChannelRepository() ChannelRepository
- func (rdbmsDataAccessor *RelationalDBDataAccessor) GetConsumerRepository() ConsumerRepository
- func (rdbmsDataAccessor *RelationalDBDataAccessor) GetDeliveryJobRepository() DeliveryJobRepository
- func (rdbmsDataAccessor *RelationalDBDataAccessor) GetLockRepository() LockRepository
- func (rdbmsDataAccessor *RelationalDBDataAccessor) GetMessageRepository() MessageRepository
- func (rdbmsDataAccessor *RelationalDBDataAccessor) GetProducerRepository() ProducerRepository
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoLock is returned when no lock is passed to try or release function ErrNoLock = errors.New("no lock provided") // ErrAlreadyLocked is returned when lock already exists in repo ErrAlreadyLocked = errors.New("lock already attained by someone else") )
var ( // ErrDuplicateMessageIDForChannel represents when the a message with same ID already exists ErrDuplicateMessageIDForChannel = errors.New("duplicate message id for channel") // ErrNoTxInContext represents the case where transaction is not passed in the context ErrNoTxInContext = errors.New("no tx value in content") )
var ( // ErrOptimisticAppInit represents the Error when optimistically update fails to start app init ErrOptimisticAppInit = errors.New(optimisticLockInitAppErrMsg) // ErrOptimisticAppComplete represents the Error when app complete attempted from not initializing state ErrOptimisticAppComplete = errors.New(optimisticLockCompleteAppErrMsg) // ErrAppInitializing is returned when app is being initialized by another thread. ErrAppInitializing = errors.New("App is in initializing") // ErrNoDataChangeFromInitialized is returned when initialization is attempted without any seed data change while app has been initialized ErrNoDataChangeFromInitialized = errors.New("No data change on initialized App") // ErrCompleteWhileNotBeingInitialized is returned when complete is called without being initialized ErrCompleteWhileNotBeingInitialized = errors.New("App not initializing to complete initializing") // ErrNoRowsUpdated is returned when a UPDATE query does not change any row which is unexpected ErrNoRowsUpdated = errors.New("No rows updated on UPDATE query") // ErrInvalidStateToSave is returned when a data is not in a state we can send it to the repo as ErrInvalidStateToSave = errors.New("Data model in invalid state to be stored") // ErrPaginationDeadlock is returned if both after and before is provided in pagination ErrPaginationDeadlock = errors.New("Can not decide on pagination direction! Both after and before provided or pagination is nil") )
var ( // ErrDBConnectionNeverInitialized is returned when same NewDataAccessor is called the first time and it failed to connec to DB; in all subsequent calls the accessor will remain nil ErrDBConnectionNeverInitialized = errors.New("DB Connection never initialized") // RDBMSStorageInternalInjector injector for data storage related implementation RDBMSStorageInternalInjector = wire.NewSet(GetConnectionPool, NewLockRepository, NewAppRepository, NewProducerRepository, NewChannelRepository, NewConsumerRepository, NewMessageRepository, NewDeliveryJobRepository, wire.Struct(new(RelationalDBDataAccessor), "db", "appRepository", "producerRepository", "channelRepository", "consumerRepository", "messageRepository", "deliveryJobRepository", "lockRepository"), wire.Bind(new(DataAccessor), new(*RelationalDBDataAccessor))) )
Functions ¶
func GetConnectionPool ¶
func GetConnectionPool(dbConfig config.RelationalDatabaseConfig, migrationConf *MigrationConfig, seedDataConfig config.SeedDataConfig) (*sql.DB, error)
GetConnectionPool Gets the DB Connection Pool for the App
Types ¶
type AppDBRepository ¶
type AppDBRepository struct {
// contains filtered or unexported fields
}
AppDBRepository is the repository to access App data
func (*AppDBRepository) CompleteAppInit ¶
func (appRep *AppDBRepository) CompleteAppInit() error
CompleteAppInit stores that App initialization completed; it will return error if app is not in initializing state before the update is made
func (*AppDBRepository) GetApp ¶
func (appRep *AppDBRepository) GetApp() (*data.App, error)
GetApp retrieves the App from storage, it will never return nil
func (*AppDBRepository) InitAppData ¶
func (appRep *AppDBRepository) InitAppData(seedData *config.SeedData) error
InitAppData initializes only and only if none present in DB with status NotInitialized. Error if insertion fails.
func (*AppDBRepository) StartAppInit ¶
func (appRep *AppDBRepository) StartAppInit(seedData *config.SeedData) error
StartAppInit stores state that App initialization started. It will return error if App is in Initializing state or if data hash is equal and app in initialized state
type AppRepository ¶
type AppRepository interface { GetApp() (*data.App, error) StartAppInit(data *config.SeedData) error CompleteAppInit() error }
AppRepository allows storage operation interaction for App
func NewAppRepository ¶
func NewAppRepository(db *sql.DB) AppRepository
NewAppRepository retrieves App Repository
type ChannelDBRepository ¶
type ChannelDBRepository struct {
// contains filtered or unexported fields
}
ChannelDBRepository channel repository implementation for RDBMS
func (*ChannelDBRepository) Get ¶
func (repo *ChannelDBRepository) Get(channelID string) (*data.Channel, error)
Get retrieves the channel with matching channel id
func (*ChannelDBRepository) GetList ¶
func (repo *ChannelDBRepository) GetList(page *data.Pagination) ([]*data.Channel, *data.Pagination, error)
GetList retrieves the list of channel based on pagination params supplied. It will return a error if both after and before is present at the same time
type ChannelRepository ¶
type ChannelRepository interface { Store(channel *data.Channel) (*data.Channel, error) Get(channelID string) (*data.Channel, error) GetList(page *data.Pagination) ([]*data.Channel, *data.Pagination, error) }
ChannelRepository allows storage operation interaction for Channel
func NewChannelRepository ¶
func NewChannelRepository(db *sql.DB) ChannelRepository
NewChannelRepository retrieves new instance of channel repository
type ConsumerDBRepository ¶
type ConsumerDBRepository struct {
// contains filtered or unexported fields
}
ConsumerDBRepository is the RDBMS implementation for ConsumerRepository
func (*ConsumerDBRepository) Delete ¶
func (consumerRepo *ConsumerDBRepository) Delete(consumer *data.Consumer) error
Delete deletes consumer from DB
func (*ConsumerDBRepository) Get ¶
func (consumerRepo *ConsumerDBRepository) Get(channelID string, consumerID string) (consumer *data.Consumer, err error)
Get retrieves consumer for specific consumer, error if either consumer or channel does not exist
func (*ConsumerDBRepository) GetByID ¶
func (consumerRepo *ConsumerDBRepository) GetByID(id string) (consumer *data.Consumer, err error)
GetByID retrieves a consumer by its ID
func (*ConsumerDBRepository) GetList ¶
func (consumerRepo *ConsumerDBRepository) GetList(channelID string, page *data.Pagination) ([]*data.Consumer, *data.Pagination, error)
GetList retrieves consumers for specific consumer; return error if channel does not exist
type ConsumerRepository ¶
type ConsumerRepository interface { Store(consumer *data.Consumer) (*data.Consumer, error) Delete(consumer *data.Consumer) error Get(channelID string, consumerID string) (*data.Consumer, error) GetList(channelID string, page *data.Pagination) ([]*data.Consumer, *data.Pagination, error) GetByID(id string) (*data.Consumer, error) }
ConsumerRepository allows storage operation interaction for Consumer
func NewConsumerRepository ¶
func NewConsumerRepository(db *sql.DB, channelRepo ChannelRepository) ConsumerRepository
NewConsumerRepository initializes new consumer repository
type DataAccessor ¶
type DataAccessor interface { GetAppRepository() AppRepository GetProducerRepository() ProducerRepository GetChannelRepository() ChannelRepository GetConsumerRepository() ConsumerRepository GetMessageRepository() MessageRepository GetDeliveryJobRepository() DeliveryJobRepository GetLockRepository() LockRepository Close() }
DataAccessor is the facade to all the data repository
func GetNewDataAccessor ¶
func GetNewDataAccessor(dbConfig config.RelationalDatabaseConfig, migrationConf *MigrationConfig, seedDataConfig config.SeedDataConfig) (DataAccessor, error)
type DeliveryJobDBRepository ¶
type DeliveryJobDBRepository struct {
// contains filtered or unexported fields
}
DeliveryJobDBRepository is the DeliveryJobRepository's RDBMS implementation
func (*DeliveryJobDBRepository) DispatchMessage ¶
func (djRepo *DeliveryJobDBRepository) DispatchMessage(message *data.Message, deliveryJobs ...*data.DeliveryJob) (err error)
DispatchMessage saves the delivery jobs and updates the message status in one atomic state
func (*DeliveryJobDBRepository) GetByID ¶
func (djRepo *DeliveryJobDBRepository) GetByID(id string) (job *data.DeliveryJob, err error)
GetByID loads the delivery job with specified id if it exists, else returns an error
func (*DeliveryJobDBRepository) GetJobsForConsumer ¶
func (djRepo *DeliveryJobDBRepository) GetJobsForConsumer(consumer *data.Consumer, jobStatus data.JobStatus, page *data.Pagination) ([]*data.DeliveryJob, *data.Pagination, error)
GetJobsForConsumer retrieves DeliveryJob created for delivery to a customer and it has to be filtered by a specific status
func (*DeliveryJobDBRepository) GetJobsForMessage ¶
func (djRepo *DeliveryJobDBRepository) GetJobsForMessage(message *data.Message, page *data.Pagination) ([]*data.DeliveryJob, *data.Pagination, error)
GetJobsForMessage retrieves jobs created for a specific message
func (*DeliveryJobDBRepository) GetJobsInflightSince ¶
func (djRepo *DeliveryJobDBRepository) GetJobsInflightSince(delta time.Duration) []*data.DeliveryJob
GetJobsInflightSince retrieves jobs in inflight status since the delta duration
func (*DeliveryJobDBRepository) GetJobsReadyForInflightSince ¶
func (djRepo *DeliveryJobDBRepository) GetJobsReadyForInflightSince(delta time.Duration) []*data.DeliveryJob
GetJobsReadyForInflightSince retrieves jobs in queued status and earliestNextAttemptAt < `now`-delta
func (*DeliveryJobDBRepository) MarkJobDead ¶
func (djRepo *DeliveryJobDBRepository) MarkJobDead(deliveryJob *data.DeliveryJob) error
MarkJobDead sets the status of the job to Dead if the job's current status is Inflight in the object and DB; else returns error
func (*DeliveryJobDBRepository) MarkJobDelivered ¶
func (djRepo *DeliveryJobDBRepository) MarkJobDelivered(deliveryJob *data.DeliveryJob) error
MarkJobDelivered sets the status of the job to Delivered if the job's current status is Inflight in the object and DB; else returns error
func (*DeliveryJobDBRepository) MarkJobInflight ¶
func (djRepo *DeliveryJobDBRepository) MarkJobInflight(deliveryJob *data.DeliveryJob) error
MarkJobInflight sets the status of the job to Inflight if job's current state in the object and DB is Queued; else returns error
func (*DeliveryJobDBRepository) MarkJobRetry ¶
func (djRepo *DeliveryJobDBRepository) MarkJobRetry(deliveryJob *data.DeliveryJob, earliestDelta time.Duration) (err error)
MarkJobRetry increases the retry attempt count and sets the status of the job to Queued if the job's current status is Inflight in the object and DB; else returns error
func (*DeliveryJobDBRepository) RequeueDeadJobsForConsumer ¶
func (djRepo *DeliveryJobDBRepository) RequeueDeadJobsForConsumer(consumer *data.Consumer) (err error)
RequeueDeadJobsForConsumer queues up dead jobs for a specific consumer
type DeliveryJobRepository ¶
type DeliveryJobRepository interface { DispatchMessage(message *data.Message, deliveryJobs ...*data.DeliveryJob) error MarkJobInflight(deliveryJob *data.DeliveryJob) error MarkJobDelivered(deliveryJob *data.DeliveryJob) error MarkJobDead(deliveryJob *data.DeliveryJob) error MarkJobRetry(deliveryJob *data.DeliveryJob, earliestDelta time.Duration) error RequeueDeadJobsForConsumer(consumer *data.Consumer) error GetJobsForMessage(message *data.Message, page *data.Pagination) ([]*data.DeliveryJob, *data.Pagination, error) GetJobsForConsumer(consumer *data.Consumer, jobStatus data.JobStatus, page *data.Pagination) ([]*data.DeliveryJob, *data.Pagination, error) GetByID(id string) (*data.DeliveryJob, error) GetJobsInflightSince(delta time.Duration) []*data.DeliveryJob GetJobsReadyForInflightSince(delta time.Duration) []*data.DeliveryJob }
DeliveryJobRepository allows storage operations over DeliveryJob
func NewDeliveryJobRepository ¶
func NewDeliveryJobRepository(db *sql.DB, msgRepo MessageRepository, consumerRepo ConsumerRepository) DeliveryJobRepository
NewDeliveryJobRepository creates a new instance of DeliveryJobRepository
type LockDBRepository ¶
type LockDBRepository struct {
// contains filtered or unexported fields
}
LockDBRepository represents the RDBMS implementation of LockRepository
func (*LockDBRepository) ReleaseLock ¶
func (lockRepo *LockDBRepository) ReleaseLock(lock *data.Lock) (err error)
ReleaseLock tries to release the lock, will return error if no such lock or any error in releasing
func (*LockDBRepository) TimeoutLocks ¶
func (lockRepo *LockDBRepository) TimeoutLocks(threshold time.Duration) (err error)
TimeoutLocks will force release locks that are older than the duration specified from now. Return error if DB called failed
type LockRepository ¶
type LockRepository interface { TryLock(lock *data.Lock) error ReleaseLock(lock *data.Lock) error TimeoutLocks(threshold time.Duration) error }
LockRepository allows storage operations over Lock
func NewLockRepository ¶
func NewLockRepository(db *sql.DB) LockRepository
NewLockRepository creates a new instance of LockRepository
type MessageDBRepository ¶
type MessageDBRepository struct {
// contains filtered or unexported fields
}
MessageDBRepository is the MessageRepository implementation
func (*MessageDBRepository) Create ¶
func (msgRepo *MessageDBRepository) Create(message *data.Message) (err error)
Create creates a new message if message.MessageID does not already exist; please ensure QuickFix is called before repo is called
func (*MessageDBRepository) GetByID ¶
func (msgRepo *MessageDBRepository) GetByID(id string) (*data.Message, error)
GetByID retrieves a message by its ID
func (*MessageDBRepository) GetMessagesForChannel ¶
func (msgRepo *MessageDBRepository) GetMessagesForChannel(channelID string, page *data.Pagination) ([]*data.Message, *data.Pagination, error)
GetMessagesForChannel retrieves messages broadcasted to a specific channel
func (*MessageDBRepository) GetMessagesNotDispatchedForCertainPeriod ¶
func (msgRepo *MessageDBRepository) GetMessagesNotDispatchedForCertainPeriod(delta time.Duration) []*data.Message
GetMessagesNotDispatchedForCertainPeriod retrieves messages in acknowledged state despite `delta` being passed.
func (*MessageDBRepository) SetDispatched ¶
func (msgRepo *MessageDBRepository) SetDispatched(txContext context.Context, message *data.Message) error
SetDispatched sets the status of the message to dispatched within the transaction passed via txContext
type MessageRepository ¶
type MessageRepository interface { Create(message *data.Message) error Get(channelID string, messageID string) (*data.Message, error) GetByID(id string) (*data.Message, error) SetDispatched(txContext context.Context, message *data.Message) error GetMessagesNotDispatchedForCertainPeriod(delta time.Duration) []*data.Message GetMessagesForChannel(channelID string, page *data.Pagination) ([]*data.Message, *data.Pagination, error) }
MessageRepository allows storage operations over Message. SetDispatched does not accept TX directly to keep the API storage class independent
func NewMessageRepository ¶
func NewMessageRepository(db *sql.DB, channelRepo ChannelRepository, producerRepo ProducerRepository) MessageRepository
NewMessageRepository creates a new instance of MessageRepository
type MigrationConfig ¶
MigrationConfig represents the DB migration config
type ProducerDBRepository ¶
type ProducerDBRepository struct {
// contains filtered or unexported fields
}
ProducerDBRepository is the producer repository implementation for RDBMS
func (*ProducerDBRepository) Get ¶
func (repo *ProducerDBRepository) Get(producerID string) (*data.Producer, error)
Get retrieves the producer with matching producer id
func (*ProducerDBRepository) GetList ¶
func (repo *ProducerDBRepository) GetList(page *data.Pagination) ([]*data.Producer, *data.Pagination, error)
GetList retrieves the list of producer based on pagination params supplied. It will return a error if both after and before is present at the same time
type ProducerRepository ¶
type ProducerRepository interface { Store(producer *data.Producer) (*data.Producer, error) Get(producerID string) (*data.Producer, error) GetList(page *data.Pagination) ([]*data.Producer, *data.Pagination, error) }
ProducerRepository allows storage operation interaction for Producer
func NewProducerRepository ¶
func NewProducerRepository(db *sql.DB) ProducerRepository
NewProducerRepository returns a new producer repository
type RelationalDBDataAccessor ¶
type RelationalDBDataAccessor struct {
// contains filtered or unexported fields
}
RelationalDBDataAccessor represents the DataAccessor implementation for RDBMS
func (*RelationalDBDataAccessor) Close ¶
func (rdbmsDataAccessor *RelationalDBDataAccessor) Close()
Close closes the connection to DB
func (*RelationalDBDataAccessor) GetAppRepository ¶
func (rdbmsDataAccessor *RelationalDBDataAccessor) GetAppRepository() AppRepository
GetAppRepository returns the AppRepository to be used for App ops
func (*RelationalDBDataAccessor) GetChannelRepository ¶
func (rdbmsDataAccessor *RelationalDBDataAccessor) GetChannelRepository() ChannelRepository
GetChannelRepository returns the ProducerRepository to be used for Producer ops
func (*RelationalDBDataAccessor) GetConsumerRepository ¶
func (rdbmsDataAccessor *RelationalDBDataAccessor) GetConsumerRepository() ConsumerRepository
GetConsumerRepository returns the ProducerRepository to be used for Producer ops
func (*RelationalDBDataAccessor) GetDeliveryJobRepository ¶
func (rdbmsDataAccessor *RelationalDBDataAccessor) GetDeliveryJobRepository() DeliveryJobRepository
GetDeliveryJobRepository retrieves the DeliveryJobRepository to be used for DeliverJob ops
func (*RelationalDBDataAccessor) GetLockRepository ¶
func (rdbmsDataAccessor *RelationalDBDataAccessor) GetLockRepository() LockRepository
GetLockRepository retrieves the LockRepository to be used for Lock ops
func (*RelationalDBDataAccessor) GetMessageRepository ¶
func (rdbmsDataAccessor *RelationalDBDataAccessor) GetMessageRepository() MessageRepository
GetMessageRepository retrieves the MessageRepository to be used for Message ops
func (*RelationalDBDataAccessor) GetProducerRepository ¶
func (rdbmsDataAccessor *RelationalDBDataAccessor) GetProducerRepository() ProducerRepository
GetProducerRepository returns the ProducerRepository to be used for Producer ops