Documentation ¶
Index ¶
- type CachedQueueRepository
- type DualQueueRepository
- func (r *DualQueueRepository) CreateQueue(ctx *armadacontext.Context, queue queue.Queue) error
- func (r *DualQueueRepository) DeleteQueue(ctx *armadacontext.Context, name string) error
- func (r *DualQueueRepository) GetAllQueues(ctx *armadacontext.Context) ([]queue.Queue, error)
- func (r *DualQueueRepository) GetQueue(ctx *armadacontext.Context, name string) (queue.Queue, error)
- func (r *DualQueueRepository) UpdateQueue(ctx *armadacontext.Context, queue queue.Queue) error
- type ErrQueueAlreadyExists
- type ErrQueueNotFound
- type EventRepository
- type JobRepository
- type PostgresQueueRepository
- func (r *PostgresQueueRepository) CreateQueue(ctx *armadacontext.Context, queue queue.Queue) error
- func (r *PostgresQueueRepository) DeleteQueue(ctx *armadacontext.Context, name string) error
- func (r *PostgresQueueRepository) GetAllQueues(ctx *armadacontext.Context) ([]queue.Queue, error)
- func (r *PostgresQueueRepository) GetQueue(ctx *armadacontext.Context, name string) (queue.Queue, error)
- func (r *PostgresQueueRepository) UpdateQueue(ctx *armadacontext.Context, queue queue.Queue) error
- type QueueRepository
- type ReadOnlyQueueRepository
- type RedisEventRepository
- func (repo *RedisEventRepository) CheckStreamExists(ctx *armadacontext.Context, queue string, jobSetId string) (bool, error)
- func (repo *RedisEventRepository) GetLastMessageId(ctx *armadacontext.Context, queue, jobSetId string) (string, error)
- func (repo *RedisEventRepository) ReadEvents(ctx *armadacontext.Context, queue string, jobSetId string, lastId string, ...) ([]*api.EventStreamMessage, *sequence.ExternalSeqNo, error)
- type RedisHealth
- type RedisJobRepository
- func (repo *RedisJobRepository) ExpirePulsarSchedulerJobDetails(ctx *armadacontext.Context, jobIds []string) error
- func (repo *RedisJobRepository) GetPulsarSchedulerJobDetails(ctx *armadacontext.Context, jobId string) (*schedulerobjects.PulsarSchedulerJobDetails, error)
- func (repo *RedisJobRepository) StorePulsarSchedulerJobDetails(ctx *armadacontext.Context, ...) error
- type RedisQueueRepository
- func (r *RedisQueueRepository) CreateQueue(ctx *armadacontext.Context, queue queue.Queue) error
- func (r *RedisQueueRepository) DeleteQueue(ctx *armadacontext.Context, name string) error
- func (r *RedisQueueRepository) GetAllQueues(ctx *armadacontext.Context) ([]queue.Queue, error)
- func (r *RedisQueueRepository) GetQueue(ctx *armadacontext.Context, name string) (queue.Queue, error)
- func (r *RedisQueueRepository) UpdateQueue(ctx *armadacontext.Context, queue queue.Queue) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CachedQueueRepository ¶ added in v0.4.48
type CachedQueueRepository struct {
// contains filtered or unexported fields
}
CachedQueueRepository is an implementation of ReadOnlyQueueRepository that fetches ques periodically and caches them. This means the queue information may be slightly out of date but allows us to continue api operations even if the queue is unavailable
func NewCachedQueueRepository ¶ added in v0.4.48
func NewCachedQueueRepository(underlyingRepo QueueRepository, updateFrequency time.Duration) *CachedQueueRepository
func (*CachedQueueRepository) GetAllQueues ¶ added in v0.4.48
func (c *CachedQueueRepository) GetAllQueues(_ *armadacontext.Context) ([]queue.Queue, error)
func (*CachedQueueRepository) GetQueue ¶ added in v0.4.48
func (c *CachedQueueRepository) GetQueue(_ *armadacontext.Context, name string) (queue.Queue, error)
func (*CachedQueueRepository) Run ¶ added in v0.4.48
func (c *CachedQueueRepository) Run(ctx *armadacontext.Context) error
type DualQueueRepository ¶ added in v0.4.48
type DualQueueRepository struct {
// contains filtered or unexported fields
}
func NewDualQueueRepository ¶ added in v0.4.48
func NewDualQueueRepository(redis redis.UniversalClient, postgres *pgxpool.Pool, usePostgresForPrimary bool) *DualQueueRepository
func (*DualQueueRepository) CreateQueue ¶ added in v0.4.48
func (r *DualQueueRepository) CreateQueue(ctx *armadacontext.Context, queue queue.Queue) error
func (*DualQueueRepository) DeleteQueue ¶ added in v0.4.48
func (r *DualQueueRepository) DeleteQueue(ctx *armadacontext.Context, name string) error
func (*DualQueueRepository) GetAllQueues ¶ added in v0.4.48
func (r *DualQueueRepository) GetAllQueues(ctx *armadacontext.Context) ([]queue.Queue, error)
func (*DualQueueRepository) GetQueue ¶ added in v0.4.48
func (r *DualQueueRepository) GetQueue(ctx *armadacontext.Context, name string) (queue.Queue, error)
func (*DualQueueRepository) UpdateQueue ¶ added in v0.4.48
func (r *DualQueueRepository) UpdateQueue(ctx *armadacontext.Context, queue queue.Queue) error
type ErrQueueAlreadyExists ¶
type ErrQueueAlreadyExists struct {
QueueName string
}
func (*ErrQueueAlreadyExists) Error ¶
func (err *ErrQueueAlreadyExists) Error() string
type ErrQueueNotFound ¶
type ErrQueueNotFound struct {
QueueName string
}
func (*ErrQueueNotFound) Error ¶
func (err *ErrQueueNotFound) Error() string
type EventRepository ¶
type EventRepository interface { CheckStreamExists(ctx *armadacontext.Context, queue string, jobSetId string) (bool, error) ReadEvents(ctx *armadacontext.Context, queue, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, *sequence.ExternalSeqNo, error) GetLastMessageId(ctx *armadacontext.Context, queue, jobSetId string) (string, error) }
type JobRepository ¶
type JobRepository interface { StorePulsarSchedulerJobDetails(ctx *armadacontext.Context, jobDetails []*schedulerobjects.PulsarSchedulerJobDetails) error GetPulsarSchedulerJobDetails(ctx *armadacontext.Context, jobIds string) (*schedulerobjects.PulsarSchedulerJobDetails, error) ExpirePulsarSchedulerJobDetails(ctx *armadacontext.Context, jobId []string) error }
type PostgresQueueRepository ¶ added in v0.4.48
type PostgresQueueRepository struct {
// contains filtered or unexported fields
}
func NewPostgresQueueRepository ¶ added in v0.4.48
func NewPostgresQueueRepository(db *pgxpool.Pool) *PostgresQueueRepository
func (*PostgresQueueRepository) CreateQueue ¶ added in v0.4.48
func (r *PostgresQueueRepository) CreateQueue(ctx *armadacontext.Context, queue queue.Queue) error
func (*PostgresQueueRepository) DeleteQueue ¶ added in v0.4.48
func (r *PostgresQueueRepository) DeleteQueue(ctx *armadacontext.Context, name string) error
func (*PostgresQueueRepository) GetAllQueues ¶ added in v0.4.48
func (r *PostgresQueueRepository) GetAllQueues(ctx *armadacontext.Context) ([]queue.Queue, error)
func (*PostgresQueueRepository) GetQueue ¶ added in v0.4.48
func (r *PostgresQueueRepository) GetQueue(ctx *armadacontext.Context, name string) (queue.Queue, error)
func (*PostgresQueueRepository) UpdateQueue ¶ added in v0.4.48
func (r *PostgresQueueRepository) UpdateQueue(ctx *armadacontext.Context, queue queue.Queue) error
type QueueRepository ¶
type QueueRepository interface { GetAllQueues(ctx *armadacontext.Context) ([]queue.Queue, error) GetQueue(ctx *armadacontext.Context, name string) (queue.Queue, error) CreateQueue(*armadacontext.Context, queue.Queue) error UpdateQueue(*armadacontext.Context, queue.Queue) error DeleteQueue(ctx *armadacontext.Context, name string) error }
type ReadOnlyQueueRepository ¶ added in v0.4.48
type RedisEventRepository ¶
type RedisEventRepository struct {
// contains filtered or unexported fields
}
func NewEventRepository ¶
func NewEventRepository(db redis.UniversalClient) *RedisEventRepository
func (*RedisEventRepository) CheckStreamExists ¶
func (repo *RedisEventRepository) CheckStreamExists(ctx *armadacontext.Context, queue string, jobSetId string) (bool, error)
func (*RedisEventRepository) GetLastMessageId ¶
func (repo *RedisEventRepository) GetLastMessageId(ctx *armadacontext.Context, queue, jobSetId string) (string, error)
func (*RedisEventRepository) ReadEvents ¶
func (repo *RedisEventRepository) ReadEvents(ctx *armadacontext.Context, queue string, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, *sequence.ExternalSeqNo, error)
type RedisHealth ¶
type RedisHealth struct {
// contains filtered or unexported fields
}
func NewRedisHealth ¶
func NewRedisHealth(db redis.UniversalClient) *RedisHealth
func (*RedisHealth) Check ¶
func (r *RedisHealth) Check() error
type RedisJobRepository ¶
type RedisJobRepository struct {
// contains filtered or unexported fields
}
func NewRedisJobRepository ¶
func NewRedisJobRepository( db redis.UniversalClient, ) *RedisJobRepository
func (*RedisJobRepository) ExpirePulsarSchedulerJobDetails ¶ added in v0.4.5
func (repo *RedisJobRepository) ExpirePulsarSchedulerJobDetails(ctx *armadacontext.Context, jobIds []string) error
func (*RedisJobRepository) GetPulsarSchedulerJobDetails ¶ added in v0.3.49
func (repo *RedisJobRepository) GetPulsarSchedulerJobDetails(ctx *armadacontext.Context, jobId string) (*schedulerobjects.PulsarSchedulerJobDetails, error)
func (*RedisJobRepository) StorePulsarSchedulerJobDetails ¶ added in v0.3.49
func (repo *RedisJobRepository) StorePulsarSchedulerJobDetails(ctx *armadacontext.Context, jobDetails []*schedulerobjects.PulsarSchedulerJobDetails) error
type RedisQueueRepository ¶
type RedisQueueRepository struct {
// contains filtered or unexported fields
}
func NewRedisQueueRepository ¶
func NewRedisQueueRepository(db redis.UniversalClient) *RedisQueueRepository
func (*RedisQueueRepository) CreateQueue ¶
func (r *RedisQueueRepository) CreateQueue(ctx *armadacontext.Context, queue queue.Queue) error
func (*RedisQueueRepository) DeleteQueue ¶
func (r *RedisQueueRepository) DeleteQueue(ctx *armadacontext.Context, name string) error
func (*RedisQueueRepository) GetAllQueues ¶
func (r *RedisQueueRepository) GetAllQueues(ctx *armadacontext.Context) ([]queue.Queue, error)
func (*RedisQueueRepository) GetQueue ¶
func (r *RedisQueueRepository) GetQueue(ctx *armadacontext.Context, name string) (queue.Queue, error)
func (*RedisQueueRepository) UpdateQueue ¶
func (r *RedisQueueRepository) UpdateQueue(ctx *armadacontext.Context, queue queue.Queue) error
TODO If the queue to be updated is deleted between this method checking if the queue exists and making the update, the deleted queue is re-added to Redis. There's no "update if exists" operation in Redis, so we need to do this with a script or transaction.
Click to show internal directories.
Click to hide internal directories.