queue

package
v0.0.0-...-c3e479f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2024 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const MaxTaskRetries = 3 // Maximum number of times a task can be retried before failing

Variables

View Source
var (
	ErrNotExist           = fmt.Errorf("task does not exist")
	ErrNotRunning         = fmt.Errorf("task is not running")
	ErrTaskCanceled       = fmt.Errorf("task was canceled")
	ErrContextCanceled    = fmt.Errorf("dequeue context timed out or was canceled")
	ErrRowsNotAffected    = fmt.Errorf("no rows were affected")
	ErrMaxRetriesExceeded = fmt.Errorf("task has exceeded the maximum amount of retries")
	ErrNotCancellable     = fmt.Errorf("task not cancellable")
)

Functions

func NewPgxPool

func NewPgxPool(ctx context.Context, url string) (*pgxpool.Pool, error)

Types

type Connection

type Connection interface {
	Transaction
	Conn() *pgx.Conn
	Release()
}

type FakePgxPoolWrapper

type FakePgxPoolWrapper struct {
	// contains filtered or unexported fields
}

FakePgxPoolWrapper is used for testing, to provide a pool interface implementation that doesn't actually use the pool in order to wrap all work in a transaction. This shouldn't be used in production, as it is based off a single transaction

func (*FakePgxPoolWrapper) Acquire

func (*FakePgxPoolWrapper) Begin

func (p *FakePgxPoolWrapper) Begin(ctx context.Context) (pgx.Tx, error)

func (*FakePgxPoolWrapper) Close

func (p *FakePgxPoolWrapper) Close()

func (*FakePgxPoolWrapper) Conn

func (p *FakePgxPoolWrapper) Conn() *pgx.Conn

func (*FakePgxPoolWrapper) Exec

func (p *FakePgxPoolWrapper) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)

func (*FakePgxPoolWrapper) Query

func (p *FakePgxPoolWrapper) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

func (*FakePgxPoolWrapper) QueryRow

func (p *FakePgxPoolWrapper) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

func (*FakePgxPoolWrapper) Release

func (p *FakePgxPoolWrapper) Release()

type MockQueue

type MockQueue struct {
	mock.Mock
}

MockQueue is an autogenerated mock type for the Queue type

func NewMockQueue

func NewMockQueue(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockQueue

NewMockQueue creates a new instance of MockQueue. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockQueue) Cancel

func (_m *MockQueue) Cancel(ctx context.Context, taskId uuid.UUID) error

Cancel provides a mock function with given fields: ctx, taskId

func (*MockQueue) Dequeue

func (_m *MockQueue) Dequeue(ctx context.Context, taskTypes []string) (*models.TaskInfo, error)

Dequeue provides a mock function with given fields: ctx, taskTypes

func (*MockQueue) Enqueue

func (_m *MockQueue) Enqueue(task *Task) (uuid.UUID, error)

Enqueue provides a mock function with given fields: task

func (*MockQueue) Finish

func (_m *MockQueue) Finish(taskId uuid.UUID, taskError error) error

Finish provides a mock function with given fields: taskId, taskError

func (*MockQueue) Heartbeats

func (_m *MockQueue) Heartbeats(olderThan time.Duration) []uuid.UUID

Heartbeats provides a mock function with given fields: olderThan

func (*MockQueue) IdFromToken

func (_m *MockQueue) IdFromToken(token uuid.UUID) (uuid.UUID, bool, error)

IdFromToken provides a mock function with given fields: token

func (*MockQueue) ListenForCancel

func (_m *MockQueue) ListenForCancel(ctx context.Context, taskID uuid.UUID, cancelFunc context.CancelCauseFunc)

ListenForCancel provides a mock function with given fields: ctx, taskID, cancelFunc

func (*MockQueue) RefreshHeartbeat

func (_m *MockQueue) RefreshHeartbeat(token uuid.UUID) error

RefreshHeartbeat provides a mock function with given fields: token

func (*MockQueue) Requeue

func (_m *MockQueue) Requeue(taskId uuid.UUID) error

Requeue provides a mock function with given fields: taskId

func (*MockQueue) RequeueFailedTasks

func (_m *MockQueue) RequeueFailedTasks(taskTypes []string) error

RequeueFailedTasks provides a mock function with given fields: taskTypes

func (*MockQueue) Status

func (_m *MockQueue) Status(taskId uuid.UUID) (*models.TaskInfo, error)

Status provides a mock function with given fields: taskId

func (*MockQueue) UpdatePayload

func (_m *MockQueue) UpdatePayload(task *models.TaskInfo, payload interface{}) (*models.TaskInfo, error)

UpdatePayload provides a mock function with given fields: task, payload

type PgQueue

type PgQueue struct {
	Pool Pool
	// contains filtered or unexported fields
}

PgQueue a task queue backed by postgres, using pgxpool.Pool using a wrapper (PgxPoolWrapper) that implements a Pool interface

func NewPgQueue

func NewPgQueue(ctx context.Context, url string) (PgQueue, error)

func (*PgQueue) Cancel

func (p *PgQueue) Cancel(ctx context.Context, taskId uuid.UUID) error

func (*PgQueue) Close

func (p *PgQueue) Close()

func (*PgQueue) Dequeue

func (p *PgQueue) Dequeue(ctx context.Context, taskTypes []string) (*models.TaskInfo, error)

func (*PgQueue) Enqueue

func (p *PgQueue) Enqueue(task *Task) (uuid.UUID, error)

func (*PgQueue) Finish

func (p *PgQueue) Finish(taskId uuid.UUID, taskError error) error

func (*PgQueue) Heartbeats

func (p *PgQueue) Heartbeats(olderThan time.Duration) []uuid.UUID

func (*PgQueue) IdFromToken

func (p *PgQueue) IdFromToken(token uuid.UUID) (id uuid.UUID, isRunning bool, err error)

func (*PgQueue) ListenForCancel

func (p *PgQueue) ListenForCancel(ctx context.Context, taskID uuid.UUID, cancelFunc context.CancelCauseFunc)

func (*PgQueue) RefreshHeartbeat

func (p *PgQueue) RefreshHeartbeat(token uuid.UUID) error

Reset the last heartbeat time to time.Now()

func (*PgQueue) RemoveAllTasks

func (p *PgQueue) RemoveAllTasks() error

RemoveAllTasks used for tests, along with testTx, to clear tables before running tests

func (*PgQueue) Requeue

func (p *PgQueue) Requeue(taskId uuid.UUID) error

func (*PgQueue) RequeueFailedTasks

func (p *PgQueue) RequeueFailedTasks(taskTypes []string) error

func (*PgQueue) Status

func (p *PgQueue) Status(taskId uuid.UUID) (*models.TaskInfo, error)

func (*PgQueue) UpdatePayload

func (p *PgQueue) UpdatePayload(task *models.TaskInfo, payload interface{}) (*models.TaskInfo, error)

type PgxConnWrapper

type PgxConnWrapper struct {
	// contains filtered or unexported fields
}

PgxConnWrapper wraps a pgxpool Conn in a generic interface to allow for alternative implementations, such as the FakePgxPoolWrapper

func (*PgxConnWrapper) Begin

func (p *PgxConnWrapper) Begin(ctx context.Context) (pgx.Tx, error)

func (*PgxConnWrapper) Conn

func (p *PgxConnWrapper) Conn() *pgx.Conn

func (*PgxConnWrapper) Exec

func (p *PgxConnWrapper) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)

func (*PgxConnWrapper) Query

func (p *PgxConnWrapper) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

func (*PgxConnWrapper) QueryRow

func (p *PgxConnWrapper) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

func (*PgxConnWrapper) Release

func (p *PgxConnWrapper) Release()

type PgxPoolWrapper

type PgxPoolWrapper struct {
	// contains filtered or unexported fields
}

PgxPoolWrapper wraps a pgx Pool in a generic interface to allow for alternative implementations, such as the FakePgxPoolWrapper

func (*PgxPoolWrapper) Acquire

func (p *PgxPoolWrapper) Acquire(ctx context.Context) (Connection, error)

func (*PgxPoolWrapper) Begin

func (p *PgxPoolWrapper) Begin(ctx context.Context) (pgx.Tx, error)

func (*PgxPoolWrapper) Close

func (p *PgxPoolWrapper) Close()

func (*PgxPoolWrapper) Exec

func (p *PgxPoolWrapper) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)

func (*PgxPoolWrapper) Query

func (p *PgxPoolWrapper) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

func (*PgxPoolWrapper) QueryRow

func (p *PgxPoolWrapper) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

type Pool

type Pool interface {
	Transaction
	Acquire(ctx context.Context) (Connection, error)
	Close()
}

Pool matches the pgxpool.Pool struct

type Queue

type Queue interface {
	// Enqueue Enqueues a job
	Enqueue(task *Task) (uuid.UUID, error)
	// Dequeue Dequeues a job of a type in taskTypes, blocking until one is available.
	Dequeue(ctx context.Context, taskTypes []string) (*models.TaskInfo, error)
	// Status returns Status of the given task
	Status(taskId uuid.UUID) (*models.TaskInfo, error)
	// Finish finishes given task, setting status to completed or failed if taskError is not nil
	Finish(taskId uuid.UUID, taskError error) error
	// Requeue requeues the given task
	Requeue(taskId uuid.UUID) error
	// Heartbeats returns the tokens of all tasks older than given duration
	Heartbeats(olderThan time.Duration) []uuid.UUID
	// IdFromToken returns a task's ID given its token
	IdFromToken(token uuid.UUID) (id uuid.UUID, isRunning bool, err error)
	// RefreshHeartbeat refresh heartbeat of task given its token
	RefreshHeartbeat(token uuid.UUID) error
	// UpdatePayload update the payload on a task
	UpdatePayload(task *models.TaskInfo, payload interface{}) (*models.TaskInfo, error)
	// ListenForCancel registers a channel and listens for notification for given task, then calls cancelFunc on receive. Should run as goroutine.
	ListenForCancel(ctx context.Context, taskID uuid.UUID, cancelFunc context.CancelCauseFunc)
	// Cancel sends notification to cancel given task and sets task state to canceled
	Cancel(ctx context.Context, taskId uuid.UUID) error
	// RequeueFailedTasks requeues all failed tasks of taskTypes to the queue
	RequeueFailedTasks(taskTypes []string) error
}

type Task

type Task struct {
	Typename     string
	Payload      interface{}
	Dependencies []uuid.UUID
	OrgId        string
	AccountId    string
	ObjectUUID   *string
	ObjectType   *string
	RequestID    string
	Priority     int
}

type Transaction

type Transaction interface {
	Begin(ctx context.Context) (pgx.Tx, error)
	Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
	Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
}

Transaction mimics the pgx.Tx struct

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL