Documentation
¶
Index ¶
- Constants
- Variables
- func APIFailureToNexusFailure(failure *failurepb.Failure) (nexus.Failure, error)
- func AdaptAuthorizeError(err error) error
- func ConvertGRPCError(err error, exposeDetails bool) error
- func FormatDuration(d time.Duration) string
- func HandlerErrorTypeFromHTTPStatus(statusCode int) nexus.HandlerErrorType
- func NexusFailureToAPIFailure(failure nexus.Failure, retryable bool) (*failurepb.Failure, error)
- func NexusFailureToProtoFailure(failure nexus.Failure) *nexuspb.Failure
- func OperationErrorToTemporalFailure(opErr *nexus.OperationError) (*failurepb.Failure, error)
- func ProtoFailureToNexusFailure(failure *nexuspb.Failure) nexus.Failure
- func SetFailureSourceOnContext(ctx context.Context, response *http.Response)
- type CallbackToken
- type CallbackTokenGenerator
- type EndpointRegistry
- type EndpointRegistryConfig
- type EndpointRegistryImpl
- func (r *EndpointRegistryImpl) GetByID(ctx context.Context, id string) (*persistencespb.NexusEndpointEntry, error)
- func (r *EndpointRegistryImpl) GetByName(ctx context.Context, _ namespace.ID, endpointName string) (*persistencespb.NexusEndpointEntry, error)
- func (r *EndpointRegistryImpl) StartLifecycle()
- func (r *EndpointRegistryImpl) StopLifecycle()
- type HTTPClientTraceConfig
- type HTTPClientTraceProvider
- type LoggedHTTPClientTraceProvider
- type NamespaceAndTaskQueue
Constants ¶
const ( // Currently supported token version. TokenVersion = 1 // Header key for the callback token in StartOperation requests. CallbackTokenHeader = "Temporal-Callback-Token" )
const ( // FailureSourceHeaderName is the header used to indicate from where the Nexus failure originated. FailureSourceHeaderName = "Temporal-Nexus-Failure-Source" // FailureSourceWorker indicates the failure originated from outside the server (e.g. bad request or on the Nexus worker). FailureSourceWorker = "worker" )
Variables ¶
var ErrNexusDisabled = serviceerror.NewFailedPrecondition("nexus is disabled")
var FailureSourceContextKey = failureSourceContextKeyType{}
var HTTPTraceConfig = dynamicconfig.NewGlobalTypedSettingWithConverter( "system.nexusHTTPTraceConfig", func(a any) (any, error) { return a, nil }, nil, `Configuration options for controlling additional tracing for Nexus HTTP requests. Fields: Enabled, MinAttempt, MaxAttempt, Hooks. See HTTPClientTraceConfig comments for more detail.`, )
HTTPTraceConfig is the dynamic config for controlling Nexus HTTP request tracing behavior. The default is nil and the conversion function does not do any actual conversion because this should be wrapped by a dynamicconfig.NewGlobalCachedTypedValue with the actual conversion function so that it is cached.
var PayloadSerializer nexus.Serializer = payloadSerializer{}
var RouteCompletionCallback = routing.NewBuilder[string](). Constant("namespaces"). StringVariable("namespace", func(namespace *string) *string { return namespace }). Constant("nexus", "callback"). Build()
RouteCompletionCallback is an HTTP route for completing a Nexus operation via callback.
var RouteDispatchNexusTaskByEndpoint = routing.NewBuilder[string](). Constant("nexus", "endpoints"). StringVariable("endpoint", func(endpoint *string) *string { return endpoint }). Constant("services"). Build()
var RouteDispatchNexusTaskByNamespaceAndTaskQueue = routing.NewBuilder[NamespaceAndTaskQueue](). Constant("namespaces"). StringVariable("namespace", func(params *NamespaceAndTaskQueue) *string { return ¶ms.Namespace }). Constant("task-queues"). StringVariable("task_queue", func(params *NamespaceAndTaskQueue) *string { return ¶ms.TaskQueue }). Constant("nexus-services"). Build()
Functions ¶
func APIFailureToNexusFailure ¶
APIFailureToNexusFailure converts an API proto Failure to a Nexus SDK Failure setting the metadata "type" field to the proto fullname of the temporal API Failure message. Mutates the failure temporarily, unsetting the Message field to avoid duplicating the information in the serialized failure. Mutating was chosen over cloning for performance reasons since this function may be called frequently.
func AdaptAuthorizeError ¶
func ConvertGRPCError ¶
ConvertGRPCError converts either a serviceerror or a gRPC status error into a Nexus HandlerError if possible. If exposeDetails is true, the error message from the given error is exposed in the converted HandlerError, otherwise, a default message with minimal information is attached to the returned error. Roughly taken from https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto and https://github.com/grpc-ecosystem/grpc-gateway/blob/a7cf811e6ffabeaddcfb4ff65602c12671ff326e/runtime/errors.go#L56.
func FormatDuration ¶ added in v1.26.2
FormatDuration converts a duration into a string representation in millisecond resolution. TODO: replace this with the version exported from the Nexus SDK
func HandlerErrorTypeFromHTTPStatus ¶ added in v1.25.0
func HandlerErrorTypeFromHTTPStatus(statusCode int) nexus.HandlerErrorType
func NexusFailureToAPIFailure ¶ added in v1.26.2
NexusFailureToAPIFailure converts a Nexus Failure to an API proto Failure. If the failure metadata "type" field is set to the fullname of the temporal API Failure message, the failure is reconstructed using protojson.Unmarshal on the failure details field.
func NexusFailureToProtoFailure ¶
NexusFailureToProtoFailure converts a Nexus SDK Failure to a proto Nexus Failure. Always returns a non-nil value.
func OperationErrorToTemporalFailure ¶
func OperationErrorToTemporalFailure(opErr *nexus.OperationError) (*failurepb.Failure, error)
func ProtoFailureToNexusFailure ¶
ProtoFailureToNexusFailure converts a proto Nexus Failure to a Nexus SDK Failure.
Types ¶
type CallbackToken ¶
type CallbackToken struct { // Token version - currently only [TokenVersion] is supported. Version int `json:"v"` // Encoded [tokenspb.NexusOperationCompletion]. Data string `json:"d"` }
CallbackToken contains an encoded NexusOperationCompletion message.
func DecodeCallbackToken ¶
func DecodeCallbackToken(encoded string) (token *CallbackToken, err error)
DecodeCallbackToken unmarshals the given token applying minimal data verification.
type CallbackTokenGenerator ¶
type CallbackTokenGenerator struct { }
func NewCallbackTokenGenerator ¶
func NewCallbackTokenGenerator() *CallbackTokenGenerator
func (*CallbackTokenGenerator) DecodeCompletion ¶
func (g *CallbackTokenGenerator) DecodeCompletion(token *CallbackToken) (*tokenspb.NexusOperationCompletion, error)
DecodeCompletion decodes a callback token unwrapping the contained NexusOperationCompletion proto struct.
func (*CallbackTokenGenerator) Tokenize ¶
func (g *CallbackTokenGenerator) Tokenize(completion *tokenspb.NexusOperationCompletion) (string, error)
type EndpointRegistry ¶ added in v1.25.0
type EndpointRegistry interface { // GetByName returns an endpoint entry for the endpoint name for a caller from the given namespace ID. // Note that the default implementation is global to the cluster and can ignore the namespace ID param. GetByName(ctx context.Context, namespaceID namespace.ID, endpointName string) (*persistencespb.NexusEndpointEntry, error) GetByID(ctx context.Context, endpointID string) (*persistencespb.NexusEndpointEntry, error) StartLifecycle() StopLifecycle() }
type EndpointRegistryConfig ¶ added in v1.25.0
type EndpointRegistryConfig struct {
// contains filtered or unexported fields
}
func NewEndpointRegistryConfig ¶ added in v1.25.0
func NewEndpointRegistryConfig(dc *dynamicconfig.Collection) *EndpointRegistryConfig
type EndpointRegistryImpl ¶ added in v1.25.0
type EndpointRegistryImpl struct {
// contains filtered or unexported fields
}
EndpointRegistryImpl manages a cached view of Nexus endpoints. Endpoints are lazily-loaded into memory on the first read. Thereafter, endpoint data is kept up to date by background long polling on matching service ListNexusEndpoints.
func NewEndpointRegistry ¶ added in v1.25.0
func NewEndpointRegistry( config *EndpointRegistryConfig, matchingClient matchingservice.MatchingServiceClient, persistence p.NexusEndpointManager, logger log.Logger, metricsHandler metrics.Handler, ) *EndpointRegistryImpl
func (*EndpointRegistryImpl) GetByID ¶ added in v1.25.0
func (r *EndpointRegistryImpl) GetByID(ctx context.Context, id string) (*persistencespb.NexusEndpointEntry, error)
func (*EndpointRegistryImpl) GetByName ¶ added in v1.25.0
func (r *EndpointRegistryImpl) GetByName(ctx context.Context, _ namespace.ID, endpointName string) (*persistencespb.NexusEndpointEntry, error)
func (*EndpointRegistryImpl) StartLifecycle ¶ added in v1.25.0
func (r *EndpointRegistryImpl) StartLifecycle()
StartLifecycle starts this component. It should only be invoked by an fx lifecycle hook. Should not be called multiple times or concurrently with StopLifecycle()
func (*EndpointRegistryImpl) StopLifecycle ¶ added in v1.25.0
func (r *EndpointRegistryImpl) StopLifecycle()
StopLifecycle stops this component. It should only be invoked by an fx lifecycle hook. Should not be called multiple times or concurrently with StartLifecycle()
type HTTPClientTraceConfig ¶
type HTTPClientTraceConfig struct { // Enabled controls whether any additional tracing will be invoked. Default false. Enabled bool // MinAttempt is the first operation attempt to include additional tracing. Default 2. Setting to 0 or 1 will add tracing to all requests and may be expensive. MinAttempt int32 // MaxAttempt is the maximum operation attempt to include additional tracing. Default 2. Setting to 0 means no maximum. MaxAttempt int32 // Hooks is the list of method names to invoke with extra tracing. See httptrace.ClientTrace for more detail. // Defaults to all implemented hooks: GetConn, GotConn, ConnectStart, ConnectDone, DNSStart, DNSDone, TLSHandshakeStart, TLSHandshakeDone, WroteRequest, GotFirstResponseByte. Hooks []string }
type HTTPClientTraceProvider ¶
type HTTPClientTraceProvider interface { // NewTrace returns a *httptrace.ClientTrace which provides hooks to invoke at each point in the HTTP request // lifecycle. This trace must be added to the HTTP request context using httptrace.WithClientTrace for the hooks to // be invoked. The provided logger should already be tagged with relevant request information // e.g. using log.With(logger, tag.RequestID(id), tag.Operation(op), ...). NewTrace(attempt int32, logger log.Logger) *httptrace.ClientTrace }
func NewLoggedHTTPClientTraceProvider ¶
func NewLoggedHTTPClientTraceProvider(dc *dynamicconfig.Collection) HTTPClientTraceProvider
type LoggedHTTPClientTraceProvider ¶
type LoggedHTTPClientTraceProvider struct {
Config *dynamicconfig.GlobalCachedTypedValue[HTTPClientTraceConfig]
}
func (*LoggedHTTPClientTraceProvider) NewTrace ¶
func (p *LoggedHTTPClientTraceProvider) NewTrace(attempt int32, logger log.Logger) *httptrace.ClientTrace