repository

package
v0.4.51 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CachedQueueRepository added in v0.4.48

type CachedQueueRepository struct {
	// contains filtered or unexported fields
}

CachedQueueRepository is an implementation of ReadOnlyQueueRepository that fetches ques periodically and caches them. This means the queue information may be slightly out of date but allows us to continue api operations even if the queue is unavailable

func NewCachedQueueRepository added in v0.4.48

func NewCachedQueueRepository(underlyingRepo QueueRepository, updateFrequency time.Duration) *CachedQueueRepository

func (*CachedQueueRepository) GetAllQueues added in v0.4.48

func (c *CachedQueueRepository) GetAllQueues(_ *armadacontext.Context) ([]queue.Queue, error)

func (*CachedQueueRepository) GetQueue added in v0.4.48

func (*CachedQueueRepository) Run added in v0.4.48

type DualQueueRepository added in v0.4.48

type DualQueueRepository struct {
	// contains filtered or unexported fields
}

func NewDualQueueRepository added in v0.4.48

func NewDualQueueRepository(redis redis.UniversalClient, postgres *pgxpool.Pool, usePostgresForPrimary bool) *DualQueueRepository

func (*DualQueueRepository) CreateQueue added in v0.4.48

func (r *DualQueueRepository) CreateQueue(ctx *armadacontext.Context, queue queue.Queue) error

func (*DualQueueRepository) DeleteQueue added in v0.4.48

func (r *DualQueueRepository) DeleteQueue(ctx *armadacontext.Context, name string) error

func (*DualQueueRepository) GetAllQueues added in v0.4.48

func (r *DualQueueRepository) GetAllQueues(ctx *armadacontext.Context) ([]queue.Queue, error)

func (*DualQueueRepository) GetQueue added in v0.4.48

func (r *DualQueueRepository) GetQueue(ctx *armadacontext.Context, name string) (queue.Queue, error)

func (*DualQueueRepository) UpdateQueue added in v0.4.48

func (r *DualQueueRepository) UpdateQueue(ctx *armadacontext.Context, queue queue.Queue) error

type ErrQueueAlreadyExists

type ErrQueueAlreadyExists struct {
	QueueName string
}

func (*ErrQueueAlreadyExists) Error

func (err *ErrQueueAlreadyExists) Error() string

type ErrQueueNotFound

type ErrQueueNotFound struct {
	QueueName string
}

func (*ErrQueueNotFound) Error

func (err *ErrQueueNotFound) Error() string

type EventRepository

type EventRepository interface {
	CheckStreamExists(ctx *armadacontext.Context, queue string, jobSetId string) (bool, error)
	ReadEvents(ctx *armadacontext.Context, queue, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, *sequence.ExternalSeqNo, error)
	GetLastMessageId(ctx *armadacontext.Context, queue, jobSetId string) (string, error)
}

type JobRepository

type JobRepository interface {
	StorePulsarSchedulerJobDetails(ctx *armadacontext.Context, jobDetails []*schedulerobjects.PulsarSchedulerJobDetails) error
	GetPulsarSchedulerJobDetails(ctx *armadacontext.Context, jobIds string) (*schedulerobjects.PulsarSchedulerJobDetails, error)
	ExpirePulsarSchedulerJobDetails(ctx *armadacontext.Context, jobId []string) error
}

type PostgresQueueRepository added in v0.4.48

type PostgresQueueRepository struct {
	// contains filtered or unexported fields
}

func NewPostgresQueueRepository added in v0.4.48

func NewPostgresQueueRepository(db *pgxpool.Pool) *PostgresQueueRepository

func (*PostgresQueueRepository) CreateQueue added in v0.4.48

func (r *PostgresQueueRepository) CreateQueue(ctx *armadacontext.Context, queue queue.Queue) error

func (*PostgresQueueRepository) DeleteQueue added in v0.4.48

func (r *PostgresQueueRepository) DeleteQueue(ctx *armadacontext.Context, name string) error

func (*PostgresQueueRepository) GetAllQueues added in v0.4.48

func (r *PostgresQueueRepository) GetAllQueues(ctx *armadacontext.Context) ([]queue.Queue, error)

func (*PostgresQueueRepository) GetQueue added in v0.4.48

func (*PostgresQueueRepository) UpdateQueue added in v0.4.48

func (r *PostgresQueueRepository) UpdateQueue(ctx *armadacontext.Context, queue queue.Queue) error

type QueueRepository

type QueueRepository interface {
	GetAllQueues(ctx *armadacontext.Context) ([]queue.Queue, error)
	GetQueue(ctx *armadacontext.Context, name string) (queue.Queue, error)
	CreateQueue(*armadacontext.Context, queue.Queue) error
	UpdateQueue(*armadacontext.Context, queue.Queue) error
	DeleteQueue(ctx *armadacontext.Context, name string) error
}

type ReadOnlyQueueRepository added in v0.4.48

type ReadOnlyQueueRepository interface {
	GetAllQueues(ctx *armadacontext.Context) ([]queue.Queue, error)
	GetQueue(ctx *armadacontext.Context, name string) (queue.Queue, error)
}

type RedisEventRepository

type RedisEventRepository struct {
	// contains filtered or unexported fields
}

func NewEventRepository

func NewEventRepository(db redis.UniversalClient) *RedisEventRepository

func (*RedisEventRepository) CheckStreamExists

func (repo *RedisEventRepository) CheckStreamExists(ctx *armadacontext.Context, queue string, jobSetId string) (bool, error)

func (*RedisEventRepository) GetLastMessageId

func (repo *RedisEventRepository) GetLastMessageId(ctx *armadacontext.Context, queue, jobSetId string) (string, error)

func (*RedisEventRepository) ReadEvents

func (repo *RedisEventRepository) ReadEvents(ctx *armadacontext.Context, queue string, jobSetId string, lastId string, limit int64, block time.Duration) ([]*api.EventStreamMessage, *sequence.ExternalSeqNo, error)

type RedisHealth

type RedisHealth struct {
	// contains filtered or unexported fields
}

func NewRedisHealth

func NewRedisHealth(db redis.UniversalClient) *RedisHealth

func (*RedisHealth) Check

func (r *RedisHealth) Check() error

type RedisJobRepository

type RedisJobRepository struct {
	// contains filtered or unexported fields
}

func NewRedisJobRepository

func NewRedisJobRepository(
	db redis.UniversalClient,
) *RedisJobRepository

func (*RedisJobRepository) ExpirePulsarSchedulerJobDetails added in v0.4.5

func (repo *RedisJobRepository) ExpirePulsarSchedulerJobDetails(ctx *armadacontext.Context, jobIds []string) error

func (*RedisJobRepository) GetPulsarSchedulerJobDetails added in v0.3.49

func (repo *RedisJobRepository) GetPulsarSchedulerJobDetails(ctx *armadacontext.Context, jobId string) (*schedulerobjects.PulsarSchedulerJobDetails, error)

func (*RedisJobRepository) StorePulsarSchedulerJobDetails added in v0.3.49

func (repo *RedisJobRepository) StorePulsarSchedulerJobDetails(ctx *armadacontext.Context, jobDetails []*schedulerobjects.PulsarSchedulerJobDetails) error

type RedisQueueRepository

type RedisQueueRepository struct {
	// contains filtered or unexported fields
}

func NewRedisQueueRepository

func NewRedisQueueRepository(db redis.UniversalClient) *RedisQueueRepository

func (*RedisQueueRepository) CreateQueue

func (r *RedisQueueRepository) CreateQueue(ctx *armadacontext.Context, queue queue.Queue) error

func (*RedisQueueRepository) DeleteQueue

func (r *RedisQueueRepository) DeleteQueue(ctx *armadacontext.Context, name string) error

func (*RedisQueueRepository) GetAllQueues

func (r *RedisQueueRepository) GetAllQueues(ctx *armadacontext.Context) ([]queue.Queue, error)

func (*RedisQueueRepository) GetQueue

func (r *RedisQueueRepository) GetQueue(ctx *armadacontext.Context, name string) (queue.Queue, error)

func (*RedisQueueRepository) UpdateQueue

func (r *RedisQueueRepository) UpdateQueue(ctx *armadacontext.Context, queue queue.Queue) error

TODO If the queue to be updated is deleted between this method checking if the queue exists and making the update, the deleted queue is re-added to Redis. There's no "update if exists" operation in Redis, so we need to do this with a script or transaction.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL