Documentation
¶
Overview ¶
package sqspoller is a simple queue polling framework, designed specifically to work with AWS SQS.
Index ¶
- Constants
- Variables
- type ErrorHandler
- type Message
- type MessageHandler
- type MessageOutput
- type Middleware
- type Poller
- func (p *Poller) OnError(handler ErrorHandler)
- func (p *Poller) OnMessage(handler MessageHandler, middleware ...Middleware)
- func (p *Poller) ReceiveMessageParams(input *sqs.ReceiveMessageInput, opts ...request.Option)
- func (p *Poller) Run() error
- func (p *Poller) SetHandlerTimeout(t time.Duration)
- func (p *Poller) SetIdlePollInterval(t time.Duration)
- func (p *Poller) SetRequestTimeout(t time.Duration)
- func (p *Poller) ShutdownAfter(t time.Duration) error
- func (p *Poller) ShutdownGracefully() error
- func (p *Poller) ShutdownNow() error
- func (p *Poller) Use(middleware ...Middleware)
- type TrackingValue
Constants ¶
const TrackingKey ctxKey = 1
TrackingKey should be used to access the values on the context object of type *TrackingValue.
Variables ¶
var ( ErrNoMessageHandler = errors.New("ErrNoMessageHandler: no message handler set on poller instance") ErrNoErrorHandler = errors.New("ErrNoErrorHandler: no error handler set on poller instance") ErrNoReceiveMessageParams = errors.New("ErrNoReceiveMessageParams: no ReceiveMessage parameters have been set") ErrHandlerTimeout = errors.New("ErrHandlerTimeout: messageHandler took to long to process message") ErrRequestTimeout = errors.New("ErrRequestTimeout: requesting message from queue timed out") ErrShutdownNow = errors.New("ErrShutdownNow: poller was suddenly shutdown") ErrShutdownGraceful = errors.New("ErrShutdownGraceful: poller could not shutdown gracefully in time") ErrAlreadyShuttingDown = errors.New("ErrAlreadyShuttingDown: poller is already in the process of shutting down") ErrAlreadyRunning = errors.New("ErrAlreadyRunning: poller is already running") ErrIntegrityIssue = errors.New("ErrIntegrityIssue: unknown integrity issue") )
Functions ¶
This section is empty.
Types ¶
type ErrorHandler ¶ added in v0.4.0
ErrorHandler is a function which handlers errors returned from sqs.ReceiveMessageWithContext, it will only be invoked if the error is not nil. Returning nil from the ErrorHandler will allow the poller to continue, returning an error will cause the poller to exit.
Errors should be of type awserr.Error, if the sqs.ReceiveMessageWithContext function returns the errors as expected.
type Message ¶
Message is an individual message, contained within a MessageOutput, it provides methods to remove itself from the SQS queue.
type MessageHandler ¶ added in v0.4.0
MessageHandler is a function which handles the incoming SQS message.
The sqs Client used to instantiate the poller will also be made available to allow the user to perform standard sqs operations.
type MessageOutput ¶
type MessageOutput struct { *sqs.ReceiveMessageOutput Messages []*Message // contains filtered or unexported fields }
MessageOutput is contains the SQS ReceiveMessageOutput and is passed down to the MessageHandler when the Poller is running.
type Middleware ¶
type Middleware func(MessageHandler) MessageHandler
Middleware is a function which that wraps a MessageHandler to add functionality before or after the MessageHandler code.
func IgnoreEmptyResponses ¶
func IgnoreEmptyResponses() Middleware
IgnoreEmptyResponses stops the data from being passed down to the inner message handler, if there is no message to be handled.
type Poller ¶
type Poller struct { // Holds the time of the last poll request that was made. This can be checked // periodically, to confirm the Poller is running as expected. LastPollTime time.Time // Maximum time interval between each poll when poll requests are returning // empty responses. IdlePollInterval time.Duration // Current poll interval, this interval will reach the IdlePollInterval // upon enough consecutive empty poll requests. Once a successful message // response is received, the CurrentInterval will drop back down to 0. CurrentInterval time.Duration // Timeout on requesting for a new message from SQS. By default, this will // be 30 seconds, if it has not been set manually. RequestTimeout time.Duration // contains filtered or unexported fields }
Poller is an instance of the polling framework, it contains the SQS client and provides a simple API for polling an SQS queue.
func Default ¶
Default creates a new instance of the SQS Poller from an instance of sqs.SQS. It also comes set up with the recommend outerMiddleware plugged in.
func (*Poller) OnError ¶ added in v0.4.0
func (p *Poller) OnError(handler ErrorHandler)
OnError attaches an ErrorHandler to the Poller instance. It is the first line of defence against message request errors from SQS.
func (*Poller) OnMessage ¶ added in v0.4.0
func (p *Poller) OnMessage(handler MessageHandler, middleware ...Middleware)
OnMessage attaches a MessageHandler to the Poller instance, if a MessageHandler already exists on the Poller instance, it will be replaced. The Middleware supplied to OnMessage will be applied first before any global middleware set by Use().
func (*Poller) ReceiveMessageParams ¶ added in v0.3.0
func (p *Poller) ReceiveMessageParams(input *sqs.ReceiveMessageInput, opts ...request.Option)
ReceiveMessageParams accepts the same parameters as the SQS ReceiveMessage method. It configures how the poller receives new messages, the parameters must be set before the Poller is run.
func (*Poller) Run ¶
Run starts the poller, the poller will continuously poll SQS until an error is returned, or explicitly told to shutdown.
func (*Poller) SetHandlerTimeout ¶ added in v0.2.0
SetHandlerTimeout lets the user set the timeout for handling a message, if the messageHandler function cannot finish execution within this time frame, the MessageHandler will return ErrHandlerTimeout. The error can be caught and handled by custom middleware, so the user can choose to move onto the next poll request if they so wish.
func (*Poller) SetIdlePollInterval ¶ added in v0.4.0
SetIdlePollInterval sets the polling interval for when the queue is empty and poll requests are returning empty responses, leaving the handler idle.
This interval will be reached through an exponential back off starting at 1 second from when the first empty response is received after a non-empty response, consecutive empty responses will cause the interval to double each time until the set interval is reached. Once a successful response is returned, the interval drops back down to 0.
func (*Poller) SetRequestTimeout ¶ added in v0.4.2
SetRequestTimeout lets the user set the timeout on requesting for a new message from the SQS queue. If the timeout occurs, ErrRequestTimeout will be passed to the OnError handler. If the caller wishes to continue polling after a the timeout, the ErrRequestTimeout error must be whitelisted in the error handler.
func (*Poller) ShutdownAfter ¶
ShutdownAfter will attempt to shutdown gracefully, if graceful shutdown cannot be achieved within the given time frame, the Poller will exit, potentially leaking unhandled resources.
func (*Poller) ShutdownGracefully ¶
ShutdownGracefully gracefully shuts down the poller.
func (*Poller) ShutdownNow ¶
ShutdownNow shuts down the Poller instantly, potentially leaking unhandled resources.
func (*Poller) Use ¶
func (p *Poller) Use(middleware ...Middleware)
Use attaches global outerMiddleware to the Poller instance which will wrap any MessageHandler and MessageHandler specific outerMiddleware.
type TrackingValue ¶ added in v0.4.2
TrackingValue represents the values stored on the context object, for each poll the context object will store the time of message received and a trace ID.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
cmd
|
|
playground
The Playground is where you can run the poller locally against a containerized SQS service.
|
The Playground is where you can run the poller locally against a containerized SQS service. |
examples
|
|
internal
|
|