Documentation ¶
Index ¶
- Constants
- Variables
- func ChunkSlice(slice []string, chunkSize int) [][]string
- func GetSecretByAkSk(ak, sk, secretName string) (string, error)
- func GetSecretWithDefault(secretName string) (string, error)
- func MustGetSecret(ak, sk, secretName string) string
- func MustGetSecretWithDefault(secretName string) string
- func NewAwsConfig(ak, sk, region string) (aws.Config, error)
- func NewConfigFromSecretWithAkSk(ak, sk, secretName string) (aws.Config, error)
- func NewConfigWithSecret(secretName string) (aws.Config, error)
- func NewMinioS3Client(endpoint, accessKeyID, secretAccessKey, region string) *s3.Client
- type Auth
- type DynamodbWrapper
- func (w *DynamodbWrapper) AddItemBatch(data []types.WriteRequest) (int, error)
- func (w *DynamodbWrapper) BuildAttrValueMap(keys []string, values []interface{}) (map[string]types.AttributeValue, error)
- func (w *DynamodbWrapper) BuildQueryBeginsWith(name string, key string) (expression.Expression, error)
- func (w *DynamodbWrapper) BuildQueryExpr(name string, key interface{}) (expression.Expression, error)
- func (w *DynamodbWrapper) BuildScanExpr()
- func (w *DynamodbWrapper) BuildTableInput(primaryKey string, sortKey string, skType types.ScalarAttributeType) *dynamodb.CreateTableInput
- func (w *DynamodbWrapper) CreateTable(tableInput *dynamodb.CreateTableInput) (*types.TableDescription, error)
- func (w *DynamodbWrapper) DeleteRow(key map[string]types.AttributeValue) error
- func (w *DynamodbWrapper) DeleteTable() error
- func (w *DynamodbWrapper) GetItem(key map[string]types.AttributeValue, out interface{}) error
- func (w *DynamodbWrapper) ListTables() ([]string, error)
- func (w *DynamodbWrapper) PutItem(data interface{}) error
- func (w *DynamodbWrapper) Query(expr expression.Expression, out interface{}) error
- func (w *DynamodbWrapper) Scan(expr expression.Expression, out interface{}) error
- func (w *DynamodbWrapper) TableExists() (bool, error)
- type EventWrapper
- type FunctionWrapper
- func (w *FunctionWrapper) Create(functionName string, handlerName string, iamRoleArn *string, data []byte) types.State
- func (w *FunctionWrapper) GetConfig() (*types.FunctionConfiguration, error)
- func (w *FunctionWrapper) Invoke(payload []byte, getLog bool, asyncMode bool) (*lambda.InvokeOutput, error)
- func (w *FunctionWrapper) InvokeAsync(payload []byte, getLog bool) (*lambda.InvokeOutput, error)
- func (w *FunctionWrapper) InvokeSync(payload []byte, getLog bool) (*lambda.InvokeOutput, error)
- func (w *FunctionWrapper) List(maxItems int) ([]types.FunctionConfiguration, error)
- func (w *FunctionWrapper) PrintInvokeOutput(output *lambda.InvokeOutput)
- type S3Client
- func MustNewS3WrapperWithDefaultConfig(bucket string, opts ...S3OptionFunc) *S3Client
- func NewS3Wrapper(bucket string, cfg aws.Config, opts ...S3OptionFunc) *S3Client
- func NewS3WrapperWithClient(bucket string, client *s3.Client, opts ...S3OptionFunc) *S3Client
- func NewS3WrapperWithDefaultConfig(bucket string, opts ...S3OptionFunc) (*S3Client, error)
- func (w *S3Client) DeleteObject(objectKey string) error
- func (w *S3Client) Download(objectKey string, opts ...S3OptionFunc) (string, error)
- func (w *S3Client) DownloadFile(objectKey string, fileName string) errordeprecated
- func (w *S3Client) GetObject(objectKey string, opts ...S3OptionFunc) ([]byte, error)
- func (w *S3Client) GetObjectContent(objectKey string) ([]byte, error)
- func (w *S3Client) HasObject(objectKey string) (bool, error)
- func (w *S3Client) ListBuckets() (*s3.ListBucketsOutput, error)
- func (w *S3Client) ListObjects(prefix string, opts ...S3OptionFunc) ([]string, error)
- func (w *S3Client) MustUploadRawData(objectKey string, raw []byte, opts ...S3OptionFunc)
- func (w *S3Client) MustUploadWithAutoGzipped(localFile, s3path string)
- func (w *S3Client) PutObject(objectKey string, raw []byte, opts ...S3OptionFunc) error
- func (w *S3Client) UploadLargeObject(bucketName string, objectKey string, largeObject []byte) error
- func (w *S3Client) UploadRawData(objectKey string, raw []byte, opts ...S3OptionFunc) error
- func (w *S3Client) UploadRawDataToGz(raw string, objectKey string) error
- func (w *S3Client) UploadToBucketWithAutoGzipped(localFile, s3path, bucket string) (*manager.UploadOutput, error)
- func (w *S3Client) UploadWithAutoGzipped(localFile, s3path string) (*manager.UploadOutput, error)
- type S3OptionFunc
- func WithAutoUnGzip(autoUnGzip bool) S3OptionFunc
- func WithBucket(s string) S3OptionFunc
- func WithEmptyFile(b bool) S3OptionFunc
- func WithFolderLevel(n int) S3OptionFunc
- func WithGz(b bool) S3OptionFunc
- func WithMaxKeys(n int) S3OptionFunc
- func WithSaveTo(s string) S3OptionFuncdeprecated
- func WithSavedName(s string) S3OptionFunc
- func WithTimeout(n int) S3OptionFunc
- type S3Options
- type SchedulerWrapper
- func (w *SchedulerWrapper) Create(name, schedule, targetArn, roleArn, jsonStr string) error
- func (w *SchedulerWrapper) DeleteSchedule(name string) (*scheduler.DeleteScheduleOutput, error)
- func (w *SchedulerWrapper) Disable(name string, schedule, targetArn, roleArn, jsonStr string) error
- func (w *SchedulerWrapper) ListSchedulers(name string) (*scheduler.ListSchedulesOutput, error)
- func (w *SchedulerWrapper) Update(name string, schedule, targetArn, roleArn, jsonStr string) error
- func (w *SchedulerWrapper) Upsert(name string, schedule, targetArn, roleArn, jsonStr string) error
- type SqsClient
- func (w *SqsClient) ClearQueue() (int, error)
- func (w *SqsClient) CreateQueue(name string) (string, error)
- func (w *SqsClient) DeleteMsg(handle *string) (*sqs.DeleteMessageOutput, error)
- func (w *SqsClient) DeleteQueue(name string) error
- func (w *SqsClient) GetMsg() (*sqs.ReceiveMessageOutput, error)
- func (w *SqsClient) GetMsgs(opts ...SqsOptFunc) (*sqs.ReceiveMessageOutput, error)
- func (w *SqsClient) GetQueueURL(name string) (string, error)
- func (w *SqsClient) GetQueues() (*sqs.ListQueuesOutput, error)
- func (w *SqsClient) GetRemainedItems(opts ...SqsOptFunc) (int64, error)
- func (w *SqsClient) GoReadMessages(ch chan *SqsResp, opts ...SqsOptFunc)
- func (w *SqsClient) MustDeleteMsg(handle *string) *sqs.DeleteMessageOutput
- func (w *SqsClient) MustGetMsg() *types.Message
- func (w *SqsClient) MustGetMsgs(opts ...SqsOptFunc) []*string
- func (w *SqsClient) MustGetQueueURL(name string) string
- func (w *SqsClient) MustSendMsg(message string) *sqs.SendMessageOutput
- func (w *SqsClient) MustSendMsgWithRetry(message string, retries uint) *sqs.SendMessageOutput
- func (w *SqsClient) PurgeQueue() error
- func (w *SqsClient) ReadMessages(chanResp chan *SqsResp, opts ...SqsOptFunc)
- func (w *SqsClient) SendManyMessages(messages []string) (int, error)
- func (w *SqsClient) SendMsg(message string) (*sqs.SendMessageOutput, error)
- func (w *SqsClient) SendMsgBatch(messages []string) (*sqs.SendMessageBatchOutput, error)
- func (w *SqsClient) SendMsgWithRetry(message string, retries uint) (*sqs.SendMessageOutput, error)
- func (w *SqsClient) SetQueueURL(name string)
- type SqsOptFunc
- type SqsOpts
- type SqsReadStatus
- type SqsResp
Constants ¶
const ( TypeN = types.ScalarAttributeTypeN TypeS = types.ScalarAttributeTypeS TypeB = types.ScalarAttributeTypeB )
const (
MaxBatchSize = 10
)
Variables ¶
var ( ErrEmptyMessageBody = errors.New("message body cannot be empty") ErrInvalidUTF8 = errors.New("message contains invalid UTF-8 characters") ErrMessageTooLong = errors.New("message exceeds maximum allowed length") ErrQueueNameMismatch = errors.New("queue name does not match the one set during initialization") ErrMessageEmpty = errors.New("message is empty") ErrSendBatchFailed = errors.New("failed to send some messages in batch") )
var ErrGzSuffixRequired = errors.New("non gz format: .gz is required")
Functions ¶
func ChunkSlice ¶ added in v0.1.5
func GetSecretByAkSk ¶
func GetSecretWithDefault ¶
func MustGetSecret ¶
func NewConfigFromSecretWithAkSk ¶
NewConfigFromSecretWithAkSk, ak, sk is used to get secret from scrects manager.
usage:
- require `ak/sk` from Account which only has access to aws:SecretsManager
func NewConfigWithSecret ¶
NewConfigWithSecret will use ~/.aws/credentials to load secret from secrets manager, then use the loaded secret to work on aws resources
func NewMinioS3Client ¶ added in v0.1.5
Types ¶
type Auth ¶
type Auth struct { AwsAccessKeyID string `json:"aws_access_key_id"` AwsSecretAccessKey string `json:"aws_secret_access_key"` }
func SecretToAuth ¶
type DynamodbWrapper ¶
type DynamodbWrapper struct { Config aws.Config Client *dynamodb.Client DdbCtx context.Context TableName string Timeout int // contains filtered or unexported fields }
func NewDynamodbWrapper ¶
func NewDynamodbWrapper(table string, config aws.Config, readCapacity, writeCapacity int) *DynamodbWrapper
func NewDynamodbWrapperWithDefault ¶
func NewDynamodbWrapperWithDefault(table string) (*DynamodbWrapper, error)
func (*DynamodbWrapper) AddItemBatch ¶
func (w *DynamodbWrapper) AddItemBatch(data []types.WriteRequest) (int, error)
func (*DynamodbWrapper) BuildAttrValueMap ¶
func (w *DynamodbWrapper) BuildAttrValueMap(keys []string, values []interface{}) (map[string]types.AttributeValue, error)
func (*DynamodbWrapper) BuildQueryBeginsWith ¶
func (w *DynamodbWrapper) BuildQueryBeginsWith(name string, key string) (expression.Expression, error)
func (*DynamodbWrapper) BuildQueryExpr ¶
func (w *DynamodbWrapper) BuildQueryExpr(name string, key interface{}) (expression.Expression, error)
func (*DynamodbWrapper) BuildScanExpr ¶
func (w *DynamodbWrapper) BuildScanExpr()
func (*DynamodbWrapper) BuildTableInput ¶
func (w *DynamodbWrapper) BuildTableInput(primaryKey string, sortKey string, skType types.ScalarAttributeType) *dynamodb.CreateTableInput
func (*DynamodbWrapper) CreateTable ¶
func (w *DynamodbWrapper) CreateTable(tableInput *dynamodb.CreateTableInput) (*types.TableDescription, error)
func (*DynamodbWrapper) DeleteRow ¶
func (w *DynamodbWrapper) DeleteRow(key map[string]types.AttributeValue) error
func (*DynamodbWrapper) DeleteTable ¶
func (w *DynamodbWrapper) DeleteTable() error
func (*DynamodbWrapper) GetItem ¶
func (w *DynamodbWrapper) GetItem(key map[string]types.AttributeValue, out interface{}) error
func (*DynamodbWrapper) ListTables ¶
func (w *DynamodbWrapper) ListTables() ([]string, error)
ListTables lists the DynamoDB table names for the current account.
func (*DynamodbWrapper) PutItem ¶
func (w *DynamodbWrapper) PutItem(data interface{}) error
func (*DynamodbWrapper) Query ¶
func (w *DynamodbWrapper) Query(expr expression.Expression, out interface{}) error
func (*DynamodbWrapper) Scan ¶
func (w *DynamodbWrapper) Scan(expr expression.Expression, out interface{}) error
func (*DynamodbWrapper) TableExists ¶
func (w *DynamodbWrapper) TableExists() (bool, error)
TableExists determines whether a DynamoDB table exists.
type EventWrapper ¶
type EventWrapper struct {
// contains filtered or unexported fields
}
func NewEventWrapper ¶
func NewEventWrapper(cfg aws.Config) (*EventWrapper, error)
func NewEventWrapperWithDefaultConfig ¶
func NewEventWrapperWithDefaultConfig() (*EventWrapper, error)
func (*EventWrapper) DeleteRule ¶
func (w *EventWrapper) DeleteRule(name string) error
DeleteRule delete a rule with name.
func (*EventWrapper) ListRules ¶
func (w *EventWrapper) ListRules() []types.Rule
ListRules lists all rules available.
func (*EventWrapper) ListTargets ¶
func (w *EventWrapper) ListTargets(name string)
ListTargets list targets of a rule.
func (*EventWrapper) PutRule ¶
func (w *EventWrapper) PutRule(name string, schedule string) error
PutRule put a rule.
func (*EventWrapper) PutTarget ¶
func (w *EventWrapper) PutTarget(name string, targetArn, targetID, jsonStr string)
PutTarget put target to a rule.
type FunctionWrapper ¶
type FunctionWrapper struct {
// contains filtered or unexported fields
}
func NewFunctionWrapper ¶
func NewFunctionWrapperWithDefaultConfig ¶
func NewFunctionWrapperWithDefaultConfig(funcName string, dryRun bool) (*FunctionWrapper, error)
func (*FunctionWrapper) GetConfig ¶
func (w *FunctionWrapper) GetConfig() (*types.FunctionConfiguration, error)
GetConfig gets data about function.
func (*FunctionWrapper) Invoke ¶
func (w *FunctionWrapper) Invoke(payload []byte, getLog bool, asyncMode bool) (*lambda.InvokeOutput, error)
func (*FunctionWrapper) InvokeAsync ¶
func (w *FunctionWrapper) InvokeAsync(payload []byte, getLog bool) (*lambda.InvokeOutput, error)
InvokeAsync invokes the function asynchronously.
func (*FunctionWrapper) InvokeSync ¶
func (w *FunctionWrapper) InvokeSync(payload []byte, getLog bool) (*lambda.InvokeOutput, error)
InvokeSync invokes the lambda function specified by name.
func (*FunctionWrapper) List ¶
func (w *FunctionWrapper) List(maxItems int) ([]types.FunctionConfiguration, error)
List lists up to maxItems for account.
func (*FunctionWrapper) PrintInvokeOutput ¶
func (w *FunctionWrapper) PrintInvokeOutput(output *lambda.InvokeOutput)
type S3Client ¶
type S3Client struct { Config aws.Config Client *s3.Client Bucket string // upload timeout Timeout int SaveTo string }
func MustNewS3WrapperWithDefaultConfig ¶
func MustNewS3WrapperWithDefaultConfig(bucket string, opts ...S3OptionFunc) *S3Client
func NewS3Wrapper ¶
func NewS3Wrapper(bucket string, cfg aws.Config, opts ...S3OptionFunc) *S3Client
func NewS3WrapperWithClient ¶ added in v0.1.5
func NewS3WrapperWithClient(bucket string, client *s3.Client, opts ...S3OptionFunc) *S3Client
func NewS3WrapperWithDefaultConfig ¶
func NewS3WrapperWithDefaultConfig(bucket string, opts ...S3OptionFunc) (*S3Client, error)
func (*S3Client) DeleteObject ¶
DeleteObject deletes a single object from the S3 bucket. It automatically removes the bucket prefix from the objectKey if present.
func (*S3Client) Download ¶
func (w *S3Client) Download(objectKey string, opts ...S3OptionFunc) (string, error)
Download retrieves an object from the S3 bucket and saves it to a local file system.
Parameters:
objectKey: string The full key of the object in the S3 bucket to be downloaded.
opts: ...S3OptionFunc Optional functional options to customize the download behavior.
Available Options:
WithFolderLevel(level int): Controls how much of the S3 object's path structure is preserved locally.
-1: Uses the entire objectKey as the local file path, preserving all folders.
0: Uses only the filename (last part of the objectKey), ignoring all folders.
>0: Includes the specified number of parent folders from the end of the objectKey. Default is 1 if not specified.
WithSavedName(name string): Specifies a custom name for the downloaded file. This overrides the original filename.
Returns:
string: The full path of the downloaded (or existing) file. Returns an empty string if the download fails.
Usage Example:
s3Wrapper := NewS3Wrapper("my-bucket", awsConfig) filePath := s3Wrapper.Download("path/to/myfile.txt", WithFolderLevel(1)) if filePath != "" { fmt.Printf("File downloaded to: %s\n", filePath) } else { fmt.Println("Download failed") }
func (*S3Client) DownloadFile
deprecated
func (*S3Client) GetObject ¶
func (w *S3Client) GetObject(objectKey string, opts ...S3OptionFunc) ([]byte, error)
func (*S3Client) GetObjectContent ¶
func (*S3Client) ListBuckets ¶
func (w *S3Client) ListBuckets() (*s3.ListBucketsOutput, error)
func (*S3Client) ListObjects ¶
func (w *S3Client) ListObjects(prefix string, opts ...S3OptionFunc) ([]string, error)
ListObjects list all available objects in bucket with prefix.
@param prefix @param opts @return []string: list s3 files found @return error
func (*S3Client) MustUploadRawData ¶
func (w *S3Client) MustUploadRawData(objectKey string, raw []byte, opts ...S3OptionFunc)
UploadRawData uploads and save raw data to s3 object key(no encoding:gzip supported).
func (*S3Client) MustUploadWithAutoGzipped ¶
func (*S3Client) PutObject ¶
func (w *S3Client) PutObject(objectKey string, raw []byte, opts ...S3OptionFunc) error
func (*S3Client) UploadLargeObject ¶
UploadLargeObject uses an upload manager to upload data to an object in a bucket. The upload manager breaks large data into parts and uploads the parts concurrently.
func (*S3Client) UploadRawData ¶
func (w *S3Client) UploadRawData(objectKey string, raw []byte, opts ...S3OptionFunc) error
func (*S3Client) UploadRawDataToGz ¶
func (*S3Client) UploadToBucketWithAutoGzipped ¶
func (w *S3Client) UploadToBucketWithAutoGzipped(localFile, s3path, bucket string) (*manager.UploadOutput, error)
func (*S3Client) UploadWithAutoGzipped ¶
func (w *S3Client) UploadWithAutoGzipped(localFile, s3path string) (*manager.UploadOutput, error)
type S3OptionFunc ¶
type S3OptionFunc func(o *S3Options)
func WithAutoUnGzip ¶
func WithAutoUnGzip(autoUnGzip bool) S3OptionFunc
func WithBucket ¶
func WithBucket(s string) S3OptionFunc
func WithEmptyFile ¶
func WithEmptyFile(b bool) S3OptionFunc
func WithFolderLevel ¶
func WithFolderLevel(n int) S3OptionFunc
WithFolderLevel is how many level of folder kept from s3uri
func WithGz ¶
func WithGz(b bool) S3OptionFunc
func WithMaxKeys ¶
func WithMaxKeys(n int) S3OptionFunc
func WithSaveTo
deprecated
func WithSaveTo(s string) S3OptionFunc
Deprecated: WithSaveTo
saveTo is disabled in xaws, so no need to pass it.
func WithSavedName ¶
func WithSavedName(s string) S3OptionFunc
func WithTimeout ¶
func WithTimeout(n int) S3OptionFunc
type SchedulerWrapper ¶
type SchedulerWrapper struct { GroupName string // contains filtered or unexported fields }
https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-run-lambda-schedule.html https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/scheduler/client/create_schedule.html
func NewSchedulerWrapper ¶
func NewSchedulerWrapper(groupName string) (*SchedulerWrapper, error)
func (*SchedulerWrapper) Create ¶
func (w *SchedulerWrapper) Create(name, schedule, targetArn, roleArn, jsonStr string) error
Create create a scheduler.
func (*SchedulerWrapper) DeleteSchedule ¶
func (w *SchedulerWrapper) DeleteSchedule(name string) (*scheduler.DeleteScheduleOutput, error)
DeleteSchedule delete a schedule.
func (*SchedulerWrapper) Disable ¶
func (w *SchedulerWrapper) Disable(name string, schedule, targetArn, roleArn, jsonStr string) error
func (*SchedulerWrapper) ListSchedulers ¶
func (w *SchedulerWrapper) ListSchedulers(name string) (*scheduler.ListSchedulesOutput, error)
ListSchedulers list.
type SqsClient ¶ added in v0.1.5
type SqsClient struct { Config aws.Config Client *sqs.Client // upload timeout Timeout int QueueName string QueueURL string SendCache []string // contains filtered or unexported fields }
func NewSqsClient ¶ added in v0.1.5
func NewSqsClientWithDefaultConfig ¶ added in v0.1.5
func (*SqsClient) ClearQueue ¶ added in v0.1.5
ClearQueue removes all messages from the queue without deleting the queue itself. It returns the number of messages cleared and any error encountered.
func (*SqsClient) CreateQueue ¶ added in v0.1.5
func (*SqsClient) DeleteMsg ¶ added in v0.1.5
func (w *SqsClient) DeleteMsg(handle *string) (*sqs.DeleteMessageOutput, error)
func (*SqsClient) DeleteQueue ¶ added in v0.1.5
func (*SqsClient) GetMsg ¶ added in v0.1.5
func (w *SqsClient) GetMsg() (*sqs.ReceiveMessageOutput, error)
GetMsg retrieves a single message from the SQS queue.
This method is a convenience wrapper around GetMsgs, setting the batch size to 1. It uses the same underlying SQS ReceiveMessage API call but is optimized for retrieving a single message at a time.
Returns:
- *sqs.ReceiveMessageOutput: The response from the SQS service, containing at most one message and associated metadata.
- error: An error object which will be non-nil if the retrieval operation failed.
The method will return an error if:
- There's a network issue preventing communication with SQS.
- The SQS service returns an error (e.g., invalid queue URL, permissions issues).
Example usage:
message, err := client.GetMsg() if err != nil { log.Printf("Failed to retrieve message: %v", err) return } if len(message.Messages) > 0 { log.Printf("Received message: %s", *message.Messages[0].Body) } else { log.Println("No messages available") }
Note:
- This method uses the default wait time as configured in the SqsClient.
- If the queue is empty, the returned ReceiveMessageOutput will contain no messages, but the error will still be nil.
- For more control over the retrieval process (e.g., wait time), use GetMsgs directly.
func (*SqsClient) GetMsgs ¶ added in v0.1.5
func (w *SqsClient) GetMsgs(opts ...SqsOptFunc) (*sqs.ReceiveMessageOutput, error)
GetMsgs retrieves multiple messages from the SQS queue.
This method allows for flexible configuration of the message retrieval process through the use of functional options.
Parameters:
- opts: Variadic SqsOptFunc that allow customization of the retrieval process. Available options include:
- WaitTimeSeconds(int): Sets the duration (in seconds) for which the call waits for a message to arrive.
- BatchSize(int): Sets the maximum number of messages to return (1-10).
Returns:
- *sqs.ReceiveMessageOutput: The response from the SQS service, containing the retrieved messages and metadata.
- error: An error object which will be non-nil if the retrieval operation failed.
The method will return an error if:
- There's a network issue preventing communication with SQS.
- The SQS service returns an error (e.g., invalid queue URL, permissions issues).
Example usage:
messages, err := client.GetMsgs(WaitTimeSeconds(20), BatchSize(5)) if err != nil { log.Printf("Failed to retrieve messages: %v", err) return } for _, msg := range messages.Messages { log.Printf("Received message: %s", *msg.Body) }
Note: - If no options are provided, default values will be used (waitTimeSeconds: 3, batchSize: client's default batch size). - The actual number of messages returned might be fewer than requested, depending on the queue's contents. - Long polling is used by default, which can help reduce empty responses and API calls.
func (*SqsClient) GetQueueURL ¶ added in v0.1.5
GetQueueURL gets the URL of an Amazon SQS queue Inputs:
queueName is the name of the queue
Output:
If success, the URL of the queue and nil Otherwise, an empty string and an error from the call to
func (*SqsClient) GetQueues ¶ added in v0.1.5
func (w *SqsClient) GetQueues() (*sqs.ListQueuesOutput, error)
GetQueues returns a list of queue names
func (*SqsClient) GetRemainedItems ¶ added in v0.1.5
func (w *SqsClient) GetRemainedItems(opts ...SqsOptFunc) (int64, error)
func (*SqsClient) GoReadMessages ¶ added in v0.1.5
func (w *SqsClient) GoReadMessages(ch chan *SqsResp, opts ...SqsOptFunc)
func (*SqsClient) MustDeleteMsg ¶ added in v0.1.5
func (w *SqsClient) MustDeleteMsg(handle *string) *sqs.DeleteMessageOutput
func (*SqsClient) MustGetMsg ¶ added in v0.1.5
func (*SqsClient) MustGetMsgs ¶ added in v0.1.5
func (w *SqsClient) MustGetMsgs(opts ...SqsOptFunc) []*string
func (*SqsClient) MustGetQueueURL ¶ added in v0.1.5
func (*SqsClient) MustSendMsg ¶ added in v0.1.5
func (w *SqsClient) MustSendMsg(message string) *sqs.SendMessageOutput
func (*SqsClient) MustSendMsgWithRetry ¶ added in v0.1.5
func (w *SqsClient) MustSendMsgWithRetry(message string, retries uint) *sqs.SendMessageOutput
MustSendMsgWithRetry is a wrapper around SendMsgWithRetry that panics on error.
This method should be used with caution, as it will panic if the message cannot be sent.
func (*SqsClient) PurgeQueue ¶ added in v0.1.5
PurgeQueue removes all messages from the queue
func (*SqsClient) ReadMessages ¶ added in v0.1.5
func (w *SqsClient) ReadMessages(chanResp chan *SqsResp, opts ...SqsOptFunc)
func (*SqsClient) SendManyMessages ¶ added in v0.1.5
SendManyMessages sends any number of messages to the SQS queue using batch operations. It automatically splits the messages into batches of up to 10 (the SQS maximum).
func (*SqsClient) SendMsg ¶ added in v0.1.5
func (w *SqsClient) SendMsg(message string) (*sqs.SendMessageOutput, error)
SendMsg sends a single message to the SQS queue.
This method uses a context with a timeout for the operation. The timeout duration is set by the Timeout field of the SqsClient.
Parameters:
- message: A string containing the body of the message to be sent.
Returns:
- *sqs.SendMessageOutput: The response from the SQS service, containing details about the sent message. This includes fields like MessageId, which uniquely identifies the message within the queue.
- error: An error object which will be non-nil if the send operation failed.
The method will return an error if:
- The context times out before the message is sent.
- There's a network issue preventing communication with SQS.
- The SQS service returns an error (e.g., invalid queue URL, permissions issues).
- The message body exceeds the maximum size limit for SQS messages (256 KB).
Example usage:
client := NewSqsClient(queueName, awsConfig, 10, 60) response, err := client.SendMsg("Hello, SQS!") if err != nil { log.Printf("Failed to send message: %v", err) return } log.Printf("Message sent successfully, ID: %s", *response.MessageId)
Note: This method is not thread-safe. If you need to send messages concurrently, consider using separate SqsClient instances or implement your own synchronization.
func (*SqsClient) SendMsgBatch ¶ added in v0.1.5
func (w *SqsClient) SendMsgBatch(messages []string) (*sqs.SendMessageBatchOutput, error)
SendMsgBatch sends multiple messages to the SQS queue in a single batch operation.
This method allows sending up to 10 messages in a single API call, which can improve throughput and reduce costs when sending large numbers of messages.
Parameters:
- messages: A slice of strings, where each string is the body of a message to be sent.
Returns:
- *sqs.SendMessageBatchOutput: The response from the SQS service, containing details about the sent messages. This includes information about successful and failed message sends.
- error: An error object which will be non-nil if the batch send operation failed entirely.
The method will return an error if:
- The messages slice is empty (ErrMessageEmpty).
- There's a network issue preventing communication with SQS.
- The SQS service returns an error (e.g., invalid queue URL, permissions issues).
Note:
- The maximum number of messages in a batch is 10. If more than 10 messages are provided, only the first 10 will be sent.
- Each message in the batch can be up to 256 KB in size.
- The total size of all messages in the batch cannot exceed the SQS maximum batch size (256 KB).
- If some messages in the batch fail to send, the method will not return an error. Check the Failed field of the output to identify any messages that were not sent successfully.
Example usage:
messages := []string{"Message 1", "Message 2", "Message 3"} response, err := client.SendMsgBatch(messages) if err != nil { log.Printf("Failed to send batch: %v", err) return } log.Printf("Successfully sent %d messages", len(response.Successful)) if len(response.Failed) > 0 { log.Printf("Failed to send %d messages", len(response.Failed)) }
This method is not thread-safe. If you need to send batches concurrently, consider using separate SqsClient instances or implement your own synchronization.
func (*SqsClient) SendMsgWithRetry ¶ added in v0.1.5
SendMsgWithRetry sends a message to SQS with retry logic.
Parameters:
- message: The message to be sent.
- retries: The number of retry attempts.
Returns:
- *sqs.SendMessageOutput: The response from SQS if successful.
- error: An error if all retry attempts fail, nil otherwise.
func (*SqsClient) SetQueueURL ¶ added in v0.1.5
type SqsOptFunc ¶
type SqsOptFunc func(o *SqsOpts)
func BatchSize ¶ added in v0.1.5
func BatchSize(i int) SqsOptFunc
func WaitTimeSeconds ¶ added in v0.1.5
func WaitTimeSeconds(i int) SqsOptFunc
func WithMax ¶
func WithMax(i int) SqsOptFunc
func WithQueueName ¶
func WithQueueName(s string) SqsOptFunc
type SqsReadStatus ¶ added in v0.1.5
type SqsReadStatus int
SqsReadStatus represents the status of SQS read operations
const ( // SqsReadError indicates an error occurred during the read operation SqsReadError SqsReadStatus = iota // SqsReadSuccess indicates a successful read of a message SqsReadSuccess // SqsReadAllConsumed indicates all messages in the queue have been consumed SqsReadAllConsumed // SqsReadMaximumReached indicates the maximum number of requested messages has been reached SqsReadMaximumReached )
func (SqsReadStatus) String ¶ added in v0.1.5
func (s SqsReadStatus) String() string
String returns the string representation of SqsReadStatus
type SqsResp ¶
type SqsResp struct { Status SqsReadStatus Msg *string }
func NewSqsResp ¶
func NewSqsResp(msg *string, status SqsReadStatus) *SqsResp