base

package
v0.17.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 6, 2021 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package base defines foundational types and constants used in asynq package.

Index

Constants

View Source
const (
	AllServers    = "asynq:servers"    // ZSET
	AllWorkers    = "asynq:workers"    // ZSET
	AllSchedulers = "asynq:schedulers" // ZSET
	AllQueues     = "asynq:queues"     // SET
	CancelChannel = "asynq:cancel"     // PubSub channel
)

Global Redis keys.

View Source
const DefaultQueueName = "default"

DefaultQueueName is the queue name used if none are specified by user.

View Source
const Version = "0.17.7"

Version of asynq library and CLI.

Variables

View Source
var DefaultQueue = QueueKey(DefaultQueueName)

DefaultQueue is the redis key for the default queue.

Functions

func ActiveKey

func ActiveKey(qname string) string

ActiveKey returns a redis key for the active tasks.

func ArchivedKey

func ArchivedKey(qname string) string

ArchivedKey returns a redis key for the archived tasks.

func DeadlinesKey

func DeadlinesKey(qname string) string

DeadlinesKey returns a redis key for the deadlines.

func EncodeMessage

func EncodeMessage(msg *TaskMessage) (string, error)

EncodeMessage marshals the given task message in JSON and returns an encoded string.

func FailedKey

func FailedKey(qname string, t time.Time) string

FailedKey returns a redis key for failure count for the given day for the queue.

func PausedKey

func PausedKey(qname string) string

PausedKey returns a redis key to indicate that the given queue is paused.

func ProcessedKey

func ProcessedKey(qname string, t time.Time) string

ProcessedKey returns a redis key for processed count for the given day for the queue.

func QueueKey

func QueueKey(qname string) string

QueueKey returns a redis key for the given queue name.

func RetryKey

func RetryKey(qname string) string

RetryKey returns a redis key for the retry tasks.

func ScheduledKey

func ScheduledKey(qname string) string

ScheduledKey returns a redis key for the scheduled tasks.

func SchedulerEntriesKey

func SchedulerEntriesKey(schedulerID string) string

SchedulerEntriesKey returns a redis key for the scheduler entries given scheduler ID.

func SchedulerHistoryKey

func SchedulerHistoryKey(entryID string) string

SchedulerHistoryKey returns a redis key for the scheduler's history for the given entry.

func ServerInfoKey

func ServerInfoKey(hostname string, pid int, serverID string) string

ServerInfoKey returns a redis key for process info.

func UniqueKey

func UniqueKey(qname, tasktype string, payload map[string]interface{}) string

UniqueKey returns a redis key with the given type, payload, and queue name.

func ValidateQueueName

func ValidateQueueName(qname string) error

ValidateQueueName validates a given qname to be used as a queue name. Returns nil if valid, otherwise returns non-nil error.

func WorkersKey

func WorkersKey(hostname string, pid int, serverID string) string

WorkersKey returns a redis key for the workers given hostname, pid, and server ID.

Types

type Broker

type Broker interface {
	Ping() error
	Enqueue(msg *TaskMessage) error
	EnqueueUnique(msg *TaskMessage, ttl time.Duration) error
	Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
	Done(msg *TaskMessage) error
	Requeue(msg *TaskMessage) error
	Schedule(msg *TaskMessage, processAt time.Time) error
	ScheduleUnique(msg *TaskMessage, processAt time.Time, ttl time.Duration) error
	Retry(msg *TaskMessage, processAt time.Time, errMsg string) error
	Archive(msg *TaskMessage, errMsg string) error
	CheckAndEnqueue(qnames ...string) error
	ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*TaskMessage, error)
	WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
	ClearServerState(host string, pid int, serverID string) error
	CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers
	PublishCancelation(id string) error
	Close() error
}

Broker is a message broker that supports operations to manage task queues.

See rdb.RDB as a reference implementation.

type Cancelations

type Cancelations struct {
	// contains filtered or unexported fields
}

Cancelations is a collection that holds cancel functions for all active tasks.

Cancelations are safe for concurrent use by multipel goroutines.

func NewCancelations

func NewCancelations() *Cancelations

NewCancelations returns a Cancelations instance.

func (*Cancelations) Add

func (c *Cancelations) Add(id string, fn context.CancelFunc)

Add adds a new cancel func to the collection.

func (*Cancelations) Delete

func (c *Cancelations) Delete(id string)

Delete deletes a cancel func from the collection given an id.

func (*Cancelations) Get

func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool)

Get returns a cancel func given an id.

type SchedulerEnqueueEvent

type SchedulerEnqueueEvent struct {
	// ID of the task that was enqueued.
	TaskID string

	// Time the task was enqueued.
	EnqueuedAt time.Time
}

SchedulerEnqueueEvent holds information about an enqueue event by a scheduler.

type SchedulerEntry

type SchedulerEntry struct {
	// Identifier of this entry.
	ID string

	// Spec describes the schedule of this entry.
	Spec string

	// Type is the task type of the periodic task.
	Type string

	// Payload is the payload of the periodic task.
	Payload map[string]interface{}

	// Opts is the options for the periodic task.
	Opts []string

	// Next shows the next time the task will be enqueued.
	Next time.Time

	// Prev shows the last time the task was enqueued.
	// Zero time if task was never enqueued.
	Prev time.Time
}

SchedulerEntry holds information about a periodic task registered with a scheduler.

type ServerInfo

type ServerInfo struct {
	Host              string
	PID               int
	ServerID          string
	Concurrency       int
	Queues            map[string]int
	StrictPriority    bool
	Status            string
	Started           time.Time
	ActiveWorkerCount int
}

ServerInfo holds information about a running server.

type ServerStatus

type ServerStatus struct {
	// contains filtered or unexported fields
}

ServerStatus represents status of a server. ServerStatus methods are concurrency safe.

func NewServerStatus

func NewServerStatus(v ServerStatusValue) *ServerStatus

NewServerStatus returns a new status instance given an initial value.

func (*ServerStatus) Get

Get returns the status value.

func (*ServerStatus) Set

func (s *ServerStatus) Set(v ServerStatusValue)

Set sets the status value.

func (*ServerStatus) String

func (s *ServerStatus) String() string

type ServerStatusValue

type ServerStatusValue int
const (
	// StatusIdle indicates the server is in idle state.
	StatusIdle ServerStatusValue = iota

	// StatusRunning indicates the server is up and active.
	StatusRunning

	// StatusQuiet indicates the server is up but not active.
	StatusQuiet

	// StatusStopped indicates the server server has been stopped.
	StatusStopped
)

type TaskMessage

type TaskMessage struct {
	// Type indicates the kind of the task to be performed.
	Type string

	// Payload holds data needed to process the task.
	Payload map[string]interface{}

	// ID is a unique identifier for each task.
	ID uuid.UUID

	// Queue is a name this message should be enqueued to.
	Queue string

	// Retry is the max number of retry for this task.
	Retry int

	// Retried is the number of times we've retried this task so far.
	Retried int

	// ErrorMsg holds the error message from the last failure.
	ErrorMsg string

	// Timeout specifies timeout in seconds.
	// If task processing doesn't complete within the timeout, the task will be retried
	// if retry count is remaining. Otherwise it will be moved to the archive.
	//
	// Use zero to indicate no timeout.
	Timeout int64

	// Deadline specifies the deadline for the task in Unix time,
	// the number of seconds elapsed since January 1, 1970 UTC.
	// If task processing doesn't complete before the deadline, the task will be retried
	// if retry count is remaining. Otherwise it will be moved to the archive.
	//
	// Use zero to indicate no deadline.
	Deadline int64

	// UniqueKey holds the redis key used for uniqueness lock for this task.
	//
	// Empty string indicates that no uniqueness lock was used.
	UniqueKey string
}

TaskMessage is the internal representation of a task with additional metadata fields. Serialized data of this type gets written to redis.

func DecodeMessage

func DecodeMessage(s string) (*TaskMessage, error)

DecodeMessage unmarshals the given encoded string and returns a decoded task message.

type WorkerInfo

type WorkerInfo struct {
	Host     string
	PID      int
	ServerID string
	ID       string
	Type     string
	Queue    string
	Payload  map[string]interface{}
	Started  time.Time
	Deadline time.Time
}

WorkerInfo holds information about a running worker.

type Z

type Z struct {
	Message *TaskMessage
	Score   int64
}

Z represents sorted set member.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL