tq

package
v0.0.0-...-dbbe8cb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2025 License: Apache-2.0 Imports: 57 Imported by: 7

Documentation

Overview

Package tq provides a task queue implementation on top of Cloud Tasks.

It exposes a high-level API that operates with proto messages and hides gory details such as serialization, routing, authentication, etc.

Transactional tasks

Tasks can be submitted as part of a database transaction. This is controlled by Kind field of TaskClass. A transactional task is guaranteed to be delivered (at least once) if and only if the database transaction was committed successfully. The are some prerequisites for using this mechanism.

First, the sweeper must be running somewhere. The sweeper is responsible for discovering tasks that were successfully committed into the database, but were failed to be dispatched to Cloud Tasks (for example if the client that was submitting the task crashed right after committing the transaction). The sweeper can run either as a standalone service (the most convenient option for Kubernetes deployments) or as a cron job (the most convenient option for Appengine deployments).

Second, the core server/tq library needs to "know" how to talk to the database that implements transactions. This is achieved by blank-importing a corresponding package.

For Datastore:

import _ "go.chromium.org/luci/server/tq/txn/datastore"

For Spanner:

import _ "go.chromium.org/luci/server/tq/txn/spanner"

The exact location of the import doesn't matter as long as the package is present in the import tree of the binary. If your tests use transactional tasks, they'll need to import the corresponding packages as well.

Enabling the sweeper on Appengine

In cron.yaml (required):

  • url: /internal/tasks/c/sweep schedule: every 1 minutes

In queue.yaml (required when using the default distributed sweep mode):

  • name: tq-sweep rate: 500/s

Using the sweeper with Spanner

You need to Create below tables in your database:

CREATE TABLE TQReminders (
  ID STRING(MAX) NOT NULL,
  FreshUntil TIMESTAMP NOT NULL,
  Payload BYTES(102400) NOT NULL,
) PRIMARY KEY (ID ASC);

CREATE TABLE TQLeases (
  SectionID STRING(MAX) NOT NULL,
  LeaseID INT64 NOT NULL,
  SerializedParts ARRAY<STRING(MAX)>,
  ExpiresAt TIMESTAMP NOT NULL,
) PRIMARY KEY (SectionID ASC, LeaseID ASC);

Index

Constants

View Source
const (
	// TraceContextHeader is name of a header that contains the trace context of
	// a span that produced the task.
	//
	// This header is read only by Dispatcher itself and exists mostly for FYI
	// purposes to help in debugging issues.
	TraceContextHeader = "X-Luci-Tq-Trace-Context"

	// ExpectedETAHeader is the name of a header that indicates when the task was
	// originally expected to run.
	//
	// One use of this header is for measuring latency of task completion.
	ExpectedETAHeader = "X-Luci-Tq-Expected-ETA"
)

Variables

View Source
var (
	// Fatal is an error tag used to indicate that the handler wants the task to
	// be dropped due to unrecoverable failure.
	//
	// See Handler doc for more details.
	Fatal = errors.BoolTag{Key: errors.NewTagKey("the task should be dropped due to fatal failure")}

	// Ignore is an error tag used to indicate that the handler wants the task to
	// be dropped as no longer needed.
	//
	// See Handler doc for more details.
	Ignore = errors.BoolTag{Key: errors.NewTagKey("the task should be dropped as no longer needed")}
)
View Source
var ModuleName = module.RegisterName("go.chromium.org/luci/server/tq")

ModuleName can be used to refer to this module when declaring dependencies.

Functions

func AddTask

func AddTask(ctx context.Context, task *Task) error

AddTask is a shortcut for Default.AddTask.

func MustAddTask

func MustAddTask(ctx context.Context, task *Task)

MustAddTask is like AddTask, but panics on errors.

Mostly useful for AddTask calls inside a Spanner transaction, where they essentially just call span.BufferWrite (i.e. make no RPCs) and thus can fail only if arguments are bad (in which case it is OK to panic).

func NewModule

func NewModule(opts *ModuleOptions) module.Module

NewModule returns a server module that sets up a TQ dispatcher.

func NewModuleFromFlags

func NewModuleFromFlags() module.Module

NewModuleFromFlags is a variant of NewModule that initializes options through command line flags.

Calling this function registers flags in flag.CommandLine. They are usually parsed in server.Main(...).

func Sweep

func Sweep(ctx context.Context) error

Sweep is a shortcut for Default.Sweep.

func TestingContext

func TestingContext(ctx context.Context, d *Dispatcher) (context.Context, *tqtesting.Scheduler)

TestingContext creates a scheduler that executes tasks through the given dispatcher (or Default one if nil) and puts it into the context as Submitter, so AddTask calls eventually submit tasks into this scheduler.

The end result is that tasks submitted using such context end up in the returned Scheduler (allowing them to be examined), and when the Scheduler delivers them, they result in calls to corresponding handlers registered in the Dispatcher.

func UseSubmitter

func UseSubmitter(ctx context.Context, s Submitter) context.Context

UseSubmitter puts an arbitrary submitter in the context.

It will be used by Dispatcher's AddTask to submit Cloud Tasks.

func ValidateNamespace

func ValidateNamespace(n string) error

ValidateNamespace returns an error if `n` is not a valid namespace name.

An empty string is a valid namespace (denoting the default namespace). Other valid namespaces must start with an ASCII letter or '_', contain only ASCII letters, digits or '_', and be less than 50 chars in length.

Types

type CloudSubmitter

type CloudSubmitter struct {
	// contains filtered or unexported fields
}

CloudSubmitter implements Submitter on top of Google Cloud APIs.

func NewCloudSubmitter

func NewCloudSubmitter(ctx context.Context, creds credentials.PerRPCCredentials) (*CloudSubmitter, error)

NewCloudSubmitter creates a new submitter.

func (*CloudSubmitter) Close

func (s *CloudSubmitter) Close()

Close closes the submitter.

func (*CloudSubmitter) Submit

func (s *CloudSubmitter) Submit(ctx context.Context, p *reminder.Payload) (err error)

Submit creates a task, returning a gRPC status.

type CustomPayload

type CustomPayload struct {
	Method      string            // e.g. "GET" or "POST", Cloud Tasks only
	RelativeURI string            // an URI relative to the task's target host, Cloud Tasks only
	Meta        map[string]string // HTTP headers or message attributes to attach
	Body        []byte            // serialized body of the request
}

CustomPayload is returned by TaskClass's Custom, see its doc.

type Dispatcher

type Dispatcher struct {
	// Sweeper knows how to sweep transactional tasks reminders.
	//
	// If not set, Sweep calls will fail.
	Sweeper Sweeper

	// Namespace is a namespace for tasks that use DeduplicationKey.
	//
	// This is needed if two otherwise independent deployments share a single
	// Cloud Tasks instance.
	//
	// Used only for Cloud Tasks tasks. Doesn't affect PubSub tasks.
	//
	// Must be valid per ValidateNamespace. Default is "".
	Namespace string

	// GAE is true when running on Appengine.
	//
	// It alters how tasks are submitted and how incoming HTTP requests are
	// authenticated.
	GAE bool

	// DisableAuth can be used to disable authentication on HTTP endpoints.
	//
	// This is useful when running in development mode on localhost or in tests.
	DisableAuth bool

	// CloudProject is ID of a project to use to construct full resource names.
	//
	// If not set, "default" will be used, which is pretty useless outside of
	// tests.
	CloudProject string

	// CloudRegion is a ID of a region to use to construct full resource names.
	//
	// If not set, "default" will be used, which is pretty useless outside of
	// tests.
	CloudRegion string

	// DefaultRoutingPrefix is a URL prefix for produced Cloud Tasks.
	//
	// Used only for Cloud Tasks tasks whose TaskClass doesn't provide some custom
	// RoutingPrefix. Doesn't affect PubSub tasks.
	//
	// Default is "/internal/tasks/t/". It means generated Cloud Tasks by will
	// have target URL "/internal/tasks/t/<generated-per-task-suffix>".
	//
	// A non-default value may be valuable if you host multiple dispatchers in
	// a single process. This is a niche use case.
	DefaultRoutingPrefix string

	// DefaultTargetHost is a hostname to dispatch Cloud Tasks to by default.
	//
	// Individual Cloud Tasks task classes may override it with their own specific
	// host. Doesn't affect PubSub tasks.
	//
	// On GAE defaults to the GAE application itself. Elsewhere defaults to
	// "127.0.0.1", which is pretty useless outside of tests.
	DefaultTargetHost string

	// PushAs is a service account email to be used for generating OIDC tokens.
	//
	// Used only for Cloud Tasks tasks. Doesn't affect PubSub tasks.
	//
	// The service account must be within the same project. The server account
	// must have "iam.serviceAccounts.actAs" permission for PushAs account.
	//
	// Optional on GAE when submitting tasks targeting GAE. Elsewhere defaults to
	// "default@example.com", which is pretty useless outside of tests.
	PushAs string

	// AuthorizedPushers is a list of service account emails to accept pushes from
	// in addition to PushAs.
	//
	// This is handy when migrating from one PushAs account to another, or when
	// submitting tasks from one service, but handing them in another.
	//
	// Optional.
	AuthorizedPushers []string

	// SweepInitiationLaunchers is a list of service account emails authorized to
	// launch sweeps via the exposed HTTP endpoint.
	SweepInitiationLaunchers []string
	// contains filtered or unexported fields
}

Dispatcher is a registry of task classes that knows how serialize and route them.

There's rarely a need to manually create instances of Dispatcher outside of Dispatcher's own tests. You should generally use the global Default dispatcher which is configured by the tq server module. Methods of the default dispatcher (such as RegisterTaskClass and AddTask) are also available as lop-level functions, prefer to use them.

The dispatcher needs a way to submit tasks to Cloud Tasks or Cloud PubSub. This is the job of Submitter. It lives in the context, so that it can be mocked in tests. In production contexts (setup when using the tq server module), the submitter is initialized to be CloudSubmitter. Tests will need to provide their own submitter (usually via TestingContext).

TODO(vadimsh): Support consuming PubSub tasks, not just producing them.

var Default Dispatcher

Default is a dispatcher installed into the server when using NewModule or NewModuleFromFlags.

The module takes care of configuring this dispatcher based on the server environment and module's options.

You still need to register your tasks in it using RegisterTaskClass.

func (*Dispatcher) AddTask

func (d *Dispatcher) AddTask(ctx context.Context, task *Task) (err error)

AddTask submits a task for later execution.

The task payload type should match some registered TaskClass. Its ID will be used to identify the task class in the serialized Cloud Tasks task body.

At some later time, in some other process, the dispatcher will invoke a handler attached to the corresponding TaskClass, based on its ID extracted from the task body.

If the given context is transactional, inherits the transaction if allowed according to the TaskClass's Kind. A transactional task will eventually be submitted to Cloud Tasks if and only if the transaction successfully commits. This requires a sweeper instance to be running somewhere, see ModuleOptions. Note that a failure to submit the task to Cloud Tasks will not abort the transaction.

If the task has a DeduplicationKey and there already was a recent task with the same TaskClass ID and DeduplicationKey, silently ignores the added task. This works only outside of transactions. Using DeduplicationKey with transactional tasks results in an error.

Annotates retriable errors with transient.Tag.

func (*Dispatcher) InstallSweepRoute

func (d *Dispatcher) InstallSweepRoute(r *router.Router, path string)

InstallSweepRoute installs a route that initiates a sweep.

It may be called periodically (e.g. by Cloud Scheduler) to launch sweeps.

func (*Dispatcher) InstallTasksRoutes

func (d *Dispatcher) InstallTasksRoutes(r *router.Router, prefix string)

InstallTasksRoutes installs tasks HTTP routes under the given prefix.

The exposed HTTP endpoints are called by Cloud Tasks service when it is time to execute a task.

func (*Dispatcher) RegisterTaskClass

func (d *Dispatcher) RegisterTaskClass(cls TaskClass) TaskClassRef

RegisterTaskClass tells the dispatcher how to route and handle tasks of some particular type.

Intended to be called during process startup. Panics if there's already a registered task class with the same ID or Prototype.

func (*Dispatcher) ReportMetrics

func (d *Dispatcher) ReportMetrics(ctx context.Context)

ReportMetrics writes gauge metrics to tsmon.

This should be called before tsmon flush. By reporting them only here, we can avoid hitting tsmon state every time some gauge value changes (which can happen very often).

func (*Dispatcher) Sweep

func (d *Dispatcher) Sweep(ctx context.Context) error

Sweep initiates a sweep of transactional tasks reminders.

It must be called periodically (e.g. once per minute) somewhere in the fleet.

func (*Dispatcher) TaskClassRef

func (d *Dispatcher) TaskClassRef(id string) TaskClassRef

TaskClassRef returns a task class reference given its ID or nil if no such task class is registered.

type DistributedSweeperOptions

type DistributedSweeperOptions struct {
	// SweepShards defines how many jobs to produce in each Sweep.
	//
	// Default is 16.
	SweepShards int

	// TasksPerScan caps maximum number of tasks that a sweep job will process.
	//
	// Defaults to 2048.
	TasksPerScan int

	// SecondaryScanShards caps the sharding of additional sweep scans to be
	// performed if the initial scan didn't cover the whole assigned partition.
	// In practice, this matters only when database is slow or there is a huge
	// backlog.
	//
	// Defaults to 16.
	SecondaryScanShards int

	// LessorID identifies an implementation of a system that manages leases on
	// subranges of the database.
	//
	// Default is the same ID as the database implementation ID.
	LessorID string

	// SweepTaskQueue is a Cloud Tasks queue name to use for sweep jobs.
	//
	// Can be in short or full form. See Queue in TaskClass for details. The queue
	// should be configured to allow at least 10 QPS.
	//
	// Default is "tq-sweep".
	TaskQueue string

	// SweepTaskPrefix is a URL prefix to use for sweep jobs.
	//
	// There should be a Dispatcher instance somewhere that is configured to
	// receive such tasks (via non-default ServingPrefix). This is useful if
	// you want to limit what processes process the sweeps.
	//
	// Default is "/internal/tasks".
	TaskPrefix string

	// TaskHost is a hostname to dispatch sweep jobs to.
	//
	// Default is "", meaning to use whatever is configured as default in
	// the Dispatcher.
	TaskHost string
}

DistributedSweeperOptions is configuration for the process of "sweeping" of transactional tasks reminders performed in a distributed manner using Cloud Tasks service itself to distribute work.

The sweeping process ensures all transactionally committed tasks will have a corresponding Cloud Tasks task created. It periodically scans the database for "reminder" records created whenever a task is created as part of a transaction. A reminder older than a certain age likely indicates that the corresponding AddTask call crashed right after the transaction before it had a chance to create Cloud Tasks task. For each such old reminder, the sweeping will idempotently create a Cloud Task and delete the record in the database.

DistributedSweeperOptions tune some of parameters of this process. Roughly:

  1. Sweep() call in Dispatcher creates SweepShards jobs that each scan a portion of the database for old reminders.
  2. Each such job is allowed to process no more than TasksPerScan reminder records. This caps its runtime and memory usage. TasksPerScan should be small enough so that all sweeping jobs finish before the next Sweep() call, but large enough so that it makes meaningful progress.
  3. If a sweeping job detects there's more than TasksPerScan items it needs to cover, it launches SecondaryScanShards follow-up jobs that cover the remaining items. This should be happening in rare circumstances, only if the database is slow or has a large backlog.

type ExecutionInfo

type ExecutionInfo struct {
	// ExecutionCount is 0 on a first delivery attempt and increased by 1 for each
	// failed attempt.
	ExecutionCount int

	// TaskID is the ID of the task in the underlying backend service.
	//
	// For Cloud Task, it is `X-CloudTasks-TaskName`.
	// For PubSub, it is `messageID`.
	TaskID string

	// Queue is the name of the Cloud Tasks queue that delivered the task.
	//
	// Empty for PubSub tasks.
	Queue string
	// contains filtered or unexported fields
}

ExecutionInfo is parsed from incoming task's metadata.

It is accessible from within task handlers via TaskExecutionInfo(ctx).

func TaskExecutionInfo

func TaskExecutionInfo(ctx context.Context) *ExecutionInfo

TaskExecutionInfo returns information about the currently executing task.

Returns nil if called not from a task handler.

type Handler

type Handler func(ctx context.Context, payload proto.Message) error

Handler is called to handle one enqueued task.

If Handler returns an error tagged with Ignore tag, the task will be dropped with HTTP 204 reply to Cloud Tasks. This is useful when task is no longer needed yet it's desirable to distinguish such a case from the normal case for monitoring purposes (e.g. in emitted logs or tsmon metrics).

If Handler returns an error tagged with Fatal tag, the task will be dropped with HTTP 202 reply to Cloud Tasks. This should be rarely used.

Otherwise, the task will be retried later (per the queue configuration) with HTTP 429 reply.

Errors tagged with transient.Tag result in HTTP 500 replies. They also trigger a retry.

type InProcSweeperOptions

type InProcSweeperOptions struct {
	// SweepShards defines how many concurrent sweeping jobs to run.
	//
	// Default is 16.
	SweepShards int

	// TasksPerScan caps maximum number of tasks that a sweep job will process.
	//
	// Defaults to 2048.
	TasksPerScan int

	// SecondaryScanShards caps the sharding of additional sweep scans to be
	// performed if the initial scan didn't cover the whole assigned partition.
	// In practice, this matters only when database is slow or there is a huge
	// backlog.
	//
	// Defaults to 16.
	SecondaryScanShards int

	// SubmitBatchSize limits a single of a single processed batch.
	//
	// When processing a batch, the sweeper loads bodies of all tasks in
	// the batch, thus this setting directly affects memory usage. There will
	// be at most SubmitBatchSize*SubmitConcurrentBatches task bodies worked-on at
	// any moment in time.
	//
	// Default is 512.
	SubmitBatchSize int

	// SubmitConcurrentBatches limits how many submit batches can be worked on
	// concurrently.
	//
	// Default is 8.
	SubmitConcurrentBatches int
}

InProcSweeperOptions is configuration for the process of "sweeping" of transactional tasks reminders performed centrally in the current process.

type ModuleOptions

type ModuleOptions struct {
	// Dispatcher is a dispatcher to use.
	//
	// Default is the global Default instance.
	Dispatcher *Dispatcher

	// CloudProject is ID of a project to use to construct full queue names.
	//
	// Default is the project the server is running in.
	CloudProject string

	// CloudRegion is a ID of a region to use to construct full queue names.
	//
	// Default is the region the server is running in.
	CloudRegion string

	// Namespace is a namespace for tasks that use DeduplicationKey.
	//
	// This is needed if two otherwise independent deployments share a single
	// Cloud Tasks instance.
	//
	// Default is "".
	Namespace string

	// DefaultTargetHost is a hostname to dispatch Cloud Tasks to by default.
	//
	// Individual task classes may override it with their own specific host.
	//
	// On GAE defaults to the GAE application itself. Elsewhere has no default:
	// if the dispatcher can't figure out where to send the task, the task
	// submission fails.
	DefaultTargetHost string

	// PushAs is a service account email to be used for generating OIDC tokens.
	//
	// The service account must be within the same project. The server account
	// must have "iam.serviceAccounts.actAs" permission for `PushAs` account.
	//
	// Default is the server's own account.
	PushAs string

	// AuthorizedPushers is a list of service account emails to accept pushes from
	// in addition to PushAs.
	//
	// This is handy when migrating from one PushAs account to another, or when
	// submitting tasks from one service, but handing them in another.
	//
	// Optional.
	AuthorizedPushers []string

	// ServingPrefix is a URL path prefix to serve registered task handlers from.
	//
	// POSTs to a URL under this prefix (regardless which one) will be treated
	// as Cloud Tasks pushes.
	//
	// Must start with "/internal/". Default is "/internal/tasks". If set to
	// literal "-", no routes will be registered at all.
	ServingPrefix string

	// SweepMode defines how to perform sweeps of the transaction tasks reminders.
	//
	// This process is necessary to make sure all transactionally submitted tasks
	// eventually execute, even if Cloud Tasks RPCs fail. When enqueueing a task
	// the client transactionally commits a special "reminder" record, which
	// indicates an intent to submit a Cloud Task. If the subsequent Cloud Tasks
	// RPC fails (or the process crashes before attempting it), the reminder
	// record is discovered by the sweep process and used to ensure the task is
	// eventually submitted.
	//
	// There are two stages: the sweep initiation and the actual processing.
	//
	// The initiation should happen periodically and centrally: no mater how many
	// replicas of the process are running, there needs to be only one sweep
	// initiator. But it doesn't have to be the same process each time. Also
	// multiple concurrent initiations are not catastrophic, though they impose
	// huge overhead and should be avoided.
	//
	// Two ways to do sweep initiations are:
	//   * Based on a periodic external signal such as a Cloud Scheduler job or
	//     GAE cron handler. See SweepInitiationEndpoint and
	//     SweepInitiationLaunchers.
	//   * Based on a timer inside some *single* primary process. For example
	//     on Kubernetes this may be a single pod Deployment, or a zero-indexed
	//     replica in a StatefulSet. See Sweep().
	//
	// Once the initiation happens, there are two ways to process the sweep (and
	// this is what SweepMode defines):
	//   * "inproc" - do all the processing right inside the replica that
	//     performed the initiation. This has scalability and reliability limits,
	//     but it doesn't require any additional infrastructure setup and has
	//     somewhat better observability.
	//   * "distributed" - use Cloud Tasks itself to distribute the work across
	//     many replicas. This requires some configuration. See SweepTaskQueue,
	//     SweepTaskPrefix and SweepTargetHost.
	//
	// Default is "distributed" mode.
	SweepMode string

	// SweepInitiationEndpoint is a URL path that can be hit to initiate a sweep.
	//
	// GET requests to this endpoint (if they have proper authentication headers)
	// will initiate sweeps. If SweepMode is "inproc" the sweep will happen in
	// the same process that handled the request.
	//
	// On GAE default is "/internal/tasks/c/sweep". On non-GAE it is "-", meaning
	// the endpoint is not exposed. When not using the endpoint there should be
	// some single process somewhere that calls Sweep() to periodically initiate
	// sweeps.
	SweepInitiationEndpoint string

	// SweepInitiationLaunchers is a list of service account emails authorized to
	// launch sweeps via SweepInitiationEndpoint.
	//
	// Additionally on GAE the Appengine service itself is always authorized to
	// launch sweeps via cron or task queues.
	//
	// Default is the server's own account.
	SweepInitiationLaunchers []string

	// SweepTaskQueue is a Cloud Tasks queue name to use to distribute sweep
	// subtasks when running in "distributed" SweepMode.
	//
	// Can be in short or full form. See Queue in TaskClass for details. The queue
	// should be configured to allow at least 10 QPS.
	//
	// Default is "tq-sweep".
	SweepTaskQueue string

	// SweepTaskPrefix is a URL prefix to use for sweep subtasks when running
	// in "distributed" SweepMode.
	//
	// There should be a Dispatcher instance somewhere that is configured to
	// receive such tasks (via non-default ServingPrefix). This is useful if
	// you want to limit what processes process the sweeps.
	//
	// Must start with "/internal/". If unset defaults to the value of
	// ServingPrefix.
	SweepTaskPrefix string

	// SweepTargetHost is a hostname to dispatch sweep subtasks to when running
	// in "distributed" SweepMode.
	//
	// This usually should be DefaultTargetHost, but it may be different if you
	// want to route sweep subtasks somewhere else.
	//
	// If unset defaults to the value of DefaultTargetHost.
	SweepTargetHost string

	// SweepShards defines how many subtasks are submitted when initiating
	// a sweep.
	//
	// It is safe to change it any time. Default is 16.
	SweepShards int
}

ModuleOptions contain configuration of the TQ server module.

It will be used to initialize Default dispatcher.

func (*ModuleOptions) Register

func (o *ModuleOptions) Register(f *flag.FlagSet)

Register registers the command line flags.

Mutates `o` by populating defaults.

type Submitter

type Submitter interface {
	// Submit submits a task, returning a gRPC status.
	//
	// AlreadyExists status indicates the task with request name already exists.
	// Other statuses are handled using their usual semantics.
	//
	// Will be called from multiple goroutines at once.
	Submit(ctx context.Context, p *reminder.Payload) error
}

Submitter is used by Dispatcher to submit tasks.

It lives in the context, so that it can be mocked in tests. In production contexts (setup when using the tq server module), the submitter is initialized to be CloudSubmitter. Tests will need to provide their own submitter (usually via TestingContext).

Note that currently Submitter can only be implemented by structs in server/tq package, since its signature depends on an internal reminder.Payload type.

type Sweeper

type Sweeper interface {
	// contains filtered or unexported methods
}

Sweeper knows how sweep transaction tasks reminders.

func NewDistributedSweeper

func NewDistributedSweeper(disp *Dispatcher, opts DistributedSweeperOptions) Sweeper

NewDistributedSweeper creates a sweeper that distributes and executes sweeping tasks through the given dispatcher.

func NewInProcSweeper

func NewInProcSweeper(opts InProcSweeperOptions) Sweeper

NewInProcSweeper creates a sweeper that performs sweeping in the current process whenever Sweep is called.

type Task

type Task struct {
	// Payload is task's payload as well as indicator of its class.
	//
	// Its type will be used to find a matching registered TaskClass which defines
	// how to route and handle the task.
	Payload proto.Message

	// DeduplicationKey is optional unique key used to derive name of the task.
	//
	// If a task of a given class with a given key has already been enqueued
	// recently (within ~1h), this task will be silently ignored.
	//
	// Because there is an extra lookup cost to identify duplicate task names,
	// enqueues of named tasks have significantly increased latency.
	//
	// Can be used only with Cloud Tasks tasks, since PubSub doesn't support
	// deduplication during enqueuing.
	//
	// Named tasks can only be used outside of transactions.
	DeduplicationKey string

	// Title is optional string that identifies the task in server logs.
	//
	// For Cloud Tasks it will also show up as a suffix in task handler URL. It
	// exists exclusively to simplify reading server logs. It serves no other
	// purpose! In particular, it is *not* a task name.
	//
	// Handlers won't ever see it. Pass all information through the payload.
	Title string

	// Delay specifies the duration the Cloud Tasks service must wait before
	// attempting to execute the task.
	//
	// Can be used only with Cloud Tasks tasks. Either Delay or ETA may be set,
	// but not both.
	Delay time.Duration

	// ETA specifies the earliest time a task may be executed.
	//
	// Can be used only with Cloud Tasks tasks. Either Delay or ETA may be set,
	// but not both.
	ETA time.Time
}

Task contains task body and metadata.

type TaskClass

type TaskClass struct {
	// ID is unique identifier of this class of tasks.
	//
	// Must match `[a-zA-Z0-9_\-.]{1,100}`.
	//
	// It is used to decide how to deserialize and route the task. Changing IDs of
	// existing task classes is a disruptive operation, make sure the queue is
	// drained first. The dispatcher will reject Cloud Tasks with unrecognized
	// class IDs with HTTP 404 error (causing Cloud Tasks to retry them later).
	//
	// Required.
	ID string

	// Prototype identifies a proto message type of a task payload.
	//
	// Used for its type information only. In particular it is used by AddTask
	// to discover what TaskClass matches the added task. There should be
	// one-to-one correspondence between proto message types and task classes.
	//
	// It is safe to arbitrarily change this type as long as JSONPB encoding of
	// the previous type can be decoded using the new type. The dispatcher will
	// reject Cloud Tasks with bodies it can't deserialize with HTTP 400 error
	// (causing Cloud Tasks to retry them later).
	//
	// Required.
	Prototype proto.Message

	// Kind indicates whether the task requires a transaction to be enqueued.
	//
	// Note that using transactional tasks requires setting up a sweeper first
	// and importing a module that implements transactions support for the
	// database you are using. See "Transactional tasks" section above.
	//
	// Required. Pick one of NonTransactional, Transactional or FollowsContext.
	Kind TaskKind

	// Queue is a name of Cloud Tasks queue to use for the tasks.
	//
	// If set, indicates the task should be submitted through Cloud Tasks API.
	// The queue must exist already. It can either be a short name like "default"
	// or a full name like "projects/<project>/locations/<region>/queues/<name>".
	// If it is a full name, it must have the above format or RegisterTaskClass
	// would panic. If it is a short queue name, the full queue name will be
	// constructed using dispatcher's CloudProject and CloudRegion if they are
	// set.
	//
	// Can't be set together with QueuePicker or Topic.
	Queue string

	// QueuePicker is a callback that picks a queue for each individual task.
	//
	// It is an alternative to specifying a single queue via Queue. It can be used
	// to distribute tasks across multiple queues, for example to spread the load
	// or to use different queues for tasks with different priorities.
	//
	// Receives Task with Payload proto.Message having the same underlying type as
	// Prototype. It can be type cast to a concrete type and examined, if
	// necessary.
	//
	// Must be lightweight, will be called from within AddTask implementation for
	// each added task, receiving the context passed to AddTask. The returned
	// queue name should be in the same format as Queue, i.e. either be a short
	// queue name or a full queue name. See Queue for details.
	//
	// Can't be set together with Queue or Topic.
	QueuePicker func(context.Context, *Task) (string, error)

	// Topic is a name of PubSub topic to use for the tasks.
	//
	// If set, indicates the task should be submitted through Cloud PubSub API.
	// The topic must exist already. It can either be a short name like "tasks" or
	// a full name like "projects/<project>/topics/<name>". If it is a full name,
	// it must have the above format or RegisterTaskClass would panic.
	//
	// Can't be set together with Queue or QueuePicker.
	Topic string

	// RoutingPrefix is a URL prefix for produced Cloud Tasks.
	//
	// Can only be used for Cloud Tasks task (i.e. only if Queue is also set).
	//
	// Default is dispatcher's DefaultRoutingPrefix which itself defaults to
	// "/internal/tasks/t/". It means generated Cloud Tasks by default will have
	// target URL "/internal/tasks/t/<generated-per-task-suffix>".
	//
	// A non-default value can be used to route Cloud Tasks tasks of a particular
	// class to particular processes, assuming the load balancer is configured
	// accordingly.
	RoutingPrefix string

	// TargetHost is a hostname to dispatch Cloud Tasks to.
	//
	// Can only be used for Cloud Tasks task (i.e. only if Queue is also set).
	//
	// If unset, will use dispatcher's DefaultTargetHost.
	TargetHost string

	// Quiet, if set, instructs the dispatcher not to log bodies of tasks.
	Quiet bool

	// QuietOnError, if set, instructs the dispatcher not to log errors returned
	// by the task handler.
	//
	// This is useful if task handler wants to do its own custom error logging.
	QuietOnError bool

	// Custom, if given, will be called to generate a custom payload from the
	// task's proto payload.
	//
	// Useful for interoperability with existing code that doesn't use dispatcher
	// or if the tasks are meant to be consumed in some custom way. You'll need to
	// setup the consumer manually, the Dispatcher doesn't know how to handle
	// tasks with custom payload.
	//
	// For Cloud Tasks tasks it is possible to customize HTTP method, relative
	// URI, headers and the request body this way. Other properties of the task
	// (such as the target host, the queue, the task name, authentication headers)
	// are not customizable.
	//
	// For PubSub tasks it is possible to customize only task's body and
	// attributes (via CustomPayload.Meta). Other fields in CustomPayload are
	// ignored.
	//
	// Receives the exact same context as passed to AddTask. If returns nil
	// result, the task will be submitted as usual.
	Custom func(ctx context.Context, m proto.Message) (*CustomPayload, error)

	// Handler will be called by the dispatcher to execute the tasks.
	//
	// The handler will receive the task's payload as a proto message of the exact
	// same type as the type of Prototype. See Handler doc for more info.
	//
	// Populating this field is equivalent to calling AttachHandler after
	// registering the class. It may be left nil if the current process just wants
	// to submit tasks, but not handle them. Some other process would need to
	// attach the handler then to be able to process tasks.
	//
	// The dispatcher will permanently fail tasks if it can't find a handler for
	// them.
	Handler Handler
}

TaskClass defines how to treat tasks of a specific proto message type.

It assigns some stable ID to a proto message kind and also defines how tasks of this kind should be submitted and routed.

The are two backends for tasks: Cloud Tasks and Cloud PubSub. Which one to use for a particular task class is defined via mutually exclusive Queue and Topic fields.

Refer to Google Cloud documentation for all semantic differences between Cloud Tasks and Cloud PubSub. One important difference is that Cloud PubSub tasks can't be deduplicated and thus the handler must expect to receive duplicates.

type TaskClassRef

type TaskClassRef interface {
	// AttachHandler sets a handler which will be called by the dispatcher to
	// execute the tasks.
	//
	// The handler will receive the task's payload as a proto message of the exact
	// same type as the type of TaskClass's Prototype. See Handler doc for more
	// info.
	//
	// Panics if the class has already a handler attached.
	AttachHandler(h Handler)

	// Definition returns the original task class definition.
	Definition() TaskClass
}

TaskClassRef represents a TaskClass registered in a Dispatcher.

func RegisterTaskClass

func RegisterTaskClass(t TaskClass) TaskClassRef

RegisterTaskClass is a shortcut for Default.RegisterTaskClass.

type TaskKind

type TaskKind int

TaskKind describes how a task class interoperates with transactions.

const (
	// NonTransactional is a task kind for tasks that must be enqueued outside
	// of a transaction.
	NonTransactional TaskKind = 1

	// Transactional is a task kind for tasks that must be enqueued only from
	// a transaction.
	Transactional TaskKind = 2

	// FollowsContext is a task kind for tasks that are enqueue transactionally
	// if the context is transactional or non-transactionally otherwise.
	FollowsContext TaskKind = 3
)

Directories

Path Synopsis
db
Package db defines common database interface.
Package db defines common database interface.
lessor
Package lessor defines common lessor interface.
Package lessor defines common lessor interface.
loopbacktest
Package loopbacktest is an integration test for TQ loopback dispatcher.
Package loopbacktest is an integration test for TQ loopback dispatcher.
metrics
Package metrics contains definition of metrics exposed by server/tq.
Package metrics contains definition of metrics exposed by server/tq.
partition
Package partition encapsulates partitioning and querying large keyspace which can't be expressed even as uint64.
Package partition encapsulates partitioning and querying large keyspace which can't be expressed even as uint64.
reminder
Package reminder holds Reminder to avoid circular dependencies.
Package reminder holds Reminder to avoid circular dependencies.
testutil
Package testutil provides fakes for testing TQ guts.
Package testutil provides fakes for testing TQ guts.
workset
Package workset contains a synchronized work queue implementation used by inproc sweeper.
Package workset contains a synchronized work queue implementation used by inproc sweeper.
Package tqtesting contains helpers for running server/tq in tests and on localhost.
Package tqtesting contains helpers for running server/tq in tests and on localhost.
txn
datastore
Datastore contains Transactional Enqueue support for Cloud Datastore.
Datastore contains Transactional Enqueue support for Cloud Datastore.
spanner
Spanner contains Transactional Enqueue support for Cloud Spanner.
Spanner contains Transactional Enqueue support for Cloud Spanner.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL