Documentation ¶
Index ¶
- Variables
- func NewBatchedTopic(queueURL string, timeout ...time.Duration) (batching.Batcher, error)
- func NewServer(queueURL string, cl int, retryTimeout int64, opts ...Option) (msg.Server, error)
- func NewTopic(queueURL string) (msg.Topic, error)
- type ErrThrottleServer
- type MessageWriter
- type Option
- type Server
- type Topic
Constants ¶
This section is empty.
Variables ¶
var NewSQSReceiverFunc = func() (awsinterfaces.SQSReceiver, *session.Session, error) { sess, err := session.NewSession() if err != nil { return nil, nil, err } conf := &aws.Config{ Credentials: credentials.NewCredentials(&credentials.EnvProvider{}), Region: aws.String("us-west-2"), Retryer: retryer.DefaultRetryer{ Retryer: client.DefaultRetryer{NumMaxRetries: 7}, Delay: 2 * time.Second, }, } if r := os.Getenv("AWS_REGION"); r != "" { conf.Region = aws.String(r) } if url := os.Getenv("SQS_ENDPOINT"); url != "" { conf.Endpoint = aws.String(url) } return sqs.New(sess, conf), sess, nil }
DI to support mocking
var NewSQSSenderFunc = func() (awsinterfaces.SQSSender, error) { sess, err := session.NewSession() if err != nil { return nil, err } conf := &aws.Config{ Credentials: credentials.NewCredentials(&credentials.EnvProvider{}), } if r := os.Getenv("AWS_REGION"); r != "" { conf.Region = aws.String(r) } if url := os.Getenv("SQS_ENDPOINT"); url != "" { conf.Endpoint = aws.String(url) } return sqs.New(sess, conf), nil }
DI to support mocking
Functions ¶
func NewBatchedTopic ¶
func NewServer ¶
NewServer creates and initializes a new Server using queueURL to a SQS queue `cl` represents the number of concurrent message receives (10 msgs each).
AWS credentials (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) are assumed to be set as environment variables.
SQS_ENDPOINT can be set as an environment variable in order to override the awsinterfaces.Client's Configured Endpoint
Types ¶
type ErrThrottleServer ¶
ErrThrottleServer signals that the server should sleep for the duration time before resuming work.
func (ErrThrottleServer) Error ¶
func (e ErrThrottleServer) Error() string
type MessageWriter ¶
type MessageWriter struct { msg.MessageWriter // contains filtered or unexported fields }
MessageWriter writes data to a SQS Queue.
func (*MessageWriter) Attributes ¶
func (w *MessageWriter) Attributes() *msg.Attributes
Attributes returns the msg.Attributes associated with the MessageWriter
func (*MessageWriter) Close ¶
func (w *MessageWriter) Close() error
Close converts it's buffered data and attributes to an SQS message and publishes it to a queue.
Once a MessageWriter is closed, it cannot be used again.
func (*MessageWriter) SetDelay ¶
func (w *MessageWriter) SetDelay(delay time.Duration)
SetDelay sets a delay on the Message. The delay must be between 0 and 900 seconds, according to the awsinterfaces sdk.
type Option ¶
Option is the signature that modifies a `Server` to set some configuration
func WithCustomRetryer ¶
WithCustomRetryer sets a custom `Retryer` to use on the SQS client.
func WithRetries ¶
WithRetries makes the `Server` retry on credential errors until `max` attempts with `delay` seconds between requests. This is needed in scenarios where credentials are automatically generated and the program starts before AWS finishes propagating them
func WithRetryJitter ¶
WithRetryJitter sets a value for Jitter on the VisibilityTimeout. With jitter applied every message that needs to be retried will have a visibility timeout in the interval: [(visibilityTimeout - jitter), visibilityTimeout + jitter)]
type Server ¶
type Server struct { // AWS QueueURL QueueURL string // Concrete instance of SQSAPI Svc awsinterfaces.SQSReceiver // contains filtered or unexported fields }
Server represents a msg.Server for receiving messages from an AWS SQS Queue.
func (*Server) Serve ¶
Serve continuously receives messages from an SQS queue, creates a message, and calls Receive on `r`. Serve is blocking and will not return until Shutdown is called on the Server.
NewServer should be used prior to running Serve.