Documentation ¶
Index ¶
- Variables
- func NewService(params *service.BootstrapParams) common.Daemon
- type Config
- type Engine
- type Forwarder
- func (fwdr *Forwarder) AddReqTokenC() <-chan *ForwarderReqToken
- func (fwdr *Forwarder) ForwardPoll(ctx context.Context) (*internalTask, error)
- func (fwdr *Forwarder) ForwardQueryTask(ctx context.Context, task *internalTask) (*shared.QueryWorkflowResponse, error)
- func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *internalTask) error
- func (fwdr *Forwarder) PollReqTokenC() <-chan *ForwarderReqToken
- type ForwarderReqToken
- type Handler
- func (h *Handler) AddActivityTask(ctx context.Context, addRequest *m.AddActivityTaskRequest) (retError error)
- func (h *Handler) AddDecisionTask(ctx context.Context, addRequest *m.AddDecisionTaskRequest) (retError error)
- func (h *Handler) CancelOutstandingPoll(ctx context.Context, request *m.CancelOutstandingPollRequest) (retError error)
- func (h *Handler) DescribeTaskList(ctx context.Context, request *m.DescribeTaskListRequest) (resp *gen.DescribeTaskListResponse, retError error)
- func (h *Handler) Health(ctx context.Context) (*health.HealthStatus, error)
- func (h *Handler) PollForActivityTask(ctx context.Context, pollRequest *m.PollForActivityTaskRequest) (resp *gen.PollForActivityTaskResponse, retError error)
- func (h *Handler) PollForDecisionTask(ctx context.Context, pollRequest *m.PollForDecisionTaskRequest) (resp *m.PollForDecisionTaskResponse, retError error)
- func (h *Handler) QueryWorkflow(ctx context.Context, queryRequest *m.QueryWorkflowRequest) (resp *gen.QueryWorkflowResponse, retError error)
- func (h *Handler) RegisterHandler()
- func (h *Handler) RespondQueryTaskCompleted(ctx context.Context, request *m.RespondQueryTaskCompletedRequest) (retError error)
- func (h *Handler) Start() error
- func (h *Handler) Stop()
- type Service
- type TaskMatcher
- func (tm *TaskMatcher) MustOffer(ctx context.Context, task *internalTask) error
- func (tm *TaskMatcher) Offer(ctx context.Context, task *internalTask) (bool, error)
- func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *internalTask) ([]byte, error)
- func (tm *TaskMatcher) Poll(ctx context.Context) (*internalTask, error)
- func (tm *TaskMatcher) PollForQuery(ctx context.Context) (*internalTask, error)
- func (tm *TaskMatcher) Rate() float64
- func (tm *TaskMatcher) UpdateRatelimit(rps *float64)
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoTasks is exported temporarily for integration test ErrNoTasks = errors.New("No tasks") )
Functions ¶
func NewService ¶
func NewService(params *service.BootstrapParams) common.Daemon
NewService builds a new cadence-matching service
Types ¶
type Config ¶ added in v0.3.0
type Config struct { PersistenceMaxQPS dynamicconfig.IntPropertyFn EnableSyncMatch dynamicconfig.BoolPropertyFnWithTaskListInfoFilters RPS dynamicconfig.IntPropertyFn // taskListManager configuration RangeSize int64 GetTasksBatchSize dynamicconfig.IntPropertyFnWithTaskListInfoFilters UpdateAckInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters IdleTasklistCheckInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters MaxTasklistIdleTime dynamicconfig.DurationPropertyFnWithTaskListInfoFilters NumTasklistWritePartitions dynamicconfig.IntPropertyFnWithTaskListInfoFilters NumTasklistReadPartitions dynamicconfig.IntPropertyFnWithTaskListInfoFilters ForwarderMaxOutstandingPolls dynamicconfig.IntPropertyFnWithTaskListInfoFilters ForwarderMaxOutstandingTasks dynamicconfig.IntPropertyFnWithTaskListInfoFilters ForwarderMaxRatePerSecond dynamicconfig.IntPropertyFnWithTaskListInfoFilters ForwarderMaxChildrenPerNode dynamicconfig.IntPropertyFnWithTaskListInfoFilters // Time to hold a poll request before returning an empty response if there are no tasks LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters MinTaskThrottlingBurstSize dynamicconfig.IntPropertyFnWithTaskListInfoFilters MaxTaskDeleteBatchSize dynamicconfig.IntPropertyFnWithTaskListInfoFilters // taskWriter configuration OutstandingTaskAppendsThreshold dynamicconfig.IntPropertyFnWithTaskListInfoFilters MaxTaskBatchSize dynamicconfig.IntPropertyFnWithTaskListInfoFilters ThrottledLogRPS dynamicconfig.IntPropertyFn }
Config represents configuration for cadence-matching service
func NewConfig ¶ added in v0.3.0
func NewConfig(dc *dynamicconfig.Collection) *Config
NewConfig returns new service config with default values
type Engine ¶
type Engine interface { Stop() AddDecisionTask(ctx context.Context, addRequest *m.AddDecisionTaskRequest) (syncMatch bool, err error) AddActivityTask(ctx context.Context, addRequest *m.AddActivityTaskRequest) (syncMatch bool, err error) PollForDecisionTask(ctx context.Context, request *m.PollForDecisionTaskRequest) (*m.PollForDecisionTaskResponse, error) PollForActivityTask(ctx context.Context, request *m.PollForActivityTaskRequest) (*workflow.PollForActivityTaskResponse, error) QueryWorkflow(ctx context.Context, request *m.QueryWorkflowRequest) (*workflow.QueryWorkflowResponse, error) RespondQueryTaskCompleted(ctx context.Context, request *m.RespondQueryTaskCompletedRequest) error CancelOutstandingPoll(ctx context.Context, request *m.CancelOutstandingPollRequest) error DescribeTaskList(ctx context.Context, request *m.DescribeTaskListRequest) (*workflow.DescribeTaskListResponse, error) }
Engine exposes interfaces for clients to poll for activity and decision tasks.
type Forwarder ¶ added in v0.7.0
type Forwarder struct {
// contains filtered or unexported fields
}
Forwarder is the type that contains state pertaining to the api call forwarder component
func (*Forwarder) AddReqTokenC ¶ added in v0.7.0
func (fwdr *Forwarder) AddReqTokenC() <-chan *ForwarderReqToken
AddReqTokenC returns a channel that can be used to wait for a token that's necessary before making a ForwardTask or ForwardQueryTask API call. After the API call is invoked, token.release() must be invoked
func (*Forwarder) ForwardPoll ¶ added in v0.7.0
ForwardPoll forwards a poll request to parent task list partition if it exist
func (*Forwarder) ForwardQueryTask ¶ added in v0.7.0
func (fwdr *Forwarder) ForwardQueryTask( ctx context.Context, task *internalTask, ) (*shared.QueryWorkflowResponse, error)
ForwardQueryTask forwards a query task to parent task list partition, if it exist
func (*Forwarder) ForwardTask ¶ added in v0.7.0
ForwardTask forwards an activity or decision task to the parent task list partition if it exist
func (*Forwarder) PollReqTokenC ¶ added in v0.7.0
func (fwdr *Forwarder) PollReqTokenC() <-chan *ForwarderReqToken
PollReqTokenC returns a channel that can be used to wait for a token that's necessary before making a ForwardPoll API call. After the API call is invoked, token.release() must be invoked
type ForwarderReqToken ¶ added in v0.7.0
type ForwarderReqToken struct {
// contains filtered or unexported fields
}
ForwarderReqToken is the token that must be acquired before making forwarder API calls. This type contains the state for the token itself
type Handler ¶
Handler - Thrift handler inteface for history service
func NewHandler ¶
func NewHandler(sVice service.Service, config *Config, taskPersistence persistence.TaskManager, metadataMgr persistence.MetadataManager) *Handler
NewHandler creates a thrift handler for the history service
func (*Handler) AddActivityTask ¶
func (h *Handler) AddActivityTask(ctx context.Context, addRequest *m.AddActivityTaskRequest) (retError error)
AddActivityTask - adds an activity task.
func (*Handler) AddDecisionTask ¶
func (h *Handler) AddDecisionTask(ctx context.Context, addRequest *m.AddDecisionTaskRequest) (retError error)
AddDecisionTask - adds a decision task.
func (*Handler) CancelOutstandingPoll ¶ added in v0.3.2
func (h *Handler) CancelOutstandingPoll(ctx context.Context, request *m.CancelOutstandingPollRequest) (retError error)
CancelOutstandingPoll is used to cancel outstanding pollers
func (*Handler) DescribeTaskList ¶ added in v0.3.5
func (h *Handler) DescribeTaskList(ctx context.Context, request *m.DescribeTaskListRequest) (resp *gen.DescribeTaskListResponse, retError error)
DescribeTaskList returns information about the target tasklist, right now this API returns the pollers which polled this tasklist in last few minutes. If includeTaskListStatus field is true, it will also return status of tasklist's ackManager (readLevel, ackLevel, backlogCountHint and taskIDBlock).
func (*Handler) PollForActivityTask ¶
func (h *Handler) PollForActivityTask(ctx context.Context, pollRequest *m.PollForActivityTaskRequest) (resp *gen.PollForActivityTaskResponse, retError error)
PollForActivityTask - long poll for an activity task.
func (*Handler) PollForDecisionTask ¶
func (h *Handler) PollForDecisionTask(ctx context.Context, pollRequest *m.PollForDecisionTaskRequest) (resp *m.PollForDecisionTaskResponse, retError error)
PollForDecisionTask - long poll for a decision task.
func (*Handler) QueryWorkflow ¶ added in v0.3.2
func (h *Handler) QueryWorkflow(ctx context.Context, queryRequest *m.QueryWorkflowRequest) (resp *gen.QueryWorkflowResponse, retError error)
QueryWorkflow queries a given workflow synchronously and return the query result.
func (*Handler) RegisterHandler ¶ added in v0.5.8
func (h *Handler) RegisterHandler()
RegisterHandler register this handler, must be called before Start()
func (*Handler) RespondQueryTaskCompleted ¶ added in v0.3.2
func (h *Handler) RespondQueryTaskCompleted(ctx context.Context, request *m.RespondQueryTaskCompletedRequest) (retError error)
RespondQueryTaskCompleted responds a query task completed
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service represents the cadence-matching service
type TaskMatcher ¶ added in v0.6.0
type TaskMatcher struct {
// contains filtered or unexported fields
}
TaskMatcher matches a task producer with a task consumer Producers are usually rpc calls from history or taskReader that drains backlog from db. Consumers are the task list pollers
func (*TaskMatcher) MustOffer ¶ added in v0.6.0
func (tm *TaskMatcher) MustOffer(ctx context.Context, task *internalTask) error
MustOffer blocks until a consumer is found to handle this task Returns error only when context is canceled or the ratelimit is set to zero (allow nothing) The passed in context MUST NOT have a deadline associated with it
func (*TaskMatcher) Offer ¶ added in v0.6.0
func (tm *TaskMatcher) Offer(ctx context.Context, task *internalTask) (bool, error)
Offer offers a task to a potential consumer (poller) If the task is successfully matched with a consumer, this method will return true and no error. If the task is matched but consumer returned error, then this method will return true and error message. Both regular tasks and query tasks should use this method to match with a consumer. Likewise, sync matches and non-sync matches both should use this method. returns error when:
- ratelimit is exceeded (does not apply to query task)
- context deadline is exceeded
- task is matched and consumer returns error in response channel
func (*TaskMatcher) OfferQuery ¶ added in v0.7.0
func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *internalTask) ([]byte, error)
OfferQuery offers a query task to a potential consumer (poller). If the task is successfully matched, this method will return the query response. Otherwise it returns error
func (*TaskMatcher) Poll ¶ added in v0.6.0
func (tm *TaskMatcher) Poll(ctx context.Context) (*internalTask, error)
Poll blocks until a task is found or context deadline is exceeded On success, the returned task could be a query task or a regular task Returns ErrNoTasks when context deadline is exceeded
func (*TaskMatcher) PollForQuery ¶ added in v0.6.0
func (tm *TaskMatcher) PollForQuery(ctx context.Context) (*internalTask, error)
PollForQuery blocks until a *query* task is found or context deadline is exceeded Returns ErrNoTasks when context deadline is exceeded
func (*TaskMatcher) Rate ¶ added in v0.6.0
func (tm *TaskMatcher) Rate() float64
Rate returns the current rate at which tasks are dispatched
func (*TaskMatcher) UpdateRatelimit ¶ added in v0.6.0
func (tm *TaskMatcher) UpdateRatelimit(rps *float64)
UpdateRatelimit updates the task dispatch rate