base

package
v1.1.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2024 License: Apache-2.0, MIT Imports: 13 Imported by: 1

Documentation

Overview

Package base defines foundational types and constants used in asynq package.

Index

Constants

View Source
const (
	AllServers    = "asynq:servers"    // ZSET
	AllWorkers    = "asynq:workers"    // ZSET
	AllSchedulers = "asynq:schedulers" // ZSET
	AllQueues     = "asynq:queues"     // SET
	CancelChannel = "asynq:cancel"     // PubSub channel
)

Global Redis keys.

View Source
const DefaultQueueName = "default"

DefaultQueueName is the queue name used if none are specified by user.

View Source
const Version = "0.24.1"

Version of asynq library and CLI.

Variables

DefaultQueue is the redis key for the default queue.

Functions

func ActiveKey

func ActiveKey(qname string) string

ActiveKey returns a redis key for the active tasks.

func AggregationSetKey

func AggregationSetKey(qname, gname, setID string) string

AggregationSetKey returns a redis key used for an aggregation set.

func AllAggregationSets

func AllAggregationSets(qname string) string

AllAggregationSets returns a redis key used to store all aggregation sets (set of tasks staged to be aggregated) in a given queue.

func AllGroups

func AllGroups(qname string) string

AllGroups return a redis key used to store all group keys used in a given queue.

func ArchivedKey

func ArchivedKey(qname string) string

ArchivedKey returns a redis key for the archived tasks.

func CompletedKey

func CompletedKey(qname string) string

func EncodeMessage

func EncodeMessage(msg *TaskMessage) ([]byte, error)

EncodeMessage marshals the given task message and returns an encoded bytes.

func EncodeSchedulerEnqueueEvent

func EncodeSchedulerEnqueueEvent(event *SchedulerEnqueueEvent) ([]byte, error)

EncodeSchedulerEnqueueEvent marshals the given event and returns an encoded bytes.

func EncodeSchedulerEntry

func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error)

EncodeSchedulerEntry marshals the given entry and returns an encoded bytes.

func EncodeServerInfo

func EncodeServerInfo(info *ServerInfo) ([]byte, error)

EncodeServerInfo marshals the given ServerInfo and returns the encoded bytes.

func EncodeWorkerInfo

func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error)

EncodeWorkerInfo marshals the given WorkerInfo and returns the encoded bytes.

func FailedKey

func FailedKey(qname string, t time.Time) string

FailedKey returns a redis key for failure count for the given day for the queue.

func FailedTotalKey

func FailedTotalKey(qname string) string

FailedTotalKey returns a redis key for total failure count for the given queue.

func GroupKey

func GroupKey(qname, gkey string) string

GroupKey returns a redis key used to group tasks belong in the same group.

func GroupKeyPrefix

func GroupKeyPrefix(qname string) string

GroupKeyPrefix returns a prefix for group key.

func LeaseKey

func LeaseKey(qname string) string

LeaseKey returns a redis key for the lease.

func PausedKey

func PausedKey(qname string) string

PausedKey returns a redis key to indicate that the given queue is paused.

func PendingKey

func PendingKey(qname string) string

PendingKey returns a redis key for the given queue name.

func ProcessedKey

func ProcessedKey(qname string, t time.Time) string

ProcessedKey returns a redis key for processed count for the given day for the queue.

func ProcessedTotalKey

func ProcessedTotalKey(qname string) string

ProcessedTotalKey returns a redis key for total processed count for the given queue.

func QueueKeyPrefix

func QueueKeyPrefix(qname string) string

QueueKeyPrefix returns a prefix for all keys in the given queue.

func RetryKey

func RetryKey(qname string) string

RetryKey returns a redis key for the retry tasks.

func ScheduledKey

func ScheduledKey(qname string) string

ScheduledKey returns a redis key for the scheduled tasks.

func SchedulerEntriesKey

func SchedulerEntriesKey(schedulerID string) string

SchedulerEntriesKey returns a redis key for the scheduler entries given scheduler ID.

func SchedulerHistoryKey

func SchedulerHistoryKey(entryID string) string

SchedulerHistoryKey returns a redis key for the scheduler's history for the given entry.

func ServerInfoKey

func ServerInfoKey(hostname string, pid int, serverID string) string

ServerInfoKey returns a redis key for process info.

func TaskKey

func TaskKey(qname, id string) string

TaskKey returns a redis key for the given task message.

func TaskKeyPrefix

func TaskKeyPrefix(qname string) string

TaskKeyPrefix returns a prefix for task key.

func UniqueKey

func UniqueKey(qname, tasktype string, payload []byte) string

UniqueKey returns a redis key with the given type, payload, and queue name.

func ValidateQueueName

func ValidateQueueName(qname string) error

ValidateQueueName validates a given qname to be used as a queue name. Returns nil if valid, otherwise returns non-nil error.

func WorkersKey

func WorkersKey(hostname string, pid int, serverID string) string

WorkersKey returns a redis key for the workers given hostname, pid, and server ID.

Types

type Broker

type Broker interface {
	Ping() error
	Close() error
	Enqueue(ctx context.Context, msg *TaskMessage) error
	EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error
	Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
	Done(ctx context.Context, msg *TaskMessage) error
	MarkAsComplete(ctx context.Context, msg *TaskMessage) error
	Requeue(ctx context.Context, msg *TaskMessage) error
	Schedule(ctx context.Context, msg *TaskMessage, processAt time.Time) error
	ScheduleUnique(ctx context.Context, msg *TaskMessage, processAt time.Time, ttl time.Duration) error
	Retry(ctx context.Context, msg *TaskMessage, processAt time.Time, errMsg string, isFailure bool) error
	Archive(ctx context.Context, msg *TaskMessage, errMsg string) error
	ForwardIfReady(qnames ...string) error

	// Group aggregation related methods
	AddToGroup(ctx context.Context, msg *TaskMessage, gname string) error
	AddToGroupUnique(ctx context.Context, msg *TaskMessage, uniqueKey, groupKey string, ttl time.Duration) error
	ListGroups(qname string) ([]string, error)
	AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, maxSize int) (aggregationSetID string, err error)
	ReadAggregationSet(qname, gname, aggregationSetID string) ([]*TaskMessage, time.Time, error)
	DeleteAggregationSet(ctx context.Context, qname, gname, aggregationSetID string) error
	ReclaimStaleAggregationSets(qname string) error

	// Task retention related method
	DeleteExpiredCompletedTasks(qname string) error

	// Lease related methods
	ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error)
	ExtendLease(qname string, ids ...string) (time.Time, error)

	// State snapshot related methods
	WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
	ClearServerState(host string, pid int, serverID string) error

	// Cancelation related methods
	CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers
	PublishCancelation(id string) error

	WriteResult(qname, id string, data []byte) (n int, err error)
}

Broker is a message broker that supports operations to manage task queues.

See rdb.RDB as a reference implementation. nolint: revive // interface too long issue

type Cancelations

type Cancelations struct {
	// contains filtered or unexported fields
}

Cancelations is a collection that holds cancel functions for all active tasks.

Cancelations are safe for concurrent use by multiple goroutines.

func NewCancelations

func NewCancelations() *Cancelations

NewCancelations returns a Cancelations instance.

func (*Cancelations) Add

func (c *Cancelations) Add(id string, fn context.CancelFunc)

Add adds a new cancel func to the collection.

func (*Cancelations) Delete

func (c *Cancelations) Delete(id string)

Delete deletes a cancel func from the collection given an id.

func (*Cancelations) Get

func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool)

Get returns a cancel func given an id.

type Lease

type Lease struct {
	Clock timeutil.Clock
	// contains filtered or unexported fields
}

Lease is a time bound lease for worker to process task. It provides a communication channel between lessor and lessee about lease expiration.

func NewLease

func NewLease(expirationTime time.Time) *Lease

func (*Lease) Deadline

func (l *Lease) Deadline() time.Time

Deadline returns the expiration time of the lease.

func (*Lease) Done

func (l *Lease) Done() <-chan struct{}

Done returns a communication channel from which the lessee can read to get notified when lessor notifies about lease expiration.

func (*Lease) IsValid

func (l *Lease) IsValid() bool

IsValid returns true if the lease's expiration time is in the future or equals to the current time, returns false otherwise.

func (*Lease) NotifyExpiration

func (l *Lease) NotifyExpiration() bool

NotifyExpiration Sends a notification to lessee about expired lease. Returns true if notification was sent, returns false if the lease is still valid and notification was not sent.

func (*Lease) Reset

func (l *Lease) Reset(expirationTime time.Time) bool

Reset changes the lease to expire at the given time. It returns true if the lease is still valid and reset operation was successful, false if the lease had been expired.

type SchedulerEnqueueEvent

type SchedulerEnqueueEvent struct {
	// ID of the task that was enqueued.
	TaskID string

	// Time the task was enqueued.
	EnqueuedAt time.Time
}

SchedulerEnqueueEvent holds information about an enqueue event by a scheduler.

func DecodeSchedulerEnqueueEvent

func DecodeSchedulerEnqueueEvent(b []byte) (*SchedulerEnqueueEvent, error)

DecodeSchedulerEnqueueEvent unmarshals the given bytes and returns a decoded SchedulerEnqueueEvent.

type SchedulerEntry

type SchedulerEntry struct {
	// Identifier of this entry.
	ID string

	// Spec describes the schedule of this entry.
	Spec string

	// Type is the task type of the periodic task.
	Type string

	// Payload is the payload of the periodic task.
	Payload []byte

	// Opts is the options for the periodic task.
	Opts []string

	// Next shows the next time the task will be enqueued.
	Next time.Time

	// Prev shows the last time the task was enqueued.
	// Zero time if task was never enqueued.
	Prev time.Time
}

SchedulerEntry holds information about a periodic task registered with a scheduler.

func DecodeSchedulerEntry

func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error)

DecodeSchedulerEntry unmarshals the given bytes and returns a decoded SchedulerEntry.

type ServerInfo

type ServerInfo struct {
	Host              string
	PID               int
	ServerID          string
	Concurrency       int
	Queues            map[string]int
	StrictPriority    bool
	Status            string
	Started           time.Time
	ActiveWorkerCount int
}

ServerInfo holds information about a running server.

func DecodeServerInfo

func DecodeServerInfo(b []byte) (*ServerInfo, error)

DecodeServerInfo decodes the given bytes into ServerInfo.

type TaskInfo

type TaskInfo struct {
	Message       *TaskMessage
	State         TaskState
	NextProcessAt time.Time
	Result        []byte
}

TaskInfo describes a task message and its metadata.

type TaskMessage

type TaskMessage struct {
	// Type indicates the kind of the task to be performed.
	Type string

	// Payload holds data needed to process the task.
	Payload []byte

	// ID is a unique identifier for each task.
	ID string

	// Queue is a name this message should be enqueued to.
	Queue string

	// Retry is the max number of retry for this task.
	Retry int

	// Retried is the number of times we've retried this task so far.
	Retried int

	// ErrorMsg holds the error message from the last failure.
	ErrorMsg string

	// Time of last failure in Unix time,
	// the number of seconds elapsed since January 1, 1970 UTC.
	//
	// Use zero to indicate no last failure
	LastFailedAt int64

	// Timeout specifies timeout in seconds.
	// If task processing doesn't complete within the timeout, the task will be retried
	// if retry count is remaining. Otherwise it will be moved to the archive.
	//
	// Use zero to indicate no timeout.
	Timeout int64

	// Deadline specifies the deadline for the task in Unix time,
	// the number of seconds elapsed since January 1, 1970 UTC.
	// If task processing doesn't complete before the deadline, the task will be retried
	// if retry count is remaining. Otherwise it will be moved to the archive.
	//
	// Use zero to indicate no deadline.
	Deadline int64

	// UniqueKey holds the redis key used for uniqueness lock for this task.
	//
	// Empty string indicates that no uniqueness lock was used.
	UniqueKey string

	// GroupKey holds the group key used for task aggregation.
	//
	// Empty string indicates no aggregation is used for this task.
	GroupKey string

	// Retention specifies the number of seconds the task should be retained after completion.
	Retention int64

	// CompletedAt is the time the task was processed successfully in Unix time,
	// the number of seconds elapsed since January 1, 1970 UTC.
	//
	// Use zero to indicate no value.
	CompletedAt int64
}

TaskMessage is the internal representation of a task with additional metadata fields. Serialized data of this type gets written to redis.

func DecodeMessage

func DecodeMessage(data []byte) (*TaskMessage, error)

DecodeMessage unmarshals the given bytes and returns a decoded task message.

type TaskState

type TaskState int

TaskState denotes the state of a task.

const (
	TaskStateActive TaskState = iota + 1
	TaskStatePending
	TaskStateScheduled
	TaskStateRetry
	TaskStateArchived
	TaskStateCompleted
	TaskStateAggregating // describes a state where task is waiting in a group to be aggregated
)

func TaskStateFromString

func TaskStateFromString(s string) (TaskState, error)

func (TaskState) String

func (s TaskState) String() string

type WorkerInfo

type WorkerInfo struct {
	Host     string
	PID      int
	ServerID string
	ID       string
	Type     string
	Payload  []byte
	Queue    string
	Started  time.Time
	Deadline time.Time
}

WorkerInfo holds information about a running worker.

func DecodeWorkerInfo

func DecodeWorkerInfo(b []byte) (*WorkerInfo, error)

DecodeWorkerInfo decodes the given bytes into WorkerInfo.

type Z

type Z struct {
	Message *TaskMessage
	Score   int64
}

Z represents sorted set member.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL