sqsjobs

package
v4.2.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2023 License: MIT Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Policy                                   string = "policy"
	PolicyAWS                                string = "Policy"
	VisibilityTimeout                        string = "visibilitytimeout"
	VisibilityTimeoutAWS                     string = "VisibilityTimeout"
	MaximumMessageSize                       string = "maximummessagesize"
	MaximumMessageSizeAWS                    string = "MaximumMessageSize"
	MessageRetentionPeriod                   string = "messageretentionperiod"
	MessageRetentionPeriodAWS                string = "MessageRetentionPeriod"
	ApproximateNumberOfMessages              string = "approximatenumberofmessages"
	ApproximateNumberOfMessagesAWS           string = "ApproximateNumberOfMessages"
	ApproximateNumberOfMessagesNotVisible    string = "approximatenumberofmessagesnotvisible"
	ApproximateNumberOfMessagesNotVisibleAWS string = "ApproximateNumberOfMessagesNotVisible"
	CreatedTimestamp                         string = "createdtimestamp"
	CreatedTimestampAWS                      string = "CreatedTimestamp"
	LastModifiedTimestamp                    string = "lastmodifiedtimestamp"
	LastModifiedTimestampAWS                 string = "LastModifiedTimestamp"
	QueueArn                                 string = "queuearn"
	QueueArnAWS                              string = "QueueArn"
	ApproximateNumberOfMessagesDelayed       string = "approximatenumberofmessagesdelayed"
	ApproximateNumberOfMessagesDelayedAWS    string = "ApproximateNumberOfMessagesDelayed"
	DelaySeconds                             string = "delayseconds"
	DelaySecondsAWS                          string = "DelaySeconds"
	RedrivePolicy                            string = "redrivepolicy"
	RedrivePolicyAWS                         string = "RedrivePolicy"
	FifoQueue                                string = "fifoqueue"
	FifoQueueAWS                             string = "FifoQueue"
	ContentBasedDeduplication                string = "contentbaseddeduplication"
	ContentBasedDeduplicationAWS             string = "ContentBasedDeduplication"
	KmsMasterKeyID                           string = "kmsmasterkeyid"
	KmsMasterKeyIDAWS                        string = "KmsMasterKeyId"
	KmsDataKeyReusePeriodSeconds             string = "kmsdatakeyreuseperiodseconds"
	KmsDataKeyReusePeriodSecondsAWS          string = "KmsDataKeyReusePeriodSeconds"
	DeduplicationScope                       string = "deduplicationscope"
	DeduplicationScopeAWS                    string = "DeduplicationScope"
	FifoThroughputLimit                      string = "fifothroughputlimit"
	FifoThroughputLimitAWS                   string = "FifoThroughputLimit"
	RedriveAllowPolicy                       string = "redriveallowpolicy"
	RedriveAllowPolicyAWS                    string = "RedriveAllowPolicy"
	SqsManagedSseEnabled                     string = "sqsmanagedsseenabled"
	SqsManagedSseEnabledAWS                  string = "SqsManagedSseEnabled"
	ReceiveMessageWaitTimeSeconds            string = "receivemessagewaittimeseconds"
	ReceiveMessageWaitTimeSecondsAWS         string = "ReceiveMessageWaitTimeSeconds"
)
View Source
const (
	StringType              string = "String"
	NumberType              string = "Number"
	BinaryType              string = "Binary"
	ApproximateReceiveCount string = "ApproximateReceiveCount"
)
View Source
const (
	// All - get all message attribute names
	All string = "All"

	// NonExistentQueue AWS error code
	NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// global
	Key          string `mapstructure:"key"`
	Secret       string `mapstructure:"secret"`
	Region       string `mapstructure:"region"`
	SessionToken string `mapstructure:"session_token"`
	Endpoint     string `mapstructure:"endpoint"`

	// Consume all jobs
	ConsumeAll bool `mapstructure:"consume_all"`

	// get queue url, do not declare
	SkipQueueDeclaration bool `mapstructure:"skip_queue_declaration"`

	// The duration (in seconds) that the received messages are hidden from subsequent
	// retrieve requests after being retrieved by a ReceiveMessage request.
	VisibilityTimeout int32 `mapstructure:"visibility_timeout"`
	// The duration (in seconds) for which the call waits for a message to arrive
	// in the queue before returning. If a message is available, the call returns
	// sooner than WaitTimeSeconds. If no messages are available and the wait time
	// expires, the call returns successfully with an empty list of messages.
	WaitTimeSeconds int32 `mapstructure:"wait_time_seconds"`
	// Prefetch is the maximum number of messages to return. Amazon SQS never returns more messages
	// than this value (however, fewer messages might be returned). Valid values: 1 to
	// 10. Default: 1.
	Prefetch int32 `mapstructure:"prefetch"`
	// The name of the new queue. The following limits apply to this name:
	//
	// * A queue
	// name can have up to 80 characters.
	//
	// * Valid values: alphanumeric characters,
	// hyphens (-), and underscores (_).
	//
	// * A FIFO queue name must end with the .fifo
	// suffix.
	//
	// Queue URLs and names are case-sensitive.
	//
	// This member is required.
	Queue *string `mapstructure:"queue"`

	/*
		link: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html
		This parameter applies only to FIFO (first-in-first-out) queues.
		The tag that specifies that a message belongs to a specific message group. Messages that belong to the same message group are processed in a FIFO manner
		(however, messages in different message groups might be processed out of order).
		To interleave multiple ordered streams within a single queue, use MessageGroupId values (for example, session data for multiple users).
		In this scenario, multiple consumers can process the queue, but the session data of each user is processed in a FIFO fashion.
		You must associate a non-empty MessageGroupId with a message. If you don't provide a MessageGroupId, the action fails.
		ReceiveMessage might return messages with multiple MessageGroupId values. For each MessageGroupId, the messages are sorted by time sent. The caller can't specify a MessageGroupId.
		The length of MessageGroupId is 128 characters. Valid values: alphanumeric characters and punctuation (!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~).
	*/
	MessageGroupID string `mapstructure:"message_group_id"`

	// A map of attributes with their corresponding values. The following lists the
	// names, descriptions, and values of the special request parameters that the
	// CreateQueue action uses.
	// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html
	Attributes map[string]string `mapstructure:"attributes"`

	// From amazon docs:
	// Add cost allocation tags to the specified Amazon SQS queue. For an overview, see
	// Tagging Your Amazon SQS Queues
	// (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-tags.html)
	// in the Amazon SQS Developer Guide. When you use queue tags, keep the following
	// guidelines in mind:
	//
	// * Adding more than 50 tags to a queue isn't recommended.
	//
	// *
	// Tags don't have any semantic meaning. Amazon SQS interprets tags as character
	// strings.
	//
	// * Tags are case-sensitive.
	//
	// * A new tag with a key identical to that
	// of an existing tag overwrites the existing tag.
	//
	// For a full list of tag
	// restrictions, see Quotas related to queues
	// (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-limits.html#limits-queues)
	// in the Amazon SQS Developer Guide. To be able to tag a queue on creation, you
	// must have the sqs:CreateQueue and sqs:TagQueue permissions. Cross-account
	// permissions don't apply to this action. For more information, see Grant
	// cross-account permissions to a role and a user name
	// (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-customer-managed-policy-examples.html#grant-cross-account-permissions-to-role-and-user-name)
	// in the Amazon SQS Developer Guide.
	Tags map[string]string `mapstructure:"tags"`
}

Config is used to parse pipeline configuration

func (*Config) InitDefault

func (c *Config) InitDefault()

type Configurer

type Configurer interface {
	// UnmarshalKey takes a single key and unmarshal it into a Struct.
	UnmarshalKey(name string, out any) error
	// Has checks if config section exists.
	Has(name string) bool
}

type Driver added in v4.1.0

type Driver struct {
	// contains filtered or unexported fields
}

func FromConfig added in v4.1.0

func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq pq.Queue) (*Driver, error)

func FromPipeline

func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq pq.Queue) (*Driver, error)

func (*Driver) Pause added in v4.1.0

func (c *Driver) Pause(ctx context.Context, p string) error

func (*Driver) Push added in v4.1.0

func (c *Driver) Push(ctx context.Context, jb jobs.Job) error

func (*Driver) Resume added in v4.1.0

func (c *Driver) Resume(ctx context.Context, p string) error

func (*Driver) Run added in v4.1.0

func (c *Driver) Run(ctx context.Context, p jobs.Pipeline) error

func (*Driver) State added in v4.1.0

func (c *Driver) State(ctx context.Context) (*jobs.State, error)

func (*Driver) Stop added in v4.1.0

func (c *Driver) Stop(ctx context.Context) error

type Item

type Item struct {
	// Job contains pluginName of job broker (usually PHP class).
	Job string `json:"job"`
	// Ident is unique identifier of the job, should be provided from outside
	Ident string `json:"id"`
	// Payload is string data (usually JSON) passed to Job broker.
	Payload string `json:"payload"`
	// Headers with key-values pairs
	Headers map[string][]string `json:"headers"`
	// Options contains set of PipelineOptions specific to job execution. Can be empty.
	Options *Options `json:"options,omitempty"`
}

func (*Item) Ack

func (i *Item) Ack() error

func (*Item) Body

func (i *Item) Body() []byte

Body packs job payload into binary payload.

func (*Item) Context

func (i *Item) Context() ([]byte, error)

Context packs job context (job, id) into binary payload. Not used in the sqs, MessageAttributes used instead

func (*Item) ID

func (i *Item) ID() string

func (*Item) Metadata added in v4.2.0

func (i *Item) Metadata() map[string][]string

func (*Item) Nack

func (i *Item) Nack() error

func (*Item) Priority

func (i *Item) Priority() int64

func (*Item) Requeue

func (i *Item) Requeue(headers map[string][]string, delay int64) error

func (*Item) Respond

func (i *Item) Respond(_ []byte, _ string) error

type Options

type Options struct {
	// Priority is job priority, default - 10
	// pointer to distinguish 0 as a priority and nil as priority not set
	Priority int64 `json:"priority"`
	// Pipeline manually specified pipeline.
	Pipeline string `json:"pipeline,omitempty"`
	// Delay defines time duration to delay execution for. Defaults to none.
	Delay int64 `json:"delay,omitempty"`
	// AutoAck jobs after receive it from the queue
	AutoAck bool `json:"auto_ack"`
	// SQS Queue name
	Queue string `json:"queue,omitempty"`
	// contains filtered or unexported fields
}

Options carry information about how to handle given job.

func (*Options) DelayDuration

func (o *Options) DelayDuration() time.Duration

DelayDuration returns delay duration in a form of time.Duration.

type RequeueFn

type RequeueFn = func(context.Context, *Item) error

RequeueFn is function used to requeue the item

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL