Documentation ¶
Overview ¶
Package call implements an RPC mechanism.
Index ¶
- Constants
- Variables
- func NewStub(name string, reg *codegen.Registration, conn Connection, tracer trace.Tracer, ...) codegen.Stub
- func RoundRobin() *roundRobin
- func Serve(ctx context.Context, l Listener, opts ServerOptions) error
- func ServeOn(ctx context.Context, conn net.Conn, hmap *HandlerMap, opts ServerOptions)
- type Balancer
- type CallOptions
- type ClientOptions
- type Connection
- type Endpoint
- type Handler
- type HandlerMap
- type Listener
- type MethodKey
- type NetEndpoint
- type ReplicaConnection
- type Resolver
- type ServerOptions
- type Version
Constants ¶
const ( // CommunicationError is the type of the error returned by a call when some // communication error is encountered, typically a process or network // error. Check for it via errors.Is(call.CommunicationError). CommunicationError transportError = iota // Unreachable is the type of the error returned by a call when every // server is unreachable. Check for it via errors.Is(call.Unreachable). Unreachable )
Variables ¶
var Missing = Version{"__tombstone__"}
Missing is the version associated with a value that does not exist in the store.
TODO(rgrandl): this should be the same as the one in gke/internal/store/store.go. Should we move the version into a common file? Right now we have a duplicate version struct that is used both by the gke/store and stub/resolver.
Functions ¶
func NewStub ¶
func NewStub(name string, reg *codegen.Registration, conn Connection, tracer trace.Tracer, injectRetries int) codegen.Stub
NewStub creates a client-side stub of the type matching reg. Calls on the stub are sent on conn to the component with the specified name.
func Serve ¶
func Serve(ctx context.Context, l Listener, opts ServerOptions) error
Serve starts listening for connections and requests on l. It always returns a non-nil error and closes l.
func ServeOn ¶
func ServeOn(ctx context.Context, conn net.Conn, hmap *HandlerMap, opts ServerOptions)
ServeOn serves client requests received over an already established network connection with a client. This can be useful in tests or when using custom networking transports.
Types ¶
type Balancer ¶
type Balancer interface { // Add adds a ReplicaConnection to the set of connections. Add(ReplicaConnection) // Remove removes a ReplicaConnection from the set of connections. Remove(ReplicaConnection) // Pick picks a ReplicaConnection from the set of connections. // Pick returns _,false if no connections are available. Pick(CallOptions) (ReplicaConnection, bool) }
Balancer manages a set of ReplicaConnections and picks one of them per call. A Balancer requires external synchronization (no concurrent calls should be made to the same Balancer).
TODO(mwhittaker): Right now, balancers have no load information about endpoints. In the short term, we can at least add information about the number of pending requests for every endpoint.
func BalancerFunc ¶
func BalancerFunc(pick func([]ReplicaConnection, CallOptions) (ReplicaConnection, bool)) Balancer
BalancerFunc returns a stateless, purely functional load balancer that calls pick to pick the connection to use.
type CallOptions ¶
type CallOptions struct { // Retry indicates whether or not calls that failed due to communication // errors should be retried. Retry bool // ShardKey, if not 0, is the shard key that a Balancer can use to route a // call. A Balancer can always choose to ignore the ShardKey. // // TODO(mwhittaker): Figure out a way to have 0 be a valid shard key. Could // change to *uint64 for example. ShardKey uint64 }
CallOptions are call-specific options.
type ClientOptions ¶
type ClientOptions struct { // Load balancer. Defaults to RoundRobin() if nil. Balancer Balancer // Logger. Defaults to a logger that logs to stderr. Logger *slog.Logger // If non-zero, each call will optimistically spin for a given duration // before blocking, waiting for the results. OptimisticSpinDuration time.Duration // All writes smaller than this limit are flattened into a single // buffer before being written on the connection. If zero, an appropriate // value is picked automatically. If negative, no flattening is done. WriteFlattenLimit int }
ClientOptions are the options to configure an RPC client.
type Connection ¶
type Connection interface { // Call makes an RPC over a Connection. Call(context.Context, MethodKey, []byte, CallOptions) ([]byte, error) // Close closes a connection. Pending invocations of Call are cancelled and // return an error. All future invocations of Call fail and return an error // immediately. Close can be called more than once. Close() }
Connection allows a client to send RPCs.
func Connect ¶
func Connect(ctx context.Context, resolver Resolver, opts ClientOptions) (Connection, error)
Connect creates a connection to the servers at the endpoints returned by the resolver.
type Endpoint ¶
type Endpoint interface { // Dial returns an network connection to the endpoint. Dial(ctx context.Context) (net.Conn, error) // Address returns the address of the endpoint. If two endpoints have the // same address, then they are guaranteed to represent the same endpoint. // But, two endpoints with different addresses may also represent the same // endpoint (e.g., TCP("golang.org:http") and TCP("golang.org:80")). Address() string }
An endpoint is a dialable entity with an address. For example, TCP("localhost:8000") returns an endpoint that dials the TCP server running on localhost:8000, and Unix("/tmp/unix.sock") returns an endpoint that dials the Unix socket /tmp/unix.sock.
type Handler ¶
Handler is a function that handles remote procedure calls. Regular application errors should be serialized in the returned bytes. A Handler should only return a non-nil error if the handler was not able to execute successfully.
type HandlerMap ¶
type HandlerMap struct {
// contains filtered or unexported fields
}
HandlerMap is a mapping from MethodID to a Handler. The zero value for a HandlerMap is an empty map.
func NewHandlerMap ¶
func NewHandlerMap() *HandlerMap
NewHandlerMap returns a handler map to which the server handlers can be added.
func (*HandlerMap) AddHandlers ¶
func (hm *HandlerMap) AddHandlers(name string, impl any) error
AddHandlers adds handlers for all methods of the component with the specified name. The handlers invoke methods on the specified impl.
func (*HandlerMap) Set ¶
func (hm *HandlerMap) Set(component, method string, handler Handler)
Set registers a handler for the specified method of component.
type MethodKey ¶
type MethodKey [16]byte
MethodKey identifies a particular method on a component (formed by fingerprinting the component and method name).
func MakeMethodKey ¶
MakeMethodKey returns the fingerprint for the specified method on component.
type NetEndpoint ¶
type NetEndpoint struct { Net string // e.g., "tcp", "udp", "unix" Addr string // e.g., "localhost:8000", "/tmp/unix.sock" }
NetEndpoint is an Endpoint that implements Dial using net.Dial.
func ParseNetEndpoint ¶
func ParseNetEndpoint(endpoint string) (NetEndpoint, error)
ParseNetEndpoint parses a string with a format of net://addr into a NetAddress. For example,
ParseNetEndpoint("tcp://localhost:80") // NetEndpoint{"tcp", "localhost:80"} ParseNetEndpoint("unix://unix.sock") // NetEndpoint{"unix", "unix.sock"}
func TCP ¶
func TCP(address string) NetEndpoint
TCP returns a TCP endpoint. The provided address is passed to net.Dial. For example:
TCP("golang.org:http") TCP("192.0.2.1:http") TCP("198.51.100.1:80") TCP("[2001:db8::1]:domain") TCP("[fe80::1%lo0]:53") TCP(":80")
func Unix ¶
func Unix(filename string) NetEndpoint
Unix returns an endpoint that uses Unix sockets. The provided filename is passed to net.Dial. For example:
Unix("unix.sock") Unix("/tmp/unix.sock")
func (NetEndpoint) Address ¶
func (ne NetEndpoint) Address() string
Address implements the Endpoint interface.
func (NetEndpoint) String ¶
func (ne NetEndpoint) String() string
type ReplicaConnection ¶
type ReplicaConnection interface { // Address returns the name of the endpoint to which the ReplicaConnection // is connected. Address() string }
ReplicaConnection is a connection to a single replica. A single Connection may consist of many ReplicaConnections (typically one per replica).
type Resolver ¶
type Resolver interface { // IsConstant returns whether a resolver is constant. A constant resolver // returns a fixed set of endpoints that doesn't change over time. A // non-constant resolver manages a set of endpoints that does change over // time. IsConstant() bool // Resolve returns a resolver's set of dialable endpoints. For non-constant // resolvers, this set of endpoints may change over time. Every snapshot of // the set of endpoints is assigned a unique version. If you call the // Resolve method with a nil version, Resolve returns the current set of // endpoints and its version. If you call the Resolve method with a non-nil // version, then a Resolver either: // 1. Blocks until the latest set of endpoints has a version newer than // the one provided, returning the new set of endpoints and a new // version. // 2. Returns the same version, indicating that the Resolve should // be called again after an appropriate delay. // // Example: // if !resolver.IsConstant() { // // Perform an unversioned, non-blocking Resolve to get the the // // latest set of endpoints and its version. // endpoints, version, err := resolver.Resolve(ctx, nil) // // // Perform a versioned Resolve that either (1) blocks until a set // // of endpoints exists with a version newer than `version`, or // // (2) returns `version`, indicating that the Resolve should be // // called again after an appropriate delay. // newEndpoints, newVersion, err := resolver.Resolve(ctx, version) // } // // If the resolver is constant, then Resolve only needs to be called once // with a nil version. The returned set of endpoints will never change, and // the returned version is nil. // // if resolver.IsConstant() { // // endpoints1 == endpoints2, and version1 == version2 == nil. // endpoints1, version1, err := resolver.Resolve(ctx, nil) // endpoints2, version2, err := resolver.Resolve(ctx, nil) // } Resolve(ctx context.Context, version *Version) ([]Endpoint, *Version, error) }
A Resolver manages a potentially changing set of endpoints. For example:
- A DNS resolver might resolve a hostname like "google.com" into a set of IP addresses like ["74.125.142.106", "74.125.142.103", "74.125.142.99"].
- A Kubernetes Service resolver might resolve a service name like "backend" into the IP addresses of the pods that implement the service.
- A unix resolver might resolve a directory name like "/tmp/workers" into the set of unix socket files within the directory.
A Resolver can safely be used concurrently from multiple goroutines.
Example usage:
func printAddrs(ctx context.Context, resolver Resolver) error { var version *Version for ctx.Err() == nil { endpoints, newVersion, err = resolver.Resolve(ctx, version) if err != nil { return err } version = newVersion for _, endpoint := range endpoints { fmt.Println(endpoint.Address()) } if resolver.IsConstant() { return nil } } return ctx.Err() }
func NewConstantResolver ¶
NewConstantResolver returns a new resolver that returns the provided set of endpoints.
type ServerOptions ¶
type ServerOptions struct { // Logger. Defaults to a logger that logs to stderr. Logger *slog.Logger // Tracer. Defaults to a discarding tracer. Tracer trace.Tracer // If positive, calls on the server are inlined and a new goroutine is // launched only if the call takes longer than the provided duration. // If zero, the system inlines call execution and automatically picks a // reasonable delay before the new goroutine is launched. // If negative, handlers are always started in a new goroutine. InlineHandlerDuration time.Duration // All writes smaller than this limit are flattened into a single // buffer before being written on the connection. If zero, an appropriate // value is picked automatically. If negative, no flattening is done. WriteFlattenLimit int }
ServerOption are the options to configure an RPC server.
type Version ¶
type Version struct {
Opaque string
}
Version is the version associated with a resolver's set of endpoints. Versions are opaque entities and should not be inspected or interpreted. Versions should only ever be constructed by calling a resolver's Resolve method and should only ever be used by being passed to the same resolver's Resolve method.