Documentation ¶
Overview ¶
Package base defines foundational types and constants used in asynq package.
Index ¶
- Constants
- Variables
- func ActiveKey(qname string) string
- func ArchivedKey(qname string) string
- func DeadlinesKey(qname string) string
- func EncodeMessage(msg *TaskMessage) (string, error)
- func FailedKey(qname string, t time.Time) string
- func PausedKey(qname string) string
- func ProcessedKey(qname string, t time.Time) string
- func QueueKey(qname string) string
- func RetryKey(qname string) string
- func ScheduledKey(qname string) string
- func SchedulerEntriesKey(schedulerID string) string
- func SchedulerHistoryKey(entryID string) string
- func ServerInfoKey(hostname string, pid int, serverID string) string
- func UniqueKey(qname, tasktype string, payload map[string]interface{}) string
- func ValidateQueueName(qname string) error
- func WorkersKey(hostname string, pid int, serverID string) string
- type Broker
- type Cancelations
- type SchedulerEnqueueEvent
- type SchedulerEntry
- type ServerInfo
- type ServerStatus
- type ServerStatusValue
- type TaskMessage
- type WorkerInfo
- type Z
Constants ¶
const ( AllServers = "asynq:servers" // ZSET AllWorkers = "asynq:workers" // ZSET AllSchedulers = "asynq:schedulers" // ZSET AllQueues = "asynq:queues" // SET CancelChannel = "asynq:cancel" // PubSub channel )
Global Redis keys.
const DefaultQueueName = "default"
DefaultQueueName is the queue name used if none are specified by user.
const Version = "0.17.7"
Version of asynq library and CLI.
Variables ¶
var DefaultQueue = QueueKey(DefaultQueueName)
DefaultQueue is the redis key for the default queue.
Functions ¶
func ArchivedKey ¶
ArchivedKey returns a redis key for the archived tasks.
func DeadlinesKey ¶
DeadlinesKey returns a redis key for the deadlines.
func EncodeMessage ¶
func EncodeMessage(msg *TaskMessage) (string, error)
EncodeMessage marshals the given task message in JSON and returns an encoded string.
func ProcessedKey ¶
ProcessedKey returns a redis key for processed count for the given day for the queue.
func ScheduledKey ¶
ScheduledKey returns a redis key for the scheduled tasks.
func SchedulerEntriesKey ¶
SchedulerEntriesKey returns a redis key for the scheduler entries given scheduler ID.
func SchedulerHistoryKey ¶
SchedulerHistoryKey returns a redis key for the scheduler's history for the given entry.
func ServerInfoKey ¶
ServerInfoKey returns a redis key for process info.
func ValidateQueueName ¶
ValidateQueueName validates a given qname to be used as a queue name. Returns nil if valid, otherwise returns non-nil error.
Types ¶
type Broker ¶
type Broker interface { Ping() error Enqueue(msg *TaskMessage) error EnqueueUnique(msg *TaskMessage, ttl time.Duration) error Dequeue(qnames ...string) (*TaskMessage, time.Time, error) Done(msg *TaskMessage) error Requeue(msg *TaskMessage) error Schedule(msg *TaskMessage, processAt time.Time) error ScheduleUnique(msg *TaskMessage, processAt time.Time, ttl time.Duration) error Retry(msg *TaskMessage, processAt time.Time, errMsg string) error Archive(msg *TaskMessage, errMsg string) error CheckAndEnqueue(qnames ...string) error ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*TaskMessage, error) WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error ClearServerState(host string, pid int, serverID string) error CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers PublishCancelation(id string) error Close() error }
Broker is a message broker that supports operations to manage task queues.
See rdb.RDB as a reference implementation.
type Cancelations ¶
type Cancelations struct {
// contains filtered or unexported fields
}
Cancelations is a collection that holds cancel functions for all active tasks.
Cancelations are safe for concurrent use by multipel goroutines.
func NewCancelations ¶
func NewCancelations() *Cancelations
NewCancelations returns a Cancelations instance.
func (*Cancelations) Add ¶
func (c *Cancelations) Add(id string, fn context.CancelFunc)
Add adds a new cancel func to the collection.
func (*Cancelations) Delete ¶
func (c *Cancelations) Delete(id string)
Delete deletes a cancel func from the collection given an id.
func (*Cancelations) Get ¶
func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool)
Get returns a cancel func given an id.
type SchedulerEnqueueEvent ¶
type SchedulerEnqueueEvent struct { // ID of the task that was enqueued. TaskID string // Time the task was enqueued. EnqueuedAt time.Time }
SchedulerEnqueueEvent holds information about an enqueue event by a scheduler.
type SchedulerEntry ¶
type SchedulerEntry struct { // Identifier of this entry. ID string // Spec describes the schedule of this entry. Spec string // Type is the task type of the periodic task. Type string // Payload is the payload of the periodic task. Payload map[string]interface{} // Opts is the options for the periodic task. Opts []string // Next shows the next time the task will be enqueued. Next time.Time // Prev shows the last time the task was enqueued. // Zero time if task was never enqueued. Prev time.Time }
SchedulerEntry holds information about a periodic task registered with a scheduler.
type ServerInfo ¶
type ServerInfo struct { Host string PID int ServerID string Concurrency int Queues map[string]int StrictPriority bool Status string Started time.Time ActiveWorkerCount int }
ServerInfo holds information about a running server.
type ServerStatus ¶
type ServerStatus struct {
// contains filtered or unexported fields
}
ServerStatus represents status of a server. ServerStatus methods are concurrency safe.
func NewServerStatus ¶
func NewServerStatus(v ServerStatusValue) *ServerStatus
NewServerStatus returns a new status instance given an initial value.
func (*ServerStatus) Get ¶
func (s *ServerStatus) Get() ServerStatusValue
Get returns the status value.
func (*ServerStatus) Set ¶
func (s *ServerStatus) Set(v ServerStatusValue)
Set sets the status value.
func (*ServerStatus) String ¶
func (s *ServerStatus) String() string
type ServerStatusValue ¶
type ServerStatusValue int
const ( // StatusIdle indicates the server is in idle state. StatusIdle ServerStatusValue = iota // StatusRunning indicates the server is up and active. StatusRunning // StatusQuiet indicates the server is up but not active. StatusQuiet // StatusStopped indicates the server server has been stopped. StatusStopped )
type TaskMessage ¶
type TaskMessage struct { // Type indicates the kind of the task to be performed. Type string // Payload holds data needed to process the task. Payload map[string]interface{} // ID is a unique identifier for each task. ID uuid.UUID // Queue is a name this message should be enqueued to. Queue string // Retry is the max number of retry for this task. Retry int // Retried is the number of times we've retried this task so far. Retried int // ErrorMsg holds the error message from the last failure. ErrorMsg string // Timeout specifies timeout in seconds. // If task processing doesn't complete within the timeout, the task will be retried // if retry count is remaining. Otherwise it will be moved to the archive. // // Use zero to indicate no timeout. Timeout int64 // Deadline specifies the deadline for the task in Unix time, // the number of seconds elapsed since January 1, 1970 UTC. // If task processing doesn't complete before the deadline, the task will be retried // if retry count is remaining. Otherwise it will be moved to the archive. // // Use zero to indicate no deadline. Deadline int64 // UniqueKey holds the redis key used for uniqueness lock for this task. // // Empty string indicates that no uniqueness lock was used. UniqueKey string }
TaskMessage is the internal representation of a task with additional metadata fields. Serialized data of this type gets written to redis.
func DecodeMessage ¶
func DecodeMessage(s string) (*TaskMessage, error)
DecodeMessage unmarshals the given encoded string and returns a decoded task message.