Documentation ¶
Index ¶
- type ErrQueueAlreadyExists
- type ErrQueueNotFound
- type EventRepository
- type JobRepository
- type QueueRepository
- type RedisEventRepository
- func (repo *RedisEventRepository) CheckStreamExists(queue string, jobSetId string) (bool, error)
- func (repo *RedisEventRepository) GetLastMessageId(queue, jobSetId string) (string, error)
- func (repo *RedisEventRepository) ReadEvents(queue string, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, *sequence.ExternalSeqNo, error)
- type RedisHealth
- type RedisJobRepository
- func (repo *RedisJobRepository) ExpirePulsarSchedulerJobDetails(jobIds []string) error
- func (repo *RedisJobRepository) GetPulsarSchedulerJobDetails(jobId string) (*schedulerobjects.PulsarSchedulerJobDetails, error)
- func (repo *RedisJobRepository) StorePulsarSchedulerJobDetails(jobDetails []*schedulerobjects.PulsarSchedulerJobDetails) error
- type RedisQueueRepository
- func (r *RedisQueueRepository) CreateQueue(queue queue.Queue) error
- func (r *RedisQueueRepository) DeleteQueue(name string) error
- func (r *RedisQueueRepository) GetAllQueues() ([]queue.Queue, error)
- func (r *RedisQueueRepository) GetQueue(name string) (queue.Queue, error)
- func (r *RedisQueueRepository) UpdateQueue(queue queue.Queue) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ErrQueueAlreadyExists ¶
type ErrQueueAlreadyExists struct {
QueueName string
}
func (*ErrQueueAlreadyExists) Error ¶
func (err *ErrQueueAlreadyExists) Error() string
type ErrQueueNotFound ¶
type ErrQueueNotFound struct {
QueueName string
}
func (*ErrQueueNotFound) Error ¶
func (err *ErrQueueNotFound) Error() string
type EventRepository ¶
type JobRepository ¶
type JobRepository interface { StorePulsarSchedulerJobDetails(jobDetails []*schedulerobjects.PulsarSchedulerJobDetails) error GetPulsarSchedulerJobDetails(jobIds string) (*schedulerobjects.PulsarSchedulerJobDetails, error) ExpirePulsarSchedulerJobDetails(jobId []string) error }
type QueueRepository ¶
type RedisEventRepository ¶
type RedisEventRepository struct {
// contains filtered or unexported fields
}
func NewEventRepository ¶
func NewEventRepository(db redis.UniversalClient) *RedisEventRepository
func (*RedisEventRepository) CheckStreamExists ¶
func (repo *RedisEventRepository) CheckStreamExists(queue string, jobSetId string) (bool, error)
func (*RedisEventRepository) GetLastMessageId ¶
func (repo *RedisEventRepository) GetLastMessageId(queue, jobSetId string) (string, error)
func (*RedisEventRepository) ReadEvents ¶
func (repo *RedisEventRepository) ReadEvents(queue string, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, *sequence.ExternalSeqNo, error)
type RedisHealth ¶
type RedisHealth struct {
// contains filtered or unexported fields
}
func NewRedisHealth ¶
func NewRedisHealth(db redis.UniversalClient) *RedisHealth
func (*RedisHealth) Check ¶
func (r *RedisHealth) Check() error
type RedisJobRepository ¶
type RedisJobRepository struct {
// contains filtered or unexported fields
}
func NewRedisJobRepository ¶
func NewRedisJobRepository( db redis.UniversalClient, ) *RedisJobRepository
func (*RedisJobRepository) ExpirePulsarSchedulerJobDetails ¶ added in v0.4.5
func (repo *RedisJobRepository) ExpirePulsarSchedulerJobDetails(jobIds []string) error
func (*RedisJobRepository) GetPulsarSchedulerJobDetails ¶ added in v0.3.49
func (repo *RedisJobRepository) GetPulsarSchedulerJobDetails(jobId string) (*schedulerobjects.PulsarSchedulerJobDetails, error)
func (*RedisJobRepository) StorePulsarSchedulerJobDetails ¶ added in v0.3.49
func (repo *RedisJobRepository) StorePulsarSchedulerJobDetails(jobDetails []*schedulerobjects.PulsarSchedulerJobDetails) error
type RedisQueueRepository ¶
type RedisQueueRepository struct {
// contains filtered or unexported fields
}
func NewRedisQueueRepository ¶
func NewRedisQueueRepository(db redis.UniversalClient) *RedisQueueRepository
func (*RedisQueueRepository) CreateQueue ¶
func (r *RedisQueueRepository) CreateQueue(queue queue.Queue) error
func (*RedisQueueRepository) DeleteQueue ¶
func (r *RedisQueueRepository) DeleteQueue(name string) error
func (*RedisQueueRepository) GetAllQueues ¶
func (r *RedisQueueRepository) GetAllQueues() ([]queue.Queue, error)
func (*RedisQueueRepository) GetQueue ¶
func (r *RedisQueueRepository) GetQueue(name string) (queue.Queue, error)
func (*RedisQueueRepository) UpdateQueue ¶
func (r *RedisQueueRepository) UpdateQueue(queue queue.Queue) error
TODO If the queue to be updated is deleted between this method checking if the queue exists and making the update, the deleted queue is re-added to Redis. There's no "update if exists" operation in Redis, so we need to do this with a script or transaction.
Click to show internal directories.
Click to hide internal directories.