worker

package
v1.27.0-126.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2025 License: MIT Imports: 53 Imported by: 0

README

Temporal Worker

Temporal Worker is a role for Temporal service used for hosting any components responsible for performing background processing on the Temporal cluster.

Replicator

Replicator is a background worker responsible for consuming replication tasks generated by remote Temporal clusters and pass it down to processor, so they can be applied to local Temporal cluster.

Quickstart for localhost development
  1. Start Temporal development server for active zone:

    make start-cdc-active
    
  2. Start Temporal development server for standby(passive) zone:

    make start-cdc-standby
    
  3. Connect two Temporal clusters:

    tctl --ad 127.0.0.1:7233 adm cl upsert-remote-cluster --frontend_address "localhost:8233"
    tctl --ad 127.0.0.1:8233 adm cl upsert-remote-cluster --frontend_address "localhost:7233"
    
  4. Create global namespaces

    tctl --ns sample namespace register --gd true --ac active --cl active standby
    
  5. Failover between zones:

    Failover to standby:

    tctl --ns sample namespace update --ac standby
    

    Failback to active:

    tctl --ns sample namespace update --ac active
    

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Module = fx.Options(
	migration.Module,
	addsearchattributes.Module,
	resource.Module,
	deletenamespace.Module,
	scheduler.Module,
	batcher.Module,
	deployment.Module,
	dlq.Module,
	dynamicconfig.Module,
	fx.Provide(
		func(c resource.HistoryClient) dlq.HistoryClient {
			return c
		},
		func(m cluster.Metadata) dlq.CurrentClusterName {
			return dlq.CurrentClusterName(m.GetCurrentClusterName())
		},
		func(b client.Bean) dlq.TaskClientDialer {
			return dlq.TaskClientDialerFn(func(_ context.Context, address string) (dlq.TaskClient, error) {
				c, err := b.GetRemoteAdminClient(address)
				if err != nil {
					return nil, err
				}
				return dlq.AddTasksFn(func(
					ctx context.Context,
					req *adminservice.AddTasksRequest,
				) (*adminservice.AddTasksResponse, error) {
					return c.AddTasks(ctx, req)
				}), nil
			})
		},
	),
	fx.Provide(HostInfoProvider),
	fx.Provide(VisibilityManagerProvider),
	fx.Provide(ThrottledLoggerRpsFnProvider),
	fx.Provide(ConfigProvider),
	fx.Provide(PersistenceRateLimitingParamsProvider),
	service.PersistenceLazyLoadedServiceResolverModule,
	fx.Provide(ServiceResolverProvider),
	fx.Provide(func(
		clusterMetadata cluster.Metadata,
		metadataManager persistence.MetadataManager,
		logger log.Logger,
	) namespace.ReplicationTaskExecutor {
		return namespace.NewReplicationTaskExecutor(
			clusterMetadata.GetCurrentClusterName(),
			metadataManager,
			logger,
		)
	}),
	fx.Provide(NewService),
	fx.Provide(fx.Annotate(NewWorkerManager, fx.ParamTags(workercommon.WorkerComponentTag))),
	fx.Provide(NewPerNamespaceWorkerManager),
	fx.Invoke(ServiceLifetimeHooks),
)

Functions

func HostInfoProvider added in v1.26.2

func HostInfoProvider() (membership.HostInfo, error)

func NewPerNamespaceWorkerManager added in v1.17.0

func NewPerNamespaceWorkerManager(params perNamespaceWorkerManagerInitParams) *perNamespaceWorkerManager

func NewWorkerManager added in v1.14.0

func NewWorkerManager(
	workerComponents []workercommon.WorkerComponent,
	logger log.Logger,
	sdkClientFactory sdk.ClientFactory,
	hostInfo membership.HostInfo,
) *workerManager

NewWorkerManager creates a new worker manager. The workerComponents argument must be first in order for the fx param tag to work correctly.

func PersistenceRateLimitingParamsProvider added in v1.17.2

func PersistenceRateLimitingParamsProvider(
	serviceConfig *Config,
	persistenceLazyLoadedServiceResolver service.PersistenceLazyLoadedServiceResolver,
	logger log.SnTaggedLogger,
) service.PersistenceRateLimitingParams

func ServiceLifetimeHooks added in v1.13.0

func ServiceLifetimeHooks(lc fx.Lifecycle, svc *Service)

func ServiceResolverProvider added in v1.23.0

func ServiceResolverProvider(
	membershipMonitor membership.Monitor,
) (membership.ServiceResolver, error)

func ThrottledLoggerRpsFnProvider added in v1.13.0

func ThrottledLoggerRpsFnProvider(serviceConfig *Config) resource.ThrottledLoggerRpsFn

func VisibilityManagerProvider added in v1.16.0

func VisibilityManagerProvider(
	logger log.Logger,
	metricsHandler metrics.Handler,
	persistenceConfig *config.Persistence,
	customVisibilityStoreFactory visibility.VisibilityStoreFactory,
	serviceConfig *Config,
	persistenceServiceResolver resolver.ServiceResolver,
	searchAttributesMapperProvider searchattribute.MapperProvider,
	saProvider searchattribute.Provider,
	namespaceRegistry namespace.Registry,
) (manager.VisibilityManager, error)

Types

type Config

type Config struct {
	ScannerCfg                           *scanner.Config
	ParentCloseCfg                       *parentclosepolicy.Config
	ThrottledLogRPS                      dynamicconfig.IntPropertyFn
	PersistenceMaxQPS                    dynamicconfig.IntPropertyFn
	PersistenceGlobalMaxQPS              dynamicconfig.IntPropertyFn
	PersistenceNamespaceMaxQPS           dynamicconfig.IntPropertyFnWithNamespaceFilter
	PersistenceGlobalNamespaceMaxQPS     dynamicconfig.IntPropertyFnWithNamespaceFilter
	PersistencePerShardNamespaceMaxQPS   dynamicconfig.IntPropertyFnWithNamespaceFilter
	PersistenceDynamicRateLimitingParams dynamicconfig.TypedPropertyFn[dynamicconfig.DynamicRateLimitingParams]
	PersistenceQPSBurstRatio             dynamicconfig.FloatPropertyFn
	OperatorRPSRatio                     dynamicconfig.FloatPropertyFn
	EnableBatcher                        dynamicconfig.BoolPropertyFn
	BatcherRPS                           dynamicconfig.IntPropertyFnWithNamespaceFilter
	BatcherConcurrency                   dynamicconfig.IntPropertyFnWithNamespaceFilter
	EnableParentClosePolicyWorker        dynamicconfig.BoolPropertyFn
	PerNamespaceWorkerCount              dynamicconfig.TypedSubscribableWithNamespaceFilter[int]
	PerNamespaceWorkerOptions            dynamicconfig.TypedSubscribableWithNamespaceFilter[sdkworker.Options]
	PerNamespaceWorkerStartRate          dynamicconfig.FloatPropertyFn

	VisibilityPersistenceMaxReadQPS         dynamicconfig.IntPropertyFn
	VisibilityPersistenceMaxWriteQPS        dynamicconfig.IntPropertyFn
	VisibilityPersistenceSlowQueryThreshold dynamicconfig.DurationPropertyFn
	EnableReadFromSecondaryVisibility       dynamicconfig.BoolPropertyFnWithNamespaceFilter
	VisibilityEnableShadowReadMode          dynamicconfig.BoolPropertyFn
	VisibilityDisableOrderByClause          dynamicconfig.BoolPropertyFnWithNamespaceFilter
	VisibilityEnableManualPagination        dynamicconfig.BoolPropertyFnWithNamespaceFilter
}

Config contains all the service config for worker

func ConfigProvider added in v1.16.0

func ConfigProvider(
	dc *dynamicconfig.Collection,
	persistenceConfig *config.Persistence,
) *Config

func NewConfig

func NewConfig(
	dc *dynamicconfig.Collection,
	persistenceConfig *config.Persistence,
) *Config

NewConfig builds the new Config for worker service

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service represents the temporal-worker service. This service hosts all background processing needed for temporal cluster: Replicator: Handles applying replication tasks generated by remote clusters. Archiver: Handles archival of workflow histories.

func NewService

func NewService(
	logger log.SnTaggedLogger,
	serviceConfig *Config,
	sdkClientFactory sdk.ClientFactory,
	esClient esclient.Client,
	clusterMetadata cluster.Metadata,
	clientBean client.Bean,
	clusterMetadataManager persistence.ClusterMetadataManager,
	namespaceRegistry namespace.Registry,
	executionManager persistence.ExecutionManager,
	membershipMonitor membership.Monitor,
	hostInfoProvider membership.HostInfoProvider,
	namespaceReplicationQueue persistence.NamespaceReplicationQueue,
	metricsHandler metrics.Handler,
	metadataManager persistence.MetadataManager,
	taskManager persistence.TaskManager,
	historyClient resource.HistoryClient,
	workerManager *workerManager,
	perNamespaceWorkerManager *perNamespaceWorkerManager,
	visibilityManager manager.VisibilityManager,
	matchingClient resource.MatchingClient,
	namespaceReplicationTaskExecutor namespace.ReplicationTaskExecutor,
) (*Service, error)

func (*Service) Start

func (s *Service) Start()

Start is called to start the service

func (*Service) Stop

func (s *Service) Stop()

Stop is called to stop the service

Directories

Path Synopsis
Package common is a generated GoMock package.
Package common is a generated GoMock package.
Package dlq contains the workflow for deleting and re-enqueueing DLQ tasks.
Package dlq contains the workflow for deleting and re-enqueueing DLQ tasks.
Package parentclosepolicy is a generated GoMock package.
Package parentclosepolicy is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL