sqs

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2025 License: BSD-3-Clause-Clear Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var NewSQSReceiverFunc = func() (awsinterfaces.SQSReceiver, *session.Session, error) {
	sess, err := session.NewSession()
	if err != nil {
		return nil, nil, err
	}

	conf := &aws.Config{
		Credentials: credentials.NewCredentials(&credentials.EnvProvider{}),
		Region:      aws.String("us-west-2"),
		Retryer: retryer.DefaultRetryer{
			Retryer: client.DefaultRetryer{NumMaxRetries: 7},
			Delay:   2 * time.Second,
		},
	}

	if r := os.Getenv("AWS_REGION"); r != "" {
		conf.Region = aws.String(r)
	}

	if url := os.Getenv("SQS_ENDPOINT"); url != "" {
		conf.Endpoint = aws.String(url)
	}

	return sqs.New(sess, conf), sess, nil
}

DI to support mocking

View Source
var NewSQSSenderFunc = func() (awsinterfaces.SQSSender, error) {
	sess, err := session.NewSession()
	if err != nil {
		return nil, err
	}

	conf := &aws.Config{
		Credentials: credentials.NewCredentials(&credentials.EnvProvider{}),
	}

	if r := os.Getenv("AWS_REGION"); r != "" {
		conf.Region = aws.String(r)
	}
	if url := os.Getenv("SQS_ENDPOINT"); url != "" {
		conf.Endpoint = aws.String(url)
	}

	return sqs.New(sess, conf), nil
}

DI to support mocking

Functions

func NewBatchedTopic

func NewBatchedTopic(queueURL string, timeout ...time.Duration) (batching.Batcher, error)

func NewServer

func NewServer(queueURL string, cl int, retryTimeout int64, opts ...Option) (msg.Server, error)

NewServer creates and initializes a new Server using queueURL to a SQS queue `cl` represents the number of concurrent message receives (10 msgs each).

AWS credentials (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) are assumed to be set as environment variables.

SQS_ENDPOINT can be set as an environment variable in order to override the awsinterfaces.Client's Configured Endpoint

func NewTopic

func NewTopic(queueURL string) (msg.Topic, error)

NewTopic returns an sqs.Topic with fully configured SQSAPI

Types

type ErrThrottleServer

type ErrThrottleServer struct {
	Message  string
	Duration time.Duration
}

ErrThrottleServer signals that the server should sleep for the duration time before resuming work.

func (ErrThrottleServer) Error

func (e ErrThrottleServer) Error() string

type MessageWriter

type MessageWriter struct {
	msg.MessageWriter
	// contains filtered or unexported fields
}

MessageWriter writes data to a SQS Queue.

func (*MessageWriter) Attributes

func (w *MessageWriter) Attributes() *msg.Attributes

Attributes returns the msg.Attributes associated with the MessageWriter

func (*MessageWriter) Close

func (w *MessageWriter) Close() error

Close converts it's buffered data and attributes to an SQS message and publishes it to a queue.

Once a MessageWriter is closed, it cannot be used again.

func (*MessageWriter) SetDelay

func (w *MessageWriter) SetDelay(delay time.Duration)

SetDelay sets a delay on the Message. The delay must be between 0 and 900 seconds, according to the awsinterfaces sdk.

func (*MessageWriter) Write

func (w *MessageWriter) Write(p []byte) (int, error)

Write writes data to the MessageWriter's internal buffer.

Once a MessageWriter is closed, it cannot be used again.

type Option

type Option func(*Server) error

Option is the signature that modifies a `Server` to set some configuration

func WithCustomRetryer

func WithCustomRetryer(r request.Retryer) Option

WithCustomRetryer sets a custom `Retryer` to use on the SQS client.

func WithRetries

func WithRetries(delay time.Duration, max int) Option

WithRetries makes the `Server` retry on credential errors until `max` attempts with `delay` seconds between requests. This is needed in scenarios where credentials are automatically generated and the program starts before AWS finishes propagating them

func WithRetryJitter

func WithRetryJitter(retryJitter int64) Option

WithRetryJitter sets a value for Jitter on the VisibilityTimeout. With jitter applied every message that needs to be retried will have a visibility timeout in the interval: [(visibilityTimeout - jitter), visibilityTimeout + jitter)]

type Server

type Server struct {
	// AWS QueueURL
	QueueURL string
	// Concrete instance of SQSAPI
	Svc awsinterfaces.SQSReceiver
	// contains filtered or unexported fields
}

Server represents a msg.Server for receiving messages from an AWS SQS Queue.

func (*Server) Serve

func (s *Server) Serve(r msg.Receiver) error

Serve continuously receives messages from an SQS queue, creates a message, and calls Receive on `r`. Serve is blocking and will not return until Shutdown is called on the Server.

NewServer should be used prior to running Serve.

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

Shutdown stops the receipt of new messages and waits for routines to complete or the passed in ctx to be canceled. msg.ErrServerClosed will be returned upon a clean shutdown. Otherwise, the passed ctx's Error will be returned.

type Topic

type Topic struct {
	QueueURL string
	Svc      awsinterfaces.SQSSender
	Batcher  batching.Batcher
}

Topic configures and manages SQSAPI for sqs.MessageWriter

func (*Topic) NewWriter

func (t *Topic) NewWriter(ctx context.Context) msg.MessageWriter

NewWriter returns a new sqs.MessageWriter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL