Documentation ¶
Index ¶
- Constants
- type Config
- type Consumer
- func (c *Consumer) Pause(_ context.Context, p string)
- func (c *Consumer) Push(ctx context.Context, jb *jobs.Job) error
- func (c *Consumer) Register(_ context.Context, p *pipeline.Pipeline) error
- func (c *Consumer) Resume(_ context.Context, p string)
- func (c *Consumer) Run(_ context.Context, p *pipeline.Pipeline) error
- func (c *Consumer) State(ctx context.Context) (*jobs.State, error)
- func (c *Consumer) Stop(context.Context) error
- type Item
- func (i *Item) Ack() error
- func (i *Item) Body() []byte
- func (i *Item) Context() ([]byte, error)
- func (i *Item) ID() string
- func (i *Item) Nack() error
- func (i *Item) Priority() int64
- func (i *Item) Requeue(headers map[string][]string, delay int64) error
- func (i *Item) Respond(data []byte, queue string) error
- type Options
Constants ¶
View Source
const ( Policy string = "policy" PolicyAWS string = "Policy" VisibilityTimeout string = "visibilitytimeout" VisibilityTimeoutAWS string = "VisibilityTimeout" MaximumMessageSize string = "maximummessagesize" MaximumMessageSizeAWS string = "MaximumMessageSize" MessageRetentionPeriod string = "messageretentionperiod" MessageRetentionPeriodAWS string = "MessageRetentionPeriod" ApproximateNumberOfMessages string = "approximatenumberofmessages" ApproximateNumberOfMessagesAWS string = "ApproximateNumberOfMessages" ApproximateNumberOfMessagesNotVisible string = "approximatenumberofmessagesnotvisible" ApproximateNumberOfMessagesNotVisibleAWS string = "ApproximateNumberOfMessagesNotVisible" CreatedTimestamp string = "createdtimestamp" CreatedTimestampAWS string = "CreatedTimestamp" LastModifiedTimestamp string = "lastmodifiedtimestamp" LastModifiedTimestampAWS string = "LastModifiedTimestamp" QueueArn string = "queuearn" QueueArnAWS string = "QueueArn" ApproximateNumberOfMessagesDelayed string = "approximatenumberofmessagesdelayed" ApproximateNumberOfMessagesDelayedAWS string = "ApproximateNumberOfMessagesDelayed" DelaySeconds string = "delayseconds" DelaySecondsAWS string = "DelaySeconds" RedrivePolicy string = "redrivepolicy" RedrivePolicyAWS string = "RedrivePolicy" FifoQueue string = "fifoqueue" FifoQueueAWS string = "FifoQueue" ContentBasedDeduplication string = "contentbaseddeduplication" ContentBasedDeduplicationAWS string = "ContentBasedDeduplication" KmsMasterKeyID string = "kmsmasterkeyid" KmsMasterKeyIDAWS string = "KmsMasterKeyId" KmsDataKeyReusePeriodSeconds string = "kmsdatakeyreuseperiodseconds" KmsDataKeyReusePeriodSecondsAWS string = "KmsDataKeyReusePeriodSeconds" DeduplicationScope string = "deduplicationscope" DeduplicationScopeAWS string = "DeduplicationScope" FifoThroughputLimit string = "fifothroughputlimit" FifoThroughputLimitAWS string = "FifoThroughputLimit" RedriveAllowPolicy string = "redriveallowpolicy" RedriveAllowPolicyAWS string = "RedriveAllowPolicy" SqsManagedSseEnabled string = "sqsmanagedsseenabled" SqsManagedSseEnabledAWS string = "SqsManagedSseEnabled" ReceiveMessageWaitTimeSeconds string = "receivemessagewaittimeseconds" ReceiveMessageWaitTimeSecondsAWS string = "ReceiveMessageWaitTimeSeconds" )
View Source
const ( StringType string = "String" NumberType string = "Number" BinaryType string = "Binary" ApproximateReceiveCount string = "ApproximateReceiveCount" )
View Source
const ( // All - get all message attribute names All string = "All" // NonExistentQueue AWS error code NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // global Key string `mapstructure:"key"` Secret string `mapstructure:"secret"` Region string `mapstructure:"region"` SessionToken string `mapstructure:"session_token"` Endpoint string `mapstructure:"endpoint"` // The duration (in seconds) that the received messages are hidden from subsequent // retrieve requests after being retrieved by a ReceiveMessage request. VisibilityTimeout int32 `mapstructure:"visibility_timeout"` // The duration (in seconds) for which the call waits for a message to arrive // in the queue before returning. If a message is available, the call returns // sooner than WaitTimeSeconds. If no messages are available and the wait time // expires, the call returns successfully with an empty list of messages. WaitTimeSeconds int32 `mapstructure:"wait_time_seconds"` // Prefetch is the maximum number of messages to return. Amazon SQS never returns more messages // than this value (however, fewer messages might be returned). Valid values: 1 to // 10. Default: 1. Prefetch int32 `mapstructure:"prefetch"` // The name of the new queue. The following limits apply to this name: // // * A queue // name can have up to 80 characters. // // * Valid values: alphanumeric characters, // hyphens (-), and underscores (_). // // * A FIFO queue name must end with the .fifo // suffix. // // Queue URLs and names are case-sensitive. // // This member is required. Queue *string `mapstructure:"queue"` /* link: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html This parameter applies only to FIFO (first-in-first-out) queues. The tag that specifies that a message belongs to a specific message group. Messages that belong to the same message group are processed in a FIFO manner (however, messages in different message groups might be processed out of order). To interleave multiple ordered streams within a single queue, use MessageGroupId values (for example, session data for multiple users). In this scenario, multiple consumers can process the queue, but the session data of each user is processed in a FIFO fashion. You must associate a non-empty MessageGroupId with a message. If you don't provide a MessageGroupId, the action fails. ReceiveMessage might return messages with multiple MessageGroupId values. For each MessageGroupId, the messages are sorted by time sent. The caller can't specify a MessageGroupId. The length of MessageGroupId is 128 characters. Valid values: alphanumeric characters and punctuation (!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~). */ MessageGroupID string `mapstructure:"message_group_id"` // A map of attributes with their corresponding values. The following lists the // names, descriptions, and values of the special request parameters that the // CreateQueue action uses. // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html Attributes map[string]string `mapstructure:"attributes"` // From amazon docs: // Add cost allocation tags to the specified Amazon SQS queue. For an overview, see // Tagging Your Amazon SQS Queues // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-tags.html) // in the Amazon SQS Developer Guide. When you use queue tags, keep the following // guidelines in mind: // // * Adding more than 50 tags to a queue isn't recommended. // // * // Tags don't have any semantic meaning. Amazon SQS interprets tags as character // strings. // // * Tags are case-sensitive. // // * A new tag with a key identical to that // of an existing tag overwrites the existing tag. // // For a full list of tag // restrictions, see Quotas related to queues // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-limits.html#limits-queues) // in the Amazon SQS Developer Guide. To be able to tag a queue on creation, you // must have the sqs:CreateQueue and sqs:TagQueue permissions. Cross-account // permissions don't apply to this action. For more information, see Grant // cross-account permissions to a role and a user name // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-customer-managed-policy-examples.html#grant-cross-account-permissions-to-role-and-user-name) // in the Amazon SQS Developer Guide. Tags map[string]string `mapstructure:"tags"` }
Config is used to parse pipeline configuration
func (*Config) InitDefault ¶
func (c *Config) InitDefault()
type Consumer ¶
func FromPipeline ¶
func FromPipeline(pipe *pipeline.Pipeline, log *zap.Logger, cfg cfgPlugin.Configurer, pq priorityqueue.Queue) (*Consumer, error)
func NewSQSConsumer ¶
func NewSQSConsumer(configKey string, log *zap.Logger, cfg cfgPlugin.Configurer, pq priorityqueue.Queue) (*Consumer, error)
type Item ¶
type Item struct { // Job contains pluginName of job broker (usually PHP class). Job string `json:"job"` // Ident is unique identifier of the job, should be provided from outside Ident string `json:"id"` // Payload is string data (usually JSON) passed to Job broker. Payload string `json:"payload"` // Headers with key-values pairs Headers map[string][]string `json:"headers"` // Options contains set of PipelineOptions specific to job execution. Can be empty. Options *Options `json:"options,omitempty"` }
type Options ¶
type Options struct { // Priority is job priority, default - 10 // pointer to distinguish 0 as a priority and nil as priority not set Priority int64 `json:"priority"` // Pipeline manually specified pipeline. Pipeline string `json:"pipeline,omitempty"` // Delay defines time duration to delay execution for. Defaults to none. Delay int64 `json:"delay,omitempty"` // contains filtered or unexported fields }
Options carry information about how to handle given job.
func (*Options) DelayDuration ¶
DelayDuration returns delay duration in a form of time.Duration.
Click to show internal directories.
Click to hide internal directories.