Documentation ¶
Index ¶
- Variables
- func ClearTombstones(versioningData *persistencespb.VersioningData) *persistencespb.VersioningData
- func GetBuildIdDeltas(prev *persistencespb.VersioningData, curr *persistencespb.VersioningData) (added []string, removed []string)
- func IsTaskExpired(t *persistencespb.AllocatedTaskInfo) bool
- func MergeVersioningData(a *persistencespb.VersioningData, b *persistencespb.VersioningData) *persistencespb.VersioningData
- func PersistUnknownBuildId(clock *hlc.Clock, data *persistencespb.VersioningData, buildId string) *persistencespb.VersioningData
- func PersistenceRateLimitingParamsProvider(serviceConfig *Config, ...) service.PersistenceRateLimitingParams
- func RateLimitInterceptorProvider(serviceConfig *Config) *interceptor.RateLimitInterceptor
- func RemoveBuildIds(clock *hlc.Clock, versioningData *persistencespb.VersioningData, ...) *persistencespb.VersioningData
- func RetryableInterceptorProvider() *interceptor.RetryableInterceptor
- func ServiceLifetimeHooks(lc fx.Lifecycle, svc *Service)
- func ServiceResolverProvider(membershipMonitor membership.Monitor) (membership.ServiceResolver, error)
- func TelemetryInterceptorProvider(logger log.Logger, namespaceRegistry namespace.Registry, ...) *interceptor.TelemetryInterceptor
- func ThrottledLoggerRpsFnProvider(serviceConfig *Config) resource.ThrottledLoggerRpsFn
- func ToBuildIdOrderingResponse(data *persistencespb.VersioningData, maxSets int) *workflowservice.GetWorkerBuildIdCompatibilityResponse
- func UpdateVersionSets(clock *hlc.Clock, data *persistencespb.VersioningData, ...) (*persistencespb.VersioningData, error)
- func VisibilityManagerProvider(logger log.Logger, persistenceConfig *config.Persistence, ...) (manager.VisibilityManager, error)
- type Config
- type Engine
- type Forwarder
- func (fwdr *Forwarder) AddReqTokenC() <-chan *ForwarderReqToken
- func (fwdr *Forwarder) ForwardNexusTask(ctx context.Context, task *internalTask) (*matchingservice.DispatchNexusTaskResponse, error)
- func (fwdr *Forwarder) ForwardPoll(ctx context.Context, pollMetadata *pollMetadata) (*internalTask, error)
- func (fwdr *Forwarder) ForwardQueryTask(ctx context.Context, task *internalTask) (*matchingservice.QueryWorkflowResponse, error)
- func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *internalTask) error
- func (fwdr *Forwarder) PollReqTokenC() <-chan *ForwarderReqToken
- type ForwarderReqToken
- type Handler
- func (h *Handler) AddActivityTask(ctx context.Context, request *matchingservice.AddActivityTaskRequest) (_ *matchingservice.AddActivityTaskResponse, retError error)
- func (h *Handler) AddWorkflowTask(ctx context.Context, request *matchingservice.AddWorkflowTaskRequest) (_ *matchingservice.AddWorkflowTaskResponse, retError error)
- func (h *Handler) ApplyTaskQueueUserDataReplicationEvent(ctx context.Context, ...) (_ *matchingservice.ApplyTaskQueueUserDataReplicationEventResponse, ...)
- func (h *Handler) CancelOutstandingPoll(ctx context.Context, request *matchingservice.CancelOutstandingPollRequest) (_ *matchingservice.CancelOutstandingPollResponse, retError error)
- func (h *Handler) CreateNexusIncomingService(ctx context.Context, ...) (_ *matchingservice.CreateNexusIncomingServiceResponse, retError error)
- func (h *Handler) DeleteNexusIncomingService(ctx context.Context, ...) (_ *matchingservice.DeleteNexusIncomingServiceResponse, retError error)
- func (h *Handler) DescribeTaskQueue(ctx context.Context, request *matchingservice.DescribeTaskQueueRequest) (_ *matchingservice.DescribeTaskQueueResponse, retError error)
- func (h *Handler) DispatchNexusTask(ctx context.Context, request *matchingservice.DispatchNexusTaskRequest) (_ *matchingservice.DispatchNexusTaskResponse, retError error)
- func (h *Handler) ForceUnloadTaskQueue(ctx context.Context, request *matchingservice.ForceUnloadTaskQueueRequest) (_ *matchingservice.ForceUnloadTaskQueueResponse, retError error)
- func (h *Handler) GetBuildIdTaskQueueMapping(ctx context.Context, ...) (_ *matchingservice.GetBuildIdTaskQueueMappingResponse, retError error)
- func (h *Handler) GetTaskQueueUserData(ctx context.Context, request *matchingservice.GetTaskQueueUserDataRequest) (_ *matchingservice.GetTaskQueueUserDataResponse, retError error)
- func (h *Handler) GetWorkerBuildIdCompatibility(ctx context.Context, ...) (_ *matchingservice.GetWorkerBuildIdCompatibilityResponse, retError error)
- func (h *Handler) ListNexusIncomingServices(ctx context.Context, request *matchingservice.ListNexusIncomingServicesRequest) (_ *matchingservice.ListNexusIncomingServicesResponse, retError error)
- func (h *Handler) ListTaskQueuePartitions(ctx context.Context, request *matchingservice.ListTaskQueuePartitionsRequest) (_ *matchingservice.ListTaskQueuePartitionsResponse, retError error)
- func (h *Handler) PollActivityTaskQueue(ctx context.Context, request *matchingservice.PollActivityTaskQueueRequest) (_ *matchingservice.PollActivityTaskQueueResponse, retError error)
- func (h *Handler) PollNexusTaskQueue(ctx context.Context, request *matchingservice.PollNexusTaskQueueRequest) (_ *matchingservice.PollNexusTaskQueueResponse, retError error)
- func (h *Handler) PollWorkflowTaskQueue(ctx context.Context, request *matchingservice.PollWorkflowTaskQueueRequest) (_ *matchingservice.PollWorkflowTaskQueueResponse, retError error)
- func (h *Handler) QueryWorkflow(ctx context.Context, request *matchingservice.QueryWorkflowRequest) (_ *matchingservice.QueryWorkflowResponse, retError error)
- func (h *Handler) ReplicateTaskQueueUserData(ctx context.Context, ...) (_ *matchingservice.ReplicateTaskQueueUserDataResponse, retError error)
- func (h *Handler) RespondNexusTaskCompleted(ctx context.Context, request *matchingservice.RespondNexusTaskCompletedRequest) (_ *matchingservice.RespondNexusTaskCompletedResponse, retError error)
- func (h *Handler) RespondNexusTaskFailed(ctx context.Context, request *matchingservice.RespondNexusTaskFailedRequest) (_ *matchingservice.RespondNexusTaskFailedResponse, retError error)
- func (h *Handler) RespondQueryTaskCompleted(ctx context.Context, request *matchingservice.RespondQueryTaskCompletedRequest) (_ *matchingservice.RespondQueryTaskCompletedResponse, retError error)
- func (h *Handler) Start()
- func (h *Handler) Stop()
- func (h *Handler) UpdateNexusIncomingService(ctx context.Context, ...) (_ *matchingservice.UpdateNexusIncomingServiceResponse, retError error)
- func (h *Handler) UpdateTaskQueueUserData(ctx context.Context, request *matchingservice.UpdateTaskQueueUserDataRequest) (_ *matchingservice.UpdateTaskQueueUserDataResponse, retError error)
- func (h *Handler) UpdateWorkerBuildIdCompatibility(ctx context.Context, ...) (_ *matchingservice.UpdateWorkerBuildIdCompatibilityResponse, retError error)
- type Service
- type TaskMatcher
- func (tm *TaskMatcher) MustOffer(ctx context.Context, task *internalTask, interruptCh chan struct{}) error
- func (tm *TaskMatcher) Offer(ctx context.Context, task *internalTask) (bool, error)
- func (tm *TaskMatcher) OfferNexusTask(ctx context.Context, task *internalTask) (*matchingservice.DispatchNexusTaskResponse, error)
- func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *internalTask) (*matchingservice.QueryWorkflowResponse, error)
- func (tm *TaskMatcher) Poll(ctx context.Context, pollMetadata *pollMetadata) (*internalTask, error)
- func (tm *TaskMatcher) PollForQuery(ctx context.Context, pollMetadata *pollMetadata) (*internalTask, error)
- func (tm *TaskMatcher) Rate() float64
- func (tm *TaskMatcher) Stop()
- func (tm *TaskMatcher) UpdateRatelimit(rpsPtr *float64)
- type TaskQueueReplicatorNamespaceReplicationQueue
- type UserDataUpdateFunc
- type UserDataUpdateOptions
Constants ¶
This section is empty.
Variables ¶
var Module = fx.Options( resource.Module, fx.Provide(dynamicconfig.NewCollection), fx.Provide(ConfigProvider), fx.Provide(PersistenceRateLimitingParamsProvider), service.PersistenceLazyLoadedServiceResolverModule, fx.Provide(ThrottledLoggerRpsFnProvider), fx.Provide(RetryableInterceptorProvider), fx.Provide(TelemetryInterceptorProvider), fx.Provide(RateLimitInterceptorProvider), fx.Provide(VisibilityManagerProvider), fx.Provide(NewHandler), fx.Provide(service.GrpcServerOptionsProvider), fx.Provide(NamespaceReplicationQueueProvider), fx.Provide(ServiceResolverProvider), fx.Provide(NewService), fx.Invoke(ServiceLifetimeHooks), )
Functions ¶
func ClearTombstones ¶ added in v1.21.0
func ClearTombstones(versioningData *persistencespb.VersioningData) *persistencespb.VersioningData
ClearTombstones clears all tombstone build ids (with STATE_DELETED) from versioning data. Clones data to avoid mutating in place.
func GetBuildIdDeltas ¶ added in v1.21.0
func GetBuildIdDeltas(prev *persistencespb.VersioningData, curr *persistencespb.VersioningData) (added []string, removed []string)
GetBuildIdDeltas compares all active build ids in prev and curr sets and returns sets of added and removed build ids.
func IsTaskExpired ¶ added in v1.22.0
func IsTaskExpired(t *persistencespb.AllocatedTaskInfo) bool
TODO https://github.com/temporalio/temporal/issues/1021
there should be more validation logic here 1. if task has valid TTL -> TTL reached -> delete 2. if task has 0 TTL / no TTL -> logic need to additionally check if corresponding workflow still exists
func MergeVersioningData ¶ added in v1.21.0
func MergeVersioningData(a *persistencespb.VersioningData, b *persistencespb.VersioningData) *persistencespb.VersioningData
MergeVersioningData merges two VersioningData structs. If a build ID appears in both data structures, the merged structure will include that latest status and timestamp. If a build ID appears in different sets in the different structures, those sets will be merged. The merged data's per set default and global default will be set according to the latest timestamps in the sources. if (a) is nil, (b) is returned as is, otherwise, if (b) is nil (a) is returned as is.
func PersistUnknownBuildId ¶ added in v1.22.0
func PersistUnknownBuildId(clock *hlc.Clock, data *persistencespb.VersioningData, buildId string) *persistencespb.VersioningData
func PersistenceRateLimitingParamsProvider ¶ added in v1.17.2
func PersistenceRateLimitingParamsProvider( serviceConfig *Config, persistenceLazyLoadedServiceResolver service.PersistenceLazyLoadedServiceResolver, ) service.PersistenceRateLimitingParams
PersistenceRateLimitingParamsProvider is the same between services but uses different config sources. if-case comes from resourceImpl.New.
func RateLimitInterceptorProvider ¶ added in v1.13.0
func RateLimitInterceptorProvider( serviceConfig *Config, ) *interceptor.RateLimitInterceptor
func RemoveBuildIds ¶ added in v1.21.0
func RemoveBuildIds(clock *hlc.Clock, versioningData *persistencespb.VersioningData, buildIds []string) *persistencespb.VersioningData
RemoveBuildIds removes given buildIds from versioning data. Assumes that build ids are safe to remove, ex: a set default is never removed unless it is a single set member and that set is not default for the queue.
func RetryableInterceptorProvider ¶ added in v1.17.3
func RetryableInterceptorProvider() *interceptor.RetryableInterceptor
func ServiceLifetimeHooks ¶ added in v1.13.0
func ServiceResolverProvider ¶ added in v1.14.0
func ServiceResolverProvider( membershipMonitor membership.Monitor, ) (membership.ServiceResolver, error)
func TelemetryInterceptorProvider ¶ added in v1.13.0
func TelemetryInterceptorProvider( logger log.Logger, namespaceRegistry namespace.Registry, metricsHandler metrics.Handler, ) *interceptor.TelemetryInterceptor
func ThrottledLoggerRpsFnProvider ¶ added in v1.13.0
func ThrottledLoggerRpsFnProvider(serviceConfig *Config) resource.ThrottledLoggerRpsFn
func ToBuildIdOrderingResponse ¶ added in v1.17.3
func ToBuildIdOrderingResponse(data *persistencespb.VersioningData, maxSets int) *workflowservice.GetWorkerBuildIdCompatibilityResponse
ToBuildIdOrderingResponse transforms the internal VersioningData representation to public representation. If maxSets is given, the last sets up to maxSets will be returned.
func UpdateVersionSets ¶ added in v1.21.0
func UpdateVersionSets(clock *hlc.Clock, data *persistencespb.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest, maxSets, maxBuildIds int) (*persistencespb.VersioningData, error)
UpdateVersionSets updates version sets given existing versioning data and an update request. The request is expected to have already been validated.
See the API docs for more detail. In short, the versioning data representation consists of a sequence of sequences of compatible versions. Like so:
* ┬─1.0───2.0─┬─3.0───4.0 │ ├─3.1 │ └─3.2 ├─1.1 ├─1.2 └─1.3
In the above example, 4.0 is the current default version and no other versions are compatible with it. The previous compatible set is the 3.x set, with 3.2 being the current default for that set, and so on. The * represents the current default set pointer, which can be shifted around by the user.
A request may:
- Add a new version possibly as the new overall default version, creating a new set.
- Add a new version, compatible with some existing version, adding it to that existing set and making it the new default for that set.
- Target some existing version, marking it (and thus its set) as the default set.
Deletions are performed by a background process which verifies build IDs are no longer in use and safe to delete (not yet implemented).
Update may fail with FailedPrecondition if it would cause exceeding the supplied limits.
func VisibilityManagerProvider ¶ added in v1.21.0
func VisibilityManagerProvider( logger log.Logger, persistenceConfig *config.Persistence, customVisibilityStoreFactory visibility.VisibilityStoreFactory, metricsHandler metrics.Handler, serviceConfig *Config, esClient esclient.Client, persistenceServiceResolver resolver.ServiceResolver, searchAttributesMapperProvider searchattribute.MapperProvider, saProvider searchattribute.Provider, ) (manager.VisibilityManager, error)
Types ¶
type Config ¶ added in v0.3.0
type Config struct { PersistenceMaxQPS dynamicconfig.IntPropertyFn PersistenceGlobalMaxQPS dynamicconfig.IntPropertyFn PersistenceNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter PersistenceGlobalNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter PersistencePerShardNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter EnablePersistencePriorityRateLimiting dynamicconfig.BoolPropertyFn PersistenceDynamicRateLimitingParams dynamicconfig.MapPropertyFn SyncMatchWaitDuration dynamicconfig.DurationPropertyFnWithTaskQueueInfoFilters TestDisableSyncMatch dynamicconfig.BoolPropertyFn RPS dynamicconfig.IntPropertyFn OperatorRPSRatio dynamicconfig.FloatPropertyFn AlignMembershipChange dynamicconfig.DurationPropertyFn ShutdownDrainDuration dynamicconfig.DurationPropertyFn HistoryMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter RangeSize int64 GetTasksBatchSize dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters UpdateAckInterval dynamicconfig.DurationPropertyFnWithTaskQueueInfoFilters MaxTaskQueueIdleTime dynamicconfig.DurationPropertyFnWithTaskQueueInfoFilters NumTaskqueueWritePartitions dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters NumTaskqueueReadPartitions dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters ForwarderMaxOutstandingPolls dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters ForwarderMaxOutstandingTasks dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters ForwarderMaxRatePerSecond dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters ForwarderMaxChildrenPerNode dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters VersionCompatibleSetLimitPerQueue dynamicconfig.IntPropertyFnWithNamespaceFilter VersionBuildIdLimitPerQueue dynamicconfig.IntPropertyFnWithNamespaceFilter TaskQueueLimitPerBuildId dynamicconfig.IntPropertyFnWithNamespaceFilter GetUserDataLongPollTimeout dynamicconfig.DurationPropertyFn BacklogNegligibleAge dynamicconfig.DurationPropertyFnWithTaskQueueInfoFilters MaxWaitForPollerBeforeFwd dynamicconfig.DurationPropertyFnWithTaskQueueInfoFilters QueryWorkflowTaskTimeoutLogRate dynamicconfig.FloatPropertyFnWithTaskQueueInfoFilters MembershipUnloadDelay dynamicconfig.DurationPropertyFn // Time to hold a poll request before returning an empty response if there are no tasks LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskQueueInfoFilters MinTaskThrottlingBurstSize dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters MaxTaskDeleteBatchSize dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters // taskWriter configuration OutstandingTaskAppendsThreshold dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters MaxTaskBatchSize dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters ThrottledLogRPS dynamicconfig.IntPropertyFn AdminNamespaceToPartitionDispatchRate dynamicconfig.FloatPropertyFnWithNamespaceFilter AdminNamespaceTaskqueueToPartitionDispatchRate dynamicconfig.FloatPropertyFnWithTaskQueueInfoFilters VisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn VisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn EnableReadFromSecondaryVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter VisibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter VisibilityEnableManualPagination dynamicconfig.BoolPropertyFnWithNamespaceFilter LoadUserData dynamicconfig.BoolPropertyFnWithTaskQueueInfoFilters ListNexusIncomingServicesLongPollTimeout dynamicconfig.DurationPropertyFn // FrontendAccessHistoryFraction is an interim flag across 2 minor releases and will be removed once fully enabled. FrontendAccessHistoryFraction dynamicconfig.FloatPropertyFn }
Config represents configuration for matching service
func ConfigProvider ¶ added in v1.21.0
func ConfigProvider( dc *dynamicconfig.Collection, persistenceConfig config.Persistence, ) *Config
func NewConfig ¶ added in v0.3.0
func NewConfig( dc *dynamicconfig.Collection, ) *Config
NewConfig returns new service config with default values
type Engine ¶
type Engine interface { Start() Stop() AddWorkflowTask(ctx context.Context, addRequest *matchingservice.AddWorkflowTaskRequest) (syncMatch bool, err error) AddActivityTask(ctx context.Context, addRequest *matchingservice.AddActivityTaskRequest) (syncMatch bool, err error) PollWorkflowTaskQueue(ctx context.Context, request *matchingservice.PollWorkflowTaskQueueRequest, opMetrics metrics.Handler) (*matchingservice.PollWorkflowTaskQueueResponse, error) PollActivityTaskQueue(ctx context.Context, request *matchingservice.PollActivityTaskQueueRequest, opMetrics metrics.Handler) (*matchingservice.PollActivityTaskQueueResponse, error) QueryWorkflow(ctx context.Context, request *matchingservice.QueryWorkflowRequest) (*matchingservice.QueryWorkflowResponse, error) RespondQueryTaskCompleted(ctx context.Context, request *matchingservice.RespondQueryTaskCompletedRequest, opMetrics metrics.Handler) error CancelOutstandingPoll(ctx context.Context, request *matchingservice.CancelOutstandingPollRequest) error DescribeTaskQueue(ctx context.Context, request *matchingservice.DescribeTaskQueueRequest) (*matchingservice.DescribeTaskQueueResponse, error) ListTaskQueuePartitions(ctx context.Context, request *matchingservice.ListTaskQueuePartitionsRequest) (*matchingservice.ListTaskQueuePartitionsResponse, error) UpdateWorkerBuildIdCompatibility(ctx context.Context, request *matchingservice.UpdateWorkerBuildIdCompatibilityRequest) (*matchingservice.UpdateWorkerBuildIdCompatibilityResponse, error) GetWorkerBuildIdCompatibility(ctx context.Context, request *matchingservice.GetWorkerBuildIdCompatibilityRequest) (*matchingservice.GetWorkerBuildIdCompatibilityResponse, error) GetTaskQueueUserData(ctx context.Context, request *matchingservice.GetTaskQueueUserDataRequest) (*matchingservice.GetTaskQueueUserDataResponse, error) ApplyTaskQueueUserDataReplicationEvent(ctx context.Context, request *matchingservice.ApplyTaskQueueUserDataReplicationEventRequest) (*matchingservice.ApplyTaskQueueUserDataReplicationEventResponse, error) GetBuildIdTaskQueueMapping(ctx context.Context, request *matchingservice.GetBuildIdTaskQueueMappingRequest) (*matchingservice.GetBuildIdTaskQueueMappingResponse, error) ForceUnloadTaskQueue(ctx context.Context, request *matchingservice.ForceUnloadTaskQueueRequest) (*matchingservice.ForceUnloadTaskQueueResponse, error) UpdateTaskQueueUserData(ctx context.Context, request *matchingservice.UpdateTaskQueueUserDataRequest) (*matchingservice.UpdateTaskQueueUserDataResponse, error) ReplicateTaskQueueUserData(ctx context.Context, request *matchingservice.ReplicateTaskQueueUserDataRequest) (*matchingservice.ReplicateTaskQueueUserDataResponse, error) DispatchNexusTask(ctx context.Context, request *matchingservice.DispatchNexusTaskRequest) (*matchingservice.DispatchNexusTaskResponse, error) PollNexusTaskQueue(ctx context.Context, request *matchingservice.PollNexusTaskQueueRequest, opMetrics metrics.Handler) (*matchingservice.PollNexusTaskQueueResponse, error) RespondNexusTaskCompleted(ctx context.Context, request *matchingservice.RespondNexusTaskCompletedRequest, opMetrics metrics.Handler) (*matchingservice.RespondNexusTaskCompletedResponse, error) RespondNexusTaskFailed(ctx context.Context, request *matchingservice.RespondNexusTaskFailedRequest, opMetrics metrics.Handler) (*matchingservice.RespondNexusTaskFailedResponse, error) CreateNexusIncomingService(ctx context.Context, request *matchingservice.CreateNexusIncomingServiceRequest) (*matchingservice.CreateNexusIncomingServiceResponse, error) UpdateNexusIncomingService(ctx context.Context, request *matchingservice.UpdateNexusIncomingServiceRequest) (*matchingservice.UpdateNexusIncomingServiceResponse, error) DeleteNexusIncomingService(ctx context.Context, request *matchingservice.DeleteNexusIncomingServiceRequest) (*matchingservice.DeleteNexusIncomingServiceResponse, error) ListNexusIncomingServices(ctx context.Context, request *matchingservice.ListNexusIncomingServicesRequest) (*matchingservice.ListNexusIncomingServicesResponse, error) }
Engine exposes interfaces for clients to interact with the matching engine
func NewEngine ¶
func NewEngine( taskManager persistence.TaskManager, historyClient resource.HistoryClient, matchingRawClient resource.MatchingRawClient, config *Config, logger log.Logger, throttledLogger log.ThrottledLogger, metricsHandler metrics.Handler, namespaceRegistry namespace.Registry, hostInfoProvider membership.HostInfoProvider, resolver membership.ServiceResolver, clusterMeta cluster.Metadata, namespaceReplicationQueue persistence.NamespaceReplicationQueue, visibilityManager manager.VisibilityManager, nexusIncomingServiceManager persistence.NexusIncomingServiceManager, ) Engine
NewEngine creates an instance of matching engine
type Forwarder ¶ added in v0.7.0
type Forwarder struct {
// contains filtered or unexported fields
}
Forwarder is the type that contains state pertaining to the api call forwarder component
func (*Forwarder) AddReqTokenC ¶ added in v0.7.0
func (fwdr *Forwarder) AddReqTokenC() <-chan *ForwarderReqToken
AddReqTokenC returns a channel that can be used to wait for a token that's necessary before making a ForwardTask or ForwardQueryTask API call. After the API call is invoked, token.release() must be invoked
func (*Forwarder) ForwardNexusTask ¶ added in v1.24.0
func (fwdr *Forwarder) ForwardNexusTask(ctx context.Context, task *internalTask) (*matchingservice.DispatchNexusTaskResponse, error)
ForwardNexusTask forwards a nexus task to parent task queue partition, if it exists.
func (*Forwarder) ForwardPoll ¶ added in v0.7.0
func (fwdr *Forwarder) ForwardPoll(ctx context.Context, pollMetadata *pollMetadata) (*internalTask, error)
ForwardPoll forwards a poll request to parent task queue partition if it exist
func (*Forwarder) ForwardQueryTask ¶ added in v0.7.0
func (fwdr *Forwarder) ForwardQueryTask( ctx context.Context, task *internalTask, ) (*matchingservice.QueryWorkflowResponse, error)
ForwardQueryTask forwards a query task to parent task queue partition, if it exists
func (*Forwarder) ForwardTask ¶ added in v0.7.0
ForwardTask forwards an activity or workflow task to the parent task queue partition if it exists
func (*Forwarder) PollReqTokenC ¶ added in v0.7.0
func (fwdr *Forwarder) PollReqTokenC() <-chan *ForwarderReqToken
PollReqTokenC returns a channel that can be used to wait for a token that's necessary before making a ForwardPoll API call. After the API call is invoked, token.release() must be invoked
type ForwarderReqToken ¶ added in v0.7.0
type ForwarderReqToken struct {
// contains filtered or unexported fields
}
ForwarderReqToken is the token that must be acquired before making forwarder API calls. This type contains the state for the token itself
type Handler ¶
type Handler struct { matchingservice.UnsafeMatchingServiceServer // contains filtered or unexported fields }
Handler - gRPC handler interface for matchingservice
func NewHandler ¶
func NewHandler( config *Config, logger log.Logger, throttledLogger log.Logger, taskManager persistence.TaskManager, historyClient resource.HistoryClient, matchingRawClient resource.MatchingRawClient, hostInfoProvider membership.HostInfoProvider, matchingServiceResolver membership.ServiceResolver, metricsHandler metrics.Handler, namespaceRegistry namespace.Registry, clusterMetadata cluster.Metadata, namespaceReplicationQueue persistence.NamespaceReplicationQueue, visibilityManager manager.VisibilityManager, nexusIncomingServiceManager persistence.NexusIncomingServiceManager, ) *Handler
NewHandler creates a gRPC handler for the matchingservice
func (*Handler) AddActivityTask ¶
func (h *Handler) AddActivityTask( ctx context.Context, request *matchingservice.AddActivityTaskRequest, ) (_ *matchingservice.AddActivityTaskResponse, retError error)
AddActivityTask - adds an activity task.
func (*Handler) AddWorkflowTask ¶ added in v0.27.0
func (h *Handler) AddWorkflowTask( ctx context.Context, request *matchingservice.AddWorkflowTaskRequest, ) (_ *matchingservice.AddWorkflowTaskResponse, retError error)
AddWorkflowTask - adds a workflow task.
func (*Handler) ApplyTaskQueueUserDataReplicationEvent ¶ added in v1.21.0
func (*Handler) CancelOutstandingPoll ¶ added in v0.3.2
func (h *Handler) CancelOutstandingPoll(ctx context.Context, request *matchingservice.CancelOutstandingPollRequest) (_ *matchingservice.CancelOutstandingPollResponse, retError error)
CancelOutstandingPoll is used to cancel outstanding pollers
func (*Handler) CreateNexusIncomingService ¶ added in v1.24.0
func (*Handler) DeleteNexusIncomingService ¶ added in v1.24.0
func (*Handler) DescribeTaskQueue ¶ added in v0.27.0
func (h *Handler) DescribeTaskQueue( ctx context.Context, request *matchingservice.DescribeTaskQueueRequest, ) (_ *matchingservice.DescribeTaskQueueResponse, retError error)
DescribeTaskQueue returns information about the target task queue, right now this API returns the pollers which polled this task queue in last few minutes. If includeTaskQueueStatus field is true, it will also return status of task queue's ackManager (readLevel, ackLevel, backlogCountHint and taskIDBlock).
func (*Handler) DispatchNexusTask ¶ added in v1.24.0
func (*Handler) ForceUnloadTaskQueue ¶ added in v1.21.0
func (*Handler) GetBuildIdTaskQueueMapping ¶ added in v1.21.0
func (*Handler) GetTaskQueueUserData ¶ added in v1.21.0
func (*Handler) GetWorkerBuildIdCompatibility ¶ added in v1.21.0
func (h *Handler) GetWorkerBuildIdCompatibility( ctx context.Context, request *matchingservice.GetWorkerBuildIdCompatibilityRequest, ) (_ *matchingservice.GetWorkerBuildIdCompatibilityResponse, retError error)
GetWorkerBuildIdCompatibility fetches the worker versioning data for a task queue
func (*Handler) ListNexusIncomingServices ¶ added in v1.24.0
func (*Handler) ListTaskQueuePartitions ¶ added in v0.27.0
func (h *Handler) ListTaskQueuePartitions( ctx context.Context, request *matchingservice.ListTaskQueuePartitionsRequest, ) (_ *matchingservice.ListTaskQueuePartitionsResponse, retError error)
ListTaskQueuePartitions returns information about partitions for a taskQueue
func (*Handler) PollActivityTaskQueue ¶ added in v0.27.0
func (h *Handler) PollActivityTaskQueue( ctx context.Context, request *matchingservice.PollActivityTaskQueueRequest, ) (_ *matchingservice.PollActivityTaskQueueResponse, retError error)
PollActivityTaskQueue - long poll for an activity task.
func (*Handler) PollNexusTaskQueue ¶ added in v1.24.0
func (*Handler) PollWorkflowTaskQueue ¶ added in v0.27.0
func (h *Handler) PollWorkflowTaskQueue( ctx context.Context, request *matchingservice.PollWorkflowTaskQueueRequest, ) (_ *matchingservice.PollWorkflowTaskQueueResponse, retError error)
PollWorkflowTaskQueue - long poll for a workflow task.
func (*Handler) QueryWorkflow ¶ added in v0.3.2
func (h *Handler) QueryWorkflow( ctx context.Context, request *matchingservice.QueryWorkflowRequest, ) (_ *matchingservice.QueryWorkflowResponse, retError error)
QueryWorkflow queries a given workflow synchronously and return the query result.
func (*Handler) ReplicateTaskQueueUserData ¶ added in v1.21.0
func (*Handler) RespondNexusTaskCompleted ¶ added in v1.24.0
func (*Handler) RespondNexusTaskFailed ¶ added in v1.24.0
func (*Handler) RespondQueryTaskCompleted ¶ added in v0.3.2
func (h *Handler) RespondQueryTaskCompleted( ctx context.Context, request *matchingservice.RespondQueryTaskCompletedRequest, ) (_ *matchingservice.RespondQueryTaskCompletedResponse, retError error)
RespondQueryTaskCompleted responds a query task completed
func (*Handler) UpdateNexusIncomingService ¶ added in v1.24.0
func (*Handler) UpdateTaskQueueUserData ¶ added in v1.21.0
func (*Handler) UpdateWorkerBuildIdCompatibility ¶ added in v1.21.0
func (h *Handler) UpdateWorkerBuildIdCompatibility( ctx context.Context, request *matchingservice.UpdateWorkerBuildIdCompatibilityRequest, ) (_ *matchingservice.UpdateWorkerBuildIdCompatibilityResponse, retError error)
UpdateWorkerBuildIdCompatibility allows changing the worker versioning graph for a task queue
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service represents the matching service
func NewService ¶
func NewService( grpcServerOptions []grpc.ServerOption, serviceConfig *Config, logger log.SnTaggedLogger, membershipMonitor membership.Monitor, grpcListener net.Listener, runtimeMetricsReporter *metrics.RuntimeMetricsReporter, handler *Handler, metricsHandler metrics.Handler, faultInjectionDataStoreFactory *client.FaultInjectionDataStoreFactory, healthServer *health.Server, visibilityManager manager.VisibilityManager, ) *Service
func (*Service) GetFaultInjection ¶ added in v1.14.0
func (s *Service) GetFaultInjection() *client.FaultInjectionDataStoreFactory
type TaskMatcher ¶ added in v0.6.0
type TaskMatcher struct {
// contains filtered or unexported fields
}
TaskMatcher matches a task producer with a task consumer Producers are usually rpc calls from history or taskReader that drains backlog from db. Consumers are the task queue pollers
func (*TaskMatcher) MustOffer ¶ added in v0.6.0
func (tm *TaskMatcher) MustOffer(ctx context.Context, task *internalTask, interruptCh chan struct{}) error
MustOffer blocks until a consumer is found to handle this task Returns error only when context is canceled or the ratelimit is set to zero (allow nothing) The passed in context MUST NOT have a deadline associated with it Note that calling MustOffer is the only way that matcher knows there are spooled tasks in the backlog, in absence of a pending MustOffer call, the forwarding logic assumes that backlog is empty.
func (*TaskMatcher) Offer ¶ added in v0.6.0
func (tm *TaskMatcher) Offer(ctx context.Context, task *internalTask) (bool, error)
Offer offers a task to a potential consumer (poller) If the task is successfully matched with a consumer, this method will return true and no error. If the task is matched but consumer returned error, then this method will return true and error message. This method should not be used for query task. This method should ONLY be used for sync match.
When a local poller is not available and forwarding to a parent task queue partition is possible, this method will attempt forwarding to the parent partition.
Cases when this method will block:
Ratelimit: When a ratelimit token is not available, this method might block waiting for a token until the provided context timeout. Rate limits are not enforced for forwarded tasks from child partition.
Forwarded tasks that originated from db backlog: When this method is called with a task that is forwarded from a remote partition and if (1) this task queue is root (2) task was from db backlog - this method will block until context timeout trying to match with a poller. The caller is expected to set the correct context timeout.
returns error when:
- ratelimit is exceeded (does not apply to query task)
- context deadline is exceeded
- task is matched and consumer returns error in response channel
func (*TaskMatcher) OfferNexusTask ¶ added in v1.24.0
func (tm *TaskMatcher) OfferNexusTask(ctx context.Context, task *internalTask) (*matchingservice.DispatchNexusTaskResponse, error)
OfferNexusTask either matchs a task to a local poller or forwards it if no local pollers available. Local match is always attempted before forwarding. If local match occurs response and error are both nil, if forwarding occurs then response or error is returned.
func (*TaskMatcher) OfferQuery ¶ added in v0.7.0
func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *internalTask) (*matchingservice.QueryWorkflowResponse, error)
OfferQuery will either match task to local poller or will forward query task. Local match is always attempted before forwarding is attempted. If local match occurs response and error are both nil, if forwarding occurs then response or error is returned.
func (*TaskMatcher) Poll ¶ added in v0.6.0
func (tm *TaskMatcher) Poll(ctx context.Context, pollMetadata *pollMetadata) (*internalTask, error)
Poll blocks until a task is found or context deadline is exceeded On success, the returned task could be a query task or a regular task Returns errNoTasks when context deadline is exceeded
func (*TaskMatcher) PollForQuery ¶ added in v0.6.0
func (tm *TaskMatcher) PollForQuery(ctx context.Context, pollMetadata *pollMetadata) (*internalTask, error)
PollForQuery blocks until a *query* task is found or context deadline is exceeded Returns errNoTasks when context deadline is exceeded
func (*TaskMatcher) Rate ¶ added in v0.6.0
func (tm *TaskMatcher) Rate() float64
Rate returns the current rate at which tasks are dispatched
func (*TaskMatcher) Stop ¶ added in v1.24.0
func (tm *TaskMatcher) Stop()
func (*TaskMatcher) UpdateRatelimit ¶ added in v0.6.0
func (tm *TaskMatcher) UpdateRatelimit(rpsPtr *float64)
UpdateRatelimit updates the task dispatch rate
type TaskQueueReplicatorNamespaceReplicationQueue ¶ added in v1.21.0
type TaskQueueReplicatorNamespaceReplicationQueue persistence.NamespaceReplicationQueue
TaskQueueReplicatorNamespaceReplicationQueue is used to ensure the replicator only gets set if global namespaces are enabled on this cluster. See NamespaceReplicationQueueProvider below.
func NamespaceReplicationQueueProvider ¶ added in v1.21.0
func NamespaceReplicationQueueProvider( namespaceReplicationQueue persistence.NamespaceReplicationQueue, clusterMetadata cluster.Metadata, ) TaskQueueReplicatorNamespaceReplicationQueue
type UserDataUpdateFunc ¶ added in v1.21.0
type UserDataUpdateFunc func(*persistencespb.TaskQueueUserData) (*persistencespb.TaskQueueUserData, bool, error)
UserDataUpdateFunc accepts the current user data for a task queue and returns the updated user data, a boolean indicating whether this data should be replicated, and an error. Extra care should be taken to avoid mutating the current user data to avoid keeping uncommitted data in memory.
type UserDataUpdateOptions ¶ added in v1.21.0
Source Files ¶
- ack_manager.go
- config.go
- db.go
- forwarder.go
- fx.go
- handler.go
- incoming_nexus_service_manager.go
- liveness.go
- matcher.go
- matching_engine.go
- matching_engine_interfaces.go
- poller_history.go
- service.go
- task.go
- task_gc.go
- task_queue_manager.go
- task_reader.go
- task_validation.go
- task_writer.go
- taskqueue.go
- version_sets.go
- version_sets_merge.go