base

package
v0.11.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2020 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package base defines foundational types and constants used in asynq package.

Index

Constants

View Source
const DefaultQueueName = "default"

DefaultQueueName is the queue name used if none are specified by user.

View Source
const Version = "0.10.0"

Version of asynq library and CLI.

Variables

View Source
var (
	AllServers = "asynq:servers" // ZSET

	AllWorkers = "asynq:workers" // ZSET

	QueuePrefix     = "asynq:queues:"                // LIST   - asynq:queues:<qname>
	AllQueues       = "asynq:queues"                 // SET
	DefaultQueue    = QueuePrefix + DefaultQueueName // LIST
	ScheduledQueue  = "asynq:scheduled"              // ZSET
	RetryQueue      = "asynq:retry"                  // ZSET
	DeadQueue       = "asynq:dead"                   // ZSET
	InProgressQueue = "asynq:in_progress"            // LIST
	KeyDeadlines    = "asynq:deadlines"              // ZSET
	PausedQueues    = "asynq:paused"                 // SET
	CancelChannel   = "asynq:cancel"                 // PubSub channel

)

Redis keys

Functions

func EncodeMessage

func EncodeMessage(msg *TaskMessage) (string, error)

EncodeMessage marshals the given task message in JSON and returns an encoded string.

func FailureKey

func FailureKey(t time.Time) string

FailureKey returns a redis key for failure count for the given day.

func ProcessedKey

func ProcessedKey(t time.Time) string

ProcessedKey returns a redis key for processed count for the given day.

func QueueKey

func QueueKey(qname string) string

QueueKey returns a redis key for the given queue name.

func ServerInfoKey

func ServerInfoKey(hostname string, pid int, sid string) string

ServerInfoKey returns a redis key for process info.

func SetBasePrefix

func SetBasePrefix(prefix string)

set base prefix only can set once before client or server launch

func WorkersKey

func WorkersKey(hostname string, pid int, sid string) string

WorkersKey returns a redis key for the workers given hostname, pid, and server ID.

Types

type Broker

type Broker interface {
	Ping() error
	Enqueue(msg *TaskMessage) error
	EnqueueUnique(msg *TaskMessage, ttl time.Duration) error
	Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
	Done(msg *TaskMessage) error
	Requeue(msg *TaskMessage) error
	Schedule(msg *TaskMessage, processAt time.Time) error
	ScheduleUnique(msg *TaskMessage, processAt time.Time, ttl time.Duration) error
	Retry(msg *TaskMessage, processAt time.Time, errMsg string) error
	Kill(msg *TaskMessage, errMsg string) error
	CheckAndEnqueue() error
	ListDeadlineExceeded(deadline time.Time) ([]*TaskMessage, error)
	WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
	ClearServerState(host string, pid int, serverID string) error
	CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers
	PublishCancelation(id string) error
	Close() error
}

Broker is a message broker that supports operations to manage task queues.

See rdb.RDB as a reference implementation.

type Cancelations

type Cancelations struct {
	// contains filtered or unexported fields
}

Cancelations is a collection that holds cancel functions for all in-progress tasks.

Cancelations are safe for concurrent use by multipel goroutines.

func NewCancelations

func NewCancelations() *Cancelations

NewCancelations returns a Cancelations instance.

func (*Cancelations) Add

func (c *Cancelations) Add(id string, fn context.CancelFunc)

Add adds a new cancel func to the collection.

func (*Cancelations) Delete

func (c *Cancelations) Delete(id string)

Delete deletes a cancel func from the collection given an id.

func (*Cancelations) Get

func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool)

Get returns a cancel func given an id.

type ServerInfo

type ServerInfo struct {
	Host              string
	PID               int
	ServerID          string
	Concurrency       int
	Queues            map[string]int
	StrictPriority    bool
	Status            string
	Started           time.Time
	ActiveWorkerCount int
}

ServerInfo holds information about a running server.

type ServerStatus

type ServerStatus struct {
	// contains filtered or unexported fields
}

ServerStatus represents status of a server. ServerStatus methods are concurrency safe.

func NewServerStatus

func NewServerStatus(v ServerStatusValue) *ServerStatus

NewServerStatus returns a new status instance given an initial value.

func (*ServerStatus) Get

Get returns the status value.

func (*ServerStatus) Set

func (s *ServerStatus) Set(v ServerStatusValue)

Set sets the status value.

func (*ServerStatus) String

func (s *ServerStatus) String() string

type ServerStatusValue

type ServerStatusValue int
const (
	// StatusIdle indicates the server is in idle state.
	StatusIdle ServerStatusValue = iota

	// StatusRunning indicates the servier is up and processing tasks.
	StatusRunning

	// StatusQuiet indicates the server is up but not processing new tasks.
	StatusQuiet

	// StatusStopped indicates the server server has been stopped.
	StatusStopped
)

type TaskMessage

type TaskMessage struct {
	// Type indicates the kind of the task to be performed.
	Type string

	// Payload holds data needed to process the task.
	Payload map[string]interface{}

	// ID is a unique identifier for each task.
	ID uuid.UUID

	// Queue is a name this message should be enqueued to.
	Queue string

	// Retry is the max number of retry for this task.
	Retry int

	// Retried is the number of times we've retried this task so far.
	Retried int

	// ErrorMsg holds the error message from the last failure.
	ErrorMsg string

	// Timeout specifies timeout in seconds.
	// If task processing doesn't complete within the timeout, the task will be retried
	// if retry count is remaining. Otherwise it will be moved to the dead queue.
	//
	// Use zero to indicate no timeout.
	Timeout int64

	// Deadline specifies the deadline for the task in Unix time,
	// the number of seconds elapsed since January 1, 1970 UTC.
	// If task processing doesn't complete before the deadline, the task will be retried
	// if retry count is remaining. Otherwise it will be moved to the dead queue.
	//
	// Use zero to indicate no deadline.
	Deadline int64

	// UniqueKey holds the redis key used for uniqueness lock for this task.
	//
	// Empty string indicates that no uniqueness lock was used.
	UniqueKey string
}

TaskMessage is the internal representation of a task with additional metadata fields. Serialized data of this type gets written to redis.

func DecodeMessage

func DecodeMessage(s string) (*TaskMessage, error)

DecodeMessage unmarshals the given encoded string and returns a decoded task message.

type WorkerInfo

type WorkerInfo struct {
	Host    string
	PID     int
	ID      string
	Type    string
	Queue   string
	Payload map[string]interface{}
	Started time.Time
}

WorkerInfo holds information about a running worker.

type Z added in v0.11.1

type Z struct {
	Message *TaskMessage
	Score   int64
}

Z represents sorted set member.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL