grpcutil

package
v1.20210204.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2021 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package grpcutil provides helpers for writing GRPC clients and servers.

Index

Constants

View Source
const (
	MetaTagAuthority   = "authority"
	MetaTagContentType = "content-type"
	MetaTagUserAgent   = "user-agent"
)

MetaTags These are common tags found in the metadata for rpc calls, both unary and streaming.

View Source
const (
	StreamMessageDirectionReceive = "recv"
	StreamMessageDirectionSend    = "send"
)

constants

View Source
const (
	EngineGRPC = "grpc"
)

Our default engine

View Source
const (
	FlagRPC = "rpc"
)

Logger flags

View Source
const (
	FlagRPCStreamMessage = "rpc.stream.message"
)

Logger flags

View Source
const (
	MetadataKeyAttempt = "x-retry-attempty"
)

Metadata Keys

Variables

View Source
var (
	// DefaultRetriableCodes is a set of well known types gRPC codes that should be retri-able.
	//
	// `ResourceExhausted` means that the user quota, e.g. per-RPC limits, have been reached.
	// `Unavailable` means that system is currently unavailable and the client should retry again.
	DefaultRetriableCodes = []codes.Code{codes.ResourceExhausted, codes.Unavailable}
)

Functions

func CreateListener

func CreateListener(bindAddr string) (net.Listener, error)

CreateListener creates a net listener for a given bind address. It handles detecting if we should create a unix socket address.

func DialAddress

func DialAddress(addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error)

DialAddress dials an address with a given set of dial options. It resolves how to dial unix sockets if the address is prefixed with `unix://`.

func ExponentBase2

func ExponentBase2(a uint) uint

ExponentBase2 computes 2^(a-1) where a >= 1. If a is 0, the result is 0.

func GetClientCommonName

func GetClientCommonName(ctx context.Context) (clientCommonName string)

GetClientCommonName fetches the client common name from the context.

func GetMethod

func GetMethod(ctx context.Context) string

GetMethod returns the rpc method from the context.

func JitterUp

func JitterUp(duration time.Duration, jitter float64) time.Duration

JitterUp adds random jitter to the duration.

This adds or subtracts time from the duration within a given jitter fraction. For example for 10s and jitter 0.1, it will return a time within [9s, 11s])

func LoggedClientStream

func LoggedClientStream(log logger.Triggerable) grpc.StreamClientInterceptor

LoggedClientStream returns a stream server interceptor.

func LoggedClientUnary

func LoggedClientUnary(log logger.Triggerable) grpc.UnaryClientInterceptor

LoggedClientUnary returns a unary client interceptor.

func LoggedServerStream

func LoggedServerStream(log logger.Triggerable) grpc.StreamServerInterceptor

LoggedServerStream returns a stream server interceptor.

func LoggedServerUnary

func LoggedServerUnary(log logger.Triggerable) grpc.UnaryServerInterceptor

LoggedServerUnary returns a unary server interceptor.

func MetaValue

func MetaValue(md metadata.MD, key string) string

MetaValue returns a value from a metadata set.

func NewRPCEventFilter

func NewRPCEventFilter(filter func(context.Context, RPCEvent) (RPCEvent, bool)) logger.Filter

NewRPCEventFilter returns a new rpc event filter.

func NewRPCEventListener

func NewRPCEventListener(listener func(context.Context, RPCEvent)) logger.Listener

NewRPCEventListener returns a new web request event listener.

func NewRPCStreamMessageEventFilter added in v1.20210103.1

func NewRPCStreamMessageEventFilter(filter func(context.Context, RPCStreamMessageEvent) (RPCStreamMessageEvent, bool)) logger.Filter

NewRPCStreamMessageEventFilter returns a new rpc stream message event filter.

func NewRPCStreamMessageEventListener added in v1.20210103.1

func NewRPCStreamMessageEventListener(listener func(context.Context, RPCStreamMessageEvent)) logger.Listener

NewRPCStreamMessageEventListener returns a new rpc stream message event event listener.

func RecoverServerStream

func RecoverServerStream(opts ...ServerRecoveryOption) grpc.StreamServerInterceptor

RecoverServerStream returns a new streaming server interceptor for panic recovery.

func RecoverServerUnary

func RecoverServerUnary(opts ...ServerRecoveryOption) grpc.UnaryServerInterceptor

RecoverServerUnary returns a new unary server interceptor for panic recovery.

func RetryStreamClientInterceptor

func RetryStreamClientInterceptor(optFuncs ...CallOption) grpc.StreamClientInterceptor

RetryStreamClientInterceptor returns a new retrying stream client interceptor for server side streaming calls.

The default configuration of the interceptor is to not retry *at all*. This behaviour can be changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).

Retry logic is available *only for ServerStreams*, i.e. 1:n streams, as the internal logic needs to buffer the messages sent by the client. If retry is enabled on any other streams (ClientStreams, BidiStreams), the retry interceptor will fail the call.

func RetryUnaryClientInterceptor

func RetryUnaryClientInterceptor(optFuncs ...CallOption) grpc.UnaryClientInterceptor

RetryUnaryClientInterceptor returns a new retrying unary client interceptor.

The default configuration of the interceptor is to not retry *at all*. This behaviour can be changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).

func StreamClientChain

func StreamClientChain(interceptors ...grpc.StreamClientInterceptor) grpc.StreamClientInterceptor

StreamClientChain creates a single interceptor out of a chain of many interceptors.

Execution is done in left-to-right order, including passing of context. For example ChainStreamClient(one, two, three) will execute one before two before three.

func StreamServerChain

func StreamServerChain(interceptors ...grpc.StreamServerInterceptor) grpc.StreamServerInterceptor

StreamServerChain reads the middleware variadic args and organizes the calls recursively in the order they appear.

func TracedClientStream

func TracedClientStream(tracer ClientTracer) grpc.StreamClientInterceptor

TracedClientStream implements the stream client interceptor based on a tracer.

func TracedClientUnary

func TracedClientUnary(tracer ClientTracer) grpc.UnaryClientInterceptor

TracedClientUnary implements the unary client interceptor based on a tracer.

func TracedServerStream

func TracedServerStream(tracer ServerTracer) grpc.StreamServerInterceptor

TracedServerStream returns a grpc streaming interceptor.

func TracedServerUnary

func TracedServerUnary(tracer ServerTracer) grpc.UnaryServerInterceptor

TracedServerUnary returns a unary server interceptor.

func UnaryClientChain

func UnaryClientChain(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor

UnaryClientChain creates a single interceptor out of a chain of many interceptors.

Execution is done in left-to-right order, including passing of context. For example ChainUnaryClient(one, two, three) will execute one before two before three.

func UnaryServerChain

func UnaryServerChain(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor

UnaryServerChain reads the middleware variadic args and organizes the calls recursively in the order they appear.

func WithClientCommonName

func WithClientCommonName(ctx context.Context, commonName string) context.Context

WithClientCommonName adds a client common name to a context as a value. This value will supercede parsing the value.

func WithMethod

func WithMethod(ctx context.Context, fullMethod string) context.Context

WithMethod adds a method to a context as a value.

Types

type BackoffFunc

type BackoffFunc func(attempt uint) time.Duration

BackoffFunc denotes a family of functions that control the backoff duration between call retries.

They are called with an identifier of the attempt, and should return a time the system client should hold off for. If the time returned is longer than the `context.Context.Deadline` of the request the deadline of the request takes precedence and the wait will be interrupted before proceeding with the next iteration.

func BackoffExponential

func BackoffExponential(scalar time.Duration) BackoffFunc

BackoffExponential produces increasing intervals for each attempt.

The scalar is multiplied times 2 raised to the current attempt. So the first retry with a scalar of 100ms is 100ms, while the 5th attempt would be 1.6s.

func BackoffExponentialWithJitter

func BackoffExponentialWithJitter(scalar time.Duration, jitterFraction float64) BackoffFunc

BackoffExponentialWithJitter creates an exponential backoff like BackoffExponential does, but adds jitter.

func BackoffLinear

func BackoffLinear(waitBetween time.Duration) BackoffFunc

BackoffLinear is very simple: it waits for a fixed period of time between calls.

func BackoffLinearWithJitter

func BackoffLinearWithJitter(waitBetween time.Duration, jitterFraction float64) BackoffFunc

BackoffLinearWithJitter waits a set period of time, allowing for jitter (fractional adjustment).

For example waitBetween=1s and jitter=0.10 can generate waits between 900ms and 1100ms.

type BackoffFuncContext

type BackoffFuncContext func(ctx context.Context, attempt uint) time.Duration

BackoffFuncContext denotes a family of functions that control the backoff duration between call retries.

They are called with an identifier of the attempt, and should return a time the system client should hold off for. If the time returned is longer than the `context.Context.Deadline` of the request the deadline of the request takes precedence and the wait will be interrupted before proceeding with the next iteration. The context can be used to extract request scoped metadata and context values.

type CallOption

type CallOption struct {
	grpc.EmptyCallOption // make sure we implement private after() and before() fields so we don't panic.
	// contains filtered or unexported fields
}

CallOption is a grpc.CallOption that is local to grpc_retry.

func WithClientRetries

func WithClientRetries(maxRetries uint) CallOption

WithClientRetries sets the maximum number of retries on this call, or this interceptor.

func WithClientRetryBackoffContext

func WithClientRetryBackoffContext(bf BackoffFuncContext) CallOption

WithClientRetryBackoffContext sets the `BackoffFuncContext` used to control time between retries.

func WithClientRetryBackoffFunc

func WithClientRetryBackoffFunc(bf BackoffFunc) CallOption

WithClientRetryBackoffFunc sets the `ClientRetryBackoffFunc` used to control time between retries.

func WithClientRetryBackoffLinear

func WithClientRetryBackoffLinear(d time.Duration) CallOption

WithClientRetryBackoffLinear sets the retry backoff to a fixed duration.

func WithClientRetryCodes

func WithClientRetryCodes(retryCodes ...codes.Code) CallOption

WithClientRetryCodes sets which codes should be retried.

Please *use with care*, as you may be retrying non-idempotent calls.

You cannot automatically retry on Cancelled and Deadline, please use `WithPerRetryTimeout` for these.

func WithClientRetryLog

func WithClientRetryLog(log logger.Log) CallOption

WithClientRetryLog sets the logger for the call.

func WithClientRetryPerRetryTimeout

func WithClientRetryPerRetryTimeout(timeout time.Duration) CallOption

WithClientRetryPerRetryTimeout sets the RPC timeout per call (including initial call) on this call, or this interceptor.

The context.Deadline of the call takes precedence and sets the maximum time the whole invocation will take, but WithPerRetryTimeout can be used to limit the RPC time per each call.

For example, with context.Deadline = now + 10s, and WithPerRetryTimeout(3 * time.Seconds), each of the retry calls (including the initial one) will have a deadline of now + 3s.

A value of 0 disables the timeout overrides completely and returns to each retry call using the parent `context.Deadline`.

Note that when this is enabled, any DeadlineExceeded errors that are propagated up will be retried.

func WithRetriesDisabled

func WithRetriesDisabled() CallOption

WithRetriesDisabled disables the retry behaviour on this call, or this interceptor.

Its semantically the same to `WithMax`

type ClientTracer

type ClientTracer interface {
	StartClientUnary(ctx context.Context, remoteAddr, method string) (context.Context, TraceFinisher, error)
	StartClientStream(ctx context.Context, remoteAddr, method string) (context.Context, TraceFinisher, error)
}

ClientTracer is a type that starts traces.

type Graceful

type Graceful struct {
	*async.Latch
	Log      logger.Log
	Listener net.Listener
	Server   *grpc.Server
}

Graceful is a shim for graceful hosting grpc servers.

func NewGraceful

func NewGraceful(listener net.Listener, server *grpc.Server) *Graceful

NewGraceful returns a new graceful host for a grpc server.

func (*Graceful) IsRunning

func (gz *Graceful) IsRunning() bool

IsRunning returns if the server is running.

func (*Graceful) NotifyStarted

func (gz *Graceful) NotifyStarted() <-chan struct{}

NotifyStarted returns the notify started signal.

func (*Graceful) NotifyStopped

func (gz *Graceful) NotifyStopped() <-chan struct{}

NotifyStopped returns the notify stopped signal.

func (*Graceful) Start

func (gz *Graceful) Start() error

Start starts the server.

func (*Graceful) Stop

func (gz *Graceful) Stop() error

Stop shuts the server down.

func (*Graceful) WithLogger

func (gz *Graceful) WithLogger(log logger.Log) *Graceful

WithLogger sets the logger.

type RPCEvent

type RPCEvent struct {
	Engine      string
	Peer        string
	Method      string
	UserAgent   string
	Authority   string
	ContentType string
	Elapsed     time.Duration
	Err         error
}

RPCEvent is an event type for rpc

func NewRPCEvent

func NewRPCEvent(method string, elapsed time.Duration, options ...RPCEventOption) RPCEvent

NewRPCEvent creates a new rpc event.

func (RPCEvent) Decompose

func (e RPCEvent) Decompose() map[string]interface{}

Decompose implements JSONWritable.

func (RPCEvent) GetFlag

func (e RPCEvent) GetFlag() string

GetFlag implements Event.

func (RPCEvent) WriteText

func (e RPCEvent) WriteText(tf logger.TextFormatter, wr io.Writer)

WriteText implements TextWritable.

type RPCEventOption

type RPCEventOption func(*RPCEvent)

RPCEventOption is a mutator for RPCEvents.

func OptRPCAuthority

func OptRPCAuthority(value string) RPCEventOption

OptRPCAuthority sets a field on the event.

func OptRPCContentType

func OptRPCContentType(value string) RPCEventOption

OptRPCContentType sets a field on the event.

func OptRPCElapsed

func OptRPCElapsed(value time.Duration) RPCEventOption

OptRPCElapsed sets a field on the event.

func OptRPCEngine

func OptRPCEngine(value string) RPCEventOption

OptRPCEngine sets a field on the event.

func OptRPCErr

func OptRPCErr(value error) RPCEventOption

OptRPCErr sets a field on the event.

func OptRPCMethod

func OptRPCMethod(value string) RPCEventOption

OptRPCMethod sets a field on the event.

func OptRPCPeer

func OptRPCPeer(value string) RPCEventOption

OptRPCPeer sets a field on the event.

func OptRPCUserAgent

func OptRPCUserAgent(value string) RPCEventOption

OptRPCUserAgent sets a field on the event.

type RPCStreamMessageEvent added in v1.20210103.1

type RPCStreamMessageEvent struct {
	RPCEvent
	Direction StreamMessageDirection
}

RPCStreamMessageEvent is an event type for rpc

func NewRPCStreamMessageEvent added in v1.20210103.1

func NewRPCStreamMessageEvent(method string, direction StreamMessageDirection, elapsed time.Duration, options ...RPCStreamMessageEventOption) RPCStreamMessageEvent

NewRPCStreamMessageEvent creates a new rpc stream message event.

func (RPCStreamMessageEvent) Decompose added in v1.20210103.1

func (e RPCStreamMessageEvent) Decompose() map[string]interface{}

Decompose implements JSONWritable.

func (RPCStreamMessageEvent) GetFlag added in v1.20210103.1

func (e RPCStreamMessageEvent) GetFlag() string

GetFlag implements Event.

func (RPCStreamMessageEvent) WriteText added in v1.20210103.1

func (e RPCStreamMessageEvent) WriteText(tf logger.TextFormatter, wr io.Writer)

WriteText implements TextWritable.

type RPCStreamMessageEventOption added in v1.20210103.1

type RPCStreamMessageEventOption func(*RPCStreamMessageEvent)

RPCStreamMessageEventOption is a mutator for RPCEvents.

func OptRPCStreamMessageAuthority added in v1.20210103.1

func OptRPCStreamMessageAuthority(value string) RPCStreamMessageEventOption

OptRPCStreamMessageAuthority sets a field on the event.

func OptRPCStreamMessageContentType added in v1.20210103.1

func OptRPCStreamMessageContentType(value string) RPCStreamMessageEventOption

OptRPCStreamMessageContentType sets a field on the event.

func OptRPCStreamMessageDirection added in v1.20210103.1

func OptRPCStreamMessageDirection(value StreamMessageDirection) RPCStreamMessageEventOption

OptRPCStreamMessageDirection sets a field on the event.

func OptRPCStreamMessageElapsed added in v1.20210103.1

func OptRPCStreamMessageElapsed(value time.Duration) RPCStreamMessageEventOption

OptRPCStreamMessageElapsed sets a field on the event.

func OptRPCStreamMessageEngine added in v1.20210103.1

func OptRPCStreamMessageEngine(value string) RPCStreamMessageEventOption

OptRPCStreamMessageEngine sets a field on the event.

func OptRPCStreamMessageErr added in v1.20210103.1

func OptRPCStreamMessageErr(value error) RPCStreamMessageEventOption

OptRPCStreamMessageErr sets a field on the event.

func OptRPCStreamMessageMethod added in v1.20210103.1

func OptRPCStreamMessageMethod(value string) RPCStreamMessageEventOption

OptRPCStreamMessageMethod sets a field on the event.

func OptRPCStreamMessagePeer added in v1.20210103.1

func OptRPCStreamMessagePeer(value string) RPCStreamMessageEventOption

OptRPCStreamMessagePeer sets a field on the event.

func OptRPCStreamMessageUserAgent added in v1.20210103.1

func OptRPCStreamMessageUserAgent(value string) RPCStreamMessageEventOption

OptRPCStreamMessageUserAgent sets a field on the event.

type RecoveryHandlerFunc

type RecoveryHandlerFunc func(p interface{}) (err error)

RecoveryHandlerFunc is a function that recovers from the panic `p` by returning an `error`.

func LoggedRecoveryHandler

func LoggedRecoveryHandler(log logger.Log) RecoveryHandlerFunc

LoggedRecoveryHandler is a recovery handler shim.

type ServerRecoveryOption

type ServerRecoveryOption func(*serverRecoveryOptions)

ServerRecoveryOption is a type that provides a recovery option.

func WithServerRecoveryHandler

func WithServerRecoveryHandler(f RecoveryHandlerFunc) ServerRecoveryOption

WithServerRecoveryHandler customizes the function for recovering from a panic.

type ServerTracer

type ServerTracer interface {
	StartServerUnary(ctx context.Context, method string) (context.Context, TraceFinisher, error)
	StartServerStream(ctx context.Context, method string) (context.Context, TraceFinisher, error)
}

ServerTracer is a type that starts traces.

type StreamMessageDirection added in v1.20210103.1

type StreamMessageDirection string

StreamMessageDirection is the direction the message was sent.

type TraceFinisher

type TraceFinisher interface {
	Finish(err error)
}

TraceFinisher is a finisher for traces

type Tracer

type Tracer interface {
	ServerTracer
	ClientTracer
}

Tracer is the full tracer.

Directories

Path Synopsis
v1

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL