Documentation ¶
Index ¶
- type Builder
- type DelayConsumer
- type IRetrier
- type MessageHandler
- type PushToPrimaryRetryTopic
- type Retrier
- func (retrier *Retrier) Build() IRetrier
- func (r *Retrier) Handle(ctx context.Context, msg messagebroker.ReceivedMessage) error
- func (r *Retrier) Start(ctx context.Context) error
- func (r *Retrier) Stop(ctx context.Context)
- func (retrier *Retrier) WithBackoff(backoff subscription.Backoff) Builder
- func (retrier *Retrier) WithBrokerStore(store brokerstore.IBrokerStore) Builder
- func (retrier *Retrier) WithCache(ch cache.ICache) Builder
- func (retrier *Retrier) WithErrChan(errChan chan error) Builder
- func (retrier *Retrier) WithIntervalFinder(finder subscription.IntervalFinder) Builder
- func (retrier *Retrier) WithMessageHandler(handler MessageHandler) Builder
- func (retrier *Retrier) WithSubscriberID(subscriberID string) Builder
- func (retrier *Retrier) WithSubscription(subs *subscription.Model) Builder
- func (retrier *Retrier) WithTopicCore(topicCore topic.ICore) Builder
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Builder ¶
type Builder interface { WithBackoff(subscription.Backoff) Builder WithCache(ch cache.ICache) Builder WithIntervalFinder(finder subscription.IntervalFinder) Builder WithBrokerStore(store brokerstore.IBrokerStore) Builder WithSubscription(subs *subscription.Model) Builder WithMessageHandler(handler MessageHandler) Builder WithSubscriberID(subscriberID string) Builder WithErrChan(chan error) Builder WithTopicCore(topicCore topic.ICore) Builder Build() IRetrier }
Builder ...
type DelayConsumer ¶
type DelayConsumer struct {
// contains filtered or unexported fields
}
DelayConsumer ...
func NewDelayConsumer ¶
func NewDelayConsumer(ctx context.Context, subscriberID, topicName string, subs *subscription.Model, bs brokerstore.IBrokerStore, handler MessageHandler, ch cache.ICache, errChan chan error) (*DelayConsumer, error)
NewDelayConsumer inits a new delay-consumer with the pre-defined message handler
func (*DelayConsumer) LogFields ¶
func (dc *DelayConsumer) LogFields(kv ...interface{}) []interface{}
LogFields ...
func (*DelayConsumer) Run ¶
func (dc *DelayConsumer) Run(ctx context.Context)
Run spawns the delay-consumer
type IRetrier ¶
type IRetrier interface { Handle(context.Context, messagebroker.ReceivedMessage) error Start(context.Context) error Stop(context.Context) }
IRetrier interface over retrier core functionalities.
type MessageHandler ¶
type MessageHandler interface {
Do(ctx context.Context, msg messagebroker.ReceivedMessage) error
}
MessageHandler defines the contract to process a retry-able broker message
func NewPushToPrimaryRetryTopicHandler ¶
func NewPushToPrimaryRetryTopicHandler(bs brokerstore.IBrokerStore) MessageHandler
NewPushToPrimaryRetryTopicHandler inits a new retry handler
type PushToPrimaryRetryTopic ¶
type PushToPrimaryRetryTopic struct {
// contains filtered or unexported fields
}
PushToPrimaryRetryTopic holds the needed instances to handle retry
func (*PushToPrimaryRetryTopic) Do ¶
func (s *PushToPrimaryRetryTopic) Do(ctx context.Context, msg messagebroker.ReceivedMessage) error
Do defines the retry action. In this case it will push the message back on to the primary retry topic for re-processing by subscriber
type Retrier ¶
type Retrier struct {
// contains filtered or unexported fields
}
Retrier implements all business logic for IRetrier
func (*Retrier) Handle ¶
func (r *Retrier) Handle(ctx context.Context, msg messagebroker.ReceivedMessage) error
Handle takes care of processing a given retry message.
func (*Retrier) Start ¶
Start starts a new retrier which internally takes care of spawning the needed delay-consumers.
func (*Retrier) WithBackoff ¶
func (retrier *Retrier) WithBackoff(backoff subscription.Backoff) Builder
WithBackoff ...
func (*Retrier) WithBrokerStore ¶
func (retrier *Retrier) WithBrokerStore(store brokerstore.IBrokerStore) Builder
WithBrokerStore ...
func (*Retrier) WithErrChan ¶
WithErrChan ...
func (*Retrier) WithIntervalFinder ¶
func (retrier *Retrier) WithIntervalFinder(finder subscription.IntervalFinder) Builder
WithIntervalFinder ...
func (*Retrier) WithMessageHandler ¶
func (retrier *Retrier) WithMessageHandler(handler MessageHandler) Builder
WithMessageHandler ...
func (*Retrier) WithSubscriberID ¶
WithSubscriberID ...
func (*Retrier) WithSubscription ¶
func (retrier *Retrier) WithSubscription(subs *subscription.Model) Builder
WithSubscription ...