tqtesting

package
v0.0.0-...-16534be Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2025 License: Apache-2.0 Imports: 23 Imported by: 1

Documentation

Overview

Package tqtesting contains helpers for running server/tq in tests and on localhost.

Index

Constants

View Source
const ClockTag = "tq-scheduler-sleep"

ClockTag tags the clock used in scheduler's sleep.

Variables

This section is empty.

Functions

func TasksCollector

func TasksCollector(tl *TaskList) func(context.Context, *Task)

TasksCollector returns a callback that adds tasks to the given list.

Can be passed as TaskSucceeded or TaskFailed callback to the Scheduler.

Synchronizes access to the list internally, but the list should be read from only when the Scheduler is paused.

Types

type Executor

type Executor interface {
	// Execute is called from Run to execute the task.
	//
	// The executor may execute the task right away in a blocking way or dispatch
	// it to some other goroutine. Either way it must call `done` callback when it
	// is done executing the task, indicating whether the task should be
	// reenqueued for a retry.
	//
	// It is safe to call Scheduler's Submit from inside Execute.
	//
	// Receives the exact same context as Run(...), in particular this context
	// is canceled when Run is done.
	Execute(ctx context.Context, t *Task, done func(retry bool))
}

Executor knows how to execute tasks when their ETA arrives.

type LoopbackHTTPExecutor

type LoopbackHTTPExecutor struct {
	// ServerAddr is where the server is listening for requests.
	ServerAddr net.Addr
}

LoopbackHTTPExecutor is an Executor that executes tasks by sending HTTP requests to the server with TQ module serving at the given (usually loopback) address.

Used exclusively when running TQ locally.

func (*LoopbackHTTPExecutor) Execute

func (e *LoopbackHTTPExecutor) Execute(ctx context.Context, t *Task, done func(retry bool))

Execute dispatches the task to the HTTP handler in a dedicated goroutine.

Marks the task as failed if the response status code is outside of range [200-299].

type RunOption

type RunOption interface {
	// contains filtered or unexported methods
}

RunOption influences behavior of Run call.

func ParallelExecute

func ParallelExecute() RunOption

ParallelExecute instructs the scheduler to call executor's Execute method in a separate goroutine instead of serially in Run.

This more closely resembles real-life behavior but may introduce more unpredictability into tests due to races.

func StopAfter

func StopAfter(f func(t *Task) bool) RunOption

StopAfter will stop the scheduler if the given function returns true, given the just finished task.

func StopAfterTask

func StopAfterTask(taskClassID string) RunOption

StopAfterTask will stop the scheduler after it finishes executing a task of the given task class ID.

func StopBefore

func StopBefore(f func(t *Task) bool) RunOption

StopBefore will stop the scheduler if the given function returns true, given the next task to be executed.

If such next task has specified ETA, StopBeforeTask does NOT provide any guarantee about what `clock.Now` returns by the time Run stops.

It is naturally racy if there are other goroutines that submit tasks concurrently. In this situation there may be a different next task (by ETA) when Run stops.

func StopBeforeTask

func StopBeforeTask(taskClassID string) RunOption

StopBeforeTask will stop the scheduler if the next task to be executed has the given task class ID.

The same caveats of StopBefore also apply for StopBeforeTask.

func StopWhenDrained

func StopWhenDrained() RunOption

StopWhenDrained will stop the scheduler after it finishes executing the last task and there are no more tasks scheduled.

It is naturally racy if there are other goroutines that submit tasks concurrently. In this situation there may be a pending queue of tasks even if Run stops.

type Scheduler

type Scheduler struct {
	// Executor knows how to execute tasks when their ETA arrives.
	Executor Executor

	// MaxAttempts is the maximum number of attempts for a task, including the
	// first attempt.
	//
	// If negative the number of attempts is unlimited.
	//
	// Default is 20.
	MaxAttempts int

	// MinBackoff is an initial retry delay for failed tasks.
	//
	// It is doubled after each failed attempt until it reaches MaxBackoff after
	// which it stays constant.
	//
	// Default is 1 sec.
	MinBackoff time.Duration

	// MaxBackoff is an upper limit on a retry delay.
	//
	// Default is 5 min.
	MaxBackoff time.Duration

	// TaskSucceeded is called from within the executor's `done` callback whenever
	// a task finishes successfully, perhaps after a bunch of retries.
	//
	// Receives the same context as passed to Run.
	TaskSucceeded func(ctx context.Context, task *Task)

	// TaskFailed is called from within the executor's `done` callback whenever
	// a task fails after being attempted MaxAttempts times.
	//
	// Receives the same context as passed to Run.
	TaskFailed func(ctx context.Context, task *Task)
	// contains filtered or unexported fields
}

Scheduler knows how to execute submitted tasks when they are due.

This is a very primitive in-memory unholy hybrid of Cloud Tasks and PubSub services that can be used in tests and on localhost.

Must be configured before the first Run call.Can be reconfigured between Run calls, but changing the configuration while Run is running is not allowed.

Scheduler implements tq.Submitter interface.

func (*Scheduler) Run

func (s *Scheduler) Run(ctx context.Context, opts ...RunOption)

Run executes the scheduler's loop until the context is canceled or one of the stop conditions are hit.

By default executes tasks serially. Pass ParallelExecute() option to execute them asynchronously.

Upon exit all executing tasks has finished, there still may be pending tasks.

Panics if Run is already running (perhaps in another goroutine).

func (*Scheduler) Submit

func (s *Scheduler) Submit(ctx context.Context, p *reminder.Payload) error

Submit schedules a task for later execution.

func (*Scheduler) Tasks

func (s *Scheduler) Tasks() TaskList

Tasks returns a snapshot of the scheduler state.

Recalculates it from scratch, so it is a pretty expensive call.

Tasks are ordered by ETA: currently executing tasks first, then scheduled tasks.

type Task

type Task struct {
	Payload proto.Message // a clone of the original AddTask payload, if available

	Task    *taskspb.Task           // a clone of the Cloud Tasks task as passed to Submit
	Message *pubsubpb.PubsubMessage // a clone of the PubSub message as passed to Submit

	Name  string    // full task name (perhaps generated)
	Class string    // TaskClass.ID passed in RegisterTaskClass.
	ETA   time.Time // when the task is due, always set at now or in future

	Finished  time.Time // when the task finished last execution attempt
	Attempts  int       // 0 initially, incremented before each execution attempt
	Executing bool      // true if executing right now
	// contains filtered or unexported fields
}

Task represents an enqueued or executing task.

func (*Task) Copy

func (t *Task) Copy() *Task

Copy makes a shallow copy of the task.

type TaskList

type TaskList []*Task

TaskList is a collection of tasks.

func (TaskList) Executing

func (tl TaskList) Executing() TaskList

Executing returns a list of tasks executing right now.

func (TaskList) Filter

func (tl TaskList) Filter(cb func(*Task) bool) TaskList

Filter returns a new task list with tasks matching the filter.

func (TaskList) Payloads

func (tl TaskList) Payloads() []proto.Message

Payloads returns a list with individual task payloads.

func (TaskList) Pending

func (tl TaskList) Pending() TaskList

Pending returns a list of tasks waiting execution.

func (TaskList) SortByETA

func (tl TaskList) SortByETA() TaskList

SortByETA sorts the list in-place by ETA.

The full sorting key is (!task.Executing, task.ETA, task.Class, task.Name)

Returns it to allow chaining calls.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL