Documentation ¶
Index ¶
- Constants
- Variables
- func DataStoreFactoryLifetimeHooks(lc fx.Lifecycle, f persistence.DataStoreFactory)
- func DataStoreFactoryProvider(clusterName ClusterName, r resolver.ServiceResolver, cfg *config.Persistence, ...) persistence.DataStoreFactory
- func EventBlobCacheProvider(dc *dynamicconfig.Collection) persistence.XDCCache
- func HealthSignalAggregatorProvider(dynamicCollection *dynamicconfig.Collection, metricsHandler metrics.Handler, ...) persistence.HealthSignalAggregator
- func IsPersistenceTransientError(err error) bool
- func NewPriorityNamespaceRateLimiter(hostMaxQPS PersistenceMaxQps, namespaceMaxQPS PersistenceNamespaceMaxQps, ...) quotas.RequestRateLimiter
- func NewPriorityRateLimiter(hostMaxQPS PersistenceMaxQps, requestPriorityFn quotas.RequestPriorityFn, ...) quotas.RequestRateLimiter
- func RequestPriorityFn(req quotas.Request) int
- type AbstractDataStoreFactory
- type ClusterName
- type DynamicRateLimitingParams
- type Factory
- type FactoryProviderFn
- type HealthRequestRateLimiterImpl
- type NewFactoryParams
- type OperatorRPSRatio
- type PersistenceBurstRatio
- type PersistenceMaxQps
- type PersistenceNamespaceMaxQps
- type PersistencePerShardNamespaceMaxQPS
Constants ¶
const ( DefaultRefreshInterval = 10 * time.Second DefaultRateBurstRatio = 1.0 DefaultInitialRateMultiplier = 1.0 )
Variables ¶
var ( CallerTypeDefaultPriority = map[string]int{ headers.CallerTypeOperator: 0, headers.CallerTypeAPI: 2, headers.CallerTypeBackground: 4, headers.CallerTypePreemptable: 5, } APITypeCallOriginPriorityOverride = map[string]int{ "StartWorkflowExecution": 1, "SignalWithStartWorkflowExecution": 1, "SignalWorkflowExecution": 1, "RequestCancelWorkflowExecution": 1, "TerminateWorkflowExecution": 1, "GetWorkflowExecutionHistory": 1, "UpdateWorkflowExecution": 1, } BackgroundTypeAPIPriorityOverride = map[string]int{ "GetOrCreateShard": 1, "UpdateShard": 1, p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryTransfer): 1, p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryTimer): 1, p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryVisibility): 1, p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryTransfer): 3, p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryTimer): 3, p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryVisibility): 3, } RequestPrioritiesOrdered = []int{0, 1, 2, 3, 4, 5} )
var Module = fx.Options( fx.Provide(DataStoreFactoryProvider), fx.Invoke(DataStoreFactoryLifetimeHooks), fx.Provide(managerProvider(Factory.NewClusterMetadataManager)), fx.Provide(managerProvider(Factory.NewMetadataManager)), fx.Provide(managerProvider(Factory.NewTaskManager)), fx.Provide(managerProvider(Factory.NewNamespaceReplicationQueue)), fx.Provide(managerProvider(Factory.NewShardManager)), fx.Provide(managerProvider(Factory.NewExecutionManager)), fx.Provide(managerProvider(Factory.NewHistoryTaskQueueManager)), fx.Provide(managerProvider(Factory.NewNexusEndpointManager)), fx.Provide(ClusterNameProvider), fx.Provide(HealthSignalAggregatorProvider), fx.Provide(persistence.NewDLQMetricsEmitter), fx.Provide(EventBlobCacheProvider), )
Functions ¶
func DataStoreFactoryLifetimeHooks ¶ added in v1.25.0
func DataStoreFactoryLifetimeHooks(lc fx.Lifecycle, f persistence.DataStoreFactory)
func DataStoreFactoryProvider ¶ added in v1.16.0
func DataStoreFactoryProvider( clusterName ClusterName, r resolver.ServiceResolver, cfg *config.Persistence, abstractDataStoreFactory AbstractDataStoreFactory, logger log.Logger, metricsHandler metrics.Handler, ) persistence.DataStoreFactory
func EventBlobCacheProvider ¶ added in v1.22.0
func EventBlobCacheProvider( dc *dynamicconfig.Collection, ) persistence.XDCCache
func HealthSignalAggregatorProvider ¶ added in v1.21.0
func HealthSignalAggregatorProvider( dynamicCollection *dynamicconfig.Collection, metricsHandler metrics.Handler, logger log.ThrottledLogger, ) persistence.HealthSignalAggregator
func IsPersistenceTransientError ¶ added in v1.17.3
func NewPriorityNamespaceRateLimiter ¶ added in v1.24.0
func NewPriorityNamespaceRateLimiter( hostMaxQPS PersistenceMaxQps, namespaceMaxQPS PersistenceNamespaceMaxQps, perShardNamespaceMaxQPS PersistencePerShardNamespaceMaxQPS, requestPriorityFn quotas.RequestPriorityFn, operatorRPSRatio OperatorRPSRatio, burstRatio PersistenceBurstRatio, ) quotas.RequestRateLimiter
func NewPriorityRateLimiter ¶ added in v1.17.2
func NewPriorityRateLimiter( hostMaxQPS PersistenceMaxQps, requestPriorityFn quotas.RequestPriorityFn, operatorRPSRatio OperatorRPSRatio, burstRatio PersistenceBurstRatio, healthSignals p.HealthSignalAggregator, dynamicParams DynamicRateLimitingParams, metricsHandler metrics.Handler, logger log.Logger, ) quotas.RequestRateLimiter
func RequestPriorityFn ¶ added in v1.18.0
Types ¶
type AbstractDataStoreFactory ¶
type AbstractDataStoreFactory interface { NewFactory( cfg config.CustomDatastoreConfig, r resolver.ServiceResolver, clusterName string, logger log.Logger, metricsHandler metrics.Handler, ) persistence.DataStoreFactory }
AbstractDataStoreFactory creates a DataStoreFactory, can be used to implement custom datastore support outside of the Temporal core.
type ClusterName ¶ added in v1.13.0
type ClusterName string
func ClusterNameProvider ¶ added in v1.14.0
func ClusterNameProvider(config *cluster.Config) ClusterName
type DynamicRateLimitingParams ¶ added in v1.21.0
type DynamicRateLimitingParams dynamicconfig.TypedPropertyFn[dynamicconfig.DynamicRateLimitingParams]
type Factory ¶
type Factory interface { // Close the factory Close() // NewTaskManager returns a new task manager NewTaskManager() (persistence.TaskManager, error) // NewShardManager returns a new shard manager NewShardManager() (persistence.ShardManager, error) // NewMetadataManager returns a new metadata manager NewMetadataManager() (persistence.MetadataManager, error) // NewExecutionManager returns a new execution manager NewExecutionManager() (persistence.ExecutionManager, error) // NewNamespaceReplicationQueue returns a new queue for namespace replication NewNamespaceReplicationQueue() (persistence.NamespaceReplicationQueue, error) // NewClusterMetadataManager returns a new manager for cluster specific metadata NewClusterMetadataManager() (persistence.ClusterMetadataManager, error) // NewHistoryTaskQueueManager returns a new manager for history task queues NewHistoryTaskQueueManager() (persistence.HistoryTaskQueueManager, error) // NewNexusEndpointManager returns a new manager for nexus endpoints NewNexusEndpointManager() (persistence.NexusEndpointManager, error) }
Factory defines the interface for any implementation that can vend persistence layer objects backed by a datastore. The actual datastore is implementation detail hidden behind this interface
func FactoryProvider ¶ added in v1.16.0
func FactoryProvider( params NewFactoryParams, ) Factory
func NewFactory ¶
func NewFactory( dataStoreFactory persistence.DataStoreFactory, cfg *config.Persistence, systemRateLimiter quotas.RequestRateLimiter, namespaceRateLimiter quotas.RequestRateLimiter, serializer serialization.Serializer, eventBlobCache persistence.XDCCache, clusterName string, metricsHandler metrics.Handler, logger log.Logger, healthSignals persistence.HealthSignalAggregator, ) Factory
NewFactory returns an implementation of factory that vends persistence objects based on specified configuration. This factory takes as input a config.Persistence object which specifies the datastore to be used for a given type of object. This config also contains config for individual datastores themselves.
The objects returned by this factory enforce ratelimit and maxconns according to given configuration. In addition, all objects will emit metrics automatically
type FactoryProviderFn ¶ added in v1.16.0
type FactoryProviderFn func(NewFactoryParams) Factory
type HealthRequestRateLimiterImpl ¶ added in v1.21.0
type HealthRequestRateLimiterImpl struct {
// contains filtered or unexported fields
}
func NewHealthRequestRateLimiterImpl ¶ added in v1.21.0
func NewHealthRequestRateLimiterImpl( healthSignals persistence.HealthSignalAggregator, rateFn quotas.RateFn, params DynamicRateLimitingParams, burstRatio PersistenceBurstRatio, metricsHandler metrics.Handler, logger log.Logger, ) *HealthRequestRateLimiterImpl
func (*HealthRequestRateLimiterImpl) Reserve ¶ added in v1.21.0
func (rl *HealthRequestRateLimiterImpl) Reserve(now time.Time, request quotas.Request) quotas.Reservation
type NewFactoryParams ¶ added in v1.16.0
type NewFactoryParams struct { fx.In DataStoreFactory persistence.DataStoreFactory EventBlobCache persistence.XDCCache Cfg *config.Persistence PersistenceMaxQPS PersistenceMaxQps PersistenceNamespaceMaxQPS PersistenceNamespaceMaxQps PersistencePerShardNamespaceMaxQPS PersistencePerShardNamespaceMaxQPS OperatorRPSRatio OperatorRPSRatio PersistenceBurstRatio PersistenceBurstRatio ClusterName ClusterName ServiceName primitives.ServiceName MetricsHandler metrics.Handler Logger log.Logger HealthSignals persistence.HealthSignalAggregator DynamicRateLimitingParams DynamicRateLimitingParams }
type OperatorRPSRatio ¶ added in v1.22.0
type OperatorRPSRatio dynamicconfig.FloatPropertyFn
type PersistenceBurstRatio ¶ added in v1.24.0
type PersistenceBurstRatio dynamicconfig.FloatPropertyFn
type PersistenceMaxQps ¶ added in v1.13.0
type PersistenceMaxQps dynamicconfig.IntPropertyFn
type PersistenceNamespaceMaxQps ¶ added in v1.17.3
type PersistenceNamespaceMaxQps dynamicconfig.IntPropertyFnWithNamespaceFilter
type PersistencePerShardNamespaceMaxQPS ¶ added in v1.21.0
type PersistencePerShardNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter