dlq

package
v1.25.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2024 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package dlq contains the workflow for deleting and re-enqueueing DLQ tasks. Both of these operations are performed by the same workflow to avoid concurrent deletion and re-enqueueing of the same task.

Index

Constants

View Source
const (
	// WorkflowName is the name of the DLQ workflow.
	WorkflowName = "temporal-sys-dlq-workflow"
	// WorkflowTypeDelete is what the value of WorkflowParams.WorkflowType should be to delete DLQ tasks. When this is
	// specified, the workflow will simply delete all tasks up to the specified max message ID.
	WorkflowTypeDelete = "delete"
	// WorkflowTypeMerge is for re-enqueuing DLQ tasks. When this is specified, the workflow will operate in batches.
	// For each batch, it will read up to MergeParams.BatchSize tasks from the DLQ, re-enqueue them, and then delete
	// them from the DLQ. It will repeat this process until it reaches the specified max message ID.
	WorkflowTypeMerge = "merge"
	// MaxMergeBatchSize is the maximum value for MergeParams.BatchSize.
	MaxMergeBatchSize = 1000
	// DefaultMergeBatchSize is the default value for MergeParams.BatchSize.
	DefaultMergeBatchSize = 100
	// QueryTypeProgress is the query to get the progress of the DLQ workflow.
	QueryTypeProgress = "dlq-job-progress-query"
)

Variables

View Source
var (
	// Module provides a [workercommon.WorkerComponent] annotated with [workercommon.WorkerComponentTag] to the graph,
	// given a [HistoryClient], a [TaskClientDialer], and a value for [CurrentClusterName].
	Module = workercommon.AnnotateWorkerComponentProvider(newComponent)

	ErrNegativeBatchSize      = errors.New("BatchSize must be positive or 0 to use the default")
	ErrMergeBatchSizeTooLarge = errors.New("BatchSize too large")
)

Functions

This section is empty.

Types

type AddTasksFn

type AddTasksFn func(
	ctx context.Context,
	req *adminservice.AddTasksRequest,
) (*adminservice.AddTasksResponse, error)

AddTasksFn provides a convenient method for implementing the TaskClient interface.

func (AddTasksFn) AddTasks

func (f AddTasksFn) AddTasks(
	ctx context.Context,
	in *adminservice.AddTasksRequest,
) (*adminservice.AddTasksResponse, error)

AddTasks implements TaskClient by calling the AddTasksFn with the request.

type CurrentClusterName

type CurrentClusterName string

CurrentClusterName is its own type just to make fx injection easier. It's similar to the same type that exists in the persistence package, but I thought that re-using that would look weird here because it has nothing to do with persistence.

type DeleteParams

type DeleteParams struct {
	Key
	// MaxMessageID is inclusive.
	MaxMessageID int64
}

DeleteParams contain the target DLQ and the max message ID to delete up to.

type HistoryClient

type HistoryClient interface {
	DeleteDLQTasks(
		ctx context.Context,
		in *historyservice.DeleteDLQTasksRequest,
		opts ...grpc.CallOption,
	) (*historyservice.DeleteDLQTasksResponse, error)
	GetDLQTasks(
		ctx context.Context,
		in *historyservice.GetDLQTasksRequest,
		opts ...grpc.CallOption,
	) (*historyservice.GetDLQTasksResponse, error)
}

HistoryClient contains the subset of methods from historyservice.HistoryServiceClient that we need, to make it easier to implement in tests.

type Key

type Key struct {
	// TaskCategoryID is the id used by [go.temporal.io/server/service/history/tasks.TaskCategoryRegistry].
	TaskCategoryID int
	// SourceCluster is the cluster that the replication tasks are coming from if the task category is replication.
	// Otherwise, it is equal to TargetCluster, which is the cluster that both the DLQ workflow is running in, and
	// the cluster that contains the DLQ itself.
	SourceCluster string
	// TargetCluster is always the cluster that the DLQ workflow is running in currently. However, that may change
	// if we add cross-cluster tasks in the future.
	TargetCluster string
}

Key uniquely identifies a DLQ.

type MergeParams

type MergeParams struct {
	Key
	// MaxMessageID is inclusive.
	MaxMessageID int64
	// BatchSize controls the number of tasks to both read and re-enqueue at a time.
	// The maximum is MaxMergeBatchSize. The default is DefaultMergeBatchSize.
	BatchSize int
}

MergeParams contain the target DLQ and the max message ID to merge up to.

type ProgressQueryResponse

type ProgressQueryResponse struct {
	MaxMessageIDToProcess     int64
	LastProcessedMessageID    int64
	NumberOfMessagesProcessed int64
	WorkflowType              string
	DlqKey                    Key
}

ProgressQueryResponse is the response to progress query.

type TaskClient

type TaskClient interface {
	AddTasks(
		ctx context.Context,
		in *adminservice.AddTasksRequest,
	) (*adminservice.AddTasksResponse, error)
}

TaskClient contains the subset of methods from adminservice.AdminServiceClient that we need, to make it easier to implement in tests.

type TaskClientDialer

type TaskClientDialer interface {
	// Dial returns a [TaskClient] given a cluster name. You don't need to close this client. Note that some
	// implementations will cache clients.
	Dial(ctx context.Context, cluster string) (TaskClient, error)
}

TaskClientDialer is a function that returns a TaskClient given a cluster name.

type TaskClientDialerFn

type TaskClientDialerFn func(ctx context.Context, address string) (TaskClient, error)

TaskClientDialerFn is a function that returns a TaskClient given an address.

func (TaskClientDialerFn) Dial

func (f TaskClientDialerFn) Dial(ctx context.Context, cluster string) (TaskClient, error)

Dial implements TaskClientDialer by calling the TaskClientDialerFn with the cluster name.

type WorkflowParams

type WorkflowParams struct {
	// WorkflowType options are available via the WorkflowType* constants.
	WorkflowType string
	// DeleteParams is only used for WorkflowTypeDelete.
	DeleteParams DeleteParams
	// MergeParams is only used for WorkflowTypeMerge.
	MergeParams MergeParams
}

WorkflowParams is the single argument to the DLQ workflow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL