Documentation ¶
Index ¶
- Variables
- type ConsumerChain
- type ConsumerDecorator
- type DefaultSQSProducer
- func (producer *DefaultSQSProducer) BatchProduceMessage(ctx context.Context, messageInput *sqs.SendMessageBatchInput) (*sqs.SendMessageBatchOutput, error)
- func (producer *DefaultSQSProducer) ProduceMessage(ctx context.Context, messageInput *sqs.SendMessageInput) error
- func (producer *DefaultSQSProducer) QueueURL() string
- type DefaultSQSProducerComponent
- type DefaultSQSProducerConfig
- type DefaultSQSQueueConsumer
- type DefaultSQSQueueConsumerComponent
- type DefaultSQSQueueConsumerConfig
- type LogFn
- type Logger
- type ProducerChain
- type ProducerDecorator
- type RetryableConsumerError
- type SQSConsumer
- type SQSMessageConsumer
- type SQSMessageConsumerError
- type SQSProducer
- type SmartSQSConsumer
- type SmartSQSQueueConsumerComponent
- type SmartSQSQueueConsumerConfig
- type Stat
- type StatFn
Constants ¶
This section is empty.
Variables ¶
var LoggerFromContext = logevent.FromContext
LoggerFromContext is the concrete implementation of LogFn that should be used at runtime.
var StatFromContext = xstats.FromContext
StatFromContext is the concrete implementation of StatFn that should be used at runtime.
Functions ¶
This section is empty.
Types ¶
type ConsumerChain ¶
type ConsumerChain []ConsumerDecorator
ConsumerChain is an ordered collection of ConsumerDecorator.
func (ConsumerChain) Apply ¶
func (c ConsumerChain) Apply(base SQSMessageConsumer) SQSMessageConsumer
Apply wraps the given SQSMessageConsumer with the Decorator chain.
type ConsumerDecorator ¶
type ConsumerDecorator func(SQSMessageConsumer) SQSMessageConsumer
ConsumerDecorator is a named type for any function that takes a SQSMessageConsumer and returns a SQSMessageConsumer.
type DefaultSQSProducer ¶
DefaultSQSProducer is a basic sqs producer
func NewDefaultSQSProducer ¶
func NewDefaultSQSProducer(queue sqsiface.SQSAPI, url string) *DefaultSQSProducer
NewDefaultSQSProducer initializes a new DefaultSQSProducer
func (*DefaultSQSProducer) BatchProduceMessage ¶
func (producer *DefaultSQSProducer) BatchProduceMessage(ctx context.Context, messageInput *sqs.SendMessageBatchInput) (*sqs.SendMessageBatchOutput, error)
BatchProduceMessage produces a batch of messages to the configured sqs queue, along with setting the queueURL to use
func (*DefaultSQSProducer) ProduceMessage ¶
func (producer *DefaultSQSProducer) ProduceMessage(ctx context.Context, messageInput *sqs.SendMessageInput) error
ProduceMessage produces a message to the configured sqs queue, along with setting the queueURL to use
func (*DefaultSQSProducer) QueueURL ¶
func (producer *DefaultSQSProducer) QueueURL() string
QueueURL retrieves the queue URL used by the DefaultSQSProducer
type DefaultSQSProducerComponent ¶
type DefaultSQSProducerComponent struct { }
DefaultSQSProducerComponent enables creating configured Component
func NewDefaultSQSProducerComponent ¶
func NewDefaultSQSProducerComponent() *DefaultSQSProducerComponent
NewDefaultSQSProducerComponent generates a new DefaultSQSQueueConsumerComponent
func (*DefaultSQSProducerComponent) New ¶
func (c *DefaultSQSProducerComponent) New(ctx context.Context, config *DefaultSQSProducerConfig) (DefaultSQSProducer, error)
New creates a configured DefaultSQSQueueConsumer
func (*DefaultSQSProducerComponent) Settings ¶
func (c *DefaultSQSProducerComponent) Settings() *DefaultSQSProducerConfig
Settings generates the default configuration for DefaultSQSProducerComponent
type DefaultSQSProducerConfig ¶
DefaultSQSProducerConfig represents the configuration to configure DefaultSQSProducer
func (*DefaultSQSProducerConfig) Name ¶
func (*DefaultSQSProducerConfig) Name() string
Name of the configuration
type DefaultSQSQueueConsumer ¶
type DefaultSQSQueueConsumer struct { Queue sqsiface.SQSAPI LogFn LogFn QueueURL string MessageConsumer SQSMessageConsumer // PollInterval defaults to 1 second PollInterval time.Duration // contains filtered or unexported fields }
DefaultSQSQueueConsumer is a naive implementation of an SQSConsumer. This implementation has no support for retries on nonpermanent failures; the result of every message consumption is followed by a deletion of the message. Furthermore, this implementation does not support concurrent processing of messages; messages are processed sequentially.
func (*DefaultSQSQueueConsumer) GetSQSMessageConsumer ¶
func (m *DefaultSQSQueueConsumer) GetSQSMessageConsumer() SQSMessageConsumer
GetSQSMessageConsumer returns the MessageConsumer field. This function implies that DefaultSQSQueueConsumer MUST have a MessageConsumer defined.
func (*DefaultSQSQueueConsumer) StartConsuming ¶
func (m *DefaultSQSQueueConsumer) StartConsuming(ctx context.Context) error
StartConsuming starts consuming from the configured SQS queue
func (*DefaultSQSQueueConsumer) StopConsuming ¶
func (m *DefaultSQSQueueConsumer) StopConsuming(ctx context.Context) error
StopConsuming stops this DefaultSQSQueueConsumer consuming from the SQS queue
type DefaultSQSQueueConsumerComponent ¶
type DefaultSQSQueueConsumerComponent struct { }
DefaultSQSQueueConsumerComponent enables creating configured Component
func NewDefaultSQSQueueConsumerComponent ¶
func NewDefaultSQSQueueConsumerComponent() *DefaultSQSQueueConsumerComponent
NewDefaultSQSQueueConsumerComponent generates a new DefaultSQSQueueConsumerComponent
func (*DefaultSQSQueueConsumerComponent) New ¶
func (c *DefaultSQSQueueConsumerComponent) New(ctx context.Context, config *DefaultSQSQueueConsumerConfig) (DefaultSQSQueueConsumer, error)
New creates a configured DefaultSQSQueueConsumer
func (*DefaultSQSQueueConsumerComponent) Settings ¶
func (c *DefaultSQSQueueConsumerComponent) Settings() *DefaultSQSQueueConsumerConfig
Settings generates the default configuration for DefaultSQSQueueConsumerComponent
type DefaultSQSQueueConsumerConfig ¶
type DefaultSQSQueueConsumerConfig struct { AWSEndpoint string QueueURL string QueueRegion string PollInterval time.Duration }
DefaultSQSQueueConsumerConfig represents the configuration to configure DefaultSQSQueueConsumer
func (*DefaultSQSQueueConsumerConfig) Name ¶
func (*DefaultSQSQueueConsumerConfig) Name() string
Name of the configuration
type LogFn ¶
LogFn is the type that should be accepted by components that intend to log content using the context logger.
type Logger ¶
type Logger = logevent.Logger
Logger is the project logging client interface. It is currently an alias to the logevent project.
type ProducerChain ¶
type ProducerChain []ProducerDecorator
ProducerChain is an ordered collection of Decorators.
func (ProducerChain) Apply ¶
func (c ProducerChain) Apply(base SQSProducer) SQSProducer
Apply wraps the given SQSProducer with the Decorator chain.
type ProducerDecorator ¶
type ProducerDecorator func(SQSProducer) SQSProducer
ProducerDecorator is a named type for any function that takes a SQSProducer and returns a SQSProducer.
type RetryableConsumerError ¶
RetryableConsumerError represents a possible error type an SQSMessageConsumer could return on a call to ConsumeMessage. Users can set VisibilityTimeout, and implementors of SQSConsumer can leverage VisibilityTimeout to change the visibility of an sqs message for retry purposes.
func (RetryableConsumerError) Error ¶
func (e RetryableConsumerError) Error() string
Error implements type error
type SQSConsumer ¶
type SQSConsumer interface { StartConsuming(ctx context.Context) error StopConsuming(ctx context.Context) error GetSQSMessageConsumer() SQSMessageConsumer }
SQSConsumer is an interface that represents an aws sqs queue worker. Implementers of SQSConsumer are responsible for: - SQS connectivity - Start and Stop consumption - error handling
type SQSMessageConsumer ¶
type SQSMessageConsumer interface { ConsumeMessage(ctx context.Context, message *sqs.Message) SQSMessageConsumerError // DeadLetter will be called when MaxRetries is exhausted, only in the SmartSQSConsumer DeadLetter(ctx context.Context, message *sqs.Message) }
SQSMessageConsumer is an interface that defines how a message should be consumer. Users are responsible for unmarshalling messages themselves, and returning errors.
type SQSMessageConsumerError ¶
SQSMessageConsumerError represents an error that can be used to indicate to the consumer that an error should be retried. Note: RetryAfter should be expressed in seconds
type SQSProducer ¶
type SQSProducer interface { QueueURL() string BatchProduceMessage(ctx context.Context, messageBatchInput *sqs.SendMessageBatchInput) (*sqs.SendMessageBatchOutput, error) ProduceMessage(ctx context.Context, messageInput *sqs.SendMessageInput) error }
SQSProducer is an interface for producing messages to an aws sqs instance. Implementors are responsible for placing messages on an sqs, and also: - SQS connectivity - error handling - constructing the input *sqs.SendMessageInput
type SmartSQSConsumer ¶
type SmartSQSConsumer struct { Queue sqsiface.SQSAPI LogFn LogFn QueueURL string MessageConsumer SQSMessageConsumer NumWorkers uint64 MessagePoolSize uint64 MaxNumberOfMessages uint64 MaxRetries uint64 PollInterval time.Duration // contains filtered or unexported fields }
SmartSQSConsumer is an implementation of an SQSConsumer. This implementation supports... - retryable and non-retryable errors. - a maximum number of retries to be placed on a retryable sqs message - concurrent workers
func (*SmartSQSConsumer) GetSQSMessageConsumer ¶
func (m *SmartSQSConsumer) GetSQSMessageConsumer() SQSMessageConsumer
GetSQSMessageConsumer returns the MessageConsumer field. This function implies that DefaultSQSQueueConsumer MUST have a MessageConsumer defined.
func (*SmartSQSConsumer) StartConsuming ¶
func (m *SmartSQSConsumer) StartConsuming(ctx context.Context) error
StartConsuming starts consuming from the configured SQS queue
func (*SmartSQSConsumer) StopConsuming ¶
func (m *SmartSQSConsumer) StopConsuming(ctx context.Context) error
StopConsuming stops this DefaultSQSQueueConsumer consuming from the SQS queue
type SmartSQSQueueConsumerComponent ¶
type SmartSQSQueueConsumerComponent struct { }
SmartSQSQueueConsumerComponent enables creating configured Component
func NewSmartSQSQueueConsumerComponent ¶
func NewSmartSQSQueueConsumerComponent() *SmartSQSQueueConsumerComponent
NewSmartSQSQueueConsumerComponent generates a new SmartSQSQueueConsumerComponent
func (*SmartSQSQueueConsumerComponent) New ¶
func (c *SmartSQSQueueConsumerComponent) New(ctx context.Context, config *SmartSQSQueueConsumerConfig) (SmartSQSConsumer, error)
New creates a configured SmartSQSConsumer
func (*SmartSQSQueueConsumerComponent) Settings ¶
func (c *SmartSQSQueueConsumerComponent) Settings() *SmartSQSQueueConsumerConfig
Settings generates the default configuration for DefaultSQSQueueConsumerComponent
type SmartSQSQueueConsumerConfig ¶
type SmartSQSQueueConsumerConfig struct { AWSEndpoint string QueueURL string QueueRegion string NumWorkers uint64 MessagePoolSize uint64 MaxNumberOfMessages uint64 MaxRetries uint64 PollInterval time.Duration }
SmartSQSQueueConsumerConfig represents the configuration to configure SmartSQSQueueConsumer
func (*SmartSQSQueueConsumerConfig) Name ¶
func (*SmartSQSQueueConsumerConfig) Name() string
Name of the configuration