api

package
v1.25.0-115.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 3, 2024 License: MIT Imports: 47 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	UpdateWorkflowWithNewWorkflowTask = &UpdateWorkflowAction{
		CreateWorkflowTask: true,
	}
	UpdateWorkflowWithoutWorkflowTask = &UpdateWorkflowAction{
		CreateWorkflowTask: false,
	}
	UpdateWorkflowTerminate = &UpdateWorkflowAction{
		CreateWorkflowTask: false,
		AbortUpdates:       true,
	}
)
View Source
var ErrUseCurrentExecution = errors.New("ErrUseCurrentExecution")

ErrUseCurrentExecution is a sentinel error to indicate to the caller to use the current workflow execution instead of creating a new one

Functions

func CreateMutableState

func CreateMutableState(
	shard shard.Context,
	namespaceEntry *namespace.Namespace,
	executionTimeout *durationpb.Duration,
	runTimeout *durationpb.Duration,
	workflowID string,
	runID string,
) (workflow.MutableState, error)

func DeserializeHistoryToken added in v1.23.0

func DeserializeHistoryToken(bytes []byte) (*tokenspb.HistoryContinuation, error)

NOTE: DO NOT MODIFY UNLESS ALSO APPLIED TO ./service/frontend/token_deprecated.go

func DeserializeRawHistoryToken added in v1.23.0

func DeserializeRawHistoryToken(bytes []byte) (*tokenspb.RawHistoryContinuation, error)

NOTE: DO NOT MODIFY UNLESS ALSO APPLIED TO ./service/frontend/token_deprecated.go

func GenerateFirstWorkflowTask

func GenerateFirstWorkflowTask(
	mutableState workflow.MutableState,
	parentInfo *workflowspb.ParentExecutionInfo,
	startEvent *historypb.HistoryEvent,
	bypassTaskGeneration bool,
) (int64, error)

func GeneratePaginationToken added in v1.23.0

func GeneratePaginationToken(
	request *historyservice.GetWorkflowExecutionRawHistoryRequest,
	versionHistories *historyspb.VersionHistories,
) *tokenspb.RawHistoryContinuation

func GeneratePaginationTokenV2Request added in v1.24.0

func GeneratePaginationTokenV2Request(
	request *historyservice.GetWorkflowExecutionRawHistoryV2Request,
	versionHistories *historyspb.VersionHistories,
) *tokenspb.RawHistoryContinuation

NOTE: DO NOT MODIFY UNLESS ALSO APPLIED TO ./service/frontend/token_deprecated.go

func GetActiveNamespace added in v1.18.0

func GetActiveNamespace(
	shard shard.Context,
	namespaceUUID namespace.ID,
) (*namespace.Namespace, error)

func GetActivityScheduledEventID added in v1.18.0

func GetActivityScheduledEventID(
	activityID string,
	mutableState workflow.MutableState,
) (int64, error)

func GetAndUpdateWorkflowWithConsistencyCheck added in v1.25.0

func GetAndUpdateWorkflowWithConsistencyCheck(
	ctx context.Context,
	reqClock *clockspb.VectorClock,
	consistencyCheckFn MutableStateConsistencyPredicate,
	workflowKey definition.WorkflowKey,
	action UpdateWorkflowActionFunc,
	newWorkflowFn func() (workflow.Context, workflow.MutableState, error),
	shardContext shard.Context,
	workflowConsistencyChecker WorkflowConsistencyChecker,
) (retError error)

func GetAndUpdateWorkflowWithNew added in v1.18.0

func GetAndUpdateWorkflowWithNew(
	ctx context.Context,
	reqClock *clockspb.VectorClock,
	workflowKey definition.WorkflowKey,
	action UpdateWorkflowActionFunc,
	newWorkflowFn func() (workflow.Context, workflow.MutableState, error),
	shard shard.Context,
	workflowConsistencyChecker WorkflowConsistencyChecker,
) (retError error)

func GetHistory added in v1.23.0

func GetHistory(
	ctx context.Context,
	shard shard.Context,
	namespaceID namespace.ID,
	execution *commonpb.WorkflowExecution,
	firstEventID int64,
	nextEventID int64,
	pageSize int32,
	nextPageToken []byte,
	transientWorkflowTaskInfo *historyspb.TransientWorkflowTaskInfo,
	branchToken []byte,
	persistenceVisibilityMgr manager.VisibilityManager,
) (*historypb.History, []byte, error)

func GetHistoryReverse added in v1.23.0

func GetHistoryReverse(
	ctx context.Context,
	shard shard.Context,
	namespaceID namespace.ID,
	execution *commonpb.WorkflowExecution,
	nextEventID int64,
	lastFirstTxnID int64,
	pageSize int32,
	nextPageToken []byte,
	branchToken []byte,
	persistenceVisibilityMgr manager.VisibilityManager,
) (*historypb.History, []byte, int64, error)

func GetMutableState added in v1.19.0

func GetMutableState(
	ctx context.Context,
	shardContext shard.Context,
	workflowKey definition.WorkflowKey,
	workflowConsistencyChecker WorkflowConsistencyChecker,
) (_ *historyservice.GetMutableStateResponse, retError error)

func GetOrPollMutableState added in v1.19.0

func GetOrPollMutableState(
	ctx context.Context,
	shardContext shard.Context,
	request *historyservice.GetMutableStateRequest,
	workflowConsistencyChecker WorkflowConsistencyChecker,
	eventNotifier events.Notifier,
) (*historyservice.GetMutableStateResponse, error)

func GetRawHistory added in v1.23.0

func GetRawHistory(
	ctx context.Context,
	shard shard.Context,
	namespaceID namespace.ID,
	execution *commonpb.WorkflowExecution,
	firstEventID int64,
	nextEventID int64,
	pageSize int32,
	nextPageToken []byte,
	transientWorkflowTaskInfo *historyspb.TransientWorkflowTaskInfo,
	branchToken []byte,
) ([]*commonpb.DataBlob, []byte, error)

func GetTaskCategory added in v1.23.0

func GetTaskCategory(categoryID int, registry tasks.TaskCategoryRegistry) (tasks.Category, error)

func IsHistoryEventOnCurrentBranch added in v1.19.0

func IsHistoryEventOnCurrentBranch(
	mutableState workflow.MutableState,
	eventID int64,
	eventVersion int64,
) (bool, error)

func IsRetryableError added in v1.17.3

func IsRetryableError(err error) bool

func MigrateWorkflowIdReusePolicyForRunningWorkflow added in v1.24.0

func MigrateWorkflowIdReusePolicyForRunningWorkflow(
	wfIDReusePolicy *enumspb.WorkflowIdReusePolicy,
	wfIDConflictPolicy *enumspb.WorkflowIdConflictPolicy,
)

func MutableStateToGetResponse added in v1.19.0

func MutableStateToGetResponse(
	mutableState workflow.MutableState,
) (*historyservice.GetMutableStateResponse, error)

func NewWorkflowVersionCheck

func NewWorkflowVersionCheck(
	shard shard.Context,
	prevLastWriteVersion int64,
	newMutableState workflow.MutableState,
) error

func OverrideStartWorkflowExecutionRequest added in v1.18.0

func OverrideStartWorkflowExecutionRequest(
	request *workflowservice.StartWorkflowExecutionRequest,
	operation string,
	shard shard.Context,
	metricsHandler metrics.Handler,
)

func ProcessOutgoingSearchAttributes added in v1.23.0

func ProcessOutgoingSearchAttributes(
	shard shard.Context,
	events []*historypb.HistoryEvent,
	namespaceId namespace.ID,
	persistenceVisibilityMgr manager.VisibilityManager,
) error

func SerializeHistoryToken added in v1.23.0

func SerializeHistoryToken(token *tokenspb.HistoryContinuation) ([]byte, error)

NOTE: DO NOT MODIFY UNLESS ALSO APPLIED TO ./service/frontend/token_deprecated.go

func SerializeRawHistoryToken added in v1.23.0

func SerializeRawHistoryToken(token *tokenspb.RawHistoryContinuation) ([]byte, error)

NOTE: DO NOT MODIFY UNLESS ALSO APPLIED TO ./service/frontend/token_deprecated.go

func SetActivityTaskRunID added in v1.18.0

func SetActivityTaskRunID(
	ctx context.Context,
	token *tokenspb.Task,
	workflowConsistencyChecker WorkflowConsistencyChecker,
) error

func TrimHistoryNode added in v1.23.0

func TrimHistoryNode(
	ctx context.Context,
	shardContext shard.Context,
	workflowConsistencyChecker WorkflowConsistencyChecker,
	eventNotifier events.Notifier,
	namespaceID string,
	workflowID string,
	runID string,
)

func UpdateWorkflowWithNew

func UpdateWorkflowWithNew(
	shardContext shard.Context,
	ctx context.Context,
	workflowLease WorkflowLease,
	action UpdateWorkflowActionFunc,
	newWorkflowFn func() (workflow.Context, workflow.MutableState, error),
) (retError error)

func ValidateNamespaceUUID added in v1.18.0

func ValidateNamespaceUUID(
	namespaceUUID namespace.ID,
) error

func ValidatePaginationToken added in v1.23.0

func ValidatePaginationToken(
	request *historyservice.GetWorkflowExecutionRawHistoryRequest,
	token *tokenspb.RawHistoryContinuation,
) error

func ValidatePaginationTokenV2Request added in v1.24.0

func ValidatePaginationTokenV2Request(
	request *historyservice.GetWorkflowExecutionRawHistoryV2Request,
	token *tokenspb.RawHistoryContinuation,
) error

NOTE: DO NOT MODIFY UNLESS ALSO APPLIED TO ./service/frontend/token_deprecated.go

func ValidateReplicationConfig added in v1.19.0

func ValidateReplicationConfig(
	clusterMetadata cluster.Metadata,
) error

func ValidateSignal

func ValidateSignal(
	ctx context.Context,
	shard shard.Context,
	mutableState workflow.MutableState,
	signalPayloadSize int,
	operation string,
) error

func ValidateStart

func ValidateStart(
	ctx context.Context,
	shard shard.Context,
	namespaceEntry *namespace.Namespace,
	workflowID string,
	workflowInputSize int,
	workflowMemoSize int,
	operation string,
) error

func ValidateStartWorkflowExecutionRequest added in v1.18.0

func ValidateStartWorkflowExecutionRequest(
	ctx context.Context,
	request *workflowservice.StartWorkflowExecutionRequest,
	shard shard.Context,
	namespaceEntry *namespace.Namespace,
	operation string,
) error

Types

type MutableStateConsistencyPredicate

type MutableStateConsistencyPredicate func(mutableState workflow.MutableState) bool

type UpdateWorkflowAction

type UpdateWorkflowAction struct {
	Noop               bool
	CreateWorkflowTask bool
	// Abort all "Workflow Updates" (not persistence updates) after persistence operation is succeeded but WF lock is not released.
	AbortUpdates bool
}

type UpdateWorkflowActionFunc

type UpdateWorkflowActionFunc func(WorkflowLease) (*UpdateWorkflowAction, error)

func ResolveDuplicateWorkflowID added in v1.24.0

func ResolveDuplicateWorkflowID(
	shardContext shard.Context,
	workflowKey definition.WorkflowKey,
	namespaceEntry *namespace.Namespace,
	newRunID string,
	currentState enumsspb.WorkflowExecutionState,
	currentStatus enumspb.WorkflowExecutionStatus,
	currentStartRequestID string,
	wfIDReusePolicy enumspb.WorkflowIdReusePolicy,
	wfIDConflictPolicy enumspb.WorkflowIdConflictPolicy,
	currentWorkflowStartTime time.Time,
) (UpdateWorkflowActionFunc, error)

ResolveDuplicateWorkflowID determines how to resolve a workflow ID duplication upon workflow start according to the WorkflowIdReusePolicy (for *completed* workflow) or WorkflowIdConflictPolicy (for *running* workflow).

NOTE: this function assumes the workflow id reuse policy Terminate-if-Running has been migrated to the workflow id conflict policy Terminate-Existing before it is invoked.

An action (ie "mitigate and allow"), an error (ie "deny") or neither (ie "allow") is returned.

type VersionedRunID added in v1.24.0

type VersionedRunID struct {
	RunID            string
	LastWriteVersion int64
}

type WorkflowConsistencyChecker

type WorkflowConsistencyChecker interface {
	GetWorkflowCache() wcache.Cache
	GetCurrentRunID(
		ctx context.Context,
		namespaceID string,
		workflowID string,
		lockPriority workflow.LockPriority,
	) (string, error)
	GetWorkflowLease(
		ctx context.Context,
		reqClock *clockspb.VectorClock,
		workflowKey definition.WorkflowKey,
		lockPriority workflow.LockPriority,
	) (WorkflowLease, error)

	GetWorkflowLeaseWithConsistencyCheck(
		ctx context.Context,
		reqClock *clockspb.VectorClock,
		consistencyPredicate MutableStateConsistencyPredicate,
		workflowKey definition.WorkflowKey,
		lockPriority workflow.LockPriority,
	) (WorkflowLease, error)
}

type WorkflowConsistencyCheckerImpl

type WorkflowConsistencyCheckerImpl struct {
	// contains filtered or unexported fields
}

func NewWorkflowConsistencyChecker

func NewWorkflowConsistencyChecker(
	shardContext shard.Context,
	workflowCache wcache.Cache,
) *WorkflowConsistencyCheckerImpl

func (*WorkflowConsistencyCheckerImpl) GetCurrentRunID

func (c *WorkflowConsistencyCheckerImpl) GetCurrentRunID(
	ctx context.Context,
	namespaceID string,
	workflowID string,
	lockPriority workflow.LockPriority,
) (runID string, retErr error)

func (*WorkflowConsistencyCheckerImpl) GetWorkflowCache added in v1.19.0

func (c *WorkflowConsistencyCheckerImpl) GetWorkflowCache() wcache.Cache

func (*WorkflowConsistencyCheckerImpl) GetWorkflowLease added in v1.24.0

func (c *WorkflowConsistencyCheckerImpl) GetWorkflowLease(
	ctx context.Context,
	reqClock *clockspb.VectorClock,
	workflowKey definition.WorkflowKey,
	lockPriority workflow.LockPriority,
) (WorkflowLease, error)

func (*WorkflowConsistencyCheckerImpl) GetWorkflowLeaseWithConsistencyCheck added in v1.25.0

func (c *WorkflowConsistencyCheckerImpl) GetWorkflowLeaseWithConsistencyCheck(
	ctx context.Context,
	reqClock *clockspb.VectorClock,
	consistencyPredicate MutableStateConsistencyPredicate,
	workflowKey definition.WorkflowKey,
	lockPriority workflow.LockPriority,
) (WorkflowLease, error)

The code below should be used when custom workflow state validation is required. If consistencyPredicate failed (thus detecting a stale workflow state) workflow state will be cleared, and mutable state will be reloaded.

type WorkflowLease added in v1.24.0

type WorkflowLease interface {
	GetContext() workflow.Context
	GetMutableState() workflow.MutableState
	GetReleaseFn() wcache.ReleaseCacheFunc
}

func NewWorkflowLease added in v1.24.0

func NewWorkflowLease(
	context workflow.Context,
	releaseFn wcache.ReleaseCacheFunc,
	mutableState workflow.MutableState,
) WorkflowLease

func NewWorkflowWithSignal

func NewWorkflowWithSignal(
	shard shard.Context,
	namespaceEntry *namespace.Namespace,
	workflowID string,
	runID string,
	startRequest *historyservice.StartWorkflowExecutionRequest,
	signalWithStartRequest *workflowservice.SignalWithStartWorkflowExecutionRequest,
) (WorkflowLease, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL