Documentation ¶
Overview ¶
Package call implements an RPC mechanism.
Index ¶
- Constants
- Variables
- func RoundRobin() *roundRobin
- func Serve(ctx context.Context, l net.Listener, hmap *HandlerMap, opts ServerOptions) error
- func ServeOn(ctx context.Context, conn net.Conn, hmap *HandlerMap, opts ServerOptions)
- type Balancer
- type CallOptions
- type ClientOptions
- type Connection
- type Endpoint
- type Handler
- type HandlerMap
- type MethodKey
- type NetEndpoint
- type NetworkAddress
- type Resolver
- type ServerOptions
- type Version
Constants ¶
const ( // CommunicationError is the type of the error returned by a call when some // communication error is encountered, typically a process or network // error. Check for it via errors.Is(call.CommunicationError). CommunicationError transportError = iota // Unreachable is the type of the error returned by a call when every // server is unreachable. Check for it via errors.Is(call.Unreachable). Unreachable )
Variables ¶
var Missing = Version{"__tombstone__"}
Missing is the version associated with a value that does not exist in the store.
TODO(rgrandl): this should be the same as the one in gke/internal/store/store.go. Should we move the version into a common file? Right now we have a duplicate version struct that is used both by the gke/store and stub/resolver.
Functions ¶
func Serve ¶
func Serve(ctx context.Context, l net.Listener, hmap *HandlerMap, opts ServerOptions) error
Serve starts listening for connections and requests on l. Handlers to handle incoming requests are found in hmap.
func ServeOn ¶
func ServeOn(ctx context.Context, conn net.Conn, hmap *HandlerMap, opts ServerOptions)
ServeOn serves client requests received over an already established network connection with a client. This can be useful in tests or when using custom networking transports.
Types ¶
type Balancer ¶
type Balancer interface { // Update updates the current set of endpoints from which the Balancer can // pick. Before Update is called for the first time, the set of endpoints // is empty. Update(endpoints []Endpoint) // Pick picks an endpoint. Pick is guaranteed to return an endpoint that // was passed to the most recent call of Update. If there are no endpoints, // then Pick returns an error that includes Unreachable. Pick(CallOptions) (Endpoint, error) }
A Balancer picks the endpoint to which which an RPC client performs a call. A Balancer should only be used by a single goroutine.
TODO(mwhittaker): Right now, balancers have no load information about endpoints. In the short term, we can at least add information about the number of pending requests for every endpoint.
TODO(mwhittaker): Right now, we pass a balancer the set of all endpoints. We instead probably want to pass it only the endpoints for which we have a connection. This means we may have to form connections more eagerly.
TODO(mwhittaker): We may want to guarantee that Update() is never called with an empty list of addresses. If we don't have addresses, then we don't need to do balancing.
func BalancerFunc ¶
func BalancerFunc(pick func([]Endpoint, CallOptions) (Endpoint, error)) Balancer
BalancerFunc returns a stateless, purely functional load balancer that uses the provided picking function.
type CallOptions ¶
type CallOptions struct { // ShardKey, if not 0, is the shard key that a Balancer can use to route a // call. A Balancer can always choose to ignore the ShardKey. // // TODO(mwhittaker): Figure out a way to have 0 be a valid shard key. Could // change to *uint64 for example. ShardKey uint64 // Balancer, if not nil, is the Balancer to use for a call, instead of the // Balancer that the client was constructed with (provided in // ClientOptions). Balancer Balancer }
CallOptions are call-specific options.
type ClientOptions ¶
type ClientOptions struct { // Load balancer. Defaults to RoundRobin() if nil. Balancer Balancer // Logger. Defaults to a logger that logs to stderr. Logger logtype.Logger // If non-zero, each call will optimistically spin for a given duration // before blocking, waiting for the results. OptimisticSpinDuration time.Duration // If non-zero, all writes smaller than this limit are flattened into // a single buffer before being written on the connection. WriteFlattenLimit int }
ClientOptions are the options to configure an RPC client.
type Connection ¶
type Connection interface { // Call makes an RPC over a Connection. Call(context.Context, MethodKey, []byte, CallOptions) ([]byte, error) // Close closes a connection. Pending invocations of Call are cancelled and // return an error. All future invocations of Call fail and return an error // immediately. Close can be called more than once. Close() }
Connection allows a client to send RPCs.
func Connect ¶
func Connect(ctx context.Context, resolver Resolver, opts ClientOptions) (Connection, error)
Connect creates a connection to the servers at the endpoints returned by the resolver.
type Endpoint ¶
type Endpoint interface { // Dial returns an network connection to the endpoint. Dial(ctx context.Context) (net.Conn, error) // Address returns the address of the endpoint. If two endpoints have the // same address, then they are guaranteed to represent the same endpoint. // But, two endpoints with different addresses may also represent the same // endpoint (e.g., TCP("golang.org:http") and TCP("golang.org:80")). Address() string }
An endpoint is a dialable entity with an address. For example, TCP("localhost:8000") returns an endpoint that dials the TCP server running on localhost:8000, and Unix("/tmp/unix.sock") returns an endpoint that dials the Unix socket /tmp/unix.sock.
type Handler ¶
Handler is a function that handles remote procedure calls. Regular application errors should be serialized in the returned bytes. A Handler should only return a non-nil error if the handler was not able to execute successfully.
type HandlerMap ¶
type HandlerMap struct {
// contains filtered or unexported fields
}
HandlerMap is a mapping from MethodID to a Handler. The zero value for a HandlerMap is an empty map.
func (*HandlerMap) Set ¶
func (hm *HandlerMap) Set(component, method string, handler Handler)
Set registers a handler for the specified method of component.
type MethodKey ¶
type MethodKey [16]byte
MethodKey identifies a particular method on a component (formed by fingerprinting the component and method name).
func MakeMethodKey ¶
MakeMethodKey returns the fingerprint for the specified method on component.
type NetEndpoint ¶
type NetEndpoint struct { Net string // e.g., "tcp", "udp", "unix" Addr string // e.g., "localhost:8000", "/tmp/unix.sock" }
NetEndpoint is an Endpoint that implements Dial using net.Dial.
func TCP ¶
func TCP(address string) NetEndpoint
TCP returns a TCP endpoint. The provided address is passed to net.Dial. For example:
TCP("golang.org:http") TCP("192.0.2.1:http") TCP("198.51.100.1:80") TCP("[2001:db8::1]:domain") TCP("[fe80::1%lo0]:53") TCP(":80")
func Unix ¶
func Unix(filename string) NetEndpoint
Unix returns an endpoint that uses Unix sockets. The provided filename is passed to net.Dial. For example:
Unix("unix.sock") Unix("/tmp/unix.sock")
func (NetEndpoint) Address ¶
func (ne NetEndpoint) Address() string
Address implements the Endpoint interface.
type NetworkAddress ¶
type NetworkAddress string
A NetworkAddress is a string of the form <network>://<address> (e.g., "tcp://localhost:8000", "unix:///tmp/unix.sock").
func (NetworkAddress) Split ¶
func (na NetworkAddress) Split() (network string, address string, err error)
Split splits the network and address from a NetworkAddress. For example,
NetworkAddress("tcp://localhost:80").Split() // "tcp", "localhost:80" NetworkAddress("unix://unix.sock").Split() // "unix", "unix.sock"
type Resolver ¶
type Resolver interface { // IsConstant returns whether a resolver is constant. A constant resolver // returns a fixed set of endpoints that doesn't change over time. A // non-constant resolver manages a set of endpoints that does change over // time. IsConstant() bool // Resolve returns a resolver's set of dialable endpoints. For non-constant // resolvers, this set of endpoints may change over time. Every snapshot of // the set of endpoints is assigned a unique version. If you call the // Resolve method with a nil version, Resolve returns the current set of // endpoints and its version. If you call the Resolve method with a non-nil // version, then a Resolver either: // 1. Blocks until the latest set of endpoints has a version newer than // the one provided, returning the new set of endpoints and a new // version. // 2. Returns the same version, indicating that the Resolve should // be called again after an appropriate delay. // // Example: // if !resolver.IsConstant() { // // Perform an unversioned, non-blocking Resolve to get the the // // latest set of endpoints and its version. // endpoints, version, err := resolver.Resolve(ctx, nil) // // // Perform a versioned Resolve that either (1) blocks until a set // // of endpoints exists with a version newer than `version`, or // // (2) returns `version`, indicating that the Resolve should be // // called again after an appropriate delay. // newEndpoints, newVersion, err := resolver.Resolve(ctx, version) // } // // If the resolver is constant, then Resolve only needs to be called once // with a nil version. The returned set of endpoints will never change, and // the returned version is nil. // // if resolver.IsConstant() { // // endpoints1 == endpoints2, and version1 == version2 == nil. // endpoints1, version1, err := resolver.Resolve(ctx, nil) // endpoints2, version2, err := resolver.Resolve(ctx, nil) // } Resolve(ctx context.Context, version *Version) ([]Endpoint, *Version, error) }
A Resolver manages a potentially changing set of endpoints. For example:
- A DNS resolver might resolve a hostname like "google.com" into a set of IP addresses like ["74.125.142.106", "74.125.142.103", "74.125.142.99"].
- A Kubernetes Service resolver might resolve a service name like "backend" into the IP addresses of the pods that implement the service.
- A unix resolver might resolve a directory name like "/tmp/workers" into the set of unix socket files within the directory.
A Resolver can safely be used concurrently from multiple goroutines.
Example usage:
func printAddrs(ctx context.Context, resolver Resolver) error { var version *Version for ctx.Err() == nil { endpoints, newVersion, err = resolver.Resolve(ctx, version) if err != nil { return err } version = newVersion for _, endpoint := range endpoints { fmt.Println(endpoint.Address()) } if resolver.IsConstant() { return nil } } return ctx.Err() }
func NewConstantResolver ¶
NewConstantResolver returns a new resolver that returns the provided set of endpoints.
func NewFileResolver ¶
NewFileResolver returns a new resolver that returns a given set of endpoints when a given file is created.
type ServerOptions ¶
type ServerOptions struct { // Logger. Defaults to a logger that logs to stderr. Logger logtype.Logger // Tracer. Defaults to a discarding tracer. Tracer trace.Tracer // If non-zero, calls on the server are inlined and a new goroutine is // launched only if the call takes longer than the provided duration. InlineHandlerDuration time.Duration // If non-zero, all writes smaller than this limit are flattened into // a single buffer before being written on the connection. WriteFlattenLimit int }
ServerOption are the options to configure an RPC server.
type Version ¶
type Version struct {
Opaque string
}
Version is the version associated with a resolver's set of endpoints. Versions are opaque entities and should not be inspected or interpreted. Versions should only ever be constructed by calling a resolver's Resolve method and should only ever be used by being passed to the same resolver's Resolve method.