runsqs

package module
v3.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2024 License: Apache-2.0 Imports: 13 Imported by: 1

README

runsqs - Prepackaged Runtime Helper For AWS SQS

GoDoc

Overview

This project is a tool bundle for running a service that interacts with AWS SQS written in go. It comes with an opinionated choice of logger, metrics client, and configuration parsing. The benefits are a simple abstraction around the aws-sdk api for consuming from an SQS or producing to an SQS.

Quick Start

package main

import (
    "net/http"
    "fmt"

    "github.com/asecurityteam/runsqs"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/sqs"
)

type BasicConsumer string

func (m BasicConsumer) ConsumeMessage(ctx context.Context, message []byte) error {
    fmt.Println(string(message))
    fmt.Println(m)
    return nil
}

func main() {
    // create a new aws session, and establish a SQS instance to connect to.
    // aws new sessions by default reads AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY as environment variables to use
	var sesh = session.Must(session.NewSession())
	queue := sqs.New(sesh, &aws.Config{
		Region:     aws.String("us-west-2"),
		HTTPClient: http.DefaultClient,
		Endpoint:   aws.String("www.aws.com"),
    })



    consumer := runsqs.DefaultSQSQueueConsumer{
        Queue: queue,
        QueueURL: "www.aws.com/url/to/queue",
        MessageConsumer: BasicConsumer{"consooooom"},
        LogFn: runsqs.LoggerFromContext,
    }

    producer := runsqs.DefaultSQSProducer{
        Queue: queue,
        QueueURL: "www.aws.com/url/to/queue",
    }

    go producer.ProduceMessage([]byte("incoming sqs message"))

    // Run the SQS consumer.
    if err := consumer.StartConsuming(); err != nil {
		panic(err.Error())
    }

    // expected output:
    // "incoming sqs message"
    // "consooooom"
}

Status

This project is in incubation which the interfaces and implementations are subject to change.

Contributing

Building And Testing

We publish a docker image called SDCLI that bundles all of our build dependencies. It is used by the included Makefile to help make building and testing a bit easier. The following actions are available through the Makefile:

  • make dep

    Install the project dependencies into a vendor directory

  • make lint

    Run our static analysis suite

  • make test

    Run unit tests and generate a coverage artifact

  • make integration

    Run integration tests and generate a coverage artifact

  • make coverage

    Report the combined coverage for unit and integration tests

License

This project is licensed under Apache 2.0. See LICENSE.txt for details.

Contributing Agreement

Atlassian requires signing a contributor's agreement before we can accept a patch. If you are an individual you can fill out the individual CLA. If you are contributing on behalf of your company then please fill out the corporate CLA.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var LoggerFromContext = logevent.FromContext

LoggerFromContext is the concrete implementation of LogFn that should be used at runtime.

View Source
var StatFromContext = xstats.FromContext

StatFromContext is the concrete implementation of StatFn that should be used at runtime.

Functions

This section is empty.

Types

type ConsumerChain

type ConsumerChain []ConsumerDecorator

ConsumerChain is an ordered collection of ConsumerDecorator.

func (ConsumerChain) Apply

Apply wraps the given SQSMessageConsumer with the Decorator chain.

type ConsumerDecorator

type ConsumerDecorator func(SQSMessageConsumer) SQSMessageConsumer

ConsumerDecorator is a named type for any function that takes a SQSMessageConsumer and returns a SQSMessageConsumer.

type DefaultSQSProducer

type DefaultSQSProducer struct {
	Queue sqsiface.SQSAPI
	// contains filtered or unexported fields
}

DefaultSQSProducer is a basic sqs producer

func NewDefaultSQSProducer

func NewDefaultSQSProducer(queue sqsiface.SQSAPI, url string) *DefaultSQSProducer

NewDefaultSQSProducer initializes a new DefaultSQSProducer

func (*DefaultSQSProducer) BatchProduceMessage

func (producer *DefaultSQSProducer) BatchProduceMessage(ctx context.Context, messageInput *sqs.SendMessageBatchInput) (*sqs.SendMessageBatchOutput, error)

BatchProduceMessage produces a batch of messages to the configured sqs queue, along with setting the queueURL to use

func (*DefaultSQSProducer) ProduceMessage

func (producer *DefaultSQSProducer) ProduceMessage(ctx context.Context, messageInput *sqs.SendMessageInput) error

ProduceMessage produces a message to the configured sqs queue, along with setting the queueURL to use

func (*DefaultSQSProducer) QueueURL

func (producer *DefaultSQSProducer) QueueURL() string

QueueURL retrieves the queue URL used by the DefaultSQSProducer

type DefaultSQSProducerComponent

type DefaultSQSProducerComponent struct {
}

DefaultSQSProducerComponent enables creating configured Component

func NewDefaultSQSProducerComponent

func NewDefaultSQSProducerComponent() *DefaultSQSProducerComponent

NewDefaultSQSProducerComponent generates a new DefaultSQSQueueConsumerComponent

func (*DefaultSQSProducerComponent) New

New creates a configured DefaultSQSQueueConsumer

func (*DefaultSQSProducerComponent) Settings

Settings generates the default configuration for DefaultSQSProducerComponent

type DefaultSQSProducerConfig

type DefaultSQSProducerConfig struct {
	AWSEndpoint string
	QueueURL    string
	QueueRegion string
}

DefaultSQSProducerConfig represents the configuration to configure DefaultSQSProducer

func (*DefaultSQSProducerConfig) Name

Name of the configuration

type DefaultSQSQueueConsumer

type DefaultSQSQueueConsumer struct {
	Queue    sqsiface.SQSAPI
	LogFn    LogFn
	QueueURL string

	MessageConsumer SQSMessageConsumer
	// PollInterval defaults to 1 second
	PollInterval time.Duration
	// contains filtered or unexported fields
}

DefaultSQSQueueConsumer is a naive implementation of an SQSConsumer. This implementation has no support for retries on nonpermanent failures; the result of every message consumption is followed by a deletion of the message. Furthermore, this implementation does not support concurrent processing of messages; messages are processed sequentially.

func (*DefaultSQSQueueConsumer) GetSQSMessageConsumer

func (m *DefaultSQSQueueConsumer) GetSQSMessageConsumer() SQSMessageConsumer

GetSQSMessageConsumer returns the MessageConsumer field. This function implies that DefaultSQSQueueConsumer MUST have a MessageConsumer defined.

func (*DefaultSQSQueueConsumer) StartConsuming

func (m *DefaultSQSQueueConsumer) StartConsuming(ctx context.Context) error

StartConsuming starts consuming from the configured SQS queue

func (*DefaultSQSQueueConsumer) StopConsuming

func (m *DefaultSQSQueueConsumer) StopConsuming(ctx context.Context) error

StopConsuming stops this DefaultSQSQueueConsumer consuming from the SQS queue

type DefaultSQSQueueConsumerComponent

type DefaultSQSQueueConsumerComponent struct {
}

DefaultSQSQueueConsumerComponent enables creating configured Component

func NewDefaultSQSQueueConsumerComponent

func NewDefaultSQSQueueConsumerComponent() *DefaultSQSQueueConsumerComponent

NewDefaultSQSQueueConsumerComponent generates a new DefaultSQSQueueConsumerComponent

func (*DefaultSQSQueueConsumerComponent) New

New creates a configured DefaultSQSQueueConsumer

func (*DefaultSQSQueueConsumerComponent) Settings

Settings generates the default configuration for DefaultSQSQueueConsumerComponent

type DefaultSQSQueueConsumerConfig

type DefaultSQSQueueConsumerConfig struct {
	AWSEndpoint  string
	QueueURL     string
	QueueRegion  string
	PollInterval time.Duration
}

DefaultSQSQueueConsumerConfig represents the configuration to configure DefaultSQSQueueConsumer

func (*DefaultSQSQueueConsumerConfig) Name

Name of the configuration

type LogFn

type LogFn func(context.Context) Logger

LogFn is the type that should be accepted by components that intend to log content using the context logger.

type Logger

type Logger = logevent.Logger

Logger is the project logging client interface. It is currently an alias to the logevent project.

type ProducerChain

type ProducerChain []ProducerDecorator

ProducerChain is an ordered collection of Decorators.

func (ProducerChain) Apply

func (c ProducerChain) Apply(base SQSProducer) SQSProducer

Apply wraps the given SQSProducer with the Decorator chain.

type ProducerDecorator

type ProducerDecorator func(SQSProducer) SQSProducer

ProducerDecorator is a named type for any function that takes a SQSProducer and returns a SQSProducer.

type RetryableConsumerError

type RetryableConsumerError struct {
	WrappedError      error
	VisibilityTimeout int64
}

RetryableConsumerError represents a possible error type an SQSMessageConsumer could return on a call to ConsumeMessage. Users can set VisibilityTimeout, and implementors of SQSConsumer can leverage VisibilityTimeout to change the visibility of an sqs message for retry purposes.

func (RetryableConsumerError) Error

func (e RetryableConsumerError) Error() string

Error implements type error

type SQSConsumer

type SQSConsumer interface {
	StartConsuming(ctx context.Context) error
	StopConsuming(ctx context.Context) error
	GetSQSMessageConsumer() SQSMessageConsumer
}

SQSConsumer is an interface that represents an aws sqs queue worker. Implementers of SQSConsumer are responsible for: - SQS connectivity - Start and Stop consumption - error handling

type SQSMessageConsumer

type SQSMessageConsumer interface {
	ConsumeMessage(ctx context.Context, message *sqs.Message) SQSMessageConsumerError
	// DeadLetter will be called when MaxRetries is exhausted, only in the SmartSQSConsumer
	DeadLetter(ctx context.Context, message *sqs.Message)
}

SQSMessageConsumer is an interface that defines how a message should be consumer. Users are responsible for unmarshalling messages themselves, and returning errors.

type SQSMessageConsumerError

type SQSMessageConsumerError interface {
	IsRetryable() bool
	Error() string
	RetryAfter() int64
}

SQSMessageConsumerError represents an error that can be used to indicate to the consumer that an error should be retried. Note: RetryAfter should be expressed in seconds

type SQSProducer

type SQSProducer interface {
	QueueURL() string
	BatchProduceMessage(ctx context.Context, messageBatchInput *sqs.SendMessageBatchInput) (*sqs.SendMessageBatchOutput, error)
	ProduceMessage(ctx context.Context, messageInput *sqs.SendMessageInput) error
}

SQSProducer is an interface for producing messages to an aws sqs instance. Implementors are responsible for placing messages on an sqs, and also: - SQS connectivity - error handling - constructing the input *sqs.SendMessageInput

type SmartSQSConsumer

type SmartSQSConsumer struct {
	Queue    sqsiface.SQSAPI
	LogFn    LogFn
	QueueURL string

	MessageConsumer     SQSMessageConsumer
	NumWorkers          uint64
	MessagePoolSize     uint64
	MaxNumberOfMessages uint64
	MaxRetries          uint64
	PollInterval        time.Duration
	// contains filtered or unexported fields
}

SmartSQSConsumer is an implementation of an SQSConsumer. This implementation supports... - retryable and non-retryable errors. - a maximum number of retries to be placed on a retryable sqs message - concurrent workers

func (*SmartSQSConsumer) GetSQSMessageConsumer

func (m *SmartSQSConsumer) GetSQSMessageConsumer() SQSMessageConsumer

GetSQSMessageConsumer returns the MessageConsumer field. This function implies that DefaultSQSQueueConsumer MUST have a MessageConsumer defined.

func (*SmartSQSConsumer) StartConsuming

func (m *SmartSQSConsumer) StartConsuming(ctx context.Context) error

StartConsuming starts consuming from the configured SQS queue

func (*SmartSQSConsumer) StopConsuming

func (m *SmartSQSConsumer) StopConsuming(ctx context.Context) error

StopConsuming stops this DefaultSQSQueueConsumer consuming from the SQS queue

type SmartSQSQueueConsumerComponent

type SmartSQSQueueConsumerComponent struct {
}

SmartSQSQueueConsumerComponent enables creating configured Component

func NewSmartSQSQueueConsumerComponent

func NewSmartSQSQueueConsumerComponent() *SmartSQSQueueConsumerComponent

NewSmartSQSQueueConsumerComponent generates a new SmartSQSQueueConsumerComponent

func (*SmartSQSQueueConsumerComponent) New

New creates a configured SmartSQSConsumer

func (*SmartSQSQueueConsumerComponent) Settings

Settings generates the default configuration for DefaultSQSQueueConsumerComponent

type SmartSQSQueueConsumerConfig

type SmartSQSQueueConsumerConfig struct {
	AWSEndpoint         string
	QueueURL            string
	QueueRegion         string
	NumWorkers          uint64
	MessagePoolSize     uint64
	MaxNumberOfMessages uint64
	MaxRetries          uint64
	PollInterval        time.Duration
}

SmartSQSQueueConsumerConfig represents the configuration to configure SmartSQSQueueConsumer

func (*SmartSQSQueueConsumerConfig) Name

Name of the configuration

type Stat

type Stat = xstats.XStater

Stat is the project metrics client interface. it is currently an alias for xstats.XStater.

type StatFn

type StatFn func(context.Context) Stat

StatFn is the type that should be accepted by components that intend to emit custom metrics using the context stat client.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL