Documentation ¶
Index ¶
- Variables
- func ParamsExpandProvider(params *resource.BootstrapParams) (log.Logger, dynamicconfig.Client, config.Persistence, *esclient.Config, ...)
- func PersistenceMaxQpsProvider(serviceConfig *Config) persistenceClient.PersistenceMaxQps
- func RateLimitInterceptorProvider(serviceConfig *Config) *interceptor.RateLimitInterceptor
- func ServiceLifetimeHooks(lc fx.Lifecycle, svcStoppedCh chan struct{}, svc *Service)
- func TelemetryInterceptorProvider(logger log.Logger, resource resource.Resource) *interceptor.TelemetryInterceptor
- func ThrottledLoggerRpsFnProvider(serviceConfig *Config) resource.ThrottledLoggerRpsFn
- type Config
- type Engine
- type Forwarder
- func (fwdr *Forwarder) AddReqTokenC() <-chan *ForwarderReqToken
- func (fwdr *Forwarder) ForwardPoll(ctx context.Context) (*internalTask, error)
- func (fwdr *Forwarder) ForwardQueryTask(ctx context.Context, task *internalTask) (*matchingservice.QueryWorkflowResponse, error)
- func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *internalTask) error
- func (fwdr *Forwarder) PollReqTokenC() <-chan *ForwarderReqToken
- type ForwarderReqToken
- type Handler
- func (h *Handler) AddActivityTask(ctx context.Context, request *matchingservice.AddActivityTaskRequest) (_ *matchingservice.AddActivityTaskResponse, retError error)
- func (h *Handler) AddWorkflowTask(ctx context.Context, request *matchingservice.AddWorkflowTaskRequest) (_ *matchingservice.AddWorkflowTaskResponse, retError error)
- func (h *Handler) CancelOutstandingPoll(ctx context.Context, request *matchingservice.CancelOutstandingPollRequest) (_ *matchingservice.CancelOutstandingPollResponse, retError error)
- func (h *Handler) Check(_ context.Context, request *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error)
- func (h *Handler) DescribeTaskQueue(ctx context.Context, request *matchingservice.DescribeTaskQueueRequest) (_ *matchingservice.DescribeTaskQueueResponse, retError error)
- func (h *Handler) ListTaskQueuePartitions(ctx context.Context, request *matchingservice.ListTaskQueuePartitionsRequest) (_ *matchingservice.ListTaskQueuePartitionsResponse, retError error)
- func (h *Handler) PollActivityTaskQueue(ctx context.Context, request *matchingservice.PollActivityTaskQueueRequest) (_ *matchingservice.PollActivityTaskQueueResponse, retError error)
- func (h *Handler) PollWorkflowTaskQueue(ctx context.Context, request *matchingservice.PollWorkflowTaskQueueRequest) (_ *matchingservice.PollWorkflowTaskQueueResponse, retError error)
- func (h *Handler) QueryWorkflow(ctx context.Context, request *matchingservice.QueryWorkflowRequest) (_ *matchingservice.QueryWorkflowResponse, retError error)
- func (h *Handler) RespondQueryTaskCompleted(ctx context.Context, request *matchingservice.RespondQueryTaskCompletedRequest) (_ *matchingservice.RespondQueryTaskCompletedResponse, retError error)
- func (h *Handler) Start()
- func (h *Handler) Stop()
- func (h *Handler) Watch(*healthpb.HealthCheckRequest, healthpb.Health_WatchServer) error
- type Service
- type TaskMatcher
- func (tm *TaskMatcher) MustOffer(ctx context.Context, task *internalTask) error
- func (tm *TaskMatcher) Offer(ctx context.Context, task *internalTask) (bool, error)
- func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *internalTask) (*matchingservice.QueryWorkflowResponse, error)
- func (tm *TaskMatcher) Poll(ctx context.Context) (*internalTask, error)
- func (tm *TaskMatcher) PollForQuery(ctx context.Context) (*internalTask, error)
- func (tm *TaskMatcher) Rate() float64
- func (tm *TaskMatcher) UpdateRatelimit(rps *float64)
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoTasks is exported temporarily for integration test ErrNoTasks = errors.New("No tasks") )
var Module = fx.Options( fx.Provide(ParamsExpandProvider), fx.Provide(dynamicconfig.NewCollection), fx.Provide(NewConfig), fx.Provide(PersistenceMaxQpsProvider), fx.Provide(ThrottledLoggerRpsFnProvider), fx.Provide(TelemetryInterceptorProvider), fx.Provide(RateLimitInterceptorProvider), fx.Provide(service.GrpcServerOptionsProvider), resource.Module, fx.Provide(ServiceProvider), fx.Invoke(ServiceLifetimeHooks), )
Functions ¶
func ParamsExpandProvider ¶ added in v1.13.0
func ParamsExpandProvider(params *resource.BootstrapParams) ( log.Logger, dynamicconfig.Client, config.Persistence, *esclient.Config, common.RPCFactory, )
func PersistenceMaxQpsProvider ¶ added in v1.13.0
func PersistenceMaxQpsProvider( serviceConfig *Config, ) persistenceClient.PersistenceMaxQps
This function is the same between services but uses different config sources. if-case comes from resourceImpl.New.
func RateLimitInterceptorProvider ¶ added in v1.13.0
func RateLimitInterceptorProvider( serviceConfig *Config, ) *interceptor.RateLimitInterceptor
func ServiceLifetimeHooks ¶ added in v1.13.0
func TelemetryInterceptorProvider ¶ added in v1.13.0
func TelemetryInterceptorProvider( logger log.Logger, resource resource.Resource, ) *interceptor.TelemetryInterceptor
func ThrottledLoggerRpsFnProvider ¶ added in v1.13.0
func ThrottledLoggerRpsFnProvider(serviceConfig *Config) resource.ThrottledLoggerRpsFn
Types ¶
type Config ¶ added in v0.3.0
type Config struct { PersistenceMaxQPS dynamicconfig.IntPropertyFn PersistenceGlobalMaxQPS dynamicconfig.IntPropertyFn SyncMatchWaitDuration dynamicconfig.DurationPropertyFnWithTaskQueueInfoFilters RPS dynamicconfig.IntPropertyFn ShutdownDrainDuration dynamicconfig.DurationPropertyFn RangeSize int64 GetTasksBatchSize dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters UpdateAckInterval dynamicconfig.DurationPropertyFnWithTaskQueueInfoFilters IdleTaskqueueCheckInterval dynamicconfig.DurationPropertyFnWithTaskQueueInfoFilters MaxTaskqueueIdleTime dynamicconfig.DurationPropertyFnWithTaskQueueInfoFilters NumTaskqueueWritePartitions dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters NumTaskqueueReadPartitions dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters ForwarderMaxOutstandingPolls dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters ForwarderMaxOutstandingTasks dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters ForwarderMaxRatePerSecond dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters ForwarderMaxChildrenPerNode dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters // Time to hold a poll request before returning an empty response if there are no tasks LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskQueueInfoFilters MinTaskThrottlingBurstSize dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters MaxTaskDeleteBatchSize dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters // taskWriter configuration OutstandingTaskAppendsThreshold dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters MaxTaskBatchSize dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters ThrottledLogRPS dynamicconfig.IntPropertyFn AdminNamespaceToPartitionDispatchRate dynamicconfig.FloatPropertyFnWithNamespaceFilter AdminNamespaceTaskqueueToPartitionDispatchRate dynamicconfig.FloatPropertyFnWithTaskQueueInfoFilters }
Config represents configuration for matching service
func NewConfig ¶ added in v0.3.0
func NewConfig(dc *dynamicconfig.Collection) *Config
NewConfig returns new service config with default values
type Engine ¶
type Engine interface { Stop() AddWorkflowTask(hCtx *handlerContext, addRequest *matchingservice.AddWorkflowTaskRequest) (syncMatch bool, err error) AddActivityTask(hCtx *handlerContext, addRequest *matchingservice.AddActivityTaskRequest) (syncMatch bool, err error) PollWorkflowTaskQueue(hCtx *handlerContext, request *matchingservice.PollWorkflowTaskQueueRequest) (*matchingservice.PollWorkflowTaskQueueResponse, error) PollActivityTaskQueue(hCtx *handlerContext, request *matchingservice.PollActivityTaskQueueRequest) (*matchingservice.PollActivityTaskQueueResponse, error) QueryWorkflow(hCtx *handlerContext, request *matchingservice.QueryWorkflowRequest) (*matchingservice.QueryWorkflowResponse, error) RespondQueryTaskCompleted(hCtx *handlerContext, request *matchingservice.RespondQueryTaskCompletedRequest) error CancelOutstandingPoll(hCtx *handlerContext, request *matchingservice.CancelOutstandingPollRequest) error DescribeTaskQueue(hCtx *handlerContext, request *matchingservice.DescribeTaskQueueRequest) (*matchingservice.DescribeTaskQueueResponse, error) ListTaskQueuePartitions(hCtx *handlerContext, request *matchingservice.ListTaskQueuePartitionsRequest) (*matchingservice.ListTaskQueuePartitionsResponse, error) }
Engine exposes interfaces for clients to poll for activity and workflow tasks.
func NewEngine ¶
func NewEngine(taskManager persistence.TaskManager, historyService historyservice.HistoryServiceClient, matchingClient matchingservice.MatchingServiceClient, config *Config, logger log.Logger, metricsClient metrics.Client, namespaceRegistry namespace.Registry, resolver membership.ServiceResolver, clusterMeta cluster.Metadata, ) Engine
NewEngine creates an instance of matching engine
type Forwarder ¶ added in v0.7.0
type Forwarder struct {
// contains filtered or unexported fields
}
Forwarder is the type that contains state pertaining to the api call forwarder component
func (*Forwarder) AddReqTokenC ¶ added in v0.7.0
func (fwdr *Forwarder) AddReqTokenC() <-chan *ForwarderReqToken
AddReqTokenC returns a channel that can be used to wait for a token that's necessary before making a ForwardTask or ForwardQueryTask API call. After the API call is invoked, token.release() must be invoked
func (*Forwarder) ForwardPoll ¶ added in v0.7.0
ForwardPoll forwards a poll request to parent task queue partition if it exist
func (*Forwarder) ForwardQueryTask ¶ added in v0.7.0
func (fwdr *Forwarder) ForwardQueryTask( ctx context.Context, task *internalTask, ) (*matchingservice.QueryWorkflowResponse, error)
ForwardQueryTask forwards a query task to parent task queue partition, if it exist
func (*Forwarder) ForwardTask ¶ added in v0.7.0
ForwardTask forwards an activity or workflow task to the parent task queue partition if it exist
func (*Forwarder) PollReqTokenC ¶ added in v0.7.0
func (fwdr *Forwarder) PollReqTokenC() <-chan *ForwarderReqToken
PollReqTokenC returns a channel that can be used to wait for a token that's necessary before making a ForwardPoll API call. After the API call is invoked, token.release() must be invoked
type ForwarderReqToken ¶ added in v0.7.0
type ForwarderReqToken struct {
// contains filtered or unexported fields
}
ForwarderReqToken is the token that must be acquired before making forwarder API calls. This type contains the state for the token itself
type Handler ¶
Handler - gRPC handler interface for matchingservice
func NewHandler ¶
NewHandler creates a gRPC handler for the matchingservice
func (*Handler) AddActivityTask ¶
func (h *Handler) AddActivityTask( ctx context.Context, request *matchingservice.AddActivityTaskRequest, ) (_ *matchingservice.AddActivityTaskResponse, retError error)
AddActivityTask - adds an activity task.
func (*Handler) AddWorkflowTask ¶ added in v0.27.0
func (h *Handler) AddWorkflowTask( ctx context.Context, request *matchingservice.AddWorkflowTaskRequest, ) (_ *matchingservice.AddWorkflowTaskResponse, retError error)
AddWorkflowTask - adds a workflow task.
func (*Handler) CancelOutstandingPoll ¶ added in v0.3.2
func (h *Handler) CancelOutstandingPoll(ctx context.Context, request *matchingservice.CancelOutstandingPollRequest) (_ *matchingservice.CancelOutstandingPollResponse, retError error)
CancelOutstandingPoll is used to cancel outstanding pollers
func (*Handler) Check ¶ added in v0.27.0
func (h *Handler) Check(_ context.Context, request *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error)
https://github.com/grpc/grpc/blob/master/doc/health-checking.md
func (*Handler) DescribeTaskQueue ¶ added in v0.27.0
func (h *Handler) DescribeTaskQueue( ctx context.Context, request *matchingservice.DescribeTaskQueueRequest, ) (_ *matchingservice.DescribeTaskQueueResponse, retError error)
DescribeTaskQueue returns information about the target task queue, right now this API returns the pollers which polled this task queue in last few minutes. If includeTaskQueueStatus field is true, it will also return status of task queue's ackManager (readLevel, ackLevel, backlogCountHint and taskIDBlock).
func (*Handler) ListTaskQueuePartitions ¶ added in v0.27.0
func (h *Handler) ListTaskQueuePartitions( ctx context.Context, request *matchingservice.ListTaskQueuePartitionsRequest, ) (_ *matchingservice.ListTaskQueuePartitionsResponse, retError error)
ListTaskQueuePartitions returns information about partitions for a taskQueue
func (*Handler) PollActivityTaskQueue ¶ added in v0.27.0
func (h *Handler) PollActivityTaskQueue( ctx context.Context, request *matchingservice.PollActivityTaskQueueRequest, ) (_ *matchingservice.PollActivityTaskQueueResponse, retError error)
PollActivityTaskQueue - long poll for an activity task.
func (*Handler) PollWorkflowTaskQueue ¶ added in v0.27.0
func (h *Handler) PollWorkflowTaskQueue( ctx context.Context, request *matchingservice.PollWorkflowTaskQueueRequest, ) (_ *matchingservice.PollWorkflowTaskQueueResponse, retError error)
PollWorkflowTaskQueue - long poll for a workflow task.
func (*Handler) QueryWorkflow ¶ added in v0.3.2
func (h *Handler) QueryWorkflow( ctx context.Context, request *matchingservice.QueryWorkflowRequest, ) (_ *matchingservice.QueryWorkflowResponse, retError error)
QueryWorkflow queries a given workflow synchronously and return the query result.
func (*Handler) RespondQueryTaskCompleted ¶ added in v0.3.2
func (h *Handler) RespondQueryTaskCompleted( ctx context.Context, request *matchingservice.RespondQueryTaskCompletedRequest, ) (_ *matchingservice.RespondQueryTaskCompletedResponse, retError error)
RespondQueryTaskCompleted responds a query task completed
func (*Handler) Watch ¶ added in v0.27.0
func (h *Handler) Watch(*healthpb.HealthCheckRequest, healthpb.Health_WatchServer) error
type Service ¶
Service represents the matching service
func NewService ¶
func NewService( params *resource.BootstrapParams, ) (*Service, error)
NewService builds a new matching service
func ServiceProvider ¶ added in v1.13.0
type TaskMatcher ¶ added in v0.6.0
type TaskMatcher struct {
// contains filtered or unexported fields
}
TaskMatcher matches a task producer with a task consumer Producers are usually rpc calls from history or taskReader that drains backlog from db. Consumers are the task queue pollers
func (*TaskMatcher) MustOffer ¶ added in v0.6.0
func (tm *TaskMatcher) MustOffer(ctx context.Context, task *internalTask) error
MustOffer blocks until a consumer is found to handle this task Returns error only when context is canceled or the ratelimit is set to zero (allow nothing) The passed in context MUST NOT have a deadline associated with it
func (*TaskMatcher) Offer ¶ added in v0.6.0
func (tm *TaskMatcher) Offer(ctx context.Context, task *internalTask) (bool, error)
Offer offers a task to a potential consumer (poller) If the task is successfully matched with a consumer, this method will return true and no error. If the task is matched but consumer returned error, then this method will return true and error message. This method should not be used for query task. This method should ONLY be used for sync match.
When a local poller is not available and forwarding to a parent task queue partition is possible, this method will attempt forwarding to the parent partition.
Cases when this method will block:
Ratelimit: When a ratelimit token is not available, this method might block waiting for a token until the provided context timeout. Rate limits are not enforced for forwarded tasks from child partition.
Forwarded tasks that originated from db backlog: When this method is called with a task that is forwarded from a remote partition and if (1) this task queue is root (2) task was from db backlog - this method will block until context timeout trying to match with a poller. The caller is expected to set the correct context timeout.
returns error when:
- ratelimit is exceeded (does not apply to query task)
- context deadline is exceeded
- task is matched and consumer returns error in response channel
func (*TaskMatcher) OfferQuery ¶ added in v0.7.0
func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *internalTask) (*matchingservice.QueryWorkflowResponse, error)
OfferQuery will either match task to local poller or will forward query task. Local match is always attempted before forwarding is attempted. If local match occurs response and error are both nil, if forwarding occurs then response or error is returned.
func (*TaskMatcher) Poll ¶ added in v0.6.0
func (tm *TaskMatcher) Poll(ctx context.Context) (*internalTask, error)
Poll blocks until a task is found or context deadline is exceeded On success, the returned task could be a query task or a regular task Returns ErrNoTasks when context deadline is exceeded
func (*TaskMatcher) PollForQuery ¶ added in v0.6.0
func (tm *TaskMatcher) PollForQuery(ctx context.Context) (*internalTask, error)
PollForQuery blocks until a *query* task is found or context deadline is exceeded Returns ErrNoTasks when context deadline is exceeded
func (*TaskMatcher) Rate ¶ added in v0.6.0
func (tm *TaskMatcher) Rate() float64
Rate returns the current rate at which tasks are dispatched
func (*TaskMatcher) UpdateRatelimit ¶ added in v0.6.0
func (tm *TaskMatcher) UpdateRatelimit(rps *float64)
UpdateRatelimit updates the task dispatch rate