Versions in this module Expand all Collapse all v1 v1.0.1 Apr 8, 2024 v1.0.0 Apr 8, 2024 Changes in this version + const AllQueues + const AllSchedulers + const AllServers + const AllWorkers + const CancelChannel + const DefaultQueueName + const Version + var DefaultQueue = PendingKey(DefaultQueueName) + func ActiveKey(qname string) string + func AggregationSetKey(qname, gname, setID string) string + func AllAggregationSets(qname string) string + func AllGroups(qname string) string + func ArchivedKey(qname string) string + func CompletedKey(qname string) string + func EncodeMessage(msg *TaskMessage) ([]byte, error) + func EncodeSchedulerEnqueueEvent(event *SchedulerEnqueueEvent) ([]byte, error) + func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) + func EncodeServerInfo(info *ServerInfo) ([]byte, error) + func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) + func FailedKey(qname string, t time.Time) string + func FailedTotalKey(qname string) string + func GroupKey(qname, gkey string) string + func GroupKeyPrefix(qname string) string + func LeaseKey(qname string) string + func PausedKey(qname string) string + func PendingKey(qname string) string + func ProcessedKey(qname string, t time.Time) string + func ProcessedTotalKey(qname string) string + func QueueKeyPrefix(qname string) string + func RetryKey(qname string) string + func ScheduledKey(qname string) string + func SchedulerEntriesKey(schedulerID string) string + func SchedulerHistoryKey(entryID string) string + func ServerInfoKey(hostname string, pid int, serverID string) string + func TaskKey(qname, id string) string + func TaskKeyPrefix(qname string) string + func UniqueKey(qname, tasktype string, payload []byte) string + func ValidateQueueName(qname string) error + func WorkersKey(hostname string, pid int, serverID string) string + type Broker interface + AddToGroup func(ctx context.Context, msg *TaskMessage, gname string) error + AddToGroupUnique func(ctx context.Context, msg *TaskMessage, groupKey string, ttl time.Duration) error + AggregationCheck func(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, ...) (aggregationSetID string, err error) + Archive func(ctx context.Context, msg *TaskMessage, errMsg string) error + CancelationPubSub func() (*redis.PubSub, error) + ClearServerState func(host string, pid int, serverID string) error + Close func() error + DeleteAggregationSet func(ctx context.Context, qname, gname, aggregationSetID string) error + DeleteExpiredCompletedTasks func(qname string) error + Dequeue func(qnames ...string) (*TaskMessage, time.Time, error) + Done func(ctx context.Context, msg *TaskMessage) error + Enqueue func(ctx context.Context, msg *TaskMessage) error + EnqueueUnique func(ctx context.Context, msg *TaskMessage, ttl time.Duration) error + ExtendLease func(qname string, ids ...string) (time.Time, error) + ForwardIfReady func(qnames ...string) error + ListGroups func(qname string) ([]string, error) + ListLeaseExpired func(cutoff time.Time, qnames ...string) ([]*TaskMessage, error) + MarkAsComplete func(ctx context.Context, msg *TaskMessage) error + Ping func() error + PublishCancelation func(id string) error + ReadAggregationSet func(qname, gname, aggregationSetID string) ([]*TaskMessage, time.Time, error) + ReclaimStaleAggregationSets func(qname string) error + Requeue func(ctx context.Context, msg *TaskMessage) error + Retry func(ctx context.Context, msg *TaskMessage, processAt time.Time, errMsg string, ...) error + Schedule func(ctx context.Context, msg *TaskMessage, processAt time.Time) error + ScheduleUnique func(ctx context.Context, msg *TaskMessage, processAt time.Time, ttl time.Duration) error + WriteResult func(qname, id string, data []byte) (n int, err error) + WriteServerState func(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error + type Cancelations struct + func NewCancelations() *Cancelations + func (c *Cancelations) Add(id string, fn context.CancelFunc) + func (c *Cancelations) Delete(id string) + func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool) + type Lease struct + Clock timeutil.Clock + func NewLease(expirationTime time.Time) *Lease + func (l *Lease) Deadline() time.Time + func (l *Lease) Done() <-chan struct{} + func (l *Lease) IsValid() bool + func (l *Lease) NotifyExpiration() bool + func (l *Lease) Reset(expirationTime time.Time) bool + type SchedulerEnqueueEvent struct + EnqueuedAt time.Time + TaskID string + func DecodeSchedulerEnqueueEvent(b []byte) (*SchedulerEnqueueEvent, error) + type SchedulerEntry struct + ID string + Next time.Time + Opts []string + Payload []byte + Prev time.Time + Spec string + Type string + func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) + type ServerInfo struct + ActiveWorkerCount int + Concurrency int + Host string + PID int + Queues map[string]int + ServerID string + Started time.Time + Status string + StrictPriority bool + func DecodeServerInfo(b []byte) (*ServerInfo, error) + type TaskInfo struct + Message *TaskMessage + NextProcessAt time.Time + Result []byte + State TaskState + type TaskMessage struct + CompletedAt int64 + Deadline int64 + ErrorMsg string + GroupKey string + ID string + LastFailedAt int64 + Payload []byte + Queue string + Retention int64 + Retried int + Retry int + Timeout int64 + Type string + UniqueKey string + func DecodeMessage(data []byte) (*TaskMessage, error) + type TaskState int + const TaskStateActive + const TaskStateAggregating + const TaskStateArchived + const TaskStateCompleted + const TaskStatePending + const TaskStateRetry + const TaskStateScheduled + func TaskStateFromString(s string) (TaskState, error) + func (s TaskState) String() string + type WorkerInfo struct + Deadline time.Time + Host string + ID string + PID int + Payload []byte + Queue string + ServerID string + Started time.Time + Type string + func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) + type Z struct + Message *TaskMessage + Score int64