consumer

package
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 25, 2024 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMaxNumberOfMessages = 10
	DefaultVisibilityTimeout   = 30
	DefaultWaitTimeSeconds     = 20
	DefaultRegion              = "us-east-1"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Logger

type Logger interface {
	Log(message string, v ...interface{})
}

type SQSClient

type SQSClient struct {
	Client        SQSService
	ClientOptions *SQSClientOptions
	Logger        Logger
}

func New

func New(sqsService SQSService, options SQSClientOptions) *SQSClient

func (*SQSClient) GetQueueUrl

func (s *SQSClient) GetQueueUrl(ctx context.Context) *string

GetQueueUrl returns the URL of the queue based on the queue name

func (*SQSClient) GetQueues

func (s *SQSClient) GetQueues(ctx context.Context, prefix string) []string

GetQueues returns a list of queues based on the prefix

func (*SQSClient) Poll

func (s *SQSClient) Poll(ctx context.Context)

Poll starts polling messages from the queue

func (*SQSClient) ProcessMessage

func (s *SQSClient) ProcessMessage(ctx context.Context, sqsMessage types.Message, queueUrl string)

ProcessMessage deletes or changes the visibility of the message based on the Handle function return.

func (*SQSClient) ReceiveMessages

func (s *SQSClient) ReceiveMessages(ctx context.Context, queueUrl string, ch chan types.Message) error

ReceiveMessages polls messages from the queue

func (*SQSClient) SetLogger

func (s *SQSClient) SetLogger(logger Logger)

func (*SQSClient) Start

func (s *SQSClient) Start(ctx context.Context)

type SQSClientInterface

type SQSClientInterface interface {
	GetQueueUrl(context.Context) *string
	ReceiveMessages(ctx context.Context, queueUrl string, ch chan types.Message) error
	ProcessMessage(ctx context.Context, message types.Message, queueUrl string)
	Poll(context.Context)
	GetQueues(ctx context.Context, prefix string) []string
	Start(context.Context)
}

type SQSClientOptions

type SQSClientOptions struct {
	QueueName string
	// Handle is the function that will be called when a message is received.
	// Return true if you want to delete the message from the queue, otherwise, return false
	Handle   func(message *message.Message) bool
	Region   string
	Endpoint string
	// PrefixBased is a flag that indicates if the queue name is a prefix
	PrefixBased         bool
	MaxNumberOfMessages int32
	VisibilityTimeout   int32
	WaitTimeSeconds     int32
	LogLevel            string
	// BackoffMultiplier is the multiplier used to calculate the backoff time (visibility timeout)
	BackoffMultiplier float64
}

type SQSService

type SQSService interface {
	GetQueueUrl(ctx context.Context, input *sqs.GetQueueUrlInput, opts ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error)
	ReceiveMessage(ctx context.Context, input *sqs.ReceiveMessageInput, opts ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
	ChangeMessageVisibility(ctx context.Context, input *sqs.ChangeMessageVisibilityInput, opts ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityOutput, error)
	DeleteMessage(ctx context.Context, input *sqs.DeleteMessageInput, opts ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
	ListQueues(ctx context.Context, input *sqs.ListQueuesInput, opts ...func(*sqs.Options)) (*sqs.ListQueuesOutput, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL