call

package
v0.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2023 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Package call implements an RPC mechanism.

Index

Constants

View Source
const (
	// CommunicationError is the type of the error returned by a call when some
	// communication error is encountered, typically a process or network
	// error. Check for it via errors.Is(call.CommunicationError).
	CommunicationError transportError = iota

	// Unreachable is the type of the error returned by a call when every
	// server is unreachable. Check for it via errors.Is(call.Unreachable).
	Unreachable
)

Variables

View Source
var Missing = Version{"__tombstone__"}

Missing is the version associated with a value that does not exist in the store.

TODO(rgrandl): this should be the same as the one in gke/internal/store/store.go. Should we move the version into a common file? Right now we have a duplicate version struct that is used both by the gke/store and stub/resolver.

Functions

func RoundRobin

func RoundRobin() *roundRobin

RoundRobin returns a round-robin balancer.

func Serve

func Serve(ctx context.Context, l Listener, opts ServerOptions) error

Serve starts listening for connections and requests on l. It always returns a non-nil error and closes l.

func ServeOn

func ServeOn(ctx context.Context, conn net.Conn, hmap *HandlerMap, opts ServerOptions)

ServeOn serves client requests received over an already established network connection with a client. This can be useful in tests or when using custom networking transports.

Types

type Balancer

type Balancer interface {
	// Update updates the current set of endpoints from which the Balancer can
	// pick. Before Update is called for the first time, the set of endpoints
	// is empty.
	Update(endpoints []Endpoint)

	// Pick picks an endpoint. Pick is guaranteed to return an endpoint that
	// was passed to the most recent call of Update. If there are no endpoints,
	// then Pick returns an error that includes Unreachable.
	Pick(CallOptions) (Endpoint, error)
}

A Balancer picks the endpoint to which which an RPC client performs a call. A Balancer should only be used by a single goroutine.

TODO(mwhittaker): Right now, balancers have no load information about endpoints. In the short term, we can at least add information about the number of pending requests for every endpoint.

TODO(mwhittaker): Right now, we pass a balancer the set of all endpoints. We instead probably want to pass it only the endpoints for which we have a connection. This means we may have to form connections more eagerly.

TODO(mwhittaker): We may want to guarantee that Update() is never called with an empty list of addresses. If we don't have addresses, then we don't need to do balancing.

func BalancerFunc

func BalancerFunc(pick func([]Endpoint, CallOptions) (Endpoint, error)) Balancer

BalancerFunc returns a stateless, purely functional load balancer that uses the provided picking function.

func Sharded

func Sharded() Balancer

Sharded returns a new sharded balancer.

Given a list of n endpoints e1, ..., en, for a request with shard key k, a sharded balancer will pick endpoint ei where i = k mod n. If no shard key is provided, an endpoint is picked at random.

type CallOptions

type CallOptions struct {
	// ShardKey, if not 0, is the shard key that a Balancer can use to route a
	// call. A Balancer can always choose to ignore the ShardKey.
	//
	// TODO(mwhittaker): Figure out a way to have 0 be a valid shard key. Could
	// change to *uint64 for example.
	ShardKey uint64

	// Balancer, if not nil, is the Balancer to use for a call, instead of the
	// Balancer that the client was constructed with (provided in
	// ClientOptions).
	Balancer Balancer
}

CallOptions are call-specific options.

type ClientOptions

type ClientOptions struct {
	// Load balancer. Defaults to RoundRobin() if nil.
	Balancer Balancer

	// Logger. Defaults to a logger that logs to stderr.
	Logger *slog.Logger

	// If non-zero, each call will optimistically spin for a given duration
	// before blocking, waiting for the results.
	OptimisticSpinDuration time.Duration

	// If non-zero, all writes smaller than this limit are flattened into
	// a single buffer before being written on the connection.
	WriteFlattenLimit int
}

ClientOptions are the options to configure an RPC client.

type Connection

type Connection interface {
	// Call makes an RPC over a Connection.
	Call(context.Context, MethodKey, []byte, CallOptions) ([]byte, error)

	// Close closes a connection. Pending invocations of Call are cancelled and
	// return an error. All future invocations of Call fail and return an error
	// immediately. Close can be called more than once.
	Close()
}

Connection allows a client to send RPCs.

func Connect

func Connect(ctx context.Context, resolver Resolver, opts ClientOptions) (Connection, error)

Connect creates a connection to the servers at the endpoints returned by the resolver.

type Endpoint

type Endpoint interface {
	// Dial returns an network connection to the endpoint.
	Dial(ctx context.Context) (net.Conn, error)

	// Address returns the address of the endpoint. If two endpoints have the
	// same address, then they are guaranteed to represent the same endpoint.
	// But, two endpoints with different addresses may also represent the same
	// endpoint (e.g., TCP("golang.org:http") and TCP("golang.org:80")).
	Address() string
}

An endpoint is a dialable entity with an address. For example, TCP("localhost:8000") returns an endpoint that dials the TCP server running on localhost:8000, and Unix("/tmp/unix.sock") returns an endpoint that dials the Unix socket /tmp/unix.sock.

func MTLS added in v0.6.0

func MTLS(config *tls.Config, ep Endpoint) Endpoint

MTLS returns an endpoint that performs MTLS authentication over the underlying endpoint. For example:

MTLS(&tls.Config{...}, TCP("golang.org:http"))
MTLS(&tls.Config{...}, Unix("unix.sock"))

REQUIRES: config is not nil

type Handler

type Handler func(ctx context.Context, args []byte) ([]byte, error)

Handler is a function that handles remote procedure calls. Regular application errors should be serialized in the returned bytes. A Handler should only return a non-nil error if the handler was not able to execute successfully.

type HandlerMap

type HandlerMap struct {
	// contains filtered or unexported fields
}

HandlerMap is a mapping from MethodID to a Handler. The zero value for a HandlerMap is an empty map.

func (*HandlerMap) Set

func (hm *HandlerMap) Set(component, method string, handler Handler)

Set registers a handler for the specified method of component.

type Listener added in v0.6.0

type Listener interface {
	Accept() (net.Conn, *HandlerMap, error)
	Close() error
	Addr() net.Addr
}

Listener allows the server to accept RPCs.

type MethodKey

type MethodKey [16]byte

MethodKey identifies a particular method on a component (formed by fingerprinting the component and method name).

func MakeMethodKey

func MakeMethodKey(component, method string) MethodKey

MakeMethodKey returns the fingerprint for the specified method on component.

type NetEndpoint

type NetEndpoint struct {
	Net  string // e.g., "tcp", "udp", "unix"
	Addr string // e.g., "localhost:8000", "/tmp/unix.sock"
}

NetEndpoint is an Endpoint that implements Dial using net.Dial.

func ParseNetEndpoint added in v0.4.0

func ParseNetEndpoint(endpoint string) (NetEndpoint, error)

ParseNetEndpoint parses a string with a format of net://addr into a NetAddress. For example,

ParseNetEndpoint("tcp://localhost:80") // NetEndpoint{"tcp", "localhost:80"}
ParseNetEndpoint("unix://unix.sock")   // NetEndpoint{"unix", "unix.sock"}

func TCP

func TCP(address string) NetEndpoint

TCP returns a TCP endpoint. The provided address is passed to net.Dial. For example:

TCP("golang.org:http")
TCP("192.0.2.1:http")
TCP("198.51.100.1:80")
TCP("[2001:db8::1]:domain")
TCP("[fe80::1%lo0]:53")
TCP(":80")

func Unix

func Unix(filename string) NetEndpoint

Unix returns an endpoint that uses Unix sockets. The provided filename is passed to net.Dial. For example:

Unix("unix.sock")
Unix("/tmp/unix.sock")

func (NetEndpoint) Address

func (ne NetEndpoint) Address() string

Address implements the Endpoint interface.

func (NetEndpoint) Dial

func (ne NetEndpoint) Dial(ctx context.Context) (net.Conn, error)

Dial implements the Endpoint interface.

func (NetEndpoint) String added in v0.4.0

func (ne NetEndpoint) String() string

type Resolver

type Resolver interface {
	// IsConstant returns whether a resolver is constant. A constant resolver
	// returns a fixed set of endpoints that doesn't change over time. A
	// non-constant resolver manages a set of endpoints that does change over
	// time.
	IsConstant() bool

	// Resolve returns a resolver's set of dialable endpoints. For non-constant
	// resolvers, this set of endpoints may change over time. Every snapshot of
	// the set of endpoints is assigned a unique version. If you call the
	// Resolve method with a nil version, Resolve returns the current set of
	// endpoints and its version. If you call the Resolve method with a non-nil
	// version, then a Resolver either:
	//    1. Blocks until the latest set of endpoints has a version newer than
	//       the one provided, returning the new set of endpoints and a new
	//       version.
	//    2. Returns the same version, indicating that the Resolve should
	//       be called again after an appropriate delay.
	//
	// Example:
	//     if !resolver.IsConstant() {
	//         // Perform an unversioned, non-blocking Resolve to get the the
	//         // latest set of endpoints and its version.
	//         endpoints, version, err := resolver.Resolve(ctx, nil)
	//
	//         // Perform a versioned Resolve that either (1) blocks until a set
	//         // of endpoints exists with a version newer than `version`, or
	//         // (2) returns `version`, indicating that the Resolve should be
	//         // called again after an appropriate delay.
	//         newEndpoints, newVersion, err := resolver.Resolve(ctx, version)
	//     }
	//
	// If the resolver is constant, then Resolve only needs to be called once
	// with a nil version. The returned set of endpoints will never change, and
	// the returned version is nil.
	//
	//     if resolver.IsConstant() {
	//         // endpoints1 == endpoints2, and version1 == version2 == nil.
	//         endpoints1, version1, err := resolver.Resolve(ctx, nil)
	//         endpoints2, version2, err := resolver.Resolve(ctx, nil)
	//     }
	Resolve(ctx context.Context, version *Version) ([]Endpoint, *Version, error)
}

A Resolver manages a potentially changing set of endpoints. For example:

  • A DNS resolver might resolve a hostname like "google.com" into a set of IP addresses like ["74.125.142.106", "74.125.142.103", "74.125.142.99"].
  • A Kubernetes Service resolver might resolve a service name like "backend" into the IP addresses of the pods that implement the service.
  • A unix resolver might resolve a directory name like "/tmp/workers" into the set of unix socket files within the directory.

A Resolver can safely be used concurrently from multiple goroutines.

Example usage:

func printAddrs(ctx context.Context, resolver Resolver) error {
    var version *Version
    for ctx.Err() == nil {
        endpoints, newVersion, err = resolver.Resolve(ctx, version)
        if err != nil {
            return err
        }
        version = newVersion

        for _, endpoint := range endpoints {
            fmt.Println(endpoint.Address())
        }

        if resolver.IsConstant() {
            return nil
        }
    }
    return ctx.Err()
}

func NewConstantResolver

func NewConstantResolver(endpoints ...Endpoint) Resolver

NewConstantResolver returns a new resolver that returns the provided set of endpoints.

func NewFileResolver

func NewFileResolver(filename string, endpoints ...Endpoint) Resolver

NewFileResolver returns a new resolver that returns a given set of endpoints when a given file is created.

type ServerOptions

type ServerOptions struct {
	// Logger. Defaults to a logger that logs to stderr.
	Logger *slog.Logger

	// Tracer. Defaults to a discarding tracer.
	Tracer trace.Tracer

	// If non-zero, calls on the server are inlined and a new goroutine is
	// launched only if the call takes longer than the provided duration.
	InlineHandlerDuration time.Duration

	// If non-zero, all writes smaller than this limit are flattened into
	// a single buffer before being written on the connection.
	WriteFlattenLimit int
}

ServerOption are the options to configure an RPC server.

type Version

type Version struct {
	Opaque string
}

Version is the version associated with a resolver's set of endpoints. Versions are opaque entities and should not be inspected or interpreted. Versions should only ever be constructed by calling a resolver's Resolve method and should only ever be used by being passed to the same resolver's Resolve method.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL