Versions in this module Expand all Collapse all v0 v0.23.14 Sep 22, 2022 v0.23.13 Sep 22, 2022 v0.23.12 Sep 17, 2022 v0.23.11 Sep 17, 2022 v0.23.10 Sep 16, 2022 v0.23.3 Sep 16, 2022 v0.23.2 Sep 16, 2022 Changes in this version + const AllQueues + const AllSchedulers + const AllServers + const AllWorkers + const CancelChannel + const DefaultQueueName + const Version + var DefaultQueue = PendingKey(DefaultQueueName) + func ActiveKey(qname string) string + func AggregationSetKey(qname, gname, setID string) string + func AllAggregationSets(qname string) string + func AllGroups(qname string) string + func ArchivedKey(qname string) string + func CompletedKey(qname string) string + func EncodeMessage(msg *TaskMessage) ([]byte, error) + func EncodeSchedulerEnqueueEvent(event *SchedulerEnqueueEvent) ([]byte, error) + func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) + func EncodeServerInfo(info *ServerInfo) ([]byte, error) + func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) + func FailedKey(qname string, t time.Time) string + func FailedTotalKey(qname string) string + func GroupKey(qname, gkey string) string + func GroupKeyPrefix(qname string) string + func LeaseKey(qname string) string + func PausedKey(qname string) string + func PendingKey(qname string) string + func ProcessedKey(qname string, t time.Time) string + func ProcessedTotalKey(qname string) string + func QueueKeyPrefix(qname string) string + func RetryKey(qname string) string + func ScheduledKey(qname string) string + func SchedulerEntriesKey(schedulerID string) string + func SchedulerHistoryKey(entryID string) string + func ServerInfoKey(hostname string, pid int, serverID string) string + func TaskKey(qname, id string) string + func TaskKeyPrefix(qname string) string + func UniqueKey(qname, tasktype string, payload []byte) string + func ValidateQueueName(qname string) error + func WorkersKey(hostname string, pid int, serverID string) string + type Broker interface + AddToGroup func(ctx context.Context, msg *TaskMessage, gname string) error + AddToGroupUnique func(ctx context.Context, msg *TaskMessage, groupKey string, ttl time.Duration) error + AggregationCheck func(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, ...) (aggregationSetID string, err error) + Archive func(ctx context.Context, msg *TaskMessage, errMsg string) error + CancelationPubSub func() (*redis.PubSub, error) + ClearServerState func(host string, pid int, serverID string) error + Close func() error + DeleteAggregationSet func(ctx context.Context, qname, gname, aggregationSetID string) error + DeleteExpiredCompletedTasks func(qname string) error + Dequeue func(qnames ...string) (*TaskMessage, time.Time, error) + Done func(ctx context.Context, msg *TaskMessage) error + Enqueue func(ctx context.Context, msg *TaskMessage) error + EnqueueUnique func(ctx context.Context, msg *TaskMessage, ttl time.Duration) error + ExtendLease func(qname string, ids ...string) (time.Time, error) + ForwardIfReady func(qnames ...string) error + ListGroups func(qname string) ([]string, error) + ListLeaseExpired func(cutoff time.Time, qnames ...string) ([]*TaskMessage, error) + MarkAsComplete func(ctx context.Context, msg *TaskMessage) error + Ping func() error + PublishCancelation func(id string) error + ReadAggregationSet func(qname, gname, aggregationSetID string) ([]*TaskMessage, time.Time, error) + ReclaimStaleAggregationSets func(qname string) error + Requeue func(ctx context.Context, msg *TaskMessage) error + Retry func(ctx context.Context, msg *TaskMessage, processAt time.Time, errMsg string, ...) error + Schedule func(ctx context.Context, msg *TaskMessage, processAt time.Time) error + ScheduleUnique func(ctx context.Context, msg *TaskMessage, processAt time.Time, ttl time.Duration) error + WriteResult func(qname, id string, data []byte) (n int, err error) + WriteServerState func(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error + type Cancelations struct + func NewCancelations() *Cancelations + func (c *Cancelations) Add(id string, fn context.CancelFunc) + func (c *Cancelations) Delete(id string) + func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool) + type Lease struct + Clock timeutil.Clock + func NewLease(expirationTime time.Time) *Lease + func (l *Lease) Deadline() time.Time + func (l *Lease) Done() <-chan struct{} + func (l *Lease) IsValid() bool + func (l *Lease) NotifyExpiration() bool + func (l *Lease) Reset(expirationTime time.Time) bool + type SchedulerEnqueueEvent struct + EnqueuedAt time.Time + TaskID string + func DecodeSchedulerEnqueueEvent(b []byte) (*SchedulerEnqueueEvent, error) + type SchedulerEntry struct + ID string + Next time.Time + Opts []string + Payload []byte + Prev time.Time + Spec string + Type string + func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) + type ServerInfo struct + ActiveWorkerCount int + Concurrency int + Host string + PID int + Queues map[string]int + ServerID string + Started time.Time + Status string + StrictPriority bool + func DecodeServerInfo(b []byte) (*ServerInfo, error) + type TaskInfo struct + Message *TaskMessage + NextProcessAt time.Time + Result []byte + State TaskState + type TaskMessage struct + CompletedAt int64 + Deadline int64 + ErrorMsg string + GroupKey string + ID string + LastFailedAt int64 + Payload []byte + Queue string + Retention int64 + Retried int + Retry int + Timeout int64 + Type string + UniqueKey string + func DecodeMessage(data []byte) (*TaskMessage, error) + type TaskState int + const TaskStateActive + const TaskStateAggregating + const TaskStateArchived + const TaskStateCompleted + const TaskStatePending + const TaskStateRetry + const TaskStateScheduled + func TaskStateFromString(s string) (TaskState, error) + func (s TaskState) String() string + type WorkerInfo struct + Deadline time.Time + Host string + ID string + PID int + Payload []byte + Queue string + ServerID string + Started time.Time + Type string + func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) + type Z struct + Message *TaskMessage + Score int64