Documentation ¶
Index ¶
- type ConsumerClient
- type IConsumer
- type MessageProcessorFunc
- type Option
- func WithBackoffDuration(duration time.Duration) Option
- func WithBatchSize(size int64) Option
- func WithConcurrencyFactor(factor int) Option
- func WithDLQURL(dlqURL *string) Option
- func WithInstrumentationClient(ic *instrumentation.Client) Option
- func WithLogger(logger *zap.Logger) Option
- func WithMessageHandler(handler MessageProcessorFunc) Option
- func WithMessageProcessTimeout(timeout time.Duration) Option
- func WithQueuePollingDuration(duration time.Duration) Option
- func WithQueueURL(url *string) Option
- func WithSQSClient(client *sqs.SQS) Option
- func WithWaitTimeSecond(waitTime int64) Option
- type SQSAPI
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerClient ¶
type ConsumerClient struct {
// contains filtered or unexported fields
}
ConsumerClient encapsulates fields related to a client that consumes messages from a queue. It uses a logger for debugging, an instrumentation client for performance monitoring, and an SQS client to interact with the message queue.
func New ¶
func New(options ...Option) (*ConsumerClient, error)
The New function creates a new ConsumerClient instance with optional configuration options.
example: consumer := NewConsumerClient(
WithLogger(yourLogger), WithInstrumentationClient(yourInstrumentationClient), WithSQSClient(yourSQSClient), WithQueueURL(&yourQueueURL), WithDLQURL(&yourDLQURL), WithConcurrencyFactor(yourConcurrencyFactor), WithQueuePollingDuration(yourPollingDuration), WithMessageProcessTimeout(yourMessageTimeout), WithMessageHandler(yourMessageHandlerFunc), WithBackoffDuration(yourBackoffDuration), WithBatchSize(yourBatchSize), WithWaitTimeSecond(yourWaitTimeSeconds),
)
func (*ConsumerClient) Start ¶
func (c *ConsumerClient) Start()
Start initiates the polling of the SQS queue in a separate goroutine.
Example: poller := NewSQSPoller("us-west-2", "https://sqs.us-west-2.amazonaws.com/1234567890/myqueue",
"https://sqs.us-west-2.amazonaws.com/1234567890/mydlq", exampleHandler)
poller.Start()
func (*ConsumerClient) Stop ¶
func (c *ConsumerClient) Stop()
Stop sends a signal to the poller to stop polling.
Example: poller := NewSQSPoller("us-west-2", "https://sqs.us-west-2.amazonaws.com/1234567890/myqueue",
"https://sqs.us-west-2.amazonaws.com/1234567890/mydlq", exampleHandler)
poller.Start() time.Sleep(10 * time.Second) // Let it poll for 10 seconds poller.Stop()
func (*ConsumerClient) Validate ¶
func (c *ConsumerClient) Validate() error
Validate validates whether all the required parameters for the consumer client have been set. It checks if the `SqsClient`, `Logger`, `NewRelicClient`, `QueueUrl`, `ConcurrencyFactor`, `MessageProcessTimeout`, and `QueuePollingDuration` fields are not nil or zero. If any of these fields are nil or zero, it returns an error indicating that the consumer client is invalid.
type IConsumer ¶
type IConsumer interface { Start() Stop() }
IConsumer provides an interface for consuming messages either concurrently or in a naive, sequential manner. Start: Begins processing messages concurrently. Stop: Halts the processing of messages.
type MessageProcessorFunc ¶
MessageProcessorFunc defines the function type that processes messages from SQS. An error should be returned if processing fails, leading the message to be moved to the DLQ.
type Option ¶
type Option func(*ConsumerClient)
func WithBackoffDuration ¶
func WithBatchSize ¶
func WithConcurrencyFactor ¶
func WithDLQURL ¶
func WithInstrumentationClient ¶
func WithInstrumentationClient(ic *instrumentation.Client) Option
func WithLogger ¶
func WithMessageHandler ¶
func WithMessageHandler(handler MessageProcessorFunc) Option
func WithQueueURL ¶
func WithSQSClient ¶
func WithWaitTimeSecond ¶
type SQSAPI ¶
type SQSAPI interface { SendMessageWithContext(ctx aws.Context, input *sqs.SendMessageInput, opts ...request.Option) (*sqs.SendMessageOutput, error) ReceiveMessageWithContext(ctx aws.Context, input *sqs.ReceiveMessageInput, opts ...request.Option) (*sqs.ReceiveMessageOutput, error) DeleteMessageWithContext(ctx aws.Context, input *sqs.DeleteMessageInput, opts ...request.Option) (*sqs.DeleteMessageOutput, error) }