sqs

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2023 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AttributeSqsDelaySeconds           = "sqsDelaySeconds"
	AttributeSqsMessageGroupId         = "sqsMessageGroupId"
	AttributeSqsMessageDeduplicationId = "sqsMessageDeduplicationId"
	MaxDelaySeconds                    = 900
	DeduplicationIdMaxLen              = 128
	GroupIdMaxLen                      = 128
)
View Source
const (
	DefaultVisibilityTimeout = "30"
	DeadletterFifoSuffix     = "-dead.fifo"
	FifoSuffix               = ".fifo"
)
View Source
const (
	MetadataKeyQueues = "cloud.aws.sqs.queues"
)

Variables

View Source
var (
	MessageDelaySeconds = Attribute{
						// contains filtered or unexported fields
	}
	MessageDeduplicationId = Attribute{
							// contains filtered or unexported fields
	}
	MessageGroupId = Attribute{
					// contains filtered or unexported fields
	}
)

Functions

func GetQueueName

func GetQueueName(config cfg.Config, queueSettings QueueNameSettingsAware) (string, error)

func NewClient

func NewClient(ctx context.Context, config cfg.Config, logger log.Logger, name string, optFns ...ClientOption) (*sqs.Client, error)

func NewService

func NewService(ctx context.Context, config cfg.Config, logger log.Logger, clientName string, optFns ...ClientOption) (*service, error)

func NewServiceWithInterfaces

func NewServiceWithInterfaces(logger log.Logger, client Client, settings *ServiceSettings) *service

func ProvideClient

func ProvideClient(ctx context.Context, config cfg.Config, logger log.Logger, name string, optFns ...ClientOption) (*sqs.Client, error)

Types

type Attribute

type Attribute struct {
	// contains filtered or unexported fields
}

type AttributeEncodeHandler

type AttributeEncodeHandler struct {
	// contains filtered or unexported fields
}

func NewAttributeEncodeHandler

func NewAttributeEncodeHandler(attribute Attribute, attributeProvider AttributeProvider) *AttributeEncodeHandler

func (*AttributeEncodeHandler) Decode

func (g *AttributeEncodeHandler) Decode(ctx context.Context, _ interface{}, attributes map[string]interface{}) (context.Context, map[string]interface{}, error)

func (*AttributeEncodeHandler) Encode

func (g *AttributeEncodeHandler) Encode(ctx context.Context, data interface{}, attributes map[string]interface{}) (context.Context, map[string]interface{}, error)

type AttributeProvider

type AttributeProvider func(data interface{}) (interface{}, error)

type Client

type Client interface {
	CreateQueue(ctx context.Context, params *sqs.CreateQueueInput, optFns ...func(*sqs.Options)) (*sqs.CreateQueueOutput, error)
	DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
	DeleteMessageBatch(ctx context.Context, params *sqs.DeleteMessageBatchInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageBatchOutput, error)
	GetQueueAttributes(ctx context.Context, params *sqs.GetQueueAttributesInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueAttributesOutput, error)
	GetQueueUrl(ctx context.Context, params *sqs.GetQueueUrlInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error)
	ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
	SendMessage(ctx context.Context, params *sqs.SendMessageInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageOutput, error)
	SendMessageBatch(ctx context.Context, params *sqs.SendMessageBatchInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageBatchOutput, error)
	SetQueueAttributes(ctx context.Context, params *sqs.SetQueueAttributesInput, optFns ...func(*sqs.Options)) (*sqs.SetQueueAttributesOutput, error)
	PurgeQueue(ctx context.Context, params *sqs.PurgeQueueInput, optFns ...func(*sqs.Options)) (*sqs.PurgeQueueOutput, error)
}

type ClientConfig

type ClientConfig struct {
	Settings    ClientSettings
	LoadOptions []func(options *awsCfg.LoadOptions) error
}

func (ClientConfig) GetLoadOptions

func (c ClientConfig) GetLoadOptions() []func(options *awsCfg.LoadOptions) error

func (ClientConfig) GetRetryOptions

func (c ClientConfig) GetRetryOptions() []func(*retry.StandardOptions)

func (ClientConfig) GetSettings

func (c ClientConfig) GetSettings() gosoAws.ClientSettings

type ClientOption

type ClientOption func(cfg *ClientConfig)

type ClientSettings

type ClientSettings struct {
	gosoAws.ClientSettings
}

type FifoSettings

type FifoSettings struct {
	Enabled                   bool `cfg:"enabled" default:"false"`
	ContentBasedDeduplication bool `cfg:"content_based_deduplication" default:"false"`
}

type Message

type Message struct {
	DelaySeconds           int32
	MessageGroupId         *string
	MessageDeduplicationId *string
	Body                   *string
}

type Properties

type Properties struct {
	Name string
	Url  string
	Arn  string
}

type Queue

type Queue interface {
	GetName() string
	GetUrl() string
	GetArn() string

	DeleteMessage(ctx context.Context, receiptHandle string) error
	DeleteMessageBatch(ctx context.Context, receiptHandles []string) error
	Receive(ctx context.Context, maxNumberOfMessages int32, waitTime int32) ([]types.Message, error)
	Send(ctx context.Context, msg *Message) error
	SendBatch(ctx context.Context, messages []*Message) error
}

func NewQueue

func NewQueue(ctx context.Context, config cfg.Config, logger log.Logger, settings *Settings, optFns ...ClientOption) (Queue, error)

func NewQueueWithInterfaces

func NewQueueWithInterfaces(logger log.Logger, client Client, props *Properties) Queue

func ProvideQueue

func ProvideQueue(ctx context.Context, config cfg.Config, logger log.Logger, settings *Settings, optFns ...ClientOption) (Queue, error)

type QueueMetadata

type QueueMetadata struct {
	AwsClientName string `json:"aws_client_name"`
	QueueArn      string `json:"queue_arn"`
	QueueName     string `json:"queue_name"`
	QueueNameFull string `json:"queue_name_full"`
	QueueUrl      string `json:"queue_url"`
}

type QueueNameSettings

type QueueNameSettings struct {
	AppId       cfg.AppId
	ClientName  string
	FifoEnabled bool
	QueueId     string
}

func (QueueNameSettings) GetAppId

func (s QueueNameSettings) GetAppId() cfg.AppId

func (QueueNameSettings) GetClientName

func (s QueueNameSettings) GetClientName() string

func (QueueNameSettings) GetQueueId

func (s QueueNameSettings) GetQueueId() string

func (QueueNameSettings) IsFifoEnabled

func (s QueueNameSettings) IsFifoEnabled() bool

type QueueNameSettingsAware

type QueueNameSettingsAware interface {
	GetAppId() cfg.AppId
	GetClientName() string
	GetQueueId() string
	IsFifoEnabled() bool
}

type QueueNamingSettings

type QueueNamingSettings struct {
	Pattern string `cfg:"pattern,nodecode" default:"{project}-{env}-{family}-{group}-{queueId}"`
}

type RedrivePolicy

type RedrivePolicy struct {
	Enabled         bool `cfg:"enabled" default:"true"`
	MaxReceiveCount int  `cfg:"max_receive_count" default:"3"`
}

type Service

type Service interface {
	CreateQueue(ctx context.Context, settings *Settings) (*Properties, error)
	QueueExists(ctx context.Context, name string) (bool, error)
	GetPropertiesByName(ctx context.Context, name string) (*Properties, error)
	GetPropertiesByArn(ctx context.Context, arn string) (*Properties, error)
	GetUrl(ctx context.Context, name string) (string, error)
	GetArn(ctx context.Context, url string) (string, error)
	Purge(ctx context.Context, url string) error
}

type ServiceSettings

type ServiceSettings struct {
	AutoCreate bool
}

type Settings

type Settings struct {
	QueueName         string
	VisibilityTimeout int
	Fifo              FifoSettings
	RedrivePolicy     RedrivePolicy
	ClientName        string
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL