matching

package
v0.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2019 License: MIT Imports: 35 Imported by: 7

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// ErrNoTasks is exported temporarily for integration test
	ErrNoTasks = errors.New("No tasks")
)

Functions

func NewService

func NewService(params *service.BootstrapParams) common.Daemon

NewService builds a new cadence-matching service

Types

type Config added in v0.3.0

type Config struct {
	PersistenceMaxQPS dynamicconfig.IntPropertyFn
	EnableSyncMatch   dynamicconfig.BoolPropertyFnWithTaskListInfoFilters
	RPS               dynamicconfig.IntPropertyFn

	// taskListManager configuration
	RangeSize                    int64
	GetTasksBatchSize            dynamicconfig.IntPropertyFnWithTaskListInfoFilters
	UpdateAckInterval            dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
	IdleTasklistCheckInterval    dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
	MaxTasklistIdleTime          dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
	NumTasklistWritePartitions   dynamicconfig.IntPropertyFnWithTaskListInfoFilters
	NumTasklistReadPartitions    dynamicconfig.IntPropertyFnWithTaskListInfoFilters
	ForwarderMaxOutstandingPolls dynamicconfig.IntPropertyFnWithTaskListInfoFilters
	ForwarderMaxOutstandingTasks dynamicconfig.IntPropertyFnWithTaskListInfoFilters
	ForwarderMaxRatePerSecond    dynamicconfig.IntPropertyFnWithTaskListInfoFilters
	ForwarderMaxChildrenPerNode  dynamicconfig.IntPropertyFnWithTaskListInfoFilters

	// Time to hold a poll request before returning an empty response if there are no tasks
	LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
	MinTaskThrottlingBurstSize dynamicconfig.IntPropertyFnWithTaskListInfoFilters
	MaxTaskDeleteBatchSize     dynamicconfig.IntPropertyFnWithTaskListInfoFilters

	// taskWriter configuration
	OutstandingTaskAppendsThreshold dynamicconfig.IntPropertyFnWithTaskListInfoFilters
	MaxTaskBatchSize                dynamicconfig.IntPropertyFnWithTaskListInfoFilters

	ThrottledLogRPS dynamicconfig.IntPropertyFn
}

Config represents configuration for cadence-matching service

func NewConfig added in v0.3.0

func NewConfig(dc *dynamicconfig.Collection) *Config

NewConfig returns new service config with default values

type Engine

type Engine interface {
	Stop()
	AddDecisionTask(ctx context.Context, addRequest *m.AddDecisionTaskRequest) (syncMatch bool, err error)
	AddActivityTask(ctx context.Context, addRequest *m.AddActivityTaskRequest) (syncMatch bool, err error)
	PollForDecisionTask(ctx context.Context, request *m.PollForDecisionTaskRequest) (*m.PollForDecisionTaskResponse, error)
	PollForActivityTask(ctx context.Context, request *m.PollForActivityTaskRequest) (*workflow.PollForActivityTaskResponse, error)
	QueryWorkflow(ctx context.Context, request *m.QueryWorkflowRequest) (*workflow.QueryWorkflowResponse, error)
	RespondQueryTaskCompleted(ctx context.Context, request *m.RespondQueryTaskCompletedRequest) error
	CancelOutstandingPoll(ctx context.Context, request *m.CancelOutstandingPollRequest) error
	DescribeTaskList(ctx context.Context, request *m.DescribeTaskListRequest) (*workflow.DescribeTaskListResponse, error)
}

Engine exposes interfaces for clients to poll for activity and decision tasks.

func NewEngine

func NewEngine(taskManager persistence.TaskManager,
	historyService history.Client,
	matchingClient matching.Client,
	config *Config,
	logger log.Logger,
	metricsClient metrics.Client,
	domainCache cache.DomainCache,
) Engine

NewEngine creates an instance of matching engine

type Forwarder added in v0.7.0

type Forwarder struct {
	// contains filtered or unexported fields
}

Forwarder is the type that contains state pertaining to the api call forwarder component

func (*Forwarder) AddReqTokenC added in v0.7.0

func (fwdr *Forwarder) AddReqTokenC() <-chan *ForwarderReqToken

AddReqTokenC returns a channel that can be used to wait for a token that's necessary before making a ForwardTask or ForwardQueryTask API call. After the API call is invoked, token.release() must be invoked

func (*Forwarder) ForwardPoll added in v0.7.0

func (fwdr *Forwarder) ForwardPoll(ctx context.Context) (*internalTask, error)

ForwardPoll forwards a poll request to parent task list partition if it exist

func (*Forwarder) ForwardQueryTask added in v0.7.0

func (fwdr *Forwarder) ForwardQueryTask(
	ctx context.Context,
	task *internalTask,
) (*shared.QueryWorkflowResponse, error)

ForwardQueryTask forwards a query task to parent task list partition, if it exist

func (*Forwarder) ForwardTask added in v0.7.0

func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *internalTask) error

ForwardTask forwards an activity or decision task to the parent task list partition if it exist

func (*Forwarder) PollReqTokenC added in v0.7.0

func (fwdr *Forwarder) PollReqTokenC() <-chan *ForwarderReqToken

PollReqTokenC returns a channel that can be used to wait for a token that's necessary before making a ForwardPoll API call. After the API call is invoked, token.release() must be invoked

type ForwarderReqToken added in v0.7.0

type ForwarderReqToken struct {
	// contains filtered or unexported fields
}

ForwarderReqToken is the token that must be acquired before making forwarder API calls. This type contains the state for the token itself

type Handler

type Handler struct {
	service.Service
	// contains filtered or unexported fields
}

Handler - Thrift handler inteface for history service

func NewHandler

func NewHandler(sVice service.Service, config *Config, taskPersistence persistence.TaskManager, metadataMgr persistence.MetadataManager) *Handler

NewHandler creates a thrift handler for the history service

func (*Handler) AddActivityTask

func (h *Handler) AddActivityTask(ctx context.Context, addRequest *m.AddActivityTaskRequest) (retError error)

AddActivityTask - adds an activity task.

func (*Handler) AddDecisionTask

func (h *Handler) AddDecisionTask(ctx context.Context, addRequest *m.AddDecisionTaskRequest) (retError error)

AddDecisionTask - adds a decision task.

func (*Handler) CancelOutstandingPoll added in v0.3.2

func (h *Handler) CancelOutstandingPoll(ctx context.Context,
	request *m.CancelOutstandingPollRequest) (retError error)

CancelOutstandingPoll is used to cancel outstanding pollers

func (*Handler) DescribeTaskList added in v0.3.5

func (h *Handler) DescribeTaskList(ctx context.Context, request *m.DescribeTaskListRequest) (resp *gen.DescribeTaskListResponse, retError error)

DescribeTaskList returns information about the target tasklist, right now this API returns the pollers which polled this tasklist in last few minutes. If includeTaskListStatus field is true, it will also return status of tasklist's ackManager (readLevel, ackLevel, backlogCountHint and taskIDBlock).

func (*Handler) Health added in v0.3.0

func (h *Handler) Health(ctx context.Context) (*health.HealthStatus, error)

Health is for health check

func (*Handler) PollForActivityTask

func (h *Handler) PollForActivityTask(ctx context.Context,
	pollRequest *m.PollForActivityTaskRequest) (resp *gen.PollForActivityTaskResponse, retError error)

PollForActivityTask - long poll for an activity task.

func (*Handler) PollForDecisionTask

func (h *Handler) PollForDecisionTask(ctx context.Context,
	pollRequest *m.PollForDecisionTaskRequest) (resp *m.PollForDecisionTaskResponse, retError error)

PollForDecisionTask - long poll for a decision task.

func (*Handler) QueryWorkflow added in v0.3.2

func (h *Handler) QueryWorkflow(ctx context.Context,
	queryRequest *m.QueryWorkflowRequest) (resp *gen.QueryWorkflowResponse, retError error)

QueryWorkflow queries a given workflow synchronously and return the query result.

func (*Handler) RegisterHandler added in v0.5.8

func (h *Handler) RegisterHandler()

RegisterHandler register this handler, must be called before Start()

func (*Handler) RespondQueryTaskCompleted added in v0.3.2

func (h *Handler) RespondQueryTaskCompleted(ctx context.Context, request *m.RespondQueryTaskCompletedRequest) (retError error)

RespondQueryTaskCompleted responds a query task completed

func (*Handler) Start

func (h *Handler) Start() error

Start starts the handler

func (*Handler) Stop

func (h *Handler) Stop()

Stop stops the handler

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service represents the cadence-matching service

func (*Service) Start

func (s *Service) Start()

Start starts the service

func (*Service) Stop

func (s *Service) Stop()

Stop stops the service

type TaskMatcher added in v0.6.0

type TaskMatcher struct {
	// contains filtered or unexported fields
}

TaskMatcher matches a task producer with a task consumer Producers are usually rpc calls from history or taskReader that drains backlog from db. Consumers are the task list pollers

func (*TaskMatcher) MustOffer added in v0.6.0

func (tm *TaskMatcher) MustOffer(ctx context.Context, task *internalTask) error

MustOffer blocks until a consumer is found to handle this task Returns error only when context is canceled or the ratelimit is set to zero (allow nothing) The passed in context MUST NOT have a deadline associated with it

func (*TaskMatcher) Offer added in v0.6.0

func (tm *TaskMatcher) Offer(ctx context.Context, task *internalTask) (bool, error)

Offer offers a task to a potential consumer (poller) If the task is successfully matched with a consumer, this method will return true and no error. If the task is matched but consumer returned error, then this method will return true and error message. Both regular tasks and query tasks should use this method to match with a consumer. Likewise, sync matches and non-sync matches both should use this method. returns error when:

  • ratelimit is exceeded (does not apply to query task)
  • context deadline is exceeded
  • task is matched and consumer returns error in response channel

func (*TaskMatcher) OfferQuery added in v0.7.0

func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *internalTask) ([]byte, error)

OfferQuery offers a query task to a potential consumer (poller). If the task is successfully matched, this method will return the query response. Otherwise it returns error

func (*TaskMatcher) Poll added in v0.6.0

func (tm *TaskMatcher) Poll(ctx context.Context) (*internalTask, error)

Poll blocks until a task is found or context deadline is exceeded On success, the returned task could be a query task or a regular task Returns ErrNoTasks when context deadline is exceeded

func (*TaskMatcher) PollForQuery added in v0.6.0

func (tm *TaskMatcher) PollForQuery(ctx context.Context) (*internalTask, error)

PollForQuery blocks until a *query* task is found or context deadline is exceeded Returns ErrNoTasks when context deadline is exceeded

func (*TaskMatcher) Rate added in v0.6.0

func (tm *TaskMatcher) Rate() float64

Rate returns the current rate at which tasks are dispatched

func (*TaskMatcher) UpdateRatelimit added in v0.6.0

func (tm *TaskMatcher) UpdateRatelimit(rps *float64)

UpdateRatelimit updates the task dispatch rate

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL