Documentation ¶
Overview ¶
Package grpcutil provides helpers for writing GRPC clients and servers.
Index ¶
- Constants
- Variables
- func CheckConnectivityState(client ConnectionStateProvider, opts ...RetryCheckConnectivityStateOption) async.Checker
- func CreateListener(bindAddr string) (net.Listener, error)
- func ExponentBase2(a uint) uint
- func GetClientCommonName(ctx context.Context) (clientCommonName string)
- func GetMethod(ctx context.Context) string
- func JitterUp(duration time.Duration, jitter float64) time.Duration
- func LoggedClientStream(log logger.Triggerable) grpc.StreamClientInterceptor
- func LoggedClientUnary(log logger.Triggerable) grpc.UnaryClientInterceptor
- func LoggedServerStream(log logger.Triggerable) grpc.StreamServerInterceptor
- func LoggedServerUnary(log logger.Triggerable) grpc.UnaryServerInterceptor
- func MetaValue(md metadata.MD, key string) string
- func NewRPCEventFilter(filter func(context.Context, RPCEvent) (RPCEvent, bool)) logger.Filter
- func NewRPCEventListener(listener func(context.Context, RPCEvent)) logger.Listener
- func NewRPCStreamMessageEventFilter(...) logger.Filter
- func NewRPCStreamMessageEventListener(listener func(context.Context, RPCStreamMessageEvent)) logger.Listener
- func RecoverServerStream(opts ...ServerRecoveryOption) grpc.StreamServerInterceptor
- func RecoverServerUnary(opts ...ServerRecoveryOption) grpc.UnaryServerInterceptor
- func RetryCheckConnectivityState(ctx context.Context, client ConnectionStateProvider, ...) (state connectivity.State, err error)
- func RetryStreamClientInterceptor(optFuncs ...CallOption) grpc.StreamClientInterceptor
- func RetryUnaryClientInterceptor(optFuncs ...CallOption) grpc.UnaryClientInterceptor
- func StreamClientChain(interceptors ...grpc.StreamClientInterceptor) grpc.StreamClientInterceptor
- func StreamServerChain(interceptors ...grpc.StreamServerInterceptor) grpc.StreamServerInterceptor
- func TracedClientStream(tracer ClientTracer) grpc.StreamClientInterceptor
- func TracedClientUnary(tracer ClientTracer) grpc.UnaryClientInterceptor
- func TracedServerStream(tracer ServerTracer) grpc.StreamServerInterceptor
- func TracedServerUnary(tracer ServerTracer) grpc.UnaryServerInterceptor
- func UnaryClientChain(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor
- func UnaryClientTimeout(timeout time.Duration) grpc.UnaryClientInterceptor
- func UnaryServerChain(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor
- func WithClientCommonName(ctx context.Context, commonName string) context.Context
- func WithMethod(ctx context.Context, fullMethod string) context.Context
- type BackoffFunc
- type BackoffFuncContext
- type CallOption
- func WithClientRetries(maxRetries uint) CallOption
- func WithClientRetryBackoffContext(bf BackoffFuncContext) CallOption
- func WithClientRetryBackoffFunc(bf BackoffFunc) CallOption
- func WithClientRetryBackoffLinear(d time.Duration) CallOption
- func WithClientRetryCodes(retryCodes ...codes.Code) CallOption
- func WithClientRetryPerRetryTimeout(timeout time.Duration) CallOption
- func WithRetriesDisabled() CallOption
- type ClientTracer
- type ConnectionStateProvider
- type Graceful
- type RPCEvent
- type RPCEventOption
- func OptRPCAuthority(value string) RPCEventOption
- func OptRPCContentType(value string) RPCEventOption
- func OptRPCElapsed(value time.Duration) RPCEventOption
- func OptRPCEngine(value string) RPCEventOption
- func OptRPCErr(value error) RPCEventOption
- func OptRPCMethod(value string) RPCEventOption
- func OptRPCPeer(value string) RPCEventOption
- func OptRPCUserAgent(value string) RPCEventOption
- type RPCStreamMessageEvent
- type RPCStreamMessageEventOption
- func OptRPCStreamMessageAuthority(value string) RPCStreamMessageEventOption
- func OptRPCStreamMessageContentType(value string) RPCStreamMessageEventOption
- func OptRPCStreamMessageDirection(value StreamMessageDirection) RPCStreamMessageEventOption
- func OptRPCStreamMessageElapsed(value time.Duration) RPCStreamMessageEventOption
- func OptRPCStreamMessageEngine(value string) RPCStreamMessageEventOption
- func OptRPCStreamMessageErr(value error) RPCStreamMessageEventOption
- func OptRPCStreamMessageMethod(value string) RPCStreamMessageEventOption
- func OptRPCStreamMessagePeer(value string) RPCStreamMessageEventOption
- func OptRPCStreamMessageUserAgent(value string) RPCStreamMessageEventOption
- type RecoveryHandlerFunc
- type RetryCheckConnectivityStateOption
- type RetryCheckConnectivityStateOptions
- type ServerRecoveryOption
- type ServerTracer
- type StreamMessageDirection
- type TraceFinisher
- type Tracer
Constants ¶
const ( MetaTagAuthority = "authority" MetaTagContentType = "content-type" MetaTagUserAgent = "user-agent" )
MetaTags These are common tags found in the metadata for rpc calls, both unary and streaming.
const ( StreamMessageDirectionReceive = "recv" StreamMessageDirectionSend = "send" )
constants
const (
EngineGRPC = "grpc"
)
Our default engine
const ErrConnectionNotReady ex.Class = "grpc connection not ready"
ErrConnectionNotReady is returned by ConnectivityStateChecker.
const (
FlagRPC = "rpc"
)
Logger flags
const (
FlagRPCStreamMessage = "rpc.stream.message"
)
Logger flags
const (
MetadataKeyAttempt = "x-retry-attempty"
)
Metadata Keys
Variables ¶
var ( // DefaultRetriableCodes is a set of well known types gRPC codes that should be retri-able. // // `ResourceExhausted` means that the user quota, e.g. per-RPC limits, have been reached. // `Unavailable` means that system is currently unavailable and the client should retry again. DefaultRetriableCodes = []codes.Code{codes.ResourceExhausted, codes.Unavailable} )
Functions ¶
func CheckConnectivityState ¶ added in v1.20210615.7
func CheckConnectivityState(client ConnectionStateProvider, opts ...RetryCheckConnectivityStateOption) async.Checker
CheckConnectivityState returns an async checker for a client that provides a connection state.
func CreateListener ¶
CreateListener creates a net listener for a given bind address. It handles detecting if we should create a unix socket address.
func ExponentBase2 ¶
ExponentBase2 computes 2^(a-1) where a >= 1. If a is 0, the result is 0.
func GetClientCommonName ¶
GetClientCommonName fetches the client common name from the context.
func JitterUp ¶
JitterUp adds random jitter to the duration.
This adds or subtracts time from the duration within a given jitter fraction. For example for 10s and jitter 0.1, it will return a time within [9s, 11s])
func LoggedClientStream ¶
func LoggedClientStream(log logger.Triggerable) grpc.StreamClientInterceptor
LoggedClientStream returns a stream server interceptor.
func LoggedClientUnary ¶
func LoggedClientUnary(log logger.Triggerable) grpc.UnaryClientInterceptor
LoggedClientUnary returns a unary client interceptor.
func LoggedServerStream ¶
func LoggedServerStream(log logger.Triggerable) grpc.StreamServerInterceptor
LoggedServerStream returns a stream server interceptor.
func LoggedServerUnary ¶
func LoggedServerUnary(log logger.Triggerable) grpc.UnaryServerInterceptor
LoggedServerUnary returns a unary server interceptor.
func NewRPCEventFilter ¶
NewRPCEventFilter returns a new rpc event filter.
func NewRPCEventListener ¶
NewRPCEventListener returns a new web request event listener.
func NewRPCStreamMessageEventFilter ¶ added in v1.20210103.1
func NewRPCStreamMessageEventFilter(filter func(context.Context, RPCStreamMessageEvent) (RPCStreamMessageEvent, bool)) logger.Filter
NewRPCStreamMessageEventFilter returns a new rpc stream message event filter.
func NewRPCStreamMessageEventListener ¶ added in v1.20210103.1
func NewRPCStreamMessageEventListener(listener func(context.Context, RPCStreamMessageEvent)) logger.Listener
NewRPCStreamMessageEventListener returns a new rpc stream message event event listener.
func RecoverServerStream ¶
func RecoverServerStream(opts ...ServerRecoveryOption) grpc.StreamServerInterceptor
RecoverServerStream returns a new streaming server interceptor for panic recovery.
func RecoverServerUnary ¶
func RecoverServerUnary(opts ...ServerRecoveryOption) grpc.UnaryServerInterceptor
RecoverServerUnary returns a new unary server interceptor for panic recovery.
func RetryCheckConnectivityState ¶ added in v1.20210615.7
func RetryCheckConnectivityState(ctx context.Context, client ConnectionStateProvider, opts ...RetryCheckConnectivityStateOption) (state connectivity.State, err error)
RetryCheckConnectivityState implements a retry checker for connectivity state.
func RetryStreamClientInterceptor ¶
func RetryStreamClientInterceptor(optFuncs ...CallOption) grpc.StreamClientInterceptor
RetryStreamClientInterceptor returns a new retrying stream client interceptor for server side streaming calls.
The default configuration of the interceptor is to not retry *at all*. This behavior can be changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).
Retry logic is available *only for ServerStreams*, i.e. 1:n streams, as the internal logic needs to buffer the messages sent by the client. If retry is enabled on any other streams (ClientStreams, BidiStreams), the retry interceptor will fail the call.
func RetryUnaryClientInterceptor ¶
func RetryUnaryClientInterceptor(optFuncs ...CallOption) grpc.UnaryClientInterceptor
RetryUnaryClientInterceptor returns a new retrying unary client interceptor.
The default configuration of the interceptor is to not retry *at all*. This behavior can be changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).
func StreamClientChain ¶
func StreamClientChain(interceptors ...grpc.StreamClientInterceptor) grpc.StreamClientInterceptor
StreamClientChain creates a single interceptor out of a chain of many interceptors.
Execution is done in left-to-right order, including passing of context. For example ChainStreamClient(one, two, three) will execute one before two before three.
func StreamServerChain ¶
func StreamServerChain(interceptors ...grpc.StreamServerInterceptor) grpc.StreamServerInterceptor
StreamServerChain reads the middleware variadic args and organizes the calls recursively in the order they appear.
func TracedClientStream ¶
func TracedClientStream(tracer ClientTracer) grpc.StreamClientInterceptor
TracedClientStream implements the stream client interceptor based on a tracer.
func TracedClientUnary ¶
func TracedClientUnary(tracer ClientTracer) grpc.UnaryClientInterceptor
TracedClientUnary implements the unary client interceptor based on a tracer.
func TracedServerStream ¶
func TracedServerStream(tracer ServerTracer) grpc.StreamServerInterceptor
TracedServerStream returns a grpc streaming interceptor.
func TracedServerUnary ¶
func TracedServerUnary(tracer ServerTracer) grpc.UnaryServerInterceptor
TracedServerUnary returns a unary server interceptor.
func UnaryClientChain ¶
func UnaryClientChain(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor
UnaryClientChain creates a single interceptor out of a chain of many interceptors.
Execution is done in left-to-right order, including passing of context. For example ChainUnaryClient(one, two, three) will execute one before two before three.
func UnaryClientTimeout ¶ added in v1.20210215.2
func UnaryClientTimeout(timeout time.Duration) grpc.UnaryClientInterceptor
UnaryClientTimeout returns a unary client interceptor.
func UnaryServerChain ¶
func UnaryServerChain(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor
UnaryServerChain reads the middleware variadic args and organizes the calls recursively in the order they appear.
func WithClientCommonName ¶
WithClientCommonName adds a client common name to a context as a value. This value will supercede parsing the value.
Types ¶
type BackoffFunc ¶
BackoffFunc denotes a family of functions that control the backoff duration between call retries.
They are called with an identifier of the attempt, and should return a time the system client should hold off for. If the time returned is longer than the `context.Context.Deadline` of the request the deadline of the request takes precedence and the wait will be interrupted before proceeding with the next iteration.
func BackoffExponential ¶
func BackoffExponential(scalar time.Duration) BackoffFunc
BackoffExponential produces increasing intervals for each attempt.
The scalar is multiplied times 2 raised to the current attempt. So the first retry with a scalar of 100ms is 100ms, while the 5th attempt would be 1.6s.
func BackoffExponentialWithJitter ¶
func BackoffExponentialWithJitter(scalar time.Duration, jitterFraction float64) BackoffFunc
BackoffExponentialWithJitter creates an exponential backoff like BackoffExponential does, but adds jitter.
func BackoffLinear ¶
func BackoffLinear(waitBetween time.Duration) BackoffFunc
BackoffLinear is very simple: it waits for a fixed period of time between calls.
func BackoffLinearWithJitter ¶
func BackoffLinearWithJitter(waitBetween time.Duration, jitterFraction float64) BackoffFunc
BackoffLinearWithJitter waits a set period of time, allowing for jitter (fractional adjustment).
For example waitBetween=1s and jitter=0.10 can generate waits between 900ms and 1100ms.
type BackoffFuncContext ¶
BackoffFuncContext denotes a family of functions that control the backoff duration between call retries.
They are called with an identifier of the attempt, and should return a time the system client should hold off for. If the time returned is longer than the `context.Context.Deadline` of the request the deadline of the request takes precedence and the wait will be interrupted before proceeding with the next iteration. The context can be used to extract request scoped metadata and context values.
type CallOption ¶
type CallOption struct { grpc.EmptyCallOption // make sure we implement private after() and before() fields so we don't panic. // contains filtered or unexported fields }
CallOption is a grpc.CallOption that is local to grpc_retry.
func WithClientRetries ¶
func WithClientRetries(maxRetries uint) CallOption
WithClientRetries sets the maximum number of retries on this call, or this interceptor.
func WithClientRetryBackoffContext ¶
func WithClientRetryBackoffContext(bf BackoffFuncContext) CallOption
WithClientRetryBackoffContext sets the `BackoffFuncContext` used to control time between retries.
func WithClientRetryBackoffFunc ¶
func WithClientRetryBackoffFunc(bf BackoffFunc) CallOption
WithClientRetryBackoffFunc sets the `ClientRetryBackoffFunc` used to control time between retries.
func WithClientRetryBackoffLinear ¶
func WithClientRetryBackoffLinear(d time.Duration) CallOption
WithClientRetryBackoffLinear sets the retry backoff to a fixed duration.
func WithClientRetryCodes ¶
func WithClientRetryCodes(retryCodes ...codes.Code) CallOption
WithClientRetryCodes sets which codes should be retried.
Please *use with care*, as you may be retrying non-idempotent calls.
You cannot automatically retry on Canceled and Deadline, please use `WithPerRetryTimeout` for these.
func WithClientRetryPerRetryTimeout ¶
func WithClientRetryPerRetryTimeout(timeout time.Duration) CallOption
WithClientRetryPerRetryTimeout sets the RPC timeout per call (including initial call) on this call, or this interceptor.
The context.Deadline of the call takes precedence and sets the maximum time the whole invocation will take, but WithPerRetryTimeout can be used to limit the RPC time per each call.
For example, with context.Deadline = now + 10s, and WithPerRetryTimeout(3 * time.Seconds), each of the retry calls (including the initial one) will have a deadline of now + 3s.
A value of 0 disables the timeout overrides completely and returns to each retry call using the parent `context.Deadline`.
Note that when this is enabled, any DeadlineExceeded errors that are propagated up will be retried.
func WithRetriesDisabled ¶
func WithRetriesDisabled() CallOption
WithRetriesDisabled disables the retry behavior on this call, or this interceptor.
Its semantically the same to `WithMax`
type ClientTracer ¶
type ClientTracer interface { StartClientUnary(ctx context.Context, remoteAddr, method string) (context.Context, TraceFinisher, error) StartClientStream(ctx context.Context, remoteAddr, method string) (context.Context, TraceFinisher, error) }
ClientTracer is a type that starts traces.
type ConnectionStateProvider ¶ added in v1.20210615.7
type ConnectionStateProvider interface {
GetConnectionState() connectivity.State
}
ConnectionStateProvider is a type that provides a connection state.
type Graceful ¶
Graceful is a shim for graceful hosting grpc servers.
func NewGraceful ¶
NewGraceful returns a new graceful host for a grpc server.
func (*Graceful) NotifyStarted ¶
func (gz *Graceful) NotifyStarted() <-chan struct{}
NotifyStarted returns the notify started signal.
func (*Graceful) NotifyStopped ¶
func (gz *Graceful) NotifyStopped() <-chan struct{}
NotifyStopped returns the notify stopped signal.
type RPCEvent ¶
type RPCEvent struct { Engine string Peer string Method string UserAgent string Authority string ContentType string Elapsed time.Duration Err error }
RPCEvent is an event type for rpc
func NewRPCEvent ¶
func NewRPCEvent(method string, elapsed time.Duration, options ...RPCEventOption) RPCEvent
NewRPCEvent creates a new rpc event.
type RPCEventOption ¶
type RPCEventOption func(*RPCEvent)
RPCEventOption is a mutator for RPCEvents.
func OptRPCAuthority ¶
func OptRPCAuthority(value string) RPCEventOption
OptRPCAuthority sets a field on the event.
func OptRPCContentType ¶
func OptRPCContentType(value string) RPCEventOption
OptRPCContentType sets a field on the event.
func OptRPCElapsed ¶
func OptRPCElapsed(value time.Duration) RPCEventOption
OptRPCElapsed sets a field on the event.
func OptRPCEngine ¶
func OptRPCEngine(value string) RPCEventOption
OptRPCEngine sets a field on the event.
func OptRPCMethod ¶
func OptRPCMethod(value string) RPCEventOption
OptRPCMethod sets a field on the event.
func OptRPCPeer ¶
func OptRPCPeer(value string) RPCEventOption
OptRPCPeer sets a field on the event.
func OptRPCUserAgent ¶
func OptRPCUserAgent(value string) RPCEventOption
OptRPCUserAgent sets a field on the event.
type RPCStreamMessageEvent ¶ added in v1.20210103.1
type RPCStreamMessageEvent struct { RPCEvent Direction StreamMessageDirection }
RPCStreamMessageEvent is an event type for rpc
func NewRPCStreamMessageEvent ¶ added in v1.20210103.1
func NewRPCStreamMessageEvent(method string, direction StreamMessageDirection, elapsed time.Duration, options ...RPCStreamMessageEventOption) RPCStreamMessageEvent
NewRPCStreamMessageEvent creates a new rpc stream message event.
func (RPCStreamMessageEvent) Decompose ¶ added in v1.20210103.1
func (e RPCStreamMessageEvent) Decompose() map[string]interface{}
Decompose implements JSONWritable.
func (RPCStreamMessageEvent) GetFlag ¶ added in v1.20210103.1
func (e RPCStreamMessageEvent) GetFlag() string
GetFlag implements Event.
func (RPCStreamMessageEvent) WriteText ¶ added in v1.20210103.1
func (e RPCStreamMessageEvent) WriteText(tf logger.TextFormatter, wr io.Writer)
WriteText implements TextWritable.
type RPCStreamMessageEventOption ¶ added in v1.20210103.1
type RPCStreamMessageEventOption func(*RPCStreamMessageEvent)
RPCStreamMessageEventOption is a mutator for RPCEvents.
func OptRPCStreamMessageAuthority ¶ added in v1.20210103.1
func OptRPCStreamMessageAuthority(value string) RPCStreamMessageEventOption
OptRPCStreamMessageAuthority sets a field on the event.
func OptRPCStreamMessageContentType ¶ added in v1.20210103.1
func OptRPCStreamMessageContentType(value string) RPCStreamMessageEventOption
OptRPCStreamMessageContentType sets a field on the event.
func OptRPCStreamMessageDirection ¶ added in v1.20210103.1
func OptRPCStreamMessageDirection(value StreamMessageDirection) RPCStreamMessageEventOption
OptRPCStreamMessageDirection sets a field on the event.
func OptRPCStreamMessageElapsed ¶ added in v1.20210103.1
func OptRPCStreamMessageElapsed(value time.Duration) RPCStreamMessageEventOption
OptRPCStreamMessageElapsed sets a field on the event.
func OptRPCStreamMessageEngine ¶ added in v1.20210103.1
func OptRPCStreamMessageEngine(value string) RPCStreamMessageEventOption
OptRPCStreamMessageEngine sets a field on the event.
func OptRPCStreamMessageErr ¶ added in v1.20210103.1
func OptRPCStreamMessageErr(value error) RPCStreamMessageEventOption
OptRPCStreamMessageErr sets a field on the event.
func OptRPCStreamMessageMethod ¶ added in v1.20210103.1
func OptRPCStreamMessageMethod(value string) RPCStreamMessageEventOption
OptRPCStreamMessageMethod sets a field on the event.
func OptRPCStreamMessagePeer ¶ added in v1.20210103.1
func OptRPCStreamMessagePeer(value string) RPCStreamMessageEventOption
OptRPCStreamMessagePeer sets a field on the event.
func OptRPCStreamMessageUserAgent ¶ added in v1.20210103.1
func OptRPCStreamMessageUserAgent(value string) RPCStreamMessageEventOption
OptRPCStreamMessageUserAgent sets a field on the event.
type RecoveryHandlerFunc ¶
type RecoveryHandlerFunc func(p interface{}) (err error)
RecoveryHandlerFunc is a function that recovers from the panic `p` by returning an `error`.
func LoggedRecoveryHandler ¶
func LoggedRecoveryHandler(log logger.Log) RecoveryHandlerFunc
LoggedRecoveryHandler is a recovery handler shim.
type RetryCheckConnectivityStateOption ¶ added in v1.20210615.7
type RetryCheckConnectivityStateOption func(*RetryCheckConnectivityStateOptions)
RetryCheckConnectivityStateOption mutates CheckConnectivityStateOptions.
func OptRetryCheckConnectivityStateMaxRetries ¶ added in v1.20210615.7
func OptRetryCheckConnectivityStateMaxRetries(maxRetries uint) RetryCheckConnectivityStateOption
OptRetryCheckConnectivityStateMaxRetries sets the MaxRetries.
func OptRetryCheckConnectivityStateRetryBackoff ¶ added in v1.20210615.7
func OptRetryCheckConnectivityStateRetryBackoff(d time.Duration) RetryCheckConnectivityStateOption
OptRetryCheckConnectivityStateRetryBackoff sets the RetryBackoff.
func OptRetryCheckConnectivityStateRetryTimeout ¶ added in v1.20210615.7
func OptRetryCheckConnectivityStateRetryTimeout(d time.Duration) RetryCheckConnectivityStateOption
OptRetryCheckConnectivityStateRetryTimeout sets the RetryTimeout.
type RetryCheckConnectivityStateOptions ¶ added in v1.20210615.7
type RetryCheckConnectivityStateOptions struct { RetryTimeout time.Duration RetryBackoff time.Duration MaxRetries uint }
RetryCheckConnectivityStateOptions are options for checking the connectivity state.
type ServerRecoveryOption ¶
type ServerRecoveryOption func(*serverRecoveryOptions)
ServerRecoveryOption is a type that provides a recovery option.
func WithServerRecoveryHandler ¶
func WithServerRecoveryHandler(f RecoveryHandlerFunc) ServerRecoveryOption
WithServerRecoveryHandler customizes the function for recovering from a panic.
type ServerTracer ¶
type ServerTracer interface { StartServerUnary(ctx context.Context, method string) (context.Context, TraceFinisher, error) StartServerStream(ctx context.Context, method string) (context.Context, TraceFinisher, error) }
ServerTracer is a type that starts traces.
type StreamMessageDirection ¶ added in v1.20210103.1
type StreamMessageDirection string
StreamMessageDirection is the direction the message was sent.
type TraceFinisher ¶
type TraceFinisher interface {
Finish(err error)
}
TraceFinisher is a finisher for traces
Source Files ¶
- backoff.go
- check_connectivity_state.go
- client_common_name.go
- client_retry.go
- constants.go
- create_listener.go
- doc.go
- graceful.go
- logged.go
- meta_value.go
- method.go
- rpc_event.go
- rpc_stream_message_event.go
- server_recovery.go
- stream_client_chain.go
- stream_server_chain.go
- tracer.go
- unary_client_chain.go
- unary_client_timeout.go
- unary_server_chain.go