queue

package
v4.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2024 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package queue provides SQS queue manipulations on the top of stripe/aws-go.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildBatchRequestEntry

func BuildBatchRequestEntry(messages ...BatchMessage) ([]types.SendMessageBatchRequestEntry, map[string]int)

BuildBatchRequestEntry builds batch entries and id2index map.

func GetQueueURL

func GetQueueURL(ctx context.Context, s sqsiface.SQSAPI, name string) (*string, error)

GetQueueURL returns a URL for the given queue name.

func NewBatchError

func NewBatchError(id2index map[string]int, errors []types.BatchResultErrorEntry) error

NewBatchError composes an error from errors if available.

Types

type BatchChangeMessageVisibility

type BatchChangeMessageVisibility struct {
	ReceiptHandle     *string
	VisibilityTimeout int32
}

A BatchChangeMessageVisibility represents each request to change a visibility timeout.

type BatchError

type BatchError struct {
	Index       int
	Code        string
	Message     string
	SenderFault bool
}

A BatchError represents an error for batch operations such as SendMessageBatch and ChangeMessageVisibilityBatch. Index can be used to identify a message causing the error. See SendMessageBatch how to handle an error in batch operation.

func IsBatchError

func IsBatchError(err error) (errors []*BatchError, ok bool)

IsBatchError checks that err contains BatchError. If err contains BatchError, it returns []*BatchError, true. If not, it returns nil, false.

func (*BatchError) Error

func (e *BatchError) Error() string

type BatchMessage

type BatchMessage struct {
	Body    string
	Options []option.SendMessageInput
}

A BatchMessage represents each request to send a message. Options are used to change parameters for the message.

type Queue

type Queue struct {
	SQS sqsiface.SQSAPI
	URL *string
}

A Queue is an SQS queue which holds queue url in URL. Queue allows you to call actions without queue url for every call.

func MustNew

func MustNew(ctx context.Context, s sqsiface.SQSAPI, name string) *Queue

MustNew initializes Queue with name. It will panic when it fails to initialize a queue.

func New

func New(ctx context.Context, s sqsiface.SQSAPI, name string) (*Queue, error)

New initializes Queue with name.

func (*Queue) ChangeMessageVisibility

func (q *Queue) ChangeMessageVisibility(ctx context.Context, receiptHandle *string, visibilityTimeout int32) error

ChangeMessageVisibility changes a message visibiliy timeout.

func (*Queue) ChangeMessageVisibilityBatch

func (q *Queue) ChangeMessageVisibilityBatch(ctx context.Context, opts ...BatchChangeMessageVisibility) error

ChangeMessageVisibilityBatch changes a visibility timeout for each message in opts.

func (*Queue) DeleteMessage

func (q *Queue) DeleteMessage(ctx context.Context, receiptHandle *string) error

DeleteMessage deletes a message from SQS queue.

func (*Queue) DeleteMessageBatch

func (q *Queue) DeleteMessageBatch(ctx context.Context, receiptHandles ...*string) error

DeleteMessageBatch deletes messages from SQS queue.

func (*Queue) DeleteQueue

func (q *Queue) DeleteQueue(ctx context.Context) error

DeleteQueue deletes a queue in SQS.

func (*Queue) PurgeQueue

func (q *Queue) PurgeQueue(ctx context.Context) error

PurgeQueue purges messages in SQS queue. It deletes all messages in SQS queue.

func (*Queue) ReceiveMessage

func (q *Queue) ReceiveMessage(ctx context.Context, opts ...option.ReceiveMessageInput) ([]types.Message, error)

ReceiveMessage receives messages from SQS queue. opts are used to change parameters for a request.

func (*Queue) SendMessage

func (q *Queue) SendMessage(ctx context.Context, body string, opts ...option.SendMessageInput) (*sqs.SendMessageOutput, error)

SendMessage sends a message to an SQS queue. opts are used to change parameters for a message.

Example
package main

import (
	"context"
	"log"

	"github.com/aws/aws-sdk-go-v2/service/sqs"
	"github.com/nabeken/aws-go-sqs/v4/queue"
	"github.com/nabeken/aws-go-sqs/v4/queue/option"
)

func main() {
	// Create SQS instance
	s := sqs.New(sqs.Options{})

	// Create Queue instance
	q, err := queue.New(context.Background(), s, "example-queue-name")
	if err != nil {
		log.Fatal(err)
	}

	// MessageAttributes
	attrs := map[string]interface{}{
		"ATTR1": "STRING!!",
		"ATTR2": 12345,
	}

	if _, err := q.SendMessage(context.Background(), "MESSAGE BODY", option.MessageAttributes(attrs)); err != nil {
		log.Fatal(err)
	}

	log.Print("successed!")
}
Output:

func (*Queue) SendMessageBatch

func (q *Queue) SendMessageBatch(ctx context.Context, messages ...BatchMessage) error

SendMessageBatch sends messages to SQS queue.

Example
package main

import (
	"context"
	"log"

	"github.com/aws/aws-sdk-go-v2/service/sqs"
	"github.com/nabeken/aws-go-sqs/v4/queue"
	"github.com/nabeken/aws-go-sqs/v4/queue/option"
)

func main() {
	// Create SQS instance
	s := sqs.New(sqs.Options{})

	// Create Queue instance
	q, err := queue.New(context.Background(), s, "example-queue-name")
	if err != nil {
		log.Fatal(err)
	}

	// MessageAttributes
	attrs := map[string]interface{}{
		"ATTR1": "STRING!!",
	}

	// Create messages for batch operation
	batchMessages := []queue.BatchMessage{
		{
			Body: "success",
		},
		{
			Body:    "failed",
			Options: []option.SendMessageInput{option.MessageAttributes(attrs)},
		},
	}

	err = q.SendMessageBatch(context.Background(), batchMessages...)
	if err != nil {
		batchErrors, ok := queue.IsBatchError(err)
		if !ok {
			log.Fatal(err)
		}
		for _, e := range batchErrors {
			if e.SenderFault {
				// Continue if the failure is on the client side.
				log.Print(e)
				continue
			}
			// Retry if the failure is on the server side
			// You can use e.Index to identify the message
			// failedMessage := batchMessages[e.Index]
		}
	}
}
Output:

Directories

Path Synopsis
Package option provides adapters to change a parameter in SQS request.
Package option provides adapters to change a parameter in SQS request.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL