Documentation ¶
Overview ¶
Package tasks implements asynchronous invocation processing.
Index ¶
- Variables
- func Delete(ctx context.Context, typ Type, id string) error
- func Enqueue(typ Type, taskID string, invID invocations.ID, payload interface{}, ...) *spanner.Mutation
- func EnqueueBQExport(invID invocations.ID, payload *pb.BigQueryExport, processAfter time.Time) *spanner.Mutation
- func Lease(ctx context.Context, typ Type, id string, duration time.Duration) (invID invocations.ID, payload []byte, err error)
- func Peek(ctx context.Context, typ Type, f func(id string) error) error
- func StartInvocationFinalization(ctx context.Context, id invocations.ID)
- type Dispatcher
- type TaskFunc
- type Type
Constants ¶
This section is empty.
Variables ¶
var AllTypes = []Type{BQExport, TryFinalizeInvocation}
AllTypes is a slice of all known types of tasks.
var ErrConflict = fmt.Errorf("the task is already leased")
ErrConflict is returned by Lease if the task does not exist or is already leased.
var FinalizationTasks = tq.RegisterTaskClass(tq.TaskClass{ ID: "try-finalize-inv", Prototype: &taskspb.TryFinalizeInvocation{}, Kind: tq.Transactional, InheritTraceContext: true, Queue: "finalizer", RoutingPrefix: "/internal/tasks/finalizer", })
FinalizationTasks describes how to route finalization tasks.
The handler is implemented in internal/services/finalizer.
var PermanentFailure = errors.BoolTag{ Key: errors.NewTagKey("permanent failure to process invocation task"), }
PermanentFailure set in an error indicates that the err is not resolvable by a retry. Such task is doomed.
var UseFinalizationTQ = experiments.Register("rdb-use-tq-finalization")
UseFinalizationTQ experiment enables using server/tq for finalization tasks.
Functions ¶
func Enqueue ¶
func Enqueue(typ Type, taskID string, invID invocations.ID, payload interface{}, processAfter time.Time) *spanner.Mutation
Enqueue inserts one row to InvocationTasks.
func EnqueueBQExport ¶
func EnqueueBQExport(invID invocations.ID, payload *pb.BigQueryExport, processAfter time.Time) *spanner.Mutation
EnqueueBQExport inserts one row to InvocationTasks for a bq export task.
func Lease ¶
func Lease(ctx context.Context, typ Type, id string, duration time.Duration) (invID invocations.ID, payload []byte, err error)
Lease leases an invocation task. If the task does not exist or is already leased, returns ErrConflict.
func StartInvocationFinalization ¶
func StartInvocationFinalization(ctx context.Context, id invocations.ID)
StartInvocationFinalization changes invocation state to FINALIZING and enqueues a TryFinalizeInvocation task.
The caller is responsible for ensuring that the invocation is active.
TODO(nodir): this package is not a great place for this function, but there is no better package at the moment. Keep it here for now, but consider a new package as the code base grows.
Types ¶
type Dispatcher ¶
type Dispatcher struct { // How often to query for tasks. Defaults to 1m. QueryInterval time.Duration // How long to lease a task for. Defaults to 1m. LeaseDuration time.Duration // Number of tasks to process concurrently. Defaults to GOMAXPROCS. Workers int }
Dispatcher queries for available tasks and dispatches them to goroutines.
type TaskFunc ¶
TaskFunc can execute a task. If the returned error is tagged with PermanentFailure, then the failed task is deleted.
type Type ¶
type Type string
Type is a value for InvocationTasks.TaskType column. It defines what a task does.
const ( // BQExport is a type of task that exports an invocation to BigQuery. // The task payload is binary-encoded BigQueryExport message. BQExport Type = "bq_export" // TryFinalizeInvocation is a type of task that tries to finalize an // invocation. No payload. TryFinalizeInvocation Type = "finalize" )
Types of invocation tasks. Used as InvocationTasks.TaskType column value.