matching

package
v1.2.8-prerelease1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2024 License: MIT Imports: 46 Imported by: 7

Documentation

Overview

Package matching is a generated GoMock package.

Index

Constants

This section is empty.

Variables

View Source
var (

	// ErrNoTasks is exported temporarily for integration test
	ErrNoTasks = errors.New("no tasks")
)
View Source
var ErrTasklistThrottled = errors.New("tasklist limit exceeded")

ErrTasklistThrottled implies a tasklist was throttled

Functions

func NewService

func NewService(
	params *resource.Params,
) (resource.Resource, error)

NewService builds a new cadence-matching service

Types

type Config added in v0.3.0

type Config struct {
	PersistenceMaxQPS       dynamicconfig.IntPropertyFn
	PersistenceGlobalMaxQPS dynamicconfig.IntPropertyFn
	EnableSyncMatch         dynamicconfig.BoolPropertyFnWithTaskListInfoFilters
	UserRPS                 dynamicconfig.IntPropertyFn
	WorkerRPS               dynamicconfig.IntPropertyFn
	DomainUserRPS           dynamicconfig.IntPropertyFnWithDomainFilter
	DomainWorkerRPS         dynamicconfig.IntPropertyFnWithDomainFilter
	ShutdownDrainDuration   dynamicconfig.DurationPropertyFn

	// taskListManager configuration
	RangeSize                    int64
	GetTasksBatchSize            dynamicconfig.IntPropertyFnWithTaskListInfoFilters
	UpdateAckInterval            dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
	IdleTasklistCheckInterval    dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
	MaxTasklistIdleTime          dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
	NumTasklistWritePartitions   dynamicconfig.IntPropertyFnWithTaskListInfoFilters
	NumTasklistReadPartitions    dynamicconfig.IntPropertyFnWithTaskListInfoFilters
	ForwarderMaxOutstandingPolls dynamicconfig.IntPropertyFnWithTaskListInfoFilters
	ForwarderMaxOutstandingTasks dynamicconfig.IntPropertyFnWithTaskListInfoFilters
	ForwarderMaxRatePerSecond    dynamicconfig.IntPropertyFnWithTaskListInfoFilters
	ForwarderMaxChildrenPerNode  dynamicconfig.IntPropertyFnWithTaskListInfoFilters
	AsyncTaskDispatchTimeout     dynamicconfig.DurationPropertyFnWithTaskListInfoFilters

	// Time to hold a poll request before returning an empty response if there are no tasks
	LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
	MinTaskThrottlingBurstSize dynamicconfig.IntPropertyFnWithTaskListInfoFilters
	MaxTaskDeleteBatchSize     dynamicconfig.IntPropertyFnWithTaskListInfoFilters

	// taskWriter configuration
	OutstandingTaskAppendsThreshold dynamicconfig.IntPropertyFnWithTaskListInfoFilters
	MaxTaskBatchSize                dynamicconfig.IntPropertyFnWithTaskListInfoFilters

	ThrottledLogRPS dynamicconfig.IntPropertyFn

	// debugging configuration
	EnableDebugMode             bool // note that this value is initialized once on service start
	EnableTaskInfoLogByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter

	ActivityTaskSyncMatchWaitTime dynamicconfig.DurationPropertyFnWithDomainFilter

	// isolation configuration
	EnableTasklistIsolation dynamicconfig.BoolPropertyFnWithDomainFilter
	AllIsolationGroups      []string
	// hostname info
	HostName string
}

Config represents configuration for cadence-matching service

func NewConfig added in v0.3.0

func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config

NewConfig returns new service config with default values

type Engine

type Engine interface {
	Stop()
	AddDecisionTask(hCtx *handlerContext, request *types.AddDecisionTaskRequest) (syncMatch bool, err error)
	AddActivityTask(hCtx *handlerContext, request *types.AddActivityTaskRequest) (syncMatch bool, err error)
	PollForDecisionTask(hCtx *handlerContext, request *types.MatchingPollForDecisionTaskRequest) (*types.MatchingPollForDecisionTaskResponse, error)
	PollForActivityTask(hCtx *handlerContext, request *types.MatchingPollForActivityTaskRequest) (*types.PollForActivityTaskResponse, error)
	QueryWorkflow(hCtx *handlerContext, request *types.MatchingQueryWorkflowRequest) (*types.QueryWorkflowResponse, error)
	RespondQueryTaskCompleted(hCtx *handlerContext, request *types.MatchingRespondQueryTaskCompletedRequest) error
	CancelOutstandingPoll(hCtx *handlerContext, request *types.CancelOutstandingPollRequest) error
	DescribeTaskList(hCtx *handlerContext, request *types.MatchingDescribeTaskListRequest) (*types.DescribeTaskListResponse, error)
	ListTaskListPartitions(hCtx *handlerContext, request *types.MatchingListTaskListPartitionsRequest) (*types.ListTaskListPartitionsResponse, error)
	GetTaskListsByDomain(hCtx *handlerContext, request *types.GetTaskListsByDomainRequest) (*types.GetTaskListsByDomainResponse, error)
}

Engine exposes interfaces for clients to poll for activity and decision tasks.

func NewEngine

func NewEngine(taskManager persistence.TaskManager,
	clusterMetadata cluster.Metadata,
	historyService history.Client,
	matchingClient matching.Client,
	config *Config,
	logger log.Logger,
	metricsClient metrics.Client,
	domainCache cache.DomainCache,
	resolver membership.Resolver,
	partitioner partition.Partitioner,
) Engine

NewEngine creates an instance of matching engine

type Forwarder added in v0.7.0

type Forwarder struct {
	// contains filtered or unexported fields
}

Forwarder is the type that contains state pertaining to the api call forwarder component

func (*Forwarder) AddReqTokenC added in v0.7.0

func (fwdr *Forwarder) AddReqTokenC() <-chan *ForwarderReqToken

AddReqTokenC returns a channel that can be used to wait for a token that's necessary before making a ForwardTask or ForwardQueryTask API call. After the API call is invoked, token.release() must be invoked TODO: consider having separate token pools for different isolation groups

func (*Forwarder) ForwardPoll added in v0.7.0

func (fwdr *Forwarder) ForwardPoll(ctx context.Context) (*InternalTask, error)

ForwardPoll forwards a poll request to parent task list partition if it exist

func (*Forwarder) ForwardQueryTask added in v0.7.0

func (fwdr *Forwarder) ForwardQueryTask(
	ctx context.Context,
	task *InternalTask,
) (*types.QueryWorkflowResponse, error)

ForwardQueryTask forwards a query task to parent task list partition, if it exist

func (*Forwarder) ForwardTask added in v0.7.0

func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *InternalTask) error

ForwardTask forwards an activity or decision task to the parent task list partition if it exist

func (*Forwarder) PollReqTokenC added in v0.7.0

func (fwdr *Forwarder) PollReqTokenC(isolationGroup string) <-chan *ForwarderReqToken

PollReqTokenC returns a channel that can be used to wait for a token that's necessary before making a ForwardPoll API call. After the API call is invoked, token.release() must be invoked For tasklists with isolation enabled, we have separate token pools for different isolation groups

type ForwarderReqToken added in v0.7.0

type ForwarderReqToken struct {
	// contains filtered or unexported fields
}

ForwarderReqToken is the token that must be acquired before making forwarder API calls. This type contains the state for the token itself

type GRPCHandler added in v1.2.7

type GRPCHandler struct {
	// contains filtered or unexported fields
}

func NewGRPCHandler added in v1.2.7

func NewGRPCHandler(h Handler) GRPCHandler

func (GRPCHandler) AddActivityTask added in v1.2.7

func (GRPCHandler) AddDecisionTask added in v1.2.7

func (GRPCHandler) CancelOutstandingPoll added in v1.2.7

func (GRPCHandler) DescribeTaskList added in v1.2.7

func (GRPCHandler) GetTaskListsByDomain added in v1.2.7

func (GRPCHandler) Health added in v1.2.7

func (GRPCHandler) ListTaskListPartitions added in v1.2.7

func (GRPCHandler) PollForActivityTask added in v1.2.7

func (GRPCHandler) PollForDecisionTask added in v1.2.7

func (GRPCHandler) QueryWorkflow added in v1.2.7

func (GRPCHandler) Register added in v1.2.7

func (g GRPCHandler) Register(dispatcher *yarpc.Dispatcher)

func (GRPCHandler) RespondQueryTaskCompleted added in v1.2.7

type HistoryUpdatedFunc added in v0.19.1

type HistoryUpdatedFunc func()

HistoryUpdatedFunc is a type for notifying applications when the poller history was updated

type InternalTask added in v0.23.1

type InternalTask struct {
	// contains filtered or unexported fields
}

InternalTask represents an activity, decision, query or started (received from another host). this struct is more like a union and only one of [ query, event, forwarded ] is non-nil for any given task

type MockHandler added in v0.17.0

type MockHandler struct {
	// contains filtered or unexported fields
}

MockHandler is a mock of Handler interface.

func NewMockHandler added in v0.17.0

func NewMockHandler(ctrl *gomock.Controller) *MockHandler

NewMockHandler creates a new mock instance.

func (*MockHandler) AddActivityTask added in v0.17.0

func (m *MockHandler) AddActivityTask(arg0 context.Context, arg1 *types.AddActivityTaskRequest) error

AddActivityTask mocks base method.

func (*MockHandler) AddDecisionTask added in v0.17.0

func (m *MockHandler) AddDecisionTask(arg0 context.Context, arg1 *types.AddDecisionTaskRequest) error

AddDecisionTask mocks base method.

func (*MockHandler) CancelOutstandingPoll added in v0.17.0

func (m *MockHandler) CancelOutstandingPoll(arg0 context.Context, arg1 *types.CancelOutstandingPollRequest) error

CancelOutstandingPoll mocks base method.

func (*MockHandler) DescribeTaskList added in v0.17.0

DescribeTaskList mocks base method.

func (*MockHandler) EXPECT added in v0.17.0

func (m *MockHandler) EXPECT() *MockHandlerMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockHandler) GetTaskListsByDomain added in v0.23.1

GetTaskListsByDomain mocks base method.

func (*MockHandler) Health added in v0.17.0

func (m *MockHandler) Health(arg0 context.Context) (*types.HealthStatus, error)

Health mocks base method.

func (*MockHandler) ListTaskListPartitions added in v0.17.0

ListTaskListPartitions mocks base method.

func (*MockHandler) PollForActivityTask added in v0.17.0

PollForActivityTask mocks base method.

func (*MockHandler) PollForDecisionTask added in v0.17.0

PollForDecisionTask mocks base method.

func (*MockHandler) QueryWorkflow added in v0.17.0

QueryWorkflow mocks base method.

func (*MockHandler) RespondQueryTaskCompleted added in v0.17.0

func (m *MockHandler) RespondQueryTaskCompleted(arg0 context.Context, arg1 *types.MatchingRespondQueryTaskCompletedRequest) error

RespondQueryTaskCompleted mocks base method.

func (*MockHandler) Start added in v0.23.1

func (m *MockHandler) Start()

Start mocks base method.

func (*MockHandler) Stop added in v0.23.1

func (m *MockHandler) Stop()

Stop mocks base method.

type MockHandlerMockRecorder added in v0.17.0

type MockHandlerMockRecorder struct {
	// contains filtered or unexported fields
}

MockHandlerMockRecorder is the mock recorder for MockHandler.

func (*MockHandlerMockRecorder) AddActivityTask added in v0.17.0

func (mr *MockHandlerMockRecorder) AddActivityTask(arg0, arg1 interface{}) *gomock.Call

AddActivityTask indicates an expected call of AddActivityTask.

func (*MockHandlerMockRecorder) AddDecisionTask added in v0.17.0

func (mr *MockHandlerMockRecorder) AddDecisionTask(arg0, arg1 interface{}) *gomock.Call

AddDecisionTask indicates an expected call of AddDecisionTask.

func (*MockHandlerMockRecorder) CancelOutstandingPoll added in v0.17.0

func (mr *MockHandlerMockRecorder) CancelOutstandingPoll(arg0, arg1 interface{}) *gomock.Call

CancelOutstandingPoll indicates an expected call of CancelOutstandingPoll.

func (*MockHandlerMockRecorder) DescribeTaskList added in v0.17.0

func (mr *MockHandlerMockRecorder) DescribeTaskList(arg0, arg1 interface{}) *gomock.Call

DescribeTaskList indicates an expected call of DescribeTaskList.

func (*MockHandlerMockRecorder) GetTaskListsByDomain added in v0.23.1

func (mr *MockHandlerMockRecorder) GetTaskListsByDomain(arg0, arg1 interface{}) *gomock.Call

GetTaskListsByDomain indicates an expected call of GetTaskListsByDomain.

func (*MockHandlerMockRecorder) Health added in v0.17.0

func (mr *MockHandlerMockRecorder) Health(arg0 interface{}) *gomock.Call

Health indicates an expected call of Health.

func (*MockHandlerMockRecorder) ListTaskListPartitions added in v0.17.0

func (mr *MockHandlerMockRecorder) ListTaskListPartitions(arg0, arg1 interface{}) *gomock.Call

ListTaskListPartitions indicates an expected call of ListTaskListPartitions.

func (*MockHandlerMockRecorder) PollForActivityTask added in v0.17.0

func (mr *MockHandlerMockRecorder) PollForActivityTask(arg0, arg1 interface{}) *gomock.Call

PollForActivityTask indicates an expected call of PollForActivityTask.

func (*MockHandlerMockRecorder) PollForDecisionTask added in v0.17.0

func (mr *MockHandlerMockRecorder) PollForDecisionTask(arg0, arg1 interface{}) *gomock.Call

PollForDecisionTask indicates an expected call of PollForDecisionTask.

func (*MockHandlerMockRecorder) QueryWorkflow added in v0.17.0

func (mr *MockHandlerMockRecorder) QueryWorkflow(arg0, arg1 interface{}) *gomock.Call

QueryWorkflow indicates an expected call of QueryWorkflow.

func (*MockHandlerMockRecorder) RespondQueryTaskCompleted added in v0.17.0

func (mr *MockHandlerMockRecorder) RespondQueryTaskCompleted(arg0, arg1 interface{}) *gomock.Call

RespondQueryTaskCompleted indicates an expected call of RespondQueryTaskCompleted.

func (*MockHandlerMockRecorder) Start added in v0.23.1

func (mr *MockHandlerMockRecorder) Start() *gomock.Call

Start indicates an expected call of Start.

func (*MockHandlerMockRecorder) Stop added in v0.23.1

func (mr *MockHandlerMockRecorder) Stop() *gomock.Call

Stop indicates an expected call of Stop.

type Service

type Service struct {
	resource.Resource
	// contains filtered or unexported fields
}

Service represents the cadence-matching service

func (*Service) Start

func (s *Service) Start()

Start starts the service

func (*Service) Stop

func (s *Service) Stop()

Stop stops the service

type TaskMatcher added in v0.6.0

type TaskMatcher struct {
	// contains filtered or unexported fields
}

TaskMatcher matches a task producer with a task consumer Producers are usually rpc calls from history or taskReader that drains backlog from db. Consumers are the task list pollers

func (*TaskMatcher) MustOffer added in v0.6.0

func (tm *TaskMatcher) MustOffer(ctx context.Context, task *InternalTask) error

MustOffer blocks until a consumer is found to handle this task Returns error only when context is canceled, expired or the ratelimit is set to zero (allow nothing)

func (*TaskMatcher) Offer added in v0.6.0

func (tm *TaskMatcher) Offer(ctx context.Context, task *InternalTask) (bool, error)

Offer offers a task to a potential consumer (poller) If the task is successfully matched with a consumer, this method will return true and no error. If the task is matched but consumer returned error, then this method will return true and error message. This method should not be used for query task. This method should ONLY be used for sync match.

When a local poller is not available and forwarding to a parent task list partition is possible, this method will attempt forwarding to the parent partition.

Cases when this method will block:

Ratelimit: When a ratelimit token is not available, this method might block waiting for a token until the provided context timeout. Rate limits are not enforced for forwarded tasks from child partition.

Forwarded tasks that originated from db backlog: When this method is called with a task that is forwarded from a remote partition and if (1) this task list is root (2) task was from db backlog - this method will block until context timeout trying to match with a poller. The caller is expected to set the correct context timeout.

returns error when:

  • ratelimit is exceeded (does not apply to query task)
  • context deadline is exceeded
  • task is matched and consumer returns error in response channel

func (*TaskMatcher) OfferQuery added in v0.7.0

func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *InternalTask) (*types.QueryWorkflowResponse, error)

OfferQuery will either match task to local poller or will forward query task. Local match is always attempted before forwarding is attempted. If local match occurs response and error are both nil, if forwarding occurs then response or error is returned.

func (*TaskMatcher) Poll added in v0.6.0

func (tm *TaskMatcher) Poll(ctx context.Context, isolationGroup string) (*InternalTask, error)

Poll blocks until a task is found or context deadline is exceeded On success, the returned task could be a query task or a regular task Returns ErrNoTasks when context deadline is exceeded

func (*TaskMatcher) PollForQuery added in v0.6.0

func (tm *TaskMatcher) PollForQuery(ctx context.Context) (*InternalTask, error)

PollForQuery blocks until a *query* task is found or context deadline is exceeded Returns ErrNoTasks when context deadline is exceeded

func (*TaskMatcher) Rate added in v0.6.0

func (tm *TaskMatcher) Rate() float64

Rate returns the current rate at which tasks are dispatched

func (*TaskMatcher) UpdateRatelimit added in v0.6.0

func (tm *TaskMatcher) UpdateRatelimit(rps *float64)

UpdateRatelimit updates the task dispatch rate

type ThriftHandler added in v0.17.0

type ThriftHandler struct {
	// contains filtered or unexported fields
}

func NewThriftHandler added in v0.17.0

func NewThriftHandler(h Handler) ThriftHandler

func (ThriftHandler) AddActivityTask added in v0.17.0

func (g ThriftHandler) AddActivityTask(ctx context.Context, AddRequest *matching.AddActivityTaskRequest) (err error)

func (ThriftHandler) AddDecisionTask added in v0.17.0

func (g ThriftHandler) AddDecisionTask(ctx context.Context, AddRequest *matching.AddDecisionTaskRequest) (err error)

func (ThriftHandler) CancelOutstandingPoll added in v0.17.0

func (g ThriftHandler) CancelOutstandingPoll(ctx context.Context, Request *matching.CancelOutstandingPollRequest) (err error)

func (ThriftHandler) DescribeTaskList added in v0.17.0

func (g ThriftHandler) DescribeTaskList(ctx context.Context, Request *matching.DescribeTaskListRequest) (dp1 *shared.DescribeTaskListResponse, err error)

func (ThriftHandler) GetTaskListsByDomain added in v0.23.1

func (g ThriftHandler) GetTaskListsByDomain(ctx context.Context, Request *shared.GetTaskListsByDomainRequest) (gp1 *shared.GetTaskListsByDomainResponse, err error)

func (ThriftHandler) Health added in v0.17.0

func (ThriftHandler) ListTaskListPartitions added in v0.17.0

func (ThriftHandler) PollForActivityTask added in v0.17.0

func (g ThriftHandler) PollForActivityTask(ctx context.Context, PollRequest *matching.PollForActivityTaskRequest) (pp1 *shared.PollForActivityTaskResponse, err error)

func (ThriftHandler) PollForDecisionTask added in v0.17.0

func (g ThriftHandler) PollForDecisionTask(ctx context.Context, PollRequest *matching.PollForDecisionTaskRequest) (pp1 *matching.PollForDecisionTaskResponse, err error)

func (ThriftHandler) QueryWorkflow added in v0.17.0

func (g ThriftHandler) QueryWorkflow(ctx context.Context, QueryRequest *matching.QueryWorkflowRequest) (qp1 *shared.QueryWorkflowResponse, err error)

func (ThriftHandler) Register added in v1.2.7

func (t ThriftHandler) Register(dispatcher *yarpc.Dispatcher)

func (ThriftHandler) RespondQueryTaskCompleted added in v0.17.0

func (g ThriftHandler) RespondQueryTaskCompleted(ctx context.Context, Request *matching.RespondQueryTaskCompletedRequest) (err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL