queue

package
v3.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2025 License: Apache-2.0, BSD-3-Clause Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrCancel indicates the task was canceled.
	ErrCancel = errors.New("queue: task canceled")

	// ErrNotFound indicates the task was not found in the queue.
	ErrNotFound = errors.New("queue: task not found")

	// ErrAgentMissMatch indicates a task is assigned to a different agent.
	ErrAgentMissMatch = errors.New("task assigned to different agent")
)
View Source
var ErrWorkerKicked = fmt.Errorf("worker was kicked")

Functions

This section is empty.

Types

type Config

type Config struct {
	Backend Type
	Store   store.Store
}

Config holds the configuration for the queue.

type FilterFn

type FilterFn func(*model.Task) (bool, int)

Filter filters tasks in the queue. If the Filter returns false, the Task is skipped and not returned to the subscriber. The int return value represents the matching score (higher is better).

type InfoT

type InfoT struct {
	Pending       []*model.Task `json:"pending"`
	WaitingOnDeps []*model.Task `json:"waiting_on_deps"`
	Running       []*model.Task `json:"running"`
	Stats         struct {
		Workers       int `json:"worker_count"`
		Pending       int `json:"pending_count"`
		WaitingOnDeps int `json:"waiting_on_deps_count"`
		Running       int `json:"running_count"`
	} `json:"stats"`
	Paused bool `json:"paused"`

} //	@name InfoT

InfoT provides runtime information.

func (*InfoT) String

func (t *InfoT) String() string

type Queue

type Queue interface {
	// Push pushes a task to the tail of this queue.
	Push(c context.Context, task *model.Task) error

	// PushAtOnce pushes multiple tasks to the tail of this queue.
	PushAtOnce(c context.Context, tasks []*model.Task) error

	// Poll retrieves and removes a task head of this queue.
	Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error)

	// Extend extends the deadline for a task.
	Extend(c context.Context, agentID int64, workflowID string) error

	// Done signals the task is complete.
	Done(c context.Context, id string, exitStatus model.StatusValue) error

	// Error signals the task is done with an error.
	Error(c context.Context, id string, err error) error

	// ErrorAtOnce signals multiple done are complete with an error.
	ErrorAtOnce(c context.Context, ids []string, err error) error

	// Evict removes a pending task from the queue.
	Evict(c context.Context, id string) error

	// EvictAtOnce removes multiple pending tasks from the queue.
	EvictAtOnce(c context.Context, ids []string) error

	// Wait waits until the task is complete.
	Wait(c context.Context, id string) error

	// Info returns internal queue information.
	Info(c context.Context) InfoT

	// Pause stops the queue from handing out new work items in Poll
	Pause()

	// Resume starts the queue again.
	Resume()

	// KickAgentWorkers kicks all workers for a given agent.
	KickAgentWorkers(agentID int64)
}

Queue defines a task queue for scheduling tasks among a pool of workers.

func New

func New(ctx context.Context, config Config) (Queue, error)

New creates a new queue based on the provided configuration.

func NewMemoryQueue

func NewMemoryQueue(ctx context.Context) Queue

NewMemoryQueue returns a new fifo queue.

func WithTaskStore

func WithTaskStore(ctx context.Context, q Queue, s store.Store) Queue

WithTaskStore returns a queue that is backed by the TaskStore. This ensures the task Queue can be restored when the system starts.

type Type

type Type string

Queue type.

const (
	TypeMemory Type = "memory"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL