Documentation ¶
Overview ¶
Package mq provides primitives for consuming SQS queues.
Example ¶
package main import ( "context" "fmt" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/sqs" mq "github.com/remind101/mq-go" ) func main() { queueURL := "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue" h := mq.HandlerFunc(func(m *mq.Message) error { fmt.Printf("Received message: %s", aws.StringValue(m.SQSMessage.Body)) // Returning no error signifies the message was processed successfully. // The Server will queue the message for deletion. return nil }) // Configure mq.Server s := mq.NewServer(queueURL, h) // Start a loop to receive SQS messages and pass them to the Handler. s.Start() defer s.Shutdown(context.Background()) // Start a publisher p := mq.NewPublisher(queueURL) p.Start() defer p.Shutdown(context.Background()) // Publish messages (will be batched). p.Publish(&sqs.SendMessageBatchRequestEntry{ MessageBody: aws.String("Hello"), }) p.Publish(&sqs.SendMessageBatchRequestEntry{ MessageBody: aws.String("World!"), }) }
Output:
Example (GracefulShutdown) ¶
package main import ( "context" "fmt" "os" "os/signal" "syscall" "time" "github.com/aws/aws-sdk-go/aws" mq "github.com/remind101/mq-go" ) func main() { queueURL := "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue" h := mq.HandlerFunc(func(m *mq.Message) error { fmt.Printf("Received message: %s", aws.StringValue(m.SQSMessage.Body)) // Returning no error signifies the message was processed successfully. // The Server will queue the message for deletion. return nil }) // Configure mq.Server s := mq.NewServer(queueURL, h) // Handle SIGINT and SIGTERM gracefully. go func() { sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) <-sigCh ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // We received an interrupt signal, shut down gracefully. if err := s.Shutdown(ctx); err != nil { fmt.Printf("SQS server shutdown: %v\n", err) } }() // Start a loop to receive SQS messages and pass them to the Handler. s.Start() }
Output:
Example (Router) ¶
package main import ( "fmt" "github.com/aws/aws-sdk-go/service/sqs" "github.com/aws/aws-sdk-go/service/sqs/sqsiface" "github.com/aws/aws-sdk-go/aws" mq "github.com/remind101/mq-go" ) func main() { queueURL := "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue" r := mq.NewRouter() r.Handle("foo-jobs", mq.HandlerFunc(func(m *mq.Message) error { fmt.Printf("Received foo message: %s", aws.StringValue(m.SQSMessage.Body)) return nil })) r.Handle("bar-jobs", mq.HandlerFunc(func(m *mq.Message) error { fmt.Printf("Received bar message: %s", aws.StringValue(m.SQSMessage.Body)) return nil })) // Configure mq.Server s := mq.NewServer(queueURL, r) // Start a loop to receive SQS messages and pass them to the Handler. go s.Start() // Publish a foo message publish(s.Client, queueURL, "foo-jobs", "this will route to foo-jobs handler func") // Publish a bar message publish(s.Client, queueURL, "bar-jobs", "this will route to bar-jobs handler func") } func publish(client sqsiface.SQSAPI, queueURL, route, message string) error { input := &sqs.SendMessageInput{ QueueUrl: aws.String(queueURL), MessageAttributes: map[string]*sqs.MessageAttributeValue{ mq.MessageAttributeNameRoute: &sqs.MessageAttributeValue{ DataType: aws.String("String"), StringValue: aws.String(route), }, }, MessageBody: aws.String(message), } _, err := client.SendMessage(input) return err }
Output:
Index ¶
- Constants
- Variables
- func ChangeVisibilityWithRetryPolicy(m *Message) error
- func ServerDefaults(s *Server)
- func WithClient(c sqsiface.SQSAPI) func(s *Server)
- func WithConcurrency(c int) func(s *Server)
- type BoundedProcessor
- type Handler
- type HandlerFunc
- type Logger
- type Message
- type PartitionedProcessor
- type Processor
- type Publisher
- type PublisherOpt
- type RetryPolicy
- type Router
- type Server
- type ServerGroup
- type UnBoundedProcessor
Examples ¶
Constants ¶
const ( // DefaultConcurrency is the default concurrency for the Server. DefaultConcurrency = 1 // DefaultMaxNumberOfMessages defaults to the maximum number of messages the // Server can request when receiving messages. DefaultMaxNumberOfMessages = 10 // DefaultWaitTimeSeconds is the default WaitTimeSeconds used when receiving // messages. DefaultWaitTimeSeconds = 1 // DefaultVisibilityTimeout is the default VisibilityTimeout used when // receiving messages in seconds. DefaultVisibilityTimeout = 30 // DefaultDeletionInterval is the default interval at which messages pending // deletion are batch deleted (if number of pending has not reached // BatchDeleteMaxMessages). DefaultDeletionInterval = 10 * time.Second // DefaultBatchDeleteMaxMessages defaults to the the maximum allowed number // of messages in a batch delete request. DefaultBatchDeleteMaxMessages = 10 )
const DefaultPublishInterval = 1 * time.Second
DefaultPublishInterval is the default interval the Publisher will send messages if the batch is not full.
const MaxVisibilityTimeout = 43200 // 12 hours
MaxVisibilityTimeout is the maximum allowed VisibilityTimeout by SQS.
const MessageAttributeNamePartitionKey = "partition_key"
MessageAttributeNamePartitionKey is the messages attribute used to determine the partition to process the message in.
const MessageAttributeNameRoute = "route"
MessageAttributeNameRoute is a MessageAttribute name used as a routing key by the Router.
Variables ¶
var DefaultRetryPolicy = &defaultRetryPolicy{}
DefaultRetryPolicy increases the Message VisibilityTimeout exponentially based on the received count up to MaxVisibilityTimeout and MaxReceives
var PublisherDefaults = func(p *Publisher) { p.Client = sqs.New(session.New()) p.PublishInterval = DefaultPublishInterval p.BatchMaxMessages = DefaultMaxNumberOfMessages p.OutputHandler = func(out *sqs.SendMessageBatchOutput, err error) { if err != nil { fmt.Println(err.Error()) } if len(out.Failed) > 0 { for _, entry := range out.Failed { fmt.Printf("Failed message send: %+v\n", entry) } } } p.Logger = &discardLogger{} }
PublisherDefaults contains the default configuration for a new Publisher.
var WithPartitionedProcessor = func(s *Server) { s.Processor = &PartitionedProcessor{s} }
WithPartitionedProcessor configures a Server with a partitioned Processor.
Functions ¶
func ChangeVisibilityWithRetryPolicy ¶
ChangeVisibilityWithRetryPolicy will change the visibility of a message based on the error of message retry policy.
If the delay is equal to the zero, this is a no op.
func ServerDefaults ¶
func ServerDefaults(s *Server)
ServerDefaults is used by NewServer to initialize a Server with defaults.
func WithClient ¶
WithClient configures a Server with a custom sqs Client.
func WithConcurrency ¶
WithConcurrency configures a Server with c Concurrency.
Types ¶
type BoundedProcessor ¶
type BoundedProcessor struct {
Server *Server
}
BoundedProcessor is the default message processor. It creates Server.Concurrency goroutines that all consume from the messages channel.
type Handler ¶
A Handler processes a Message.
func RootHandler ¶
RootHandler is a root handler responsible for adding delay in messages that have error'd.
Queues MUST have a dead letter queue or else messages that cannot succeed will never be removed from the queue.
type HandlerFunc ¶
HandlerFunc is an adaptor to allow the use of ordinary functions as message Handlers.
func (HandlerFunc) HandleMessage ¶
func (h HandlerFunc) HandleMessage(m *Message) error
HandleMessage satisfies the Handler interface.
type Logger ¶
type Logger interface {
Println(...interface{})
}
Logger defines a simple interface to support debug logging in the Server.
type Message ¶
type Message struct { QueueURL string SQSMessage *sqs.Message RetryPolicy RetryPolicy // contains filtered or unexported fields }
Message wraps an sqs.Message.
func NewMessage ¶
NewMessage returns a fully initialized Message.
func (*Message) ChangeVisibility ¶
ChangeVisibility changes the VisibilityTimeout to timeout seconds.
type PartitionedProcessor ¶
type PartitionedProcessor struct {
Server *Server
}
PartitionedProcessor is a processor that creates Server.Concurrency goroutines to process messages except each message is partitioned to the same goroutine based on the a consistent hash of the message's partition key. Messages with the same partition key are guaranteed to be processed by the same goroutine.
type Processor ¶
type Processor interface { // Process processes messages. A well behaved processor will: // * Receive messages from the messagesCh in a loop until that channel is // closed. // * Send messages to the deletionsCh if message was successfully processed. // * Send errors to the errorsCh. // * Close the done channel when finished processing. Process(messagesCh <-chan *Message, deletionsCh chan<- *Message, errorsCh chan<- error, done chan struct{}) }
Processor defines an interface for processing messages.
type Publisher ¶
type Publisher struct { QueueURL string Client sqsiface.SQSAPI PublishInterval time.Duration BatchMaxMessages int OutputHandler func(*sqs.SendMessageBatchOutput, error) Logger Logger // contains filtered or unexported fields }
Publisher is a publisher that efficiently sends messages to a single SQS Queue. It maintains a buffer of messages that is sent to SQS when it is full or when the publish interval is reached.
func NewPublisher ¶
func NewPublisher(queueURL string, opts ...PublisherOpt) *Publisher
NewPublisher returns a new Publisher with sensible defaults.
func (*Publisher) Publish ¶
func (p *Publisher) Publish(entry *sqs.SendMessageBatchRequestEntry)
Publish adds entry to the internal messages buffer.
type PublisherOpt ¶
type PublisherOpt func(*Publisher)
PublisherOpt defines a function that configures a Publisher.
type RetryPolicy ¶
type RetryPolicy interface { // Amount to delay the message visibility in seconds from the time the // message was first received, based on the number of times it has been // received so far. Delay(receiveCount int) *int64 // Seconds }
RetryPolicy defines an interface to determine when to retry a Message.
type Router ¶
type Router struct { sync.Mutex // Resolver maps a Message to a string identifier used to match to a registered Handler. The // default implementation returns a MessageAttribute named "route". Resolver func(*Message) (string, bool) // contains filtered or unexported fields }
Router will route a message based on MessageAttributes to other registered Handlers.
func (*Router) HandleMessage ¶
HandleMessage satisfies the Handler interface.
type Server ¶
type Server struct { QueueURL string Client sqsiface.SQSAPI Handler Handler ErrorHandler func(error) Concurrency int AttributeNames []*string MessageAttributeNames []*string MaxNumberOfMessages *int64 WaitTimeSeconds *int64 VisibilityTimeout *int64 BatchDeleteMaxMessages int DeletionInterval time.Duration Logger Logger Processor Processor // contains filtered or unexported fields }
Server is responsible for running the request loop to receive messages from a single SQS Queue. It manages the message processing pipeline.
There are three sections of the processing pipeline:
1. Receiving messages The Server starts a single goroutine to batch request messages from QueueURL. The frequency of this call is controlled with WaitTimeSeconds. Messages are sent to an unbuffered channel. This ensures that the Server does not continue requesting messages is the processing goroutines are unable to keep up.
2. Processing messages The Server starts one or more goroutines for processing messages from the messages channel. Concurrency is controlled by Server.Processor and Server.Concurrency. These goroutines simply pass messages to the Handler. If the Handler returns no error, the message will be sent to the deletions channel.
3. Deleting messages The Server starts a single goroutine to batch delete processed messages. It will delete messages when the batch size is reached or if no deletions have occurred within an interval. This interval must be smaller than the VisibilityTimeout or messages could be received again before deletion occurs.
On shutdown, the receiving loop ends, and the messages channel used by processing loops is closed.
Once processing loops have drained the messages channel and finished processing, they will signal to the deletion goroutine to finish.
When the deletion goroutine receives the signal, the deletion loop will drain the deletions channel and after finishing, close the done channel, signaling that the Server has shutdown gracefully.
type ServerGroup ¶
type ServerGroup struct {
Servers []*Server
}
ServerGroup represents a list of Servers.
type UnBoundedProcessor ¶
type UnBoundedProcessor struct {
Server *Server
}
UnBoundedProcessor is a message processor that creates a new goroutine to process each message. It ignores the Server.Concurrency value.