workerdeployment

package
v1.27.0-128.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2025 License: MIT Imports: 44 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Workflow types
	WorkerDeploymentVersionWorkflowType  = "temporal-sys-worker-deployment-version-workflow"
	WorkerDeploymentWorkflowType         = "temporal-sys-worker-deployment-workflow"
	WorkerDeploymentDrainageWorkflowType = "temporal-sys-worker-deployment-version-drainage-workflow"

	// Namespace division
	WorkerDeploymentNamespaceDivision = "TemporalWorkerDeployment"

	// Updates
	RegisterWorkerInDeployment   = "register-task-queue-worker"       // for Worker Deployment Version wf
	SyncVersionState             = "sync-version-state"               // for Worker Deployment Version wfs
	UpdateVersionMetadata        = "update-version-metadata"          // for Worker Deployment Version wfs
	SetCurrentVersion            = "set-current-version"              // for Worker Deployment wfs
	SetRampingVersion            = "set-ramping-version"              // for Worker Deployment wfs
	AddVersionToWorkerDeployment = "add-version-to-worker-deployment" // for Worker Deployment wfs
	DeleteVersion                = "delete-version"                   // for WorkerDeployment wfs
	DeleteDeployment             = "delete-deployment"                // for WorkerDeployment wfs

	// Signals
	ForceCANSignalName       = "force-continue-as-new" // for Worker Deployment Version _and_ Worker Deployment wfs
	SyncDrainageSignalName   = "sync-drainage-status"
	TerminateDrainageSignal  = "terminate-drainage"
	SyncVersionSummarySignal = "sync-version-summary"

	// Queries
	QueryDescribeVersion    = "describe-version"    // for Worker Deployment Version wf
	QueryDescribeDeployment = "describe-deployment" // for Worker Deployment wf

	// Memos
	WorkerDeploymentMemoField = "WorkerDeploymentMemo" // for Worker Deployment wf

	// Prefixes, Delimeters and Keys
	WorkerDeploymentVersionWorkflowIDPrefix      = "temporal-sys-worker-deployment-version"
	WorkerDeploymentVersionWorkflowIDDelimeter   = ":"
	WorkerDeploymentVersionWorkflowIDInitialSize = len(WorkerDeploymentVersionWorkflowIDDelimeter) + len(WorkerDeploymentVersionWorkflowIDPrefix)
	WorkerDeploymentNameFieldName                = "WorkerDeploymentName"
	WorkerDeploymentBuildIDFieldName             = "BuildID"
)

Variables

Functions

func DrainageWorkflow

func DrainageWorkflow(
	ctx workflow.Context,
	unsafeRefreshIntervalGetter func() any,
	unsafeVisibilityGracePeriodGetter func() any,
	args *deploymentspb.DrainageWorkflowArgs,
) error

func NewResult

func NewResult(
	dc *dynamicconfig.Collection,
	params activityDeps,
) fxResult

func VersionWorkflow

func VersionWorkflow(ctx workflow.Context, versionWorkflowArgs *deploymentspb.WorkerDeploymentVersionWorkflowArgs) error

func Workflow

func Workflow(ctx workflow.Context, unsafeMaxVersion func() int, args *deploymentspb.WorkerDeploymentWorkflowArgs) error

Types

type Activities

type Activities struct {
	// contains filtered or unexported fields
}

func (*Activities) CheckUnversionedRampUserDataPropagation

func (a *Activities) CheckUnversionedRampUserDataPropagation(ctx context.Context, input *deploymentspb.CheckWorkerDeploymentUserDataPropagationRequest) error

func (*Activities) DeleteWorkerDeploymentVersion

func (a *Activities) DeleteWorkerDeploymentVersion(ctx context.Context, args *deploymentspb.DeleteVersionActivityArgs) error

type Client

type Client interface {
	RegisterTaskQueueWorker(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		deploymentName, buildId string,
		taskQueueName string,
		taskQueueType enumspb.TaskQueueType,
		identity string,
		requestID string,
	) error

	DescribeVersion(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		version string,
	) (*deploymentpb.WorkerDeploymentVersionInfo, error)

	DescribeWorkerDeployment(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		deploymentName string,
	) (*deploymentpb.WorkerDeploymentInfo, []byte, error)

	SetCurrentVersion(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		deploymentName string,
		version string,
		identity string,
		ignoreMissingTaskQueues bool,
		conflictToken []byte,
	) (*deploymentspb.SetCurrentVersionResponse, error)

	ListWorkerDeployments(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		pageSize int,
		nextPageToken []byte,
	) ([]*deploymentspb.WorkerDeploymentSummary, []byte, error)

	DeleteWorkerDeploymentVersion(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		version string,
		skipDrainage bool,
		identity string,
	) error

	DeleteWorkerDeployment(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		deploymentName string,
		identity string,
	) error

	SetRampingVersion(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		deploymentName string,
		version string,
		percentage float32,
		identity string,
		ignoreMissingTaskQueues bool,
		conflictToken []byte,
	) (*deploymentspb.SetRampingVersionResponse, error)

	UpdateVersionMetadata(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		version string,
		upsertEntries map[string]*commonpb.Payload,
		removeEntries []string,
		identity string,
	) (*deploymentpb.VersionMetadata, error)

	// Used internally by the Worker Deployment workflow in its StartWorkerDeployment Activity
	StartWorkerDeployment(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		deploymentName string,
		identity string,
		requestID string,
	) error

	// Used internally by the Worker Deployment workflow in its SyncWorkerDeploymentVersion Activity
	SyncVersionWorkflowFromWorkerDeployment(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		deploymentName, version string,
		args *deploymentspb.SyncVersionStateUpdateArgs,
		identity string,
		requestID string,
	) (*deploymentspb.SyncVersionStateResponse, error)

	// Used internally by the Worker Deployment workflow in its DeleteVersion Activity
	DeleteVersionFromWorkerDeployment(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		deploymentName, version string,
		identity string,
		requestID string,
		skipDrainage bool,
	) error

	// Used internally by the Worker Deployment Version workflow in its AddVersionToWorkerDeployment Activity
	AddVersionToWorkerDeployment(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		deploymentName string,
		args *deploymentspb.AddVersionUpdateArgs,
		identity string,
		requestID string,
	) (*deploymentspb.AddVersionToWorkerDeploymentResponse, error)

	// Used internally by the Drainage workflow (child of Worker Deployment Version workflow)
	// in its GetVersionDrainageStatus Activity
	GetVersionDrainageStatus(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		version string) (enumspb.VersionDrainageStatus, error)

	// Used internally by the Worker Deployment workflow in its IsVersionMissingTaskQueues Activity
	// to verify if there are missing task queues in the new current/ramping version.
	IsVersionMissingTaskQueues(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		prevCurrentVersion, newVersion string,
	) (bool, error)
}

func ClientProvider

func ClientProvider(
	logger log.Logger,
	historyClient resource.HistoryClient,
	matchingClient resource.MatchingClient,
	visibilityManager manager.VisibilityManager,
	dc *dynamicconfig.Collection,
) Client

type ClientImpl

type ClientImpl struct {
	// contains filtered or unexported fields
}

ClientImpl implements Client

func (*ClientImpl) AddVersionToWorkerDeployment

func (d *ClientImpl) AddVersionToWorkerDeployment(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	deploymentName string,
	args *deploymentspb.AddVersionUpdateArgs,
	identity string,
	requestID string,
) (*deploymentspb.AddVersionToWorkerDeploymentResponse, error)

func (*ClientImpl) DeleteVersionFromWorkerDeployment

func (d *ClientImpl) DeleteVersionFromWorkerDeployment(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	deploymentName, version string,
	identity string,
	requestID string,
	skipDrainage bool,
) (retErr error)

func (*ClientImpl) DeleteWorkerDeployment

func (d *ClientImpl) DeleteWorkerDeployment(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	deploymentName string,
	identity string,
) (retErr error)

func (*ClientImpl) DeleteWorkerDeploymentVersion

func (d *ClientImpl) DeleteWorkerDeploymentVersion(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	version string,
	skipDrainage bool,
	identity string,
) (retErr error)

func (*ClientImpl) DescribeVersion

func (d *ClientImpl) DescribeVersion(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	version string,
) (_ *deploymentpb.WorkerDeploymentVersionInfo, retErr error)

func (*ClientImpl) DescribeWorkerDeployment

func (d *ClientImpl) DescribeWorkerDeployment(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	deploymentName string,
) (_ *deploymentpb.WorkerDeploymentInfo, conflictToken []byte, retErr error)

func (*ClientImpl) GetVersionDrainageStatus

func (d *ClientImpl) GetVersionDrainageStatus(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	version string) (enumspb.VersionDrainageStatus, error)

func (*ClientImpl) IsVersionMissingTaskQueues

func (d *ClientImpl) IsVersionMissingTaskQueues(ctx context.Context, namespaceEntry *namespace.Namespace, prevCurrentVersion, newVersion string) (bool, error)

func (*ClientImpl) ListWorkerDeployments

func (d *ClientImpl) ListWorkerDeployments(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	pageSize int,
	nextPageToken []byte,
) (_ []*deploymentspb.WorkerDeploymentSummary, _ []byte, retError error)

func (*ClientImpl) RegisterTaskQueueWorker

func (d *ClientImpl) RegisterTaskQueueWorker(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	deploymentName, buildId string,
	taskQueueName string,
	taskQueueType enumspb.TaskQueueType,
	identity string,
	requestID string,
) (retErr error)

func (*ClientImpl) SetCurrentVersion

func (d *ClientImpl) SetCurrentVersion(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	deploymentName string,
	version string,
	identity string,
	ignoreMissingTaskQueues bool,
	conflictToken []byte,
) (_ *deploymentspb.SetCurrentVersionResponse, retErr error)

func (*ClientImpl) SetRampingVersion

func (d *ClientImpl) SetRampingVersion(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	deploymentName string,
	version string,
	percentage float32,
	identity string,
	ignoreMissingTaskQueues bool,
	conflictToken []byte,
) (_ *deploymentspb.SetRampingVersionResponse, retErr error)

func (*ClientImpl) StartWorkerDeployment

func (d *ClientImpl) StartWorkerDeployment(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	deploymentName string,
	identity string,
	requestID string,
) (retErr error)

func (*ClientImpl) SyncVersionWorkflowFromWorkerDeployment

func (d *ClientImpl) SyncVersionWorkflowFromWorkerDeployment(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	deploymentName, version string,
	args *deploymentspb.SyncVersionStateUpdateArgs,
	identity string,
	requestID string,
) (_ *deploymentspb.SyncVersionStateResponse, retErr error)

func (*ClientImpl) UpdateVersionMetadata

func (d *ClientImpl) UpdateVersionMetadata(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	version string,
	upsertEntries map[string]*commonpb.Payload,
	removeEntries []string,
	identity string,
) (_ *deploymentpb.VersionMetadata, retErr error)

type DrainageActivities

type DrainageActivities struct {
	// contains filtered or unexported fields
}

func (*DrainageActivities) GetVersionDrainageStatus

type ErrMaxTaskQueuesInDeployment

type ErrMaxTaskQueuesInDeployment struct {
	// contains filtered or unexported fields
}

type ErrRegister

type ErrRegister struct {
	// contains filtered or unexported fields
}

type VersionActivities

type VersionActivities struct {
	// contains filtered or unexported fields
}

func (*VersionActivities) CheckIfTaskQueuesHavePollers

func (a *VersionActivities) CheckIfTaskQueuesHavePollers(ctx context.Context, args *deploymentspb.CheckTaskQueuesHavePollersActivityArgs) (bool, error)

CheckIfTaskQueuesHavePollers returns true if any of the given task queues has any pollers

func (*VersionActivities) CheckWorkerDeploymentUserDataPropagation

func (a *VersionActivities) CheckWorkerDeploymentUserDataPropagation(ctx context.Context, input *deploymentspb.CheckWorkerDeploymentUserDataPropagationRequest) error

func (*VersionActivities) StartWorkerDeploymentWorkflow

func (a *VersionActivities) StartWorkerDeploymentWorkflow(
	ctx context.Context,
	input *deploymentspb.StartWorkerDeploymentRequest,
) error

type VersionWorkflowRunner

type VersionWorkflowRunner struct {
	*deploymentspb.WorkerDeploymentVersionWorkflowArgs
	// contains filtered or unexported fields
}

VersionWorkflowRunner holds the local state for a deployment workflow

type WorkflowRunner

type WorkflowRunner struct {
	*deploymentspb.WorkerDeploymentWorkflowArgs
	// contains filtered or unexported fields
}

WorkflowRunner holds the local state while running a deployment-series workflow

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL