rpc

package
v0.420.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2025 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const ShutdownGracePeriod = time.Second * 5

Variables

This section is empty.

Functions

func AlwaysRetry

func AlwaysRetry() func(error) bool

AlwaysRetry instructs RetryStreamingServerStream to always retry the errors it encounters when supplied as the errorRetryCallback argument

func ClientFromContext

func ClientFromContext[Client Pingable[Req, Resp, RespPtr], Req any, Resp any, RespPtr PingResponse[Resp]](ctx context.Context) Client

ClientFromContext returns the given RPC client from the context, or panics.

func ContextValuesMiddleware

func ContextValuesMiddleware(ctx context.Context, handler http.Handler) http.HandlerFunc

ContextValuesMiddleware injects values from a Context into the request Context.

func ContextWithClient

func ContextWithClient[Client Pingable[Req, Resp, RespPtr], Req any, Resp any, RespPtr PingResponse[Resp]](ctx context.Context, client Client) context.Context

ContextWithClient returns a context with an RPC client attached.

func CustomOtelInterceptor

func CustomOtelInterceptor() connect.Interceptor

func DefaultClientOptions

func DefaultClientOptions(level log.Level) []connect.ClientOption

func DefaultHandlerOptions

func DefaultHandlerOptions() []connect.HandlerOption

func Dial

func Dial[Client Pingable[Req, Resp, RespPtr], Req any, Resp any, RespPtr PingResponse[Resp]](factory ClientFactory[Client, Req, Resp, RespPtr], baseURL string, errorLevel log.Level, opts ...connect.ClientOption) Client

func GetHTTPClient

func GetHTTPClient(url string) *http.Client

GetHTTPClient returns a HTTP client usable for the given URL.

func InitialiseClients

func InitialiseClients(authenticators map[string]string, allowInsecure bool)

InitialiseClients initialises global HTTP clients used by the RPC system.

"authenticators" are authenticator executables to use for each endpoint. The key is the URL of the endpoint, the value is the path to the authenticator executable.

"allowInsecure" skips certificate verification, making TLS susceptible to machine-in-the-middle attacks.

func IsClientAvailableInContext

func IsClientAvailableInContext[Client Pingable[Req, Resp, RespPtr], Req any, Resp any, RespPtr PingResponse[Resp]](ctx context.Context) bool

func IsDirectRouted

func IsDirectRouted(ctx context.Context) bool

IsDirectRouted returns true if the incoming request should be directly routed and never redirected.

func MetadataInterceptor

func MetadataInterceptor(errorLevel log.Level) connect.Interceptor

MetadataInterceptor propagates FTL metadata through servers and clients.

"errorLevel" is the level at which errors will be logged

func PanicInterceptor

func PanicInterceptor() connect.Interceptor

PanicInterceptor intercepts panics and logs them.

func ParentRequestKeyFromContext

func ParentRequestKeyFromContext(ctx context.Context) (optional.Option[model.RequestKey], error)

func RequestKeyFromContext

func RequestKeyFromContext(ctx context.Context) (optional.Option[model.RequestKey], error)

RequestKeyFromContext returns the request key from the context, if any.

TODO: Return an Option here instead of a bool.

func RetryStreamingClientStream

func RetryStreamingClientStream[Req any, Resp any](
	ctx context.Context,
	retry backoff.Backoff,
	rpc func(context.Context) *connect.ClientStreamForClient[Req, Resp],
	handler func(ctx context.Context, send func(*Req) error) error,
)

RetryStreamingClientStream will repeatedly call handler with the stream returned by "rpc" until handler returns an error or the context is cancelled.

If the stream errors, it will be closed and a new call will be issued.

func RetryStreamingServerStream

func RetryStreamingServerStream[Req, Resp any](
	ctx context.Context,
	name string,
	retry backoff.Backoff,
	req *Req,
	rpc func(context.Context, *connect.Request[Req]) (*connect.ServerStreamForClient[Resp], error),
	handler func(ctx context.Context, resp *Resp) error,
	errorRetryCallback func(err error) bool,
)

RetryStreamingServerStream will repeatedly call handler with responses from the stream returned by "rpc" until either the context is cancelled or the errorRetryCallback returns false.

func Serve

func Serve(ctx context.Context, listen *url.URL, options ...Option) error

Serve starts a HTTP and Connect gRPC server with sane defaults for FTL.

Blocks until the context is cancelled.

func VerbFromContext

func VerbFromContext(ctx context.Context) (*schema.Ref, bool)

VerbFromContext returns the current module.verb of the current request.

func VerbsFromContext

func VerbsFromContext(ctx context.Context) ([]*schema.Ref, bool)

VerbsFromContext returns the module.verb chain of the current request.

func Wait

func Wait[Req any, Resp any, RespPtr PingResponse[Resp]](ctx context.Context, retry backoff.Backoff, deadline time.Duration, client Pingable[Req, Resp, RespPtr]) error

Wait for a client to become available.

This will repeatedly call Ping() according to the retry policy until the client is ready or the deadline is reached.

If "ctx" is cancelled this will return ctx.Err()

Usually rpc errors are logged, but this function will silence ping call errors, and returns the last error if the deadline is reached.

func WithDirectRouting

func WithDirectRouting(ctx context.Context) context.Context

WithDirectRouting ensures any hops in Verb routing do not redirect.

This is used so that eg. calls from Drives do not create recursive loops when calling back to the Agent.

func WithParentRequestKey

func WithParentRequestKey(ctx context.Context, key model.RequestKey) context.Context

func WithRequestKey

func WithRequestKey(ctx context.Context, key model.RequestKey) context.Context

WithRequestKey adds the request key to the context.

func WithVerbs

func WithVerbs(ctx context.Context, verbs []*schema.Ref) context.Context

WithVerbs adds the module.verb chain from the current request to the context.

Types

type ClientFactory

type ClientFactory[Client Pingable[Req, Resp, RespPtr], Req any, Resp any, RespPtr PingResponse[Resp]] func(httpClient connect.HTTPClient, baseURL string, opts ...connect.ClientOption) Client

ClientFactory is a function that creates a new client and is typically one of the New*Client functions generated by protoc-gen-connect-go.

type GRPCServerConstructor

type GRPCServerConstructor[Iface Pingable[Req, Resp, RespPtr], Req any, Resp any, RespPtr PingResponse[Resp]] func(svc Iface, opts ...connect.HandlerOption) (string, http.Handler)

type Option

type Option func(*serverOptions)

func GRPC

func GRPC[Iface, Impl Pingable[Req, Resp, RespPtr], Req any, Resp any, RespPtr PingResponse[Resp]](constructor GRPCServerConstructor[Iface, Req, Resp, RespPtr], impl Impl, options ...connect.HandlerOption) Option

GRPC is a convenience function for registering a GRPC server with default options. TODO(aat): Do we need pingable here?

func HTTP

func HTTP(prefix string, handler http.Handler) Option

HTTP adds a HTTP route to the server.

func HealthCheck

func HealthCheck(check http.HandlerFunc) Option

func PProf

func PProf() Option

PProf adds /debug/pprof routes to the server.

func RawGRPC

func RawGRPC[Iface, Impl any](constructor RawGRPCServerConstructor[Iface], impl Impl, options ...connect.HandlerOption) Option

RawGRPC is a convenience function for registering a GRPC server with default options without Pingable.

type PingResponse

type PingResponse[T any] interface {
	*T
	GetNotReady() string
}

PingResponse is a constraint that is used to enforce that a pointer to the Pingable response message has a GetNotReady() method.

type Pingable

type Pingable[Req any, Resp any, RespPtr PingResponse[Resp]] interface {
	Ping(ctx context.Context, req *connect.Request[Req]) (*connect.Response[Resp], error)
}

Pingable is an interface that is used to indicate that a client can be pinged.

type RawGRPCServerConstructor

type RawGRPCServerConstructor[Iface any] func(svc Iface, opts ...connect.HandlerOption) (string, http.Handler)

type Server

type Server struct {
	Bind   *pubsub.Topic[*url.URL] // Will be updated with the actual bind address.
	Server *http.Server
	// contains filtered or unexported fields
}

func NewServer

func NewServer(ctx context.Context, listen *url.URL, options ...Option) (*Server, error)

func (*Server) Serve

func (s *Server) Serve(ctx context.Context) error

Serve runs the server, updating .Bind with the actual bind address.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL