Documentation ¶
Overview ¶
Package tqtesting contains helpers for running server/tq in tests and on localhost.
Index ¶
Constants ¶
const ClockTag = "tq-scheduler-sleep"
ClockTag tags the clock used in scheduler's sleep.
Variables ¶
This section is empty.
Functions ¶
func TasksCollector ¶
TasksCollector returns a callback that adds tasks to the given list.
Can be passed as TaskSucceeded or TaskFailed callback to the Scheduler.
Synchronizes access to the list internally, but the list should be read from only when the Scheduler is paused.
Types ¶
type Executor ¶
type Executor interface { // Execute is called from Run to execute the task. // // The executor may execute the task right away in a blocking way or dispatch // it to some other goroutine. Either way it must call `done` callback when it // is done executing the task, indicating whether the task should be // reenqueued for a retry. // // It is safe to call Scheduler's Submit from inside Execute. // // Receives the exact same context as Run(...), in particular this context // is canceled when Run is done. Execute(ctx context.Context, t *Task, done func(retry bool)) }
Executor knows how to execute tasks when their ETA arrives.
type LoopbackHTTPExecutor ¶
type LoopbackHTTPExecutor struct { // ServerAddr is where the server is listening for requests. ServerAddr net.Addr }
LoopbackHTTPExecutor is an Executor that executes tasks by sending HTTP requests to the server with TQ module serving at the given (usually loopback) address.
Used exclusively when running TQ locally.
type RunOption ¶
type RunOption interface {
// contains filtered or unexported methods
}
RunOption influences behavior of Run call.
func ParallelExecute ¶
func ParallelExecute() RunOption
ParallelExecute instructs the scheduler to call executor's Execute method in a separate goroutine instead of serially in Run.
This more closely resembles real-life behavior but may introduce more unpredictability into tests due to races.
func StopAfter ¶
StopAfter will stop the scheduler if the given function returns true, given the just finished task.
func StopAfterTask ¶
StopAfterTask will stop the scheduler after it finishes executing a task of the given task class ID.
func StopBefore ¶
StopBefore will stop the scheduler if the given function returns true, given the next task to be executed.
If such next task has specified ETA, StopBeforeTask does NOT provide any guarantee about what `clock.Now` returns by the time Run stops.
It is naturally racy if there are other goroutines that submit tasks concurrently. In this situation there may be a different next task (by ETA) when Run stops.
func StopBeforeTask ¶
StopBeforeTask will stop the scheduler if the next task to be executed has the given task class ID.
The same caveats of StopBefore also apply for StopBeforeTask.
func StopWhenDrained ¶
func StopWhenDrained() RunOption
StopWhenDrained will stop the scheduler after it finishes executing the last task and there are no more tasks scheduled.
It is naturally racy if there are other goroutines that submit tasks concurrently. In this situation there may be a pending queue of tasks even if Run stops.
type Scheduler ¶
type Scheduler struct { // Executor knows how to execute tasks when their ETA arrives. Executor Executor // MaxAttempts is the maximum number of attempts for a task, including the // first attempt. // // If negative the number of attempts is unlimited. // // Default is 20. MaxAttempts int // MinBackoff is an initial retry delay for failed tasks. // // It is doubled after each failed attempt until it reaches MaxBackoff after // which it stays constant. // // Default is 1 sec. MinBackoff time.Duration // MaxBackoff is an upper limit on a retry delay. // // Default is 5 min. MaxBackoff time.Duration // TaskSucceeded is called from within the executor's `done` callback whenever // a task finishes successfully, perhaps after a bunch of retries. // // Receives the same context as passed to Run. TaskSucceeded func(ctx context.Context, task *Task) // TaskFailed is called from within the executor's `done` callback whenever // a task fails after being attempted MaxAttempts times. // // Receives the same context as passed to Run. TaskFailed func(ctx context.Context, task *Task) // contains filtered or unexported fields }
Scheduler knows how to execute submitted tasks when they are due.
This is a very primitive in-memory unholy hybrid of Cloud Tasks and PubSub services that can be used in tests and on localhost.
Must be configured before the first Run call.Can be reconfigured between Run calls, but changing the configuration while Run is running is not allowed.
Scheduler implements tq.Submitter interface.
func (*Scheduler) Run ¶
Run executes the scheduler's loop until the context is canceled or one of the stop conditions are hit.
By default executes tasks serially. Pass ParallelExecute() option to execute them asynchronously.
Upon exit all executing tasks has finished, there still may be pending tasks.
Panics if Run is already running (perhaps in another goroutine).
type Task ¶
type Task struct { Payload proto.Message // a clone of the original AddTask payload, if available Task *taskspb.Task // a clone of the Cloud Tasks task as passed to Submit Message *pubsubpb.PubsubMessage // a clone of the PubSub message as passed to Submit Name string // full task name (perhaps generated) Class string // TaskClass.ID passed in RegisterTaskClass. ETA time.Time // when the task is due, always set at now or in future Finished time.Time // when the task finished last execution attempt Attempts int // 0 initially, incremented before each execution attempt Executing bool // true if executing right now // contains filtered or unexported fields }
Task represents an enqueued or executing task.
type TaskList ¶
type TaskList []*Task
TaskList is a collection of tasks.