Documentation ¶
Index ¶
- Variables
- func CreateMutableState(shard shard.Context, namespaceEntry *namespace.Namespace, ...) (workflow.MutableState, error)
- func DeserializeHistoryToken(bytes []byte) (*tokenspb.HistoryContinuation, error)
- func DeserializeRawHistoryToken(bytes []byte) (*tokenspb.RawHistoryContinuation, error)
- func GenerateFirstWorkflowTask(mutableState workflow.MutableState, ...) (int64, error)
- func GeneratePaginationToken(request *historyservice.GetWorkflowExecutionRawHistoryRequest, ...) *tokenspb.RawHistoryContinuation
- func GeneratePaginationTokenV2Request(request *historyservice.GetWorkflowExecutionRawHistoryV2Request, ...) *tokenspb.RawHistoryContinuation
- func GetActiveNamespace(shard shard.Context, namespaceUUID namespace.ID) (*namespace.Namespace, error)
- func GetActivityScheduledEventID(activityID string, mutableState workflow.MutableState) (int64, error)
- func GetAndUpdateWorkflowWithConsistencyCheck(ctx context.Context, reqClock *clockspb.VectorClock, ...) (retError error)
- func GetAndUpdateWorkflowWithNew(ctx context.Context, reqClock *clockspb.VectorClock, ...) (retError error)
- func GetHistory(ctx context.Context, shard shard.Context, namespaceID namespace.ID, ...) (*historypb.History, []byte, error)
- func GetHistoryReverse(ctx context.Context, shard shard.Context, namespaceID namespace.ID, ...) (*historypb.History, []byte, int64, error)
- func GetMutableState(ctx context.Context, shardContext shard.Context, ...) (_ *historyservice.GetMutableStateResponse, retError error)
- func GetOrPollMutableState(ctx context.Context, shardContext shard.Context, ...) (*historyservice.GetMutableStateResponse, error)
- func GetRawHistory(ctx context.Context, shard shard.Context, namespaceID namespace.ID, ...) ([]*commonpb.DataBlob, []byte, error)
- func GetTaskCategory(categoryID int, registry tasks.TaskCategoryRegistry) (tasks.Category, error)
- func IsHistoryEventOnCurrentBranch(mutableState workflow.MutableState, eventID int64, eventVersion int64) (bool, error)
- func IsRetryableError(err error) bool
- func MigrateWorkflowIdReusePolicyForRunningWorkflow(wfIDReusePolicy *enumspb.WorkflowIdReusePolicy, ...)
- func MutableStateToGetResponse(mutableState workflow.MutableState) (*historyservice.GetMutableStateResponse, error)
- func NewWorkflowVersionCheck(shard shard.Context, prevLastWriteVersion int64, ...) error
- func OverrideStartWorkflowExecutionRequest(request *workflowservice.StartWorkflowExecutionRequest, operation string, ...)
- func ProcessOutgoingSearchAttributes(shard shard.Context, events []*historypb.HistoryEvent, ...) error
- func SerializeHistoryToken(token *tokenspb.HistoryContinuation) ([]byte, error)
- func SerializeRawHistoryToken(token *tokenspb.RawHistoryContinuation) ([]byte, error)
- func SetActivityTaskRunID(ctx context.Context, token *tokenspb.Task, ...) error
- func TrimHistoryNode(ctx context.Context, shardContext shard.Context, ...)
- func UpdateWorkflowWithNew(shardContext shard.Context, ctx context.Context, workflowLease WorkflowLease, ...) (retError error)
- func ValidateNamespaceUUID(namespaceUUID namespace.ID) error
- func ValidatePaginationToken(request *historyservice.GetWorkflowExecutionRawHistoryRequest, ...) error
- func ValidatePaginationTokenV2Request(request *historyservice.GetWorkflowExecutionRawHistoryV2Request, ...) error
- func ValidateReplicationConfig(clusterMetadata cluster.Metadata) error
- func ValidateSignal(ctx context.Context, shard shard.Context, mutableState workflow.MutableState, ...) error
- func ValidateStart(ctx context.Context, shard shard.Context, namespaceEntry *namespace.Namespace, ...) error
- func ValidateStartWorkflowExecutionRequest(ctx context.Context, request *workflowservice.StartWorkflowExecutionRequest, ...) error
- type MutableStateConsistencyPredicate
- type UpdateWorkflowAction
- type UpdateWorkflowActionFunc
- type VersionedRunID
- type WorkflowConsistencyChecker
- type WorkflowConsistencyCheckerImpl
- func (c *WorkflowConsistencyCheckerImpl) GetCurrentRunID(ctx context.Context, namespaceID string, workflowID string, ...) (runID string, retErr error)
- func (c *WorkflowConsistencyCheckerImpl) GetWorkflowCache() wcache.Cache
- func (c *WorkflowConsistencyCheckerImpl) GetWorkflowLease(ctx context.Context, reqClock *clockspb.VectorClock, ...) (WorkflowLease, error)
- func (c *WorkflowConsistencyCheckerImpl) GetWorkflowLeaseWithConsistencyCheck(ctx context.Context, reqClock *clockspb.VectorClock, ...) (WorkflowLease, error)
- type WorkflowLease
Constants ¶
This section is empty.
Variables ¶
var ( UpdateWorkflowWithNewWorkflowTask = &UpdateWorkflowAction{ CreateWorkflowTask: true, } UpdateWorkflowWithoutWorkflowTask = &UpdateWorkflowAction{ CreateWorkflowTask: false, } UpdateWorkflowTerminate = &UpdateWorkflowAction{ CreateWorkflowTask: false, AbortUpdates: true, } )
var ErrUseCurrentExecution = errors.New("ErrUseCurrentExecution")
ErrUseCurrentExecution is a sentinel error to indicate to the caller to use the current workflow execution instead of creating a new one
Functions ¶
func CreateMutableState ¶
func CreateMutableState( shard shard.Context, namespaceEntry *namespace.Namespace, executionTimeout *durationpb.Duration, runTimeout *durationpb.Duration, workflowID string, runID string, ) (workflow.MutableState, error)
func DeserializeHistoryToken ¶ added in v1.23.0
func DeserializeHistoryToken(bytes []byte) (*tokenspb.HistoryContinuation, error)
NOTE: DO NOT MODIFY UNLESS ALSO APPLIED TO ./service/frontend/token_deprecated.go
func DeserializeRawHistoryToken ¶ added in v1.23.0
func DeserializeRawHistoryToken(bytes []byte) (*tokenspb.RawHistoryContinuation, error)
NOTE: DO NOT MODIFY UNLESS ALSO APPLIED TO ./service/frontend/token_deprecated.go
func GenerateFirstWorkflowTask ¶
func GenerateFirstWorkflowTask( mutableState workflow.MutableState, parentInfo *workflowspb.ParentExecutionInfo, startEvent *historypb.HistoryEvent, bypassTaskGeneration bool, ) (int64, error)
func GeneratePaginationToken ¶ added in v1.23.0
func GeneratePaginationToken( request *historyservice.GetWorkflowExecutionRawHistoryRequest, versionHistories *historyspb.VersionHistories, ) *tokenspb.RawHistoryContinuation
func GeneratePaginationTokenV2Request ¶ added in v1.24.0
func GeneratePaginationTokenV2Request( request *historyservice.GetWorkflowExecutionRawHistoryV2Request, versionHistories *historyspb.VersionHistories, ) *tokenspb.RawHistoryContinuation
NOTE: DO NOT MODIFY UNLESS ALSO APPLIED TO ./service/frontend/token_deprecated.go
func GetActiveNamespace ¶ added in v1.18.0
func GetActivityScheduledEventID ¶ added in v1.18.0
func GetActivityScheduledEventID( activityID string, mutableState workflow.MutableState, ) (int64, error)
func GetAndUpdateWorkflowWithConsistencyCheck ¶ added in v1.25.0
func GetAndUpdateWorkflowWithConsistencyCheck( ctx context.Context, reqClock *clockspb.VectorClock, consistencyCheckFn MutableStateConsistencyPredicate, workflowKey definition.WorkflowKey, action UpdateWorkflowActionFunc, newWorkflowFn func() (workflow.Context, workflow.MutableState, error), shardContext shard.Context, workflowConsistencyChecker WorkflowConsistencyChecker, ) (retError error)
func GetAndUpdateWorkflowWithNew ¶ added in v1.18.0
func GetAndUpdateWorkflowWithNew( ctx context.Context, reqClock *clockspb.VectorClock, workflowKey definition.WorkflowKey, action UpdateWorkflowActionFunc, newWorkflowFn func() (workflow.Context, workflow.MutableState, error), shard shard.Context, workflowConsistencyChecker WorkflowConsistencyChecker, ) (retError error)
func GetHistory ¶ added in v1.23.0
func GetHistory( ctx context.Context, shard shard.Context, namespaceID namespace.ID, execution *commonpb.WorkflowExecution, firstEventID int64, nextEventID int64, pageSize int32, nextPageToken []byte, transientWorkflowTaskInfo *historyspb.TransientWorkflowTaskInfo, branchToken []byte, persistenceVisibilityMgr manager.VisibilityManager, ) (*historypb.History, []byte, error)
func GetHistoryReverse ¶ added in v1.23.0
func GetHistoryReverse( ctx context.Context, shard shard.Context, namespaceID namespace.ID, execution *commonpb.WorkflowExecution, nextEventID int64, lastFirstTxnID int64, pageSize int32, nextPageToken []byte, branchToken []byte, persistenceVisibilityMgr manager.VisibilityManager, ) (*historypb.History, []byte, int64, error)
func GetMutableState ¶ added in v1.19.0
func GetMutableState( ctx context.Context, shardContext shard.Context, workflowKey definition.WorkflowKey, workflowConsistencyChecker WorkflowConsistencyChecker, ) (_ *historyservice.GetMutableStateResponse, retError error)
func GetOrPollMutableState ¶ added in v1.19.0
func GetRawHistory ¶ added in v1.23.0
func GetRawHistory( ctx context.Context, shard shard.Context, namespaceID namespace.ID, execution *commonpb.WorkflowExecution, firstEventID int64, nextEventID int64, pageSize int32, nextPageToken []byte, transientWorkflowTaskInfo *historyspb.TransientWorkflowTaskInfo, branchToken []byte, ) ([]*commonpb.DataBlob, []byte, error)
func GetTaskCategory ¶ added in v1.23.0
func IsHistoryEventOnCurrentBranch ¶ added in v1.19.0
func IsRetryableError ¶ added in v1.17.3
func MigrateWorkflowIdReusePolicyForRunningWorkflow ¶ added in v1.24.0
func MigrateWorkflowIdReusePolicyForRunningWorkflow( wfIDReusePolicy *enumspb.WorkflowIdReusePolicy, wfIDConflictPolicy *enumspb.WorkflowIdConflictPolicy, )
func MutableStateToGetResponse ¶ added in v1.19.0
func MutableStateToGetResponse( mutableState workflow.MutableState, ) (*historyservice.GetMutableStateResponse, error)
func NewWorkflowVersionCheck ¶
func OverrideStartWorkflowExecutionRequest ¶ added in v1.18.0
func ProcessOutgoingSearchAttributes ¶ added in v1.23.0
func ProcessOutgoingSearchAttributes( shard shard.Context, events []*historypb.HistoryEvent, namespaceId namespace.ID, persistenceVisibilityMgr manager.VisibilityManager, ) error
func SerializeHistoryToken ¶ added in v1.23.0
func SerializeHistoryToken(token *tokenspb.HistoryContinuation) ([]byte, error)
NOTE: DO NOT MODIFY UNLESS ALSO APPLIED TO ./service/frontend/token_deprecated.go
func SerializeRawHistoryToken ¶ added in v1.23.0
func SerializeRawHistoryToken(token *tokenspb.RawHistoryContinuation) ([]byte, error)
NOTE: DO NOT MODIFY UNLESS ALSO APPLIED TO ./service/frontend/token_deprecated.go
func SetActivityTaskRunID ¶ added in v1.18.0
func TrimHistoryNode ¶ added in v1.23.0
func UpdateWorkflowWithNew ¶
func UpdateWorkflowWithNew( shardContext shard.Context, ctx context.Context, workflowLease WorkflowLease, action UpdateWorkflowActionFunc, newWorkflowFn func() (workflow.Context, workflow.MutableState, error), ) (retError error)
func ValidateNamespaceUUID ¶ added in v1.18.0
func ValidatePaginationToken ¶ added in v1.23.0
func ValidatePaginationToken( request *historyservice.GetWorkflowExecutionRawHistoryRequest, token *tokenspb.RawHistoryContinuation, ) error
func ValidatePaginationTokenV2Request ¶ added in v1.24.0
func ValidatePaginationTokenV2Request( request *historyservice.GetWorkflowExecutionRawHistoryV2Request, token *tokenspb.RawHistoryContinuation, ) error
NOTE: DO NOT MODIFY UNLESS ALSO APPLIED TO ./service/frontend/token_deprecated.go
func ValidateReplicationConfig ¶ added in v1.19.0
func ValidateSignal ¶
func ValidateStart ¶
Types ¶
type MutableStateConsistencyPredicate ¶
type MutableStateConsistencyPredicate func(mutableState workflow.MutableState) bool
type UpdateWorkflowAction ¶
type UpdateWorkflowActionFunc ¶
type UpdateWorkflowActionFunc func(WorkflowLease) (*UpdateWorkflowAction, error)
func ResolveDuplicateWorkflowID ¶ added in v1.24.0
func ResolveDuplicateWorkflowID( shardContext shard.Context, workflowKey definition.WorkflowKey, namespaceEntry *namespace.Namespace, newRunID string, currentState enumsspb.WorkflowExecutionState, currentStatus enumspb.WorkflowExecutionStatus, currentStartRequestID string, wfIDReusePolicy enumspb.WorkflowIdReusePolicy, wfIDConflictPolicy enumspb.WorkflowIdConflictPolicy, currentWorkflowStartTime time.Time, ) (UpdateWorkflowActionFunc, error)
ResolveDuplicateWorkflowID determines how to resolve a workflow ID duplication upon workflow start according to the WorkflowIdReusePolicy (for *completed* workflow) or WorkflowIdConflictPolicy (for *running* workflow).
NOTE: this function assumes the workflow id reuse policy Terminate-if-Running has been migrated to the workflow id conflict policy Terminate-Existing before it is invoked.
An action (ie "mitigate and allow"), an error (ie "deny") or neither (ie "allow") is returned.
type VersionedRunID ¶ added in v1.24.0
type WorkflowConsistencyChecker ¶
type WorkflowConsistencyChecker interface { GetWorkflowCache() wcache.Cache GetCurrentRunID( ctx context.Context, namespaceID string, workflowID string, lockPriority locks.Priority, ) (string, error) GetWorkflowLease( ctx context.Context, reqClock *clockspb.VectorClock, workflowKey definition.WorkflowKey, lockPriority locks.Priority, ) (WorkflowLease, error) GetWorkflowLeaseWithConsistencyCheck( ctx context.Context, reqClock *clockspb.VectorClock, consistencyPredicate MutableStateConsistencyPredicate, workflowKey definition.WorkflowKey, lockPriority locks.Priority, ) (WorkflowLease, error) }
type WorkflowConsistencyCheckerImpl ¶
type WorkflowConsistencyCheckerImpl struct {
// contains filtered or unexported fields
}
func NewWorkflowConsistencyChecker ¶
func NewWorkflowConsistencyChecker( shardContext shard.Context, workflowCache wcache.Cache, ) *WorkflowConsistencyCheckerImpl
func (*WorkflowConsistencyCheckerImpl) GetCurrentRunID ¶
func (*WorkflowConsistencyCheckerImpl) GetWorkflowCache ¶ added in v1.19.0
func (c *WorkflowConsistencyCheckerImpl) GetWorkflowCache() wcache.Cache
func (*WorkflowConsistencyCheckerImpl) GetWorkflowLease ¶ added in v1.24.0
func (c *WorkflowConsistencyCheckerImpl) GetWorkflowLease( ctx context.Context, reqClock *clockspb.VectorClock, workflowKey definition.WorkflowKey, lockPriority locks.Priority, ) (WorkflowLease, error)
func (*WorkflowConsistencyCheckerImpl) GetWorkflowLeaseWithConsistencyCheck ¶ added in v1.25.0
func (c *WorkflowConsistencyCheckerImpl) GetWorkflowLeaseWithConsistencyCheck( ctx context.Context, reqClock *clockspb.VectorClock, consistencyPredicate MutableStateConsistencyPredicate, workflowKey definition.WorkflowKey, lockPriority locks.Priority, ) (WorkflowLease, error)
The code below should be used when custom workflow state validation is required. If consistencyPredicate failed (thus detecting a stale workflow state) workflow state will be cleared, and mutable state will be reloaded.
type WorkflowLease ¶ added in v1.24.0
type WorkflowLease interface { GetContext() workflow.Context GetMutableState() workflow.MutableState GetReleaseFn() wcache.ReleaseCacheFunc }
func NewWorkflowLease ¶ added in v1.24.0
func NewWorkflowLease( context workflow.Context, releaseFn wcache.ReleaseCacheFunc, mutableState workflow.MutableState, ) WorkflowLease
func NewWorkflowWithSignal ¶
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package getdlqtasks contains the logic to implement the [historyservice.HistoryServiceServer.GetDLQTasks] API.
|
Package getdlqtasks contains the logic to implement the [historyservice.HistoryServiceServer.GetDLQTasks] API. |