Documentation
¶
Index ¶
- Constants
- Variables
- func DecodeWorkerDeploymentMemo(memo *commonpb.Memo) *deploymentspb.WorkerDeploymentWorkflowMemo
- func DrainageWorkflow(ctx workflow.Context, unsafeRefreshIntervalGetter func() any, ...) error
- func NewResult(dc *dynamicconfig.Collection, params activityDeps) fxResult
- func VersionWorkflow(ctx workflow.Context, ...) error
- func Workflow(ctx workflow.Context, unsafeMaxVersion func() int, ...) error
- type Activities
- func (a *Activities) CheckUnversionedRampUserDataPropagation(ctx context.Context, ...) error
- func (a *Activities) DeleteWorkerDeploymentVersion(ctx context.Context, args *deploymentspb.DeleteVersionActivityArgs) error
- func (a *Activities) IsVersionMissingTaskQueues(ctx context.Context, args *deploymentspb.IsVersionMissingTaskQueuesArgs) (*deploymentspb.IsVersionMissingTaskQueuesResult, error)
- func (a *Activities) SyncUnversionedRamp(ctx context.Context, input *deploymentspb.SyncUnversionedRampActivityArgs) (*deploymentspb.SyncDeploymentVersionUserDataResponse, error)
- func (a *Activities) SyncWorkerDeploymentVersion(ctx context.Context, args *deploymentspb.SyncVersionStateActivityArgs) (*deploymentspb.SyncVersionStateActivityResult, error)
- type Client
- type ClientImpl
- func (d *ClientImpl) AddVersionToWorkerDeployment(ctx context.Context, namespaceEntry *namespace.Namespace, ...) (*deploymentspb.AddVersionToWorkerDeploymentResponse, error)
- func (d *ClientImpl) DeleteVersionFromWorkerDeployment(ctx context.Context, namespaceEntry *namespace.Namespace, ...) (retErr error)
- func (d *ClientImpl) DeleteWorkerDeployment(ctx context.Context, namespaceEntry *namespace.Namespace, ...) (retErr error)
- func (d *ClientImpl) DeleteWorkerDeploymentVersion(ctx context.Context, namespaceEntry *namespace.Namespace, version string, ...) (retErr error)
- func (d *ClientImpl) DescribeVersion(ctx context.Context, namespaceEntry *namespace.Namespace, version string) (_ *deploymentpb.WorkerDeploymentVersionInfo, retErr error)
- func (d *ClientImpl) DescribeWorkerDeployment(ctx context.Context, namespaceEntry *namespace.Namespace, ...) (_ *deploymentpb.WorkerDeploymentInfo, conflictToken []byte, retErr error)
- func (d *ClientImpl) GetVersionDrainageStatus(ctx context.Context, namespaceEntry *namespace.Namespace, version string) (enumspb.VersionDrainageStatus, error)
- func (d *ClientImpl) IsVersionMissingTaskQueues(ctx context.Context, namespaceEntry *namespace.Namespace, ...) (bool, error)
- func (d *ClientImpl) ListWorkerDeployments(ctx context.Context, namespaceEntry *namespace.Namespace, pageSize int, ...) (_ []*deploymentspb.WorkerDeploymentSummary, _ []byte, retError error)
- func (d *ClientImpl) RegisterTaskQueueWorker(ctx context.Context, namespaceEntry *namespace.Namespace, ...) (retErr error)
- func (d *ClientImpl) SetCurrentVersion(ctx context.Context, namespaceEntry *namespace.Namespace, ...) (_ *deploymentspb.SetCurrentVersionResponse, retErr error)
- func (d *ClientImpl) SetRampingVersion(ctx context.Context, namespaceEntry *namespace.Namespace, ...) (_ *deploymentspb.SetRampingVersionResponse, retErr error)
- func (d *ClientImpl) StartWorkerDeployment(ctx context.Context, namespaceEntry *namespace.Namespace, ...) (retErr error)
- func (d *ClientImpl) SyncVersionWorkflowFromWorkerDeployment(ctx context.Context, namespaceEntry *namespace.Namespace, ...) (_ *deploymentspb.SyncVersionStateResponse, retErr error)
- func (d *ClientImpl) UpdateVersionMetadata(ctx context.Context, namespaceEntry *namespace.Namespace, version string, ...) (_ *deploymentpb.VersionMetadata, retErr error)
- type DrainageActivities
- type ErrMaxTaskQueuesInDeployment
- type ErrRegister
- type VersionActivities
- func (a *VersionActivities) AddVersionToWorkerDeployment(ctx context.Context, input *deploymentspb.AddVersionToWorkerDeploymentRequest) (*deploymentspb.AddVersionToWorkerDeploymentResponse, error)
- func (a *VersionActivities) CheckIfTaskQueuesHavePollers(ctx context.Context, ...) (bool, error)
- func (a *VersionActivities) CheckWorkerDeploymentUserDataPropagation(ctx context.Context, ...) error
- func (a *VersionActivities) StartWorkerDeploymentWorkflow(ctx context.Context, input *deploymentspb.StartWorkerDeploymentRequest) error
- func (a *VersionActivities) SyncDeploymentVersionUserData(ctx context.Context, input *deploymentspb.SyncDeploymentVersionUserDataRequest) (*deploymentspb.SyncDeploymentVersionUserDataResponse, error)
- type VersionWorkflowRunner
- type WorkflowRunner
Constants ¶
View Source
const ( // Workflow types WorkerDeploymentVersionWorkflowType = "temporal-sys-worker-deployment-version-workflow" WorkerDeploymentWorkflowType = "temporal-sys-worker-deployment-workflow" WorkerDeploymentDrainageWorkflowType = "temporal-sys-worker-deployment-version-drainage-workflow" // Namespace division WorkerDeploymentNamespaceDivision = "TemporalWorkerDeployment" // Updates RegisterWorkerInDeployment = "register-task-queue-worker" // for Worker Deployment Version wf SyncVersionState = "sync-version-state" // for Worker Deployment Version wfs UpdateVersionMetadata = "update-version-metadata" // for Worker Deployment Version wfs SetCurrentVersion = "set-current-version" // for Worker Deployment wfs SetRampingVersion = "set-ramping-version" // for Worker Deployment wfs AddVersionToWorkerDeployment = "add-version-to-worker-deployment" // for Worker Deployment wfs DeleteVersion = "delete-version" // for WorkerDeployment wfs DeleteDeployment = "delete-deployment" // for WorkerDeployment wfs // Signals ForceCANSignalName = "force-continue-as-new" // for Worker Deployment Version _and_ Worker Deployment wfs SyncDrainageSignalName = "sync-drainage-status" TerminateDrainageSignal = "terminate-drainage" SyncVersionSummarySignal = "sync-version-summary" // Queries QueryDescribeVersion = "describe-version" // for Worker Deployment Version wf QueryDescribeDeployment = "describe-deployment" // for Worker Deployment wf // Memos WorkerDeploymentMemoField = "WorkerDeploymentMemo" // for Worker Deployment wf // Prefixes, Delimeters and Keys WorkerDeploymentVersionWorkflowIDPrefix = "temporal-sys-worker-deployment-version" WorkerDeploymentVersionWorkflowIDDelimeter = ":" WorkerDeploymentVersionWorkflowIDInitialSize = len(WorkerDeploymentVersionWorkflowIDDelimeter) + len(WorkerDeploymentVersionWorkflowIDPrefix) WorkerDeploymentNameFieldName = "WorkerDeploymentName" WorkerDeploymentBuildIDFieldName = "BuildID" )
Variables ¶
View Source
var Module = fx.Options( fx.Provide(NewResult), fx.Provide(ClientProvider), )
View Source
var ( WorkerDeploymentVisibilityBaseListQuery = fmt.Sprintf( "%s = '%s' AND %s = '%s' AND %s = '%s'", searchattribute.WorkflowType, WorkerDeploymentWorkflowType, searchattribute.TemporalNamespaceDivision, WorkerDeploymentNamespaceDivision, searchattribute.ExecutionStatus, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING.String(), ) )
Functions ¶
func DecodeWorkerDeploymentMemo ¶
func DecodeWorkerDeploymentMemo(memo *commonpb.Memo) *deploymentspb.WorkerDeploymentWorkflowMemo
func DrainageWorkflow ¶
func DrainageWorkflow( ctx workflow.Context, unsafeRefreshIntervalGetter func() any, unsafeVisibilityGracePeriodGetter func() any, args *deploymentspb.DrainageWorkflowArgs, ) error
func NewResult ¶
func NewResult( dc *dynamicconfig.Collection, params activityDeps, ) fxResult
func VersionWorkflow ¶
func VersionWorkflow(ctx workflow.Context, versionWorkflowArgs *deploymentspb.WorkerDeploymentVersionWorkflowArgs) error
func Workflow ¶
func Workflow(ctx workflow.Context, unsafeMaxVersion func() int, args *deploymentspb.WorkerDeploymentWorkflowArgs) error
Types ¶
type Activities ¶
type Activities struct {
// contains filtered or unexported fields
}
func (*Activities) CheckUnversionedRampUserDataPropagation ¶
func (a *Activities) CheckUnversionedRampUserDataPropagation(ctx context.Context, input *deploymentspb.CheckWorkerDeploymentUserDataPropagationRequest) error
func (*Activities) DeleteWorkerDeploymentVersion ¶
func (a *Activities) DeleteWorkerDeploymentVersion(ctx context.Context, args *deploymentspb.DeleteVersionActivityArgs) error
func (*Activities) IsVersionMissingTaskQueues ¶
func (a *Activities) IsVersionMissingTaskQueues(ctx context.Context, args *deploymentspb.IsVersionMissingTaskQueuesArgs) (*deploymentspb.IsVersionMissingTaskQueuesResult, error)
func (*Activities) SyncUnversionedRamp ¶
func (a *Activities) SyncUnversionedRamp( ctx context.Context, input *deploymentspb.SyncUnversionedRampActivityArgs, ) (*deploymentspb.SyncDeploymentVersionUserDataResponse, error)
func (*Activities) SyncWorkerDeploymentVersion ¶
func (a *Activities) SyncWorkerDeploymentVersion(ctx context.Context, args *deploymentspb.SyncVersionStateActivityArgs) (*deploymentspb.SyncVersionStateActivityResult, error)
type Client ¶
type Client interface { RegisterTaskQueueWorker( ctx context.Context, namespaceEntry *namespace.Namespace, deploymentName, buildId string, taskQueueName string, taskQueueType enumspb.TaskQueueType, identity string, requestID string, ) error DescribeVersion( ctx context.Context, namespaceEntry *namespace.Namespace, version string, ) (*deploymentpb.WorkerDeploymentVersionInfo, error) DescribeWorkerDeployment( ctx context.Context, namespaceEntry *namespace.Namespace, deploymentName string, ) (*deploymentpb.WorkerDeploymentInfo, []byte, error) SetCurrentVersion( ctx context.Context, namespaceEntry *namespace.Namespace, deploymentName string, version string, identity string, ignoreMissingTaskQueues bool, conflictToken []byte, ) (*deploymentspb.SetCurrentVersionResponse, error) ListWorkerDeployments( ctx context.Context, namespaceEntry *namespace.Namespace, pageSize int, nextPageToken []byte, ) ([]*deploymentspb.WorkerDeploymentSummary, []byte, error) DeleteWorkerDeploymentVersion( ctx context.Context, namespaceEntry *namespace.Namespace, version string, skipDrainage bool, identity string, ) error DeleteWorkerDeployment( ctx context.Context, namespaceEntry *namespace.Namespace, deploymentName string, identity string, ) error SetRampingVersion( ctx context.Context, namespaceEntry *namespace.Namespace, deploymentName string, version string, percentage float32, identity string, ignoreMissingTaskQueues bool, conflictToken []byte, ) (*deploymentspb.SetRampingVersionResponse, error) UpdateVersionMetadata( ctx context.Context, namespaceEntry *namespace.Namespace, version string, upsertEntries map[string]*commonpb.Payload, removeEntries []string, identity string, ) (*deploymentpb.VersionMetadata, error) // Used internally by the Worker Deployment workflow in its StartWorkerDeployment Activity StartWorkerDeployment( ctx context.Context, namespaceEntry *namespace.Namespace, deploymentName string, identity string, requestID string, ) error // Used internally by the Worker Deployment workflow in its SyncWorkerDeploymentVersion Activity SyncVersionWorkflowFromWorkerDeployment( ctx context.Context, namespaceEntry *namespace.Namespace, deploymentName, version string, args *deploymentspb.SyncVersionStateUpdateArgs, identity string, requestID string, ) (*deploymentspb.SyncVersionStateResponse, error) // Used internally by the Worker Deployment workflow in its DeleteVersion Activity DeleteVersionFromWorkerDeployment( ctx context.Context, namespaceEntry *namespace.Namespace, deploymentName, version string, identity string, requestID string, skipDrainage bool, ) error // Used internally by the Worker Deployment Version workflow in its AddVersionToWorkerDeployment Activity AddVersionToWorkerDeployment( ctx context.Context, namespaceEntry *namespace.Namespace, deploymentName string, args *deploymentspb.AddVersionUpdateArgs, identity string, requestID string, ) (*deploymentspb.AddVersionToWorkerDeploymentResponse, error) // Used internally by the Drainage workflow (child of Worker Deployment Version workflow) // in its GetVersionDrainageStatus Activity GetVersionDrainageStatus( ctx context.Context, namespaceEntry *namespace.Namespace, version string) (enumspb.VersionDrainageStatus, error) // Used internally by the Worker Deployment workflow in its IsVersionMissingTaskQueues Activity // to verify if there are missing task queues in the new current/ramping version. IsVersionMissingTaskQueues( ctx context.Context, namespaceEntry *namespace.Namespace, prevCurrentVersion, newVersion string, ) (bool, error) }
func ClientProvider ¶
func ClientProvider( logger log.Logger, historyClient resource.HistoryClient, matchingClient resource.MatchingClient, visibilityManager manager.VisibilityManager, dc *dynamicconfig.Collection, ) Client
type ClientImpl ¶
type ClientImpl struct {
// contains filtered or unexported fields
}
ClientImpl implements Client
func (*ClientImpl) AddVersionToWorkerDeployment ¶
func (d *ClientImpl) AddVersionToWorkerDeployment( ctx context.Context, namespaceEntry *namespace.Namespace, deploymentName string, args *deploymentspb.AddVersionUpdateArgs, identity string, requestID string, ) (*deploymentspb.AddVersionToWorkerDeploymentResponse, error)
func (*ClientImpl) DeleteVersionFromWorkerDeployment ¶
func (*ClientImpl) DeleteWorkerDeployment ¶
func (*ClientImpl) DeleteWorkerDeploymentVersion ¶
func (*ClientImpl) DescribeVersion ¶
func (d *ClientImpl) DescribeVersion( ctx context.Context, namespaceEntry *namespace.Namespace, version string, ) (_ *deploymentpb.WorkerDeploymentVersionInfo, retErr error)
func (*ClientImpl) DescribeWorkerDeployment ¶
func (d *ClientImpl) DescribeWorkerDeployment( ctx context.Context, namespaceEntry *namespace.Namespace, deploymentName string, ) (_ *deploymentpb.WorkerDeploymentInfo, conflictToken []byte, retErr error)
func (*ClientImpl) GetVersionDrainageStatus ¶
func (d *ClientImpl) GetVersionDrainageStatus( ctx context.Context, namespaceEntry *namespace.Namespace, version string) (enumspb.VersionDrainageStatus, error)
func (*ClientImpl) IsVersionMissingTaskQueues ¶
func (*ClientImpl) ListWorkerDeployments ¶
func (d *ClientImpl) ListWorkerDeployments( ctx context.Context, namespaceEntry *namespace.Namespace, pageSize int, nextPageToken []byte, ) (_ []*deploymentspb.WorkerDeploymentSummary, _ []byte, retError error)
func (*ClientImpl) RegisterTaskQueueWorker ¶
func (*ClientImpl) SetCurrentVersion ¶
func (d *ClientImpl) SetCurrentVersion( ctx context.Context, namespaceEntry *namespace.Namespace, deploymentName string, version string, identity string, ignoreMissingTaskQueues bool, conflictToken []byte, ) (_ *deploymentspb.SetCurrentVersionResponse, retErr error)
func (*ClientImpl) SetRampingVersion ¶
func (d *ClientImpl) SetRampingVersion( ctx context.Context, namespaceEntry *namespace.Namespace, deploymentName string, version string, percentage float32, identity string, ignoreMissingTaskQueues bool, conflictToken []byte, ) (_ *deploymentspb.SetRampingVersionResponse, retErr error)
func (*ClientImpl) StartWorkerDeployment ¶
func (*ClientImpl) SyncVersionWorkflowFromWorkerDeployment ¶
func (d *ClientImpl) SyncVersionWorkflowFromWorkerDeployment( ctx context.Context, namespaceEntry *namespace.Namespace, deploymentName, version string, args *deploymentspb.SyncVersionStateUpdateArgs, identity string, requestID string, ) (_ *deploymentspb.SyncVersionStateResponse, retErr error)
func (*ClientImpl) UpdateVersionMetadata ¶
func (d *ClientImpl) UpdateVersionMetadata( ctx context.Context, namespaceEntry *namespace.Namespace, version string, upsertEntries map[string]*commonpb.Payload, removeEntries []string, identity string, ) (_ *deploymentpb.VersionMetadata, retErr error)
type DrainageActivities ¶
type DrainageActivities struct {
// contains filtered or unexported fields
}
func (*DrainageActivities) GetVersionDrainageStatus ¶
func (a *DrainageActivities) GetVersionDrainageStatus(ctx context.Context, version *deploymentspb.WorkerDeploymentVersion) (*deploymentpb.VersionDrainageInfo, error)
type ErrMaxTaskQueuesInDeployment ¶
type ErrMaxTaskQueuesInDeployment struct {
// contains filtered or unexported fields
}
type ErrRegister ¶
type ErrRegister struct {
// contains filtered or unexported fields
}
type VersionActivities ¶
type VersionActivities struct {
// contains filtered or unexported fields
}
func (*VersionActivities) AddVersionToWorkerDeployment ¶
func (a *VersionActivities) AddVersionToWorkerDeployment(ctx context.Context, input *deploymentspb.AddVersionToWorkerDeploymentRequest) (*deploymentspb.AddVersionToWorkerDeploymentResponse, error)
func (*VersionActivities) CheckIfTaskQueuesHavePollers ¶
func (a *VersionActivities) CheckIfTaskQueuesHavePollers(ctx context.Context, args *deploymentspb.CheckTaskQueuesHavePollersActivityArgs) (bool, error)
CheckIfTaskQueuesHavePollers returns true if any of the given task queues has any pollers
func (*VersionActivities) CheckWorkerDeploymentUserDataPropagation ¶
func (a *VersionActivities) CheckWorkerDeploymentUserDataPropagation(ctx context.Context, input *deploymentspb.CheckWorkerDeploymentUserDataPropagationRequest) error
func (*VersionActivities) StartWorkerDeploymentWorkflow ¶
func (a *VersionActivities) StartWorkerDeploymentWorkflow( ctx context.Context, input *deploymentspb.StartWorkerDeploymentRequest, ) error
func (*VersionActivities) SyncDeploymentVersionUserData ¶
func (a *VersionActivities) SyncDeploymentVersionUserData( ctx context.Context, input *deploymentspb.SyncDeploymentVersionUserDataRequest, ) (*deploymentspb.SyncDeploymentVersionUserDataResponse, error)
type VersionWorkflowRunner ¶
type VersionWorkflowRunner struct { *deploymentspb.WorkerDeploymentVersionWorkflowArgs // contains filtered or unexported fields }
VersionWorkflowRunner holds the local state for a deployment workflow
type WorkflowRunner ¶
type WorkflowRunner struct { *deploymentspb.WorkerDeploymentWorkflowArgs // contains filtered or unexported fields }
WorkflowRunner holds the local state while running a deployment-series workflow
Click to show internal directories.
Click to hide internal directories.