Documentation ¶
Index ¶
- Variables
- func ContextWithIdentity(ctx context.Context, identity string) context.Context
- func ContextWithIsolationGroup(ctx context.Context, isolationGroup string) context.Context
- func ContextWithPollerID(ctx context.Context, pollerID string) context.Context
- func IdentityFromContext(ctx context.Context) string
- func IsolationGroupFromContext(ctx context.Context) string
- func NewPerTaskListScope(domainName string, taskListName string, taskListKind types.TaskListKind, ...) metrics.Scope
- func PollerIDFromContext(ctx context.Context) string
- type AddTaskParams
- type Forwarder
- func (fwdr *Forwarder) AddReqTokenC() <-chan *ForwarderReqToken
- func (fwdr *Forwarder) ForwardPoll(ctx context.Context) (*InternalTask, error)
- func (fwdr *Forwarder) ForwardQueryTask(ctx context.Context, task *InternalTask) (*types.QueryWorkflowResponse, error)
- func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *InternalTask) error
- func (fwdr *Forwarder) PollReqTokenC(isolationGroup string) <-chan *ForwarderReqToken
- type ForwarderReqToken
- type Identifier
- type InternalTask
- func (task *InternalTask) Finish(err error)
- func (task *InternalTask) IsForwarded() bool
- func (task *InternalTask) IsQuery() bool
- func (task *InternalTask) IsStarted() bool
- func (task *InternalTask) IsSyncMatch() bool
- func (task *InternalTask) PollForActivityResponse() *types.PollForActivityTaskResponse
- func (task *InternalTask) PollForDecisionResponse() *types.MatchingPollForDecisionTaskResponse
- func (task *InternalTask) WorkflowExecution() *types.WorkflowExecution
- type Manager
- type TaskMatcher
- func (tm *TaskMatcher) MustOffer(ctx context.Context, task *InternalTask) error
- func (tm *TaskMatcher) Offer(ctx context.Context, task *InternalTask) (bool, error)
- func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *InternalTask) (*types.QueryWorkflowResponse, error)
- func (tm *TaskMatcher) Poll(ctx context.Context, isolationGroup string) (*InternalTask, error)
- func (tm *TaskMatcher) PollForQuery(ctx context.Context) (*InternalTask, error)
- func (tm *TaskMatcher) Rate() float64
- func (tm *TaskMatcher) UpdateRatelimit(rps *float64)
- type TestTaskManager
- func (m *TestTaskManager) Close()
- func (m *TestTaskManager) CompleteTask(_ context.Context, request *persistence.CompleteTaskRequest) error
- func (m *TestTaskManager) CompleteTasksLessThan(_ context.Context, request *persistence.CompleteTasksLessThanRequest) (*persistence.CompleteTasksLessThanResponse, error)
- func (m *TestTaskManager) CreateTasks(_ context.Context, request *persistence.CreateTasksRequest) (*persistence.CreateTasksResponse, error)
- func (m *TestTaskManager) DeleteTaskList(_ context.Context, request *persistence.DeleteTaskListRequest) error
- func (m *TestTaskManager) GetCreateTaskCount(taskList *Identifier) int
- func (m *TestTaskManager) GetName() string
- func (m *TestTaskManager) GetOrphanTasks(_ context.Context, request *persistence.GetOrphanTasksRequest) (*persistence.GetOrphanTasksResponse, error)
- func (m *TestTaskManager) GetRangeID(taskList *Identifier) int64
- func (m *TestTaskManager) GetTaskCount(taskList *Identifier) int
- func (m *TestTaskManager) GetTaskListSize(_ context.Context, request *persistence.GetTaskListSizeRequest) (*persistence.GetTaskListSizeResponse, error)
- func (m *TestTaskManager) GetTasks(_ context.Context, request *persistence.GetTasksRequest) (*persistence.GetTasksResponse, error)
- func (m *TestTaskManager) LeaseTaskList(_ context.Context, request *persistence.LeaseTaskListRequest) (*persistence.LeaseTaskListResponse, error)
- func (m *TestTaskManager) ListTaskList(_ context.Context, request *persistence.ListTaskListRequest) (*persistence.ListTaskListResponse, error)
- func (m *TestTaskManager) SetRangeID(taskList *Identifier, rangeID int64)
- func (m *TestTaskManager) String() string
- func (m *TestTaskManager) UpdateTaskList(_ context.Context, request *persistence.UpdateTaskListRequest) (*persistence.UpdateTaskListResponse, error)
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoTasks is exported temporarily for integration test ErrNoTasks = errors.New("no tasks") )
var ErrTasklistThrottled = errors.New("tasklist limit exceeded")
ErrTasklistThrottled implies a tasklist was throttled
Functions ¶
func ContextWithIdentity ¶
func ContextWithPollerID ¶
func IdentityFromContext ¶
func NewPerTaskListScope ¶
func PollerIDFromContext ¶
Types ¶
type AddTaskParams ¶
type AddTaskParams struct { Execution *types.WorkflowExecution TaskInfo *persistence.TaskInfo Source types.TaskSource ForwardedFrom string ActivityTaskDispatchInfo *types.ActivityTaskDispatchInfo }
type Forwarder ¶
type Forwarder struct {
// contains filtered or unexported fields
}
Forwarder is the type that contains state pertaining to the api call forwarder component
func (*Forwarder) AddReqTokenC ¶
func (fwdr *Forwarder) AddReqTokenC() <-chan *ForwarderReqToken
AddReqTokenC returns a channel that can be used to wait for a token that's necessary before making a ForwardTask or ForwardQueryTask API call. After the API call is invoked, token.release() must be invoked TODO: consider having separate token pools for different isolation groups
func (*Forwarder) ForwardPoll ¶
func (fwdr *Forwarder) ForwardPoll(ctx context.Context) (*InternalTask, error)
ForwardPoll forwards a poll request to parent task list partition if it exist
func (*Forwarder) ForwardQueryTask ¶
func (fwdr *Forwarder) ForwardQueryTask( ctx context.Context, task *InternalTask, ) (*types.QueryWorkflowResponse, error)
ForwardQueryTask forwards a query task to parent task list partition, if it exist
func (*Forwarder) ForwardTask ¶
func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *InternalTask) error
ForwardTask forwards an activity or decision task to the parent task list partition if it exist
func (*Forwarder) PollReqTokenC ¶
func (fwdr *Forwarder) PollReqTokenC(isolationGroup string) <-chan *ForwarderReqToken
PollReqTokenC returns a channel that can be used to wait for a token that's necessary before making a ForwardPoll API call. After the API call is invoked, token.release() must be invoked For tasklists with isolation enabled, we have separate token pools for different isolation groups
type ForwarderReqToken ¶
type ForwarderReqToken struct {
// contains filtered or unexported fields
}
ForwarderReqToken is the token that must be acquired before making forwarder API calls. This type contains the state for the token itself
type Identifier ¶
type Identifier struct {
// contains filtered or unexported fields
}
Identifier is the key that uniquely identifies a task list
func NewIdentifier ¶
func NewIdentifier( domainID string, taskListName string, taskType int, ) (*Identifier, error)
NewIdentifier returns identifier which uniquely identifies as task list
func NewTestTaskListID ¶
func (*Identifier) GetDomainID ¶
func (tid *Identifier) GetDomainID() string
GetDomainID returns the domain ID of the task list
func (*Identifier) GetName ¶
func (tn *Identifier) GetName() string
GetName returns the name of the task list
func (*Identifier) GetRoot ¶
func (tn *Identifier) GetRoot() string
GetRoot returns the root name for a task list
func (*Identifier) GetType ¶
func (tid *Identifier) GetType() int
GetType returns the task type of the task list
func (*Identifier) IsRoot ¶
func (tn *Identifier) IsRoot() bool
IsRoot returns true if this task list is a root partition
func (*Identifier) Parent ¶
Parent returns the name of the parent task list input:
degree: Number of children at each level of the tree
Returns empty string if this task list is the root
func (*Identifier) String ¶
func (tid *Identifier) String() string
type InternalTask ¶
type InternalTask struct { Event *genericTaskInfo // non-nil for activity or decision task that's locally generated Query *queryTaskInfo // non-nil for a query task that's locally sync matched ResponseC chan error // non-nil only where there is a caller waiting for response (sync-match) BacklogCountHint int64 ActivityTaskDispatchInfo *types.ActivityTaskDispatchInfo // contains filtered or unexported fields }
InternalTask represents an activity, decision, query or started (received from another host). this struct is more like a union and only one of [ query, event, forwarded ] is non-nil for any given task
func (*InternalTask) Finish ¶
func (task *InternalTask) Finish(err error)
finish marks a task as finished. Should be called after a poller picks up a task and marks it as started. If the task is unable to marked as started, then this method should be called with a non-nil error argument.
func (*InternalTask) IsForwarded ¶
func (task *InternalTask) IsForwarded() bool
isForwarded returns true if the underlying task is forwarded by a remote matching host forwarded tasks are already marked as started in history
func (*InternalTask) IsQuery ¶
func (task *InternalTask) IsQuery() bool
isQuery returns true if the underlying task is a query task
func (*InternalTask) IsStarted ¶
func (task *InternalTask) IsStarted() bool
isStarted is true when this task is already marked as started
func (*InternalTask) IsSyncMatch ¶
func (task *InternalTask) IsSyncMatch() bool
func (*InternalTask) PollForActivityResponse ¶
func (task *InternalTask) PollForActivityResponse() *types.PollForActivityTaskResponse
pollForActivityResponse returns the poll response for an activity task that is already marked as started. This method should only be called when isStarted() is true
func (*InternalTask) PollForDecisionResponse ¶
func (task *InternalTask) PollForDecisionResponse() *types.MatchingPollForDecisionTaskResponse
pollForDecisionResponse returns the poll response for a decision task that is already marked as started. This method should only be called when isStarted() is true
func (*InternalTask) WorkflowExecution ¶
func (task *InternalTask) WorkflowExecution() *types.WorkflowExecution
type Manager ¶
type Manager interface { Start() error Stop() // AddTask adds a task to the task list. This method will first attempt a synchronous // match with a poller. When that fails, task will be written to database and later // asynchronously matched with a poller AddTask(ctx context.Context, params AddTaskParams) (syncMatch bool, err error) // GetTask blocks waiting for a task Returns error when context deadline is exceeded // maxDispatchPerSecond is the max rate at which tasks are allowed to be dispatched // from this task list to pollers GetTask(ctx context.Context, maxDispatchPerSecond *float64) (*InternalTask, error) // DispatchTask dispatches a task to a poller. When there are no pollers to pick // up the task, this method will return error. Task will not be persisted to db DispatchTask(ctx context.Context, task *InternalTask) error // DispatchQueryTask will dispatch query to local or remote poller. If forwarded then result or error is returned, // if dispatched to local poller then nil and nil is returned. DispatchQueryTask(ctx context.Context, taskID string, request *types.MatchingQueryWorkflowRequest) (*types.QueryWorkflowResponse, error) CancelPoller(pollerID string) GetAllPollerInfo() []*types.PollerInfo HasPollerAfter(accessTime time.Time) bool // DescribeTaskList returns information about the target tasklist DescribeTaskList(includeTaskListStatus bool) *types.DescribeTaskListResponse String() string GetTaskListKind() types.TaskListKind TaskListID() *Identifier }
func NewManager ¶
func NewManager( domainCache cache.DomainCache, logger log.Logger, metricsClient metrics.Client, taskManager persistence.TaskManager, clusterMetadata cluster.Metadata, partitioner partition.Partitioner, matchingClient matching.Client, closeCallback func(Manager), taskList *Identifier, taskListKind *types.TaskListKind, config *config.Config, timeSource clock.TimeSource, createTime time.Time, ) (Manager, error)
type TaskMatcher ¶
type TaskMatcher struct {
// contains filtered or unexported fields
}
TaskMatcher matches a task producer with a task consumer Producers are usually rpc calls from history or taskReader that drains backlog from db. Consumers are the task list pollers
func (*TaskMatcher) MustOffer ¶
func (tm *TaskMatcher) MustOffer(ctx context.Context, task *InternalTask) error
MustOffer blocks until a consumer is found to handle this task Returns error only when context is canceled, expired or the ratelimit is set to zero (allow nothing)
func (*TaskMatcher) Offer ¶
func (tm *TaskMatcher) Offer(ctx context.Context, task *InternalTask) (bool, error)
Offer offers a task to a potential consumer (poller) If the task is successfully matched with a consumer, this method will return true and no error. If the task is matched but consumer returned error, then this method will return true and error message. This method should not be used for query task. This method should ONLY be used for sync match.
When a local poller is not available and forwarding to a parent task list partition is possible, this method will attempt forwarding to the parent partition.
Cases when this method will block:
Ratelimit: When a ratelimit token is not available, this method might block waiting for a token until the provided context timeout. Rate limits are not enforced for forwarded tasks from child partition.
Forwarded tasks that originated from db backlog: When this method is called with a task that is forwarded from a remote partition and if (1) this task list is root (2) task was from db backlog - this method will block until context timeout trying to match with a poller. The caller is expected to set the correct context timeout.
returns error when:
- ratelimit is exceeded (does not apply to query task)
- context deadline is exceeded
- task is matched and consumer returns error in response channel
func (*TaskMatcher) OfferQuery ¶
func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *InternalTask) (*types.QueryWorkflowResponse, error)
OfferQuery will either match task to local poller or will forward query task. Local match is always attempted before forwarding is attempted. If local match occurs response and error are both nil, if forwarding occurs then response or error is returned.
func (*TaskMatcher) Poll ¶
func (tm *TaskMatcher) Poll(ctx context.Context, isolationGroup string) (*InternalTask, error)
Poll blocks until a task is found or context deadline is exceeded On success, the returned task could be a query task or a regular task Returns ErrNoTasks when context deadline is exceeded
func (*TaskMatcher) PollForQuery ¶
func (tm *TaskMatcher) PollForQuery(ctx context.Context) (*InternalTask, error)
PollForQuery blocks until a *query* task is found or context deadline is exceeded Returns ErrNoTasks when context deadline is exceeded
func (*TaskMatcher) Rate ¶
func (tm *TaskMatcher) Rate() float64
Rate returns the current rate at which tasks are dispatched
func (*TaskMatcher) UpdateRatelimit ¶
func (tm *TaskMatcher) UpdateRatelimit(rps *float64)
UpdateRatelimit updates the task dispatch rate
type TestTaskManager ¶
func NewTestTaskManager ¶
func NewTestTaskManager(t *testing.T, logger log.Logger) *TestTaskManager
func (*TestTaskManager) Close ¶
func (m *TestTaskManager) Close()
func (*TestTaskManager) CompleteTask ¶
func (m *TestTaskManager) CompleteTask( _ context.Context, request *persistence.CompleteTaskRequest, ) error
CompleteTask provides a mock function with given fields: ctx, request
func (*TestTaskManager) CompleteTasksLessThan ¶
func (m *TestTaskManager) CompleteTasksLessThan( _ context.Context, request *persistence.CompleteTasksLessThanRequest, ) (*persistence.CompleteTasksLessThanResponse, error)
CompleteTasksLessThan provides a mock function with given fields: ctx, request
func (*TestTaskManager) CreateTasks ¶
func (m *TestTaskManager) CreateTasks( _ context.Context, request *persistence.CreateTasksRequest, ) (*persistence.CreateTasksResponse, error)
CreateTask provides a mock function with given fields: ctx, request
func (*TestTaskManager) DeleteTaskList ¶
func (m *TestTaskManager) DeleteTaskList( _ context.Context, request *persistence.DeleteTaskListRequest, ) error
DeleteTaskList provides a mock function with given fields: ctx, request
func (*TestTaskManager) GetCreateTaskCount ¶
func (m *TestTaskManager) GetCreateTaskCount(taskList *Identifier) int
getCreateTaskCount returns how many times CreateTask was called
func (*TestTaskManager) GetName ¶
func (m *TestTaskManager) GetName() string
func (*TestTaskManager) GetOrphanTasks ¶
func (m *TestTaskManager) GetOrphanTasks(_ context.Context, request *persistence.GetOrphanTasksRequest) (*persistence.GetOrphanTasksResponse, error)
func (*TestTaskManager) GetRangeID ¶
func (m *TestTaskManager) GetRangeID(taskList *Identifier) int64
func (*TestTaskManager) GetTaskCount ¶
func (m *TestTaskManager) GetTaskCount(taskList *Identifier) int
getTaskCount returns number of tasks in a task list
func (*TestTaskManager) GetTaskListSize ¶
func (m *TestTaskManager) GetTaskListSize(_ context.Context, request *persistence.GetTaskListSizeRequest) (*persistence.GetTaskListSizeResponse, error)
func (*TestTaskManager) GetTasks ¶
func (m *TestTaskManager) GetTasks( _ context.Context, request *persistence.GetTasksRequest, ) (*persistence.GetTasksResponse, error)
GetTasks provides a mock function with given fields: ctx, request
func (*TestTaskManager) LeaseTaskList ¶
func (m *TestTaskManager) LeaseTaskList( _ context.Context, request *persistence.LeaseTaskListRequest, ) (*persistence.LeaseTaskListResponse, error)
LeaseTaskList provides a mock function with given fields: ctx, request
func (*TestTaskManager) ListTaskList ¶
func (m *TestTaskManager) ListTaskList( _ context.Context, request *persistence.ListTaskListRequest, ) (*persistence.ListTaskListResponse, error)
ListTaskList provides a mock function with given fields: ctx, request
func (*TestTaskManager) SetRangeID ¶
func (m *TestTaskManager) SetRangeID(taskList *Identifier, rangeID int64)
func (*TestTaskManager) String ¶
func (m *TestTaskManager) String() string
func (*TestTaskManager) UpdateTaskList ¶
func (m *TestTaskManager) UpdateTaskList( _ context.Context, request *persistence.UpdateTaskListRequest, ) (*persistence.UpdateTaskListResponse, error)
UpdateTaskList provides a mock function with given fields: ctx, request