Documentation ¶
Index ¶
- Variables
- func ApplyClusterMetadataConfigProvider(logger log.Logger, config *config.Config, ...) (*cluster.Config, config.Persistence, error)
- func InterruptCh() <-chan interface{}
- func PersistenceFactoryProvider() persistenceClient.FactoryProviderFn
- func ServerLifetimeHooks(lc fx.Lifecycle, svr Server)
- func ServerOptionsProvider(opts []ServerOption) (serverOptionsProvider, error)
- func StopService(logger log.Logger, app *fx.App, svcName primitives.ServiceName, ...)
- type Server
- type ServerFx
- type ServerImpl
- type ServerOption
- func ForServices(names []string) ServerOption
- func InterruptOn(interruptCh <-chan interface{}) ServerOption
- func WithAudienceGetter(audienceGetter func(cfg *config.Config) authorization.JWTAudienceMapper) ServerOption
- func WithAuthorizer(authorizer authorization.Authorizer) ServerOption
- func WithChainedFrontendGrpcInterceptors(interceptors ...grpc.UnaryServerInterceptor) ServerOption
- func WithClaimMapper(claimMapper func(cfg *config.Config) authorization.ClaimMapper) ServerOption
- func WithClientFactoryProvider(clientFactoryProvider client.FactoryProvider) ServerOption
- func WithConfig(cfg *config.Config) ServerOption
- func WithConfigLoader(configDir string, env string, zone string) ServerOption
- func WithCustomDataStoreFactory(customFactory persistenceclient.AbstractDataStoreFactory) ServerOption
- func WithCustomMetricsHandler(provider metrics.Handler) ServerOption
- func WithDynamicConfigClient(c dynamicconfig.Client) ServerOption
- func WithElasticsearchHttpClient(c *http.Client) ServerOption
- func WithLogger(logger log.Logger) ServerOption
- func WithNamespaceLogger(namespaceLogger log.Logger) ServerOption
- func WithPersistenceServiceResolver(r resolver.ServiceResolver) ServerOption
- func WithSearchAttributesMapper(m searchattribute.Mapper) ServerOption
- func WithTLSConfigFactory(tlsConfigProvider encryption.TLSConfigProvider) ServerOption
- type ServiceProviderParamsCommon
- type ServiceStopFn
- type ServicesGroupIn
- type ServicesGroupOut
- func FrontendServiceProvider(params ServiceProviderParamsCommon) (ServicesGroupOut, error)
- func HistoryServiceProvider(params ServiceProviderParamsCommon) (ServicesGroupOut, error)
- func InternalFrontendServiceProvider(params ServiceProviderParamsCommon) (ServicesGroupOut, error)
- func MatchingServiceProvider(params ServiceProviderParamsCommon) (ServicesGroupOut, error)
- func WorkerServiceProvider(params ServiceProviderParamsCommon) (ServicesGroupOut, error)
- type ServicesMetadata
Constants ¶
This section is empty.
Variables ¶
var ( // Services is the set of all valid temporal services as strings (needs to be strings to // keep ServerOptions interface stable) Services = []string{ string(primitives.FrontendService), string(primitives.InternalFrontendService), string(primitives.HistoryService), string(primitives.MatchingService), string(primitives.WorkerService), } // DefaultServices is the set of services to start by default if services are not given on // the command line. DefaultServices = []string{ string(primitives.FrontendService), string(primitives.HistoryService), string(primitives.MatchingService), string(primitives.WorkerService), } )
var FxLogAdapter = fx.WithLogger(func(logger log.Logger) fxevent.Logger {
return &fxLogAdapter{logger: logger}
})
var ServerFxImplModule = fx.Options( fx.Provide(NewServerFxImpl), fx.Provide(func(src *ServerImpl) Server { return src }), )
var ServiceTracingModule = fx.Options( fx.Supply([]otelsdktrace.BatchSpanProcessorOption{}), fx.Provide( fx.Annotate( func(exps []otelsdktrace.SpanExporter, opts []otelsdktrace.BatchSpanProcessorOption) []otelsdktrace.SpanProcessor { sps := make([]otelsdktrace.SpanProcessor, 0, len(exps)) for _, exp := range exps { sps = append(sps, otelsdktrace.NewBatchSpanProcessor(exp, opts...)) } return sps }, fx.ParamTags(`optional:"true"`, ``), ), ), fx.Provide( fx.Annotate( func(rsn primitives.ServiceName, rsi resource.InstanceID) (*otelresource.Resource, error) { if rsn == primitives.InternalFrontendService { rsn = primitives.FrontendService } serviceName := string(rsn) if !strings.HasPrefix(serviceName, "io.temporal.") { serviceName = fmt.Sprintf("io.temporal.%s", serviceName) } attrs := []attribute.KeyValue{ semconv.ServiceNameKey.String(serviceName), semconv.ServiceVersionKey.String(headers.ServerVersion), } if rsi != "" { attrs = append(attrs, semconv.ServiceInstanceIDKey.String(string(rsi))) } return otelresource.New(context.Background(), otelresource.WithProcess(), otelresource.WithOS(), otelresource.WithHost(), otelresource.WithContainer(), otelresource.WithAttributes(attrs...), ) }, fx.ParamTags(``, `optional:"true"`), ), ), fx.Provide( func(r *otelresource.Resource, sps []otelsdktrace.SpanProcessor) []otelsdktrace.TracerProviderOption { opts := make([]otelsdktrace.TracerProviderOption, 0, len(sps)+1) opts = append(opts, otelsdktrace.WithResource(r)) for _, sp := range sps { opts = append(opts, otelsdktrace.WithSpanProcessor(sp)) } return opts }, ), fx.Provide(func(lc fx.Lifecycle, opts []otelsdktrace.TracerProviderOption) trace.TracerProvider { tp := otelsdktrace.NewTracerProvider(opts...) lc.Append(fx.Hook{OnStop: func(ctx context.Context) error { return tp.Shutdown(ctx) }}) return tp }), fx.Provide(func() propagation.TextMapPropagator { return propagation.TraceContext{} }), fx.Provide(telemetry.NewServerTraceInterceptor), fx.Provide(telemetry.NewClientTraceInterceptor), )
ServiceTracingModule holds per-service (i.e. frontend/history/matching/worker) fx state. The following types can be overriden with fx.Replace/fx.Decorate:
- []go.opentelemetry.io/otel/sdk/trace.BatchSpanProcessorOption default: empty slice
- []go.opentelemetry.io/otel/sdk/trace.SpanProcessor default: wrap each otelsdktrace.SpanExporter with otelsdktrace.NewBatchSpanProcessor
- *go.opentelemetry.io/otel/sdk/resource.Resource default: resource.Default() augmented with the supplied serviceName
- []go.opentelemetry.io/otel/sdk/trace.TracerProviderOption default: the provided resource.Resource and each of the otelsdktrace.SpanExporter
- go.opentelemetry.io/otel/trace.TracerProvider default: otelsdktrace.NewTracerProvider with each of the otelsdktrace.TracerProviderOption
- go.opentelemetry.io/otel/ppropagation.TextMapPropagator default: propagation.TraceContext{}
- telemetry.ServerTraceInterceptor
- telemetry.ClientTraceInterceptor
var TraceExportModule = fx.Options( fx.Invoke(func(log log.Logger) { otel.SetErrorHandler(otel.ErrorHandlerFunc( func(err error) { log.Warn("OTEL error", tag.Error(err), tag.ErrorType(err)) }), ) }), fx.Provide(func(lc fx.Lifecycle, c *config.Config) ([]otelsdktrace.SpanExporter, error) { exporters, err := c.ExporterConfig.SpanExporters() if err != nil { return nil, err } lc.Append(fx.Hook{ OnStart: startAll(exporters), OnStop: shutdownAll(exporters), }) return exporters, nil }), )
TraceExportModule holds process-global telemetry fx state defining the set of OTEL trace/span exporters used by tracing instrumentation. The following types can be overriden/augmented with fx.Replace/fx.Decorate:
- []go.opentelemetry.io/otel/sdk/trace.SpanExporter
Functions ¶
func ApplyClusterMetadataConfigProvider ¶ added in v1.14.0
func ApplyClusterMetadataConfigProvider( logger log.Logger, config *config.Config, persistenceServiceResolver resolver.ServiceResolver, persistenceFactoryProvider persistenceClient.FactoryProviderFn, customDataStoreFactory persistenceClient.AbstractDataStoreFactory, ) (*cluster.Config, config.Persistence, error)
ApplyClusterMetadataConfigProvider performs a config check against the configured persistence store for cluster metadata. If there is a mismatch, the persisted values take precedence and will be written over in the config objects. This is to keep this check hidden from downstream calls. TODO: move this to cluster.fx
func InterruptCh ¶
func InterruptCh() <-chan interface{}
func PersistenceFactoryProvider ¶ added in v1.16.0
func PersistenceFactoryProvider() persistenceClient.FactoryProviderFn
func ServerLifetimeHooks ¶ added in v1.14.0
func ServerOptionsProvider ¶ added in v1.14.0
func ServerOptionsProvider(opts []ServerOption) (serverOptionsProvider, error)
func StopService ¶ added in v1.16.0
func StopService(logger log.Logger, app *fx.App, svcName primitives.ServiceName, stopChan chan struct{})
Types ¶
type Server ¶
func NewServer ¶
func NewServer(opts ...ServerOption) (Server, error)
NewServer returns a new instance of server that serves one or many services.
type ServerFx ¶ added in v1.14.0
type ServerFx struct {
// contains filtered or unexported fields
}
func NewServerFx ¶ added in v1.14.0
func NewServerFx(opts ...ServerOption) (*ServerFx, error)
type ServerImpl ¶ added in v1.14.0
type ServerImpl struct {
// contains filtered or unexported fields
}
ServerImpl is temporal server.
func NewServerFxImpl ¶ added in v1.14.0
func NewServerFxImpl( opts *serverOptions, logger log.Logger, namespaceLogger resource.NamespaceLogger, stoppedCh chan interface{}, dcCollection *dynamicconfig.Collection, servicesGroup ServicesGroupIn, persistenceConfig config.Persistence, clusterMetadata *cluster.Config, persistenceFactoryProvider persistenceClient.FactoryProviderFn, ) *ServerImpl
NewServer returns a new instance of server that serves one or many services.
func (*ServerImpl) Start ¶ added in v1.14.0
func (s *ServerImpl) Start() error
Start temporal server. This function should be called only once, Server doesn't support multiple restarts.
type ServerOption ¶
type ServerOption interface {
// contains filtered or unexported methods
}
func ForServices ¶
func ForServices(names []string) ServerOption
ForServices indicates which supplied services (e.g. frontend, history, matching, worker) within the server to start
func InterruptOn ¶
func InterruptOn(interruptCh <-chan interface{}) ServerOption
InterruptOn interrupts server on the signal from server. If channel is nil Start() will block forever.
func WithAudienceGetter ¶ added in v1.10.3
func WithAudienceGetter(audienceGetter func(cfg *config.Config) authorization.JWTAudienceMapper) ServerOption
WithAudienceGetter configures JWT audience getter for authorization
func WithAuthorizer ¶
func WithAuthorizer(authorizer authorization.Authorizer) ServerOption
WithAuthorizer sets a low level authorizer to allow/deny all API calls
func WithChainedFrontendGrpcInterceptors ¶ added in v1.14.0
func WithChainedFrontendGrpcInterceptors( interceptors ...grpc.UnaryServerInterceptor, ) ServerOption
WithChainedFrontendGrpcInterceptors sets a chain of ordered custom grpc interceptors that will be invoked for all Frontend gRPC API calls. The list of custom interceptors will be appended to the end of the internal ServerInterceptors. The custom interceptors will be invoked in the order as they appear in the supplied list, after the internal ServerInterceptors.
func WithClaimMapper ¶ added in v1.4.0
func WithClaimMapper(claimMapper func(cfg *config.Config) authorization.ClaimMapper) ServerOption
WithClaimMapper configures a role mapper for authorization
func WithClientFactoryProvider ¶ added in v1.11.0
func WithClientFactoryProvider(clientFactoryProvider client.FactoryProvider) ServerOption
WithClientFactoryProvider sets a custom ClientFactoryProvider NOTE: this option is experimental and may be changed or removed in future release.
func WithConfig ¶
func WithConfig(cfg *config.Config) ServerOption
WithConfig sets a custom configuration
func WithConfigLoader ¶
func WithConfigLoader(configDir string, env string, zone string) ServerOption
WithConfigLoader sets a custom configuration load
func WithCustomDataStoreFactory ¶ added in v1.11.0
func WithCustomDataStoreFactory(customFactory persistenceclient.AbstractDataStoreFactory) ServerOption
WithCustomDataStoreFactory sets a custom AbstractDataStoreFactory NOTE: this option is experimental and may be changed or removed in future release.
func WithCustomMetricsHandler ¶ added in v1.17.0
func WithCustomMetricsHandler(provider metrics.Handler) ServerOption
WithCustomerMetricsProvider sets a custom implementation of the metrics.MetricsHandler interface metrics.MetricsHandler is the base interface for publishing metric events
func WithDynamicConfigClient ¶ added in v1.5.7
func WithDynamicConfigClient(c dynamicconfig.Client) ServerOption
WithDynamicConfigClient sets custom client for reading dynamic configuration.
func WithElasticsearchHttpClient ¶ added in v1.5.7
func WithElasticsearchHttpClient(c *http.Client) ServerOption
WithElasticsearchHttpClient sets a custom HTTP client which is used to make requests to Elasticsearch
func WithLogger ¶ added in v1.5.7
func WithLogger(logger log.Logger) ServerOption
WithLogger sets a custom logger
func WithNamespaceLogger ¶ added in v1.11.0
func WithNamespaceLogger(namespaceLogger log.Logger) ServerOption
WithNamespaceLogger sets an optional logger for all frontend operations
func WithPersistenceServiceResolver ¶ added in v1.5.7
func WithPersistenceServiceResolver(r resolver.ServiceResolver) ServerOption
WithPersistenceServiceResolver sets a custom persistence service resolver which will convert service name or address value from config to another address
func WithSearchAttributesMapper ¶ added in v1.12.1
func WithSearchAttributesMapper(m searchattribute.Mapper) ServerOption
WithSearchAttributesMapper sets a custom search attributes mapper which converts search attributes aliases to field names and vice versa.
func WithTLSConfigFactory ¶
func WithTLSConfigFactory(tlsConfigProvider encryption.TLSConfigProvider) ServerOption
WithTLSConfigFactory overrides default provider of TLS configuration
type ServiceProviderParamsCommon ¶ added in v1.14.0
type ServiceProviderParamsCommon struct { fx.In Cfg *config.Config ServiceNames resource.ServiceNames Logger log.Logger NamespaceLogger resource.NamespaceLogger DynamicConfigClient dynamicconfig.Client MetricsHandler metrics.Handler EsConfig *esclient.Config EsClient esclient.Client TlsConfigProvider encryption.TLSConfigProvider PersistenceConfig config.Persistence ClusterMetadata *cluster.Config ClientFactoryProvider client.FactoryProvider AudienceGetter authorization.JWTAudienceMapper PersistenceServiceResolver resolver.ServiceResolver PersistenceFactoryProvider persistenceClient.FactoryProviderFn SearchAttributesMapper searchattribute.Mapper CustomInterceptors []grpc.UnaryServerInterceptor Authorizer authorization.Authorizer ClaimMapper authorization.ClaimMapper DataStoreFactory persistenceClient.AbstractDataStoreFactory SpanExporters []otelsdktrace.SpanExporter InstanceID resource.InstanceID `optional:"true"` }
type ServiceStopFn ¶ added in v1.14.0
type ServiceStopFn func()
type ServicesGroupIn ¶ added in v1.14.0
type ServicesGroupIn struct { fx.In Services []*ServicesMetadata `group:"services"` }
type ServicesGroupOut ¶ added in v1.14.0
type ServicesGroupOut struct { fx.Out Services *ServicesMetadata `group:"services"` }
func FrontendServiceProvider ¶ added in v1.14.0
func FrontendServiceProvider( params ServiceProviderParamsCommon, ) (ServicesGroupOut, error)
func HistoryServiceProvider ¶ added in v1.14.0
func HistoryServiceProvider( params ServiceProviderParamsCommon, ) (ServicesGroupOut, error)
func InternalFrontendServiceProvider ¶ added in v1.20.0
func InternalFrontendServiceProvider( params ServiceProviderParamsCommon, ) (ServicesGroupOut, error)
func MatchingServiceProvider ¶ added in v1.14.0
func MatchingServiceProvider( params ServiceProviderParamsCommon, ) (ServicesGroupOut, error)
func WorkerServiceProvider ¶ added in v1.14.0
func WorkerServiceProvider( params ServiceProviderParamsCommon, ) (ServicesGroupOut, error)
type ServicesMetadata ¶ added in v1.14.0
type ServicesMetadata struct { App *fx.App // Added for info. ServiceStopFn is enough. ServiceName primitives.ServiceName ServiceStopFn ServiceStopFn }