Documentation ¶
Overview ¶
Package sqsprocessor contains an implementation of an sqs processor, similar in design to the provided pubsub client in the gcloud go sdk
The main structure is the Processor, which handles spawning, managing and feeding a pool of workers which execute a provided ProcessFunc over each message they receive.
Basic Usage ¶
c := sqs.NewFromConfig(cfg) p := NewProcessor(c, ProcessorConfig{ NumWorkers: 10, Backoff: time.Second, Receive: sqs.ReceiveMessageInput{ QueueUrl: sqsQueueURL, MaxNumberOfMessages: 10, VisibilityTimeout: 2, WaitTimeSeconds: 1, }, }) ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() p.Process(ctx, func(ctx context.Context, message types.Message) ProcessResult { if message.Body == nil { // Delete bad messages from queue return ProcessResultAck } if *msg.Body == "good" { // Happy path return ProcessResultAck } // Sad path return ProcessResultNack }) }() wg.Add(1) go func() { defer wg.Done() for err := range p.Errors() { log.Printf("received error from processor, %v\n", err) } }() sigC := make(chan os.Signal, 1) signal.Notify(sigC, syscall.SIGTERM, syscall.SIGINT) <-sigC log.Print("Receieved signal to quit, stopping processor") cancel() wg.Wait() log.Print("Processor stopped, exiting")
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ErrMessageExpired ¶
type ErrMessageExpired struct {
// contains filtered or unexported fields
}
ErrMessageExpired is returned when a message with an expired deadline is encountered and processing is abandoned
func (ErrMessageExpired) Error ¶
func (e ErrMessageExpired) Error() string
type Middleware ¶ added in v0.2.0
type Middleware func(ProcessFunc) ProcessFunc
type ProcessFunc ¶
type ProcessFunc func(ctx context.Context, msg types.Message) ProcessResult
ProcessFunc is the signature of functions the user provides to process each message received off the queue
type ProcessResult ¶
type ProcessResult uint8
ProcessResult is an enum used to signal success or failure when processing a message in a ProcessFunc
const ( /* ProcessResultNack indicates that the ProcessFunc either does not want to process a message or has failed to, upon receiving this, the Processor expedites the re-processing of the message by making it visable in the queue */ ProcessResultNack ProcessResult = iota /* ProcessResultAck indicates that the ProcessFunc was successful in processing the message. Upon receiving this, the Processor deletes the message from the queue to prevent re-delivery */ ProcessResultAck )
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor is the struct which orchestrates polling for messages as well as starting and feeding a configured number of workers in a pool
func New ¶ added in v0.1.4
func New(c SQSClienter, config ProcessorConfig) *Processor
New returns a pointer to a new Processor given a config and sqs client
type ProcessorConfig ¶
type ProcessorConfig struct { // NumWorkers is the number of worker // goroutines spawned, managed and // used by the Processor to process any // received messages NumWorkers int // Backoff is the amount of time the // Processor will block before polling // for new messages if none were received // in the previous call Backoff time.Duration // TODO abstract these? Receive sqs.ReceiveMessageInput ReceiveOptions []func(*sqs.Options) }
type SQSClienter ¶
type SQSClienter interface { ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) ChangeMessageVisibility(ctx context.Context, params *sqs.ChangeMessageVisibilityInput, optFns ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityOutput, error) }
SQSClienter encapsulates all sqs methods a Processor will use