Documentation ¶
Index ¶
- Constants
- Variables
- func APIFailureToNexusFailure(failure *failurepb.Failure) *nexus.Failure
- func AdaptAuthorizeError(err error) error
- func ConvertGRPCError(err error, exposeDetails bool) error
- func IncomingServicePersistedEntryToExternalAPI(entry *persistencepb.NexusIncomingServiceEntry) *nexus.IncomingService
- func NexusFailureToProtoFailure(failure *nexus.Failure) *nexuspb.Failure
- func ProtoFailureToNexusFailure(failure *nexuspb.Failure) *nexus.Failure
- func UnsuccessfulOperationErrorToTemporalFailure(err *nexus.UnsuccessfulOperationError) *failurepb.Failure
- type CallbackToken
- type CallbackTokenGenerator
- type IncomingServiceRegistry
- type IncomingServiceRegistryConfig
- type NamespaceAndTaskQueue
- type NamespaceService
- type OutgoingServiceRegistry
- func (h *OutgoingServiceRegistry) Create(ctx context.Context, req *operatorservice.CreateNexusOutgoingServiceRequest) (*operatorservice.CreateNexusOutgoingServiceResponse, error)
- func (h *OutgoingServiceRegistry) Delete(ctx context.Context, req *operatorservice.DeleteNexusOutgoingServiceRequest) (*operatorservice.DeleteNexusOutgoingServiceResponse, error)
- func (h *OutgoingServiceRegistry) Get(ctx context.Context, req *operatorservice.GetNexusOutgoingServiceRequest) (*operatorservice.GetNexusOutgoingServiceResponse, error)
- func (h *OutgoingServiceRegistry) List(ctx context.Context, req *operatorservice.ListNexusOutgoingServicesRequest) (*operatorservice.ListNexusOutgoingServicesResponse, error)
- func (h *OutgoingServiceRegistry) Update(ctx context.Context, req *operatorservice.UpdateNexusOutgoingServiceRequest) (*operatorservice.UpdateNexusOutgoingServiceResponse, error)
- type OutgoingServiceRegistryConfig
Constants ¶
const ( // Currently supported token version. TokenVersion = 1 // Header key for the callback token in StartOperation requests. CallbackTokenHeader = "Temporal-Callback-Token" )
const ( IssueNamespaceNotSet = "namespace is not set on request" IssueNameNotSet = "name is not set on request" IssueSpecNotSet = "spec is not set on request" IssueURLNotSet = "url is not set on request" IssuePublicCallbackURLNotSet = "public callback url is not set on request" )
Variables ¶
var PayloadSerializer nexus.Serializer = payloadSerializer{}
var RouteCompletionCallback = routing.NewBuilder[string](). Constant("api", "v1", "namespaces"). StringVariable("namespace", func(namespace *string) *string { return namespace }). Constant("nexus", "callback"). Build()
RouteCompletionCallback is an HTTP route for completing a Nexus operation via callback.
var RouteDispatchNexusTaskByNamespaceAndTaskQueue = routing.NewBuilder[NamespaceAndTaskQueue](). Constant("api", "v1", "namespaces"). StringVariable("namespace", func(params *NamespaceAndTaskQueue) *string { return ¶ms.Namespace }). Constant("task-queues"). StringVariable("task_queue", func(params *NamespaceAndTaskQueue) *string { return ¶ms.TaskQueue }). Constant("nexus-operations"). Build()
var RouteDispatchNexusTaskByService = routing.NewBuilder[string](). Constant("api", "v1", "nexus", "services"). StringVariable("service", func(service *string) *string { return service }). Constant("operations"). Build()
var ServiceNameRegex = regexp.MustCompile(`[a-zA-Z_][a-zA-Z0-9_]*`)
ServiceNameRegex is the regular expression that outgoing service names must match.
Functions ¶
func APIFailureToNexusFailure ¶
APIFailureToNexusFailure converts an API proto Failure to a Nexus SDK Failure taking only the failure message to avoid leaking too many details to 3rd party callers. Always returns a non-nil value.
func AdaptAuthorizeError ¶
func ConvertGRPCError ¶
ConvertGRPCError converts either a serviceerror or a gRPC status error into a Nexus HandlerError if possible. If exposeDetails is true, the error message from the given error is exposed in the converted HandlerError, otherwise, a default message with minimal information is attached to the returned error. Roughly taken from https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto and https://github.com/grpc-ecosystem/grpc-gateway/blob/a7cf811e6ffabeaddcfb4ff65602c12671ff326e/runtime/errors.go#L56.
func IncomingServicePersistedEntryToExternalAPI ¶
func IncomingServicePersistedEntryToExternalAPI(entry *persistencepb.NexusIncomingServiceEntry) *nexus.IncomingService
func NexusFailureToProtoFailure ¶
NexusFailureToProtoFailure converts a Nexus SDK Failure to a proto Nexus Failure. Always returns a non-nil value.
func ProtoFailureToNexusFailure ¶
ProtoFailureToNexusFailure converts a proto Nexus Failure to a Nexus SDK Failure. Always returns a non-nil value.
func UnsuccessfulOperationErrorToTemporalFailure ¶
func UnsuccessfulOperationErrorToTemporalFailure(err *nexus.UnsuccessfulOperationError) *failurepb.Failure
Types ¶
type CallbackToken ¶
type CallbackToken struct { // Token version - currently only [TokenVersion] is supported. Version int `json:"v"` // Encoded [tokenspb.NexusOperationCompletion]. Data string `json:"d"` }
CallbackToken contains an encoded NexusOperationCompletion message.
func DecodeCallbackToken ¶
func DecodeCallbackToken(encoded string) (token *CallbackToken, err error)
DecodeCallbackToken unmarshals the given token applying minimal data verification.
type CallbackTokenGenerator ¶
type CallbackTokenGenerator struct { }
func NewCallbackTokenGenerator ¶
func NewCallbackTokenGenerator() *CallbackTokenGenerator
func (*CallbackTokenGenerator) DecodeCompletion ¶
func (g *CallbackTokenGenerator) DecodeCompletion(token *CallbackToken) (*tokenspb.NexusOperationCompletion, error)
DecodeCompletion decodes a callback token unwrapping the contained NexusOperationCompletion proto struct.
func (*CallbackTokenGenerator) Tokenize ¶
func (g *CallbackTokenGenerator) Tokenize(completion *tokenspb.NexusOperationCompletion) (string, error)
type IncomingServiceRegistry ¶
type IncomingServiceRegistry struct {
// contains filtered or unexported fields
}
IncomingServiceRegistry manages a cached view of Nexus incoming services. Services are lazily-loaded into memory on the first read. Thereafter, service data is kept up to date by background long polling on matching service ListNexusIncomingServices.
func NewIncomingServiceRegistry ¶
func NewIncomingServiceRegistry( config *IncomingServiceRegistryConfig, matchingClient matchingservice.MatchingServiceClient, persistence p.NexusIncomingServiceManager, logger log.Logger, ) *IncomingServiceRegistry
func (*IncomingServiceRegistry) Get ¶
func (r *IncomingServiceRegistry) Get(ctx context.Context, id string) (*nexus.IncomingService, error)
func (*IncomingServiceRegistry) StartLifecycle ¶
func (r *IncomingServiceRegistry) StartLifecycle()
StartLifecycle starts this component. It should only be invoked by an fx lifecycle hook. Should not be called multiple times or concurrently with StopLifecycle()
func (*IncomingServiceRegistry) StopLifecycle ¶
func (r *IncomingServiceRegistry) StopLifecycle()
StopLifecycle stops this component. It should only be invoked by an fx lifecycle hook. Should not be called multiple times or concurrently with StartLifecycle()
type IncomingServiceRegistryConfig ¶
type IncomingServiceRegistryConfig struct {
// contains filtered or unexported fields
}
func NewIncomingServiceRegistryConfig ¶
func NewIncomingServiceRegistryConfig(dc *dynamicconfig.Collection) *IncomingServiceRegistryConfig
type NamespaceAndTaskQueue ¶
type NamespaceService ¶
type NamespaceService interface { GetNamespace(ctx context.Context, request *persistence.GetNamespaceRequest) (*persistence.GetNamespaceResponse, error) UpdateNamespace(ctx context.Context, request *persistence.UpdateNamespaceRequest) error }
NamespaceService is an interface which contains only the methods we need from go.temporal.io/server/common/persistence.MetadataManager.
type OutgoingServiceRegistry ¶
type OutgoingServiceRegistry struct {
// contains filtered or unexported fields
}
OutgoingServiceRegistry manages the registration and retrieval of "outgoing" services from the namespace metadata in the persistence layer. An outgoing service is a Nexus service to which we send traffic. For example, we might call one of these services to start an operation using this API: https://github.com/nexus-rpc/api/blob/main/SPEC.md#start-operation. We need a registry for these services, so that we can look up the URL for a service by name. Later, we may add more service-specific information to the registry.
func NewOutgoingServiceRegistry ¶
func NewOutgoingServiceRegistry( namespaceService NamespaceService, namespaceReplicator namespace.Replicator, config *OutgoingServiceRegistryConfig, ) *OutgoingServiceRegistry
NewOutgoingServiceRegistry creates a new OutgoingServiceRegistry with the given namespace service and configuration.
func (*OutgoingServiceRegistry) Create ¶
func (h *OutgoingServiceRegistry) Create( ctx context.Context, req *operatorservice.CreateNexusOutgoingServiceRequest, ) (*operatorservice.CreateNexusOutgoingServiceResponse, error)
Create implements operatorservice.OperatorServiceServer.CreateNexusOutgoingService.
func (*OutgoingServiceRegistry) Delete ¶
func (h *OutgoingServiceRegistry) Delete( ctx context.Context, req *operatorservice.DeleteNexusOutgoingServiceRequest, ) (*operatorservice.DeleteNexusOutgoingServiceResponse, error)
Delete implements operatorservice.OperatorServiceServer.DeleteNexusOutgoingService.
func (*OutgoingServiceRegistry) Get ¶
func (h *OutgoingServiceRegistry) Get( ctx context.Context, req *operatorservice.GetNexusOutgoingServiceRequest, ) (*operatorservice.GetNexusOutgoingServiceResponse, error)
Get implements operatorservice.OperatorServiceServer.GetNexusOutgoingService.
func (*OutgoingServiceRegistry) List ¶
func (h *OutgoingServiceRegistry) List( ctx context.Context, req *operatorservice.ListNexusOutgoingServicesRequest, ) (*operatorservice.ListNexusOutgoingServicesResponse, error)
List implements operatorservice.OperatorServiceServer.ListNexusOutgoingServices.
func (*OutgoingServiceRegistry) Update ¶
func (h *OutgoingServiceRegistry) Update( ctx context.Context, req *operatorservice.UpdateNexusOutgoingServiceRequest, ) (*operatorservice.UpdateNexusOutgoingServiceResponse, error)
Update implements operatorservice.OperatorServiceServer.UpdateNexusOutgoingService.
type OutgoingServiceRegistryConfig ¶
type OutgoingServiceRegistryConfig struct { MaxURLLength dynamicconfig.IntPropertyFn MaxNameLength dynamicconfig.IntPropertyFn DefaultPageSize dynamicconfig.IntPropertyFn MaxPageSize dynamicconfig.IntPropertyFn }
OutgoingServiceRegistryConfig contains the dynamic configuration values for the OutgoingServiceRegistry.
func NewOutgoingServiceRegistryConfig ¶
func NewOutgoingServiceRegistryConfig(dc *dynamicconfig.Collection) *OutgoingServiceRegistryConfig
NewOutgoingServiceRegistryConfig creates a new OutgoingServiceRegistryConfig with the given dynamic configuration.