Versions in this module Expand all Collapse all v0 v0.0.3 Apr 15, 2024 v0.0.2 Aug 29, 2023 v0.0.1 Apr 21, 2023 Changes in this version + const LeaseDuration + type DailyStats struct + Failed int + Processed int + Queue string + Time time.Time + type GroupStat struct + Group string + Size int + type Pagination struct + Page int + Size int + type RDB struct + func NewRDB(client redis.UniversalClient) *RDB + func (r *RDB) AddToGroup(ctx context.Context, msg *base.TaskMessage, groupKey string) error + func (r *RDB) AddToGroupUnique(ctx context.Context, msg *base.TaskMessage, groupKey string, ttl time.Duration) error + func (r *RDB) AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, ...) (string, error) + func (r *RDB) AllQueues() ([]string, error) + func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) error + func (r *RDB) ArchiveAllAggregatingTasks(qname, gname string) (int64, error) + func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) + func (r *RDB) ArchiveAllRetryTasks(qname string) (int64, error) + func (r *RDB) ArchiveAllScheduledTasks(qname string) (int64, error) + func (r *RDB) ArchiveTask(qname, id string) error + func (r *RDB) CancelationPubSub() (*redis.PubSub, error) + func (r *RDB) ClearSchedulerEntries(scheduelrID string) error + func (r *RDB) ClearSchedulerHistory(entryID string) error + func (r *RDB) ClearServerState(host string, pid int, serverID string) error + func (r *RDB) Client() redis.UniversalClient + func (r *RDB) Close() error + func (r *RDB) ClusterKeySlot(qname string) (int64, error) + func (r *RDB) ClusterNodes(qname string) ([]redis.ClusterNode, error) + func (r *RDB) CurrentStats(qname string) (*Stats, error) + func (r *RDB) DeleteAggregationSet(ctx context.Context, qname, gname, setID string) error + func (r *RDB) DeleteAllAggregatingTasks(qname, gname string) (int64, error) + func (r *RDB) DeleteAllArchivedTasks(qname string) (int64, error) + func (r *RDB) DeleteAllCompletedTasks(qname string) (int64, error) + func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error) + func (r *RDB) DeleteAllRetryTasks(qname string) (int64, error) + func (r *RDB) DeleteAllScheduledTasks(qname string) (int64, error) + func (r *RDB) DeleteExpiredCompletedTasks(qname string) error + func (r *RDB) DeleteTask(qname, id string) error + func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, leaseExpirationTime time.Time, err error) + func (r *RDB) Done(ctx context.Context, msg *base.TaskMessage) error + func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error + func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time.Duration) error + func (r *RDB) ExtendLease(qname string, ids ...string) (expirationTime time.Time, err error) + func (r *RDB) ForwardIfReady(qnames ...string) error + func (r *RDB) GetTaskInfo(qname, id string) (*base.TaskInfo, error) + func (r *RDB) GroupStats(qname string) ([]*GroupStat, error) + func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error) + func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskInfo, error) + func (r *RDB) ListAggregating(qname, gname string, pgn Pagination) ([]*base.TaskInfo, error) + func (r *RDB) ListArchived(qname string, pgn Pagination) ([]*base.TaskInfo, error) + func (r *RDB) ListCompleted(qname string, pgn Pagination) ([]*base.TaskInfo, error) + func (r *RDB) ListGroups(qname string) ([]string, error) + func (r *RDB) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) + func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskInfo, error) + func (r *RDB) ListRetry(qname string, pgn Pagination) ([]*base.TaskInfo, error) + func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]*base.TaskInfo, error) + func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*base.SchedulerEnqueueEvent, error) + func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) + func (r *RDB) ListServers() ([]*base.ServerInfo, error) + func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) + func (r *RDB) MarkAsComplete(ctx context.Context, msg *base.TaskMessage) error + func (r *RDB) Pause(qname string) error + func (r *RDB) Ping() error + func (r *RDB) PublishCancelation(id string) error + func (r *RDB) ReadAggregationSet(qname, gname, setID string) ([]*base.TaskMessage, time.Time, error) + func (r *RDB) ReclaimStaleAggregationSets(qname string) error + func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error + func (r *RDB) RedisClusterInfo() (map[string]string, error) + func (r *RDB) RedisInfo() (map[string]string, error) + func (r *RDB) RemoveQueue(qname string, force bool) error + func (r *RDB) Requeue(ctx context.Context, msg *base.TaskMessage) error + func (r *RDB) Retry(ctx context.Context, msg *base.TaskMessage, processAt time.Time, errMsg string, ...) error + func (r *RDB) RunAllAggregatingTasks(qname, gname string) (int64, error) + func (r *RDB) RunAllArchivedTasks(qname string) (int64, error) + func (r *RDB) RunAllRetryTasks(qname string) (int64, error) + func (r *RDB) RunAllScheduledTasks(qname string) (int64, error) + func (r *RDB) RunTask(qname, id string) error + func (r *RDB) Schedule(ctx context.Context, msg *base.TaskMessage, processAt time.Time) error + func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, processAt time.Time, ...) error + func (r *RDB) SetClock(c timeutil.Clock) + func (r *RDB) Unpause(qname string) error + func (r *RDB) WriteResult(qname, taskID string, data []byte) (int, error) + func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.SchedulerEntry, ttl time.Duration) error + func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error + type Stats struct + Active int + Aggregating int + Archived int + Completed int + Failed int + FailedTotal int + Groups int + Latency time.Duration + MemoryUsage int64 + Paused bool + Pending int + Processed int + ProcessedTotal int + Queue string + Retry int + Scheduled int + Size int + Timestamp time.Time