sweep

package
v0.0.0-...-053b097 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Scan

Scan scans the given partition of the Reminders' keyspace.

Returns a list of stale reminders which likely match crashed AddTask calls. The caller is expected to eventually execute corresponding Cloud Tasks calls and delete these reminders, lest they'll be rediscovered during the next scan.

If unable to complete the scan of the given part of the keyspace and Level is less than 2, it intelligently partitions the not-yet-scanned keyspace into several partitions for the follow up and returns them as well.

Logs errors inside, but doesn't return them.

Types

type BatchProcessor

type BatchProcessor struct {
	Context   context.Context    // the context to use for processing
	DB        db.DB              // DB to use to fetch reminders from
	Submitter internal.Submitter // knows how to submit tasks

	BatchSize         int // max size of a single reminder batch
	ConcurrentBatches int // how many concurrent batches to process
	// contains filtered or unexported fields
}

BatchProcessor handles reminders in batches.

func (*BatchProcessor) Enqueue

func (p *BatchProcessor) Enqueue(ctx context.Context, r []*reminder.Reminder)

Enqueue adds reminder to the to-be-processed queue.

Must be called only between Start and Stop. Drops reminders on the floor if the context is canceled.

func (*BatchProcessor) Start

func (p *BatchProcessor) Start() error

Start launches background processor goroutines.

func (*BatchProcessor) Stop

func (p *BatchProcessor) Stop() int

Stop waits until all enqueues reminders are processed and then stops the processor.

Returns the total number of successfully processed reminders.

type Distributed

type Distributed struct {
	// EnqueueSweepTask submits the task for execution somewhere in the fleet.
	EnqueueSweepTask func(ctx context.Context, task *tqpb.SweepTask) error
	// Submitter is used to submit Cloud Tasks requests.
	Submitter internal.Submitter
}

Distributed implements distributed sweeping.

Requires its EnqueueSweepTask callback to be configured in a way that enqueued tasks eventually result in ExecSweepTask call (perhaps in a different process).

func (*Distributed) ExecSweepTask

func (d *Distributed) ExecSweepTask(ctx context.Context, task *tqpb.SweepTask) error

ExecSweepTask executes a previously enqueued sweep task.

Note: we never want to retry failed ExecSweepTask. These tasks fork. If we retry on transient errors that are not really transient we may accidentally blow up with exponential number of tasks. Better just to wait for the next fresh sweep. For that reason the implementation is careful not to return errors marked with transient.Tag.

type ScanParams

type ScanParams struct {
	DB            db.DB                // DB to use to fetch reminders
	Partition     *partition.Partition // the keyspace partition to scan
	KeySpaceBytes int                  // length of the reminder keys (usually 16)

	TasksPerScan        int // caps maximum number of reminders to process
	SecondaryScanShards int // caps the number of follow-up scans

	Level int // recursion level (0 == the root task)
}

ScanParams contains parameters for the Scan call.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL