client

package
v0.0.0-...-c935d06 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 31, 2024 License: Apache-2.0 Imports: 32 Imported by: 20

Documentation

Overview

Package client is an interface for an RPC client

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultBackoff is the default backoff function for retries.
	DefaultBackoff = exponentialBackoff
	// DefaultRetry is the default check-for-retry function for retries.
	DefaultRetry = RetryOnError
	// DefaultRetries is the default number of times a request is tried.
	DefaultRetries = 5
	// DefaultRequestTimeout is the default request timeout.
	DefaultRequestTimeout = time.Second * 30
	// DefaultConnectionTimeout is the default connection timeout.
	DefaultConnectionTimeout = time.Second * 5
	// DefaultPoolSize sets the connection pool size.
	DefaultPoolSize = 100
	// DefaultPoolTTL sets the connection pool ttl.
	DefaultPoolTTL = time.Minute
	// DefaultPoolCloseTimeout sets the connection pool colse timeout.
	DefaultPoolCloseTimeout = time.Second
)
View Source
var (
	// DefaultContentType header.
	DefaultContentType = "application/json"

	// DefaultCodecs map.
	DefaultCodecs = map[string]codec.NewCodec{
		"application/grpc":         grpc.NewCodec,
		"application/grpc+json":    grpc.NewCodec,
		"application/grpc+proto":   grpc.NewCodec,
		"application/protobuf":     proto.NewCodec,
		"application/json":         json.NewCodec,
		"application/json-rpc":     jsonrpc.NewCodec,
		"application/proto-rpc":    protorpc.NewCodec,
		"application/octet-stream": raw.NewCodec,
	}
)

Functions

func Call

func Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error

Makes a synchronous call to a service using the default client.

func NewContext

func NewContext(ctx context.Context, c Client) context.Context

func Publish

func Publish(ctx context.Context, msg Message, opts ...PublishOption) error

Publishes a publication using the default client. Using the underlying broker set within the options.

func RetryAlways

func RetryAlways(ctx context.Context, req Request, retryCount int, err error) (bool, error)

RetryAlways always retry on error.

func RetryOnError

func RetryOnError(ctx context.Context, req Request, retryCount int, err error) (bool, error)

RetryOnError retries a request on a 500 or timeout error.

func String

func String() string

Types

type BackoffFunc

type BackoffFunc func(ctx context.Context, req Request, attempts int) (time.Duration, error)

type Cache

type Cache struct {
	// contains filtered or unexported fields
}

Cache for responses.

func NewCache

func NewCache() *Cache

NewCache returns an initialized cache.

func (*Cache) Get

func (c *Cache) Get(ctx context.Context, req *Request) (interface{}, bool)

Get a response from the cache.

func (*Cache) List

func (c *Cache) List() map[string]string

List the key value pairs in the cache.

func (*Cache) Set

func (c *Cache) Set(ctx context.Context, req *Request, rsp interface{}, expiry time.Duration)

Set a response in the cache.

type CallFunc

type CallFunc func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error

CallFunc represents the individual call func.

type CallOption

type CallOption func(*CallOptions)

CallOption used by Call or Stream.

func WithAddress

func WithAddress(a ...string) CallOption

WithAddress sets the remote addresses to use rather than using service discovery.

func WithBackoff

func WithBackoff(fn BackoffFunc) CallOption

WithBackoff is a CallOption which overrides that which set in Options.CallOptions.

func WithCache

func WithCache(c time.Duration) CallOption

WithCache is a CallOption which sets the duration the response shoull be cached for.

func WithCallWrapper

func WithCallWrapper(cw ...CallWrapper) CallOption

WithCallWrapper is a CallOption which adds to the existing CallFunc wrappers.

func WithConnClose

func WithConnClose() CallOption

WithConnClose sets the Connection header to close.

func WithDialTimeout

func WithDialTimeout(d time.Duration) CallOption

WithDialTimeout is a CallOption which overrides that which set in Options.CallOptions.

func WithRequestTimeout

func WithRequestTimeout(d time.Duration) CallOption

WithRequestTimeout is a CallOption which overrides that which set in Options.CallOptions.

func WithRetries

func WithRetries(i int) CallOption

WithRetries sets the number of tries for a call. This CallOption overrides Options.CallOptions.

func WithRetry

func WithRetry(fn RetryFunc) CallOption

WithRetry is a CallOption which overrides that which set in Options.CallOptions.

func WithSelectOption

func WithSelectOption(so ...selector.SelectOption) CallOption

func WithServiceToken

func WithServiceToken() CallOption

WithServiceToken is a CallOption which overrides the authorization header with the services own auth token.

func WithStreamTimeout

func WithStreamTimeout(d time.Duration) CallOption

WithStreamTimeout sets the stream timeout.

type CallOptions

type CallOptions struct {

	// Other options for implementations of the interface
	// can be stored in a context
	Context context.Context
	// Backoff func
	Backoff BackoffFunc
	// Check if retriable func
	Retry         RetryFunc
	SelectOptions []selector.SelectOption

	// Address of remote hosts
	Address []string

	// Middleware for low level call func
	CallWrappers []CallWrapper

	// ConnectionTimeout of one request to the server.
	// Set this lower than the RequestTimeout to enbale retries on connection timeout.
	ConnectionTimeout time.Duration
	// Request/Response timeout of entire srv.Call, for single request timeout set ConnectionTimeout.
	RequestTimeout time.Duration
	// Stream timeout for the stream
	StreamTimeout time.Duration
	// Duration to cache the response for
	CacheExpiry time.Duration
	// Transport Dial Timeout. Used for initial dial to establish a connection.
	DialTimeout time.Duration
	// Number of Call attempts
	Retries int
	// Use the services own auth token
	ServiceToken bool
	// ConnClose sets the Connection: close header.
	ConnClose bool
}

CallOptions are options used to make calls to a server.

type CallWrapper

type CallWrapper func(CallFunc) CallFunc

CallWrapper is a low level wrapper for the CallFunc.

type Client

type Client interface {
	Init(...Option) error
	Options() Options
	NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
	NewRequest(service, endpoint string, req interface{}, reqOpts ...RequestOption) Request
	Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
	Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
	Publish(ctx context.Context, msg Message, opts ...PublishOption) error
	String() string
}

Client is the interface used to make requests to services. It supports Request/Response via Transport and Publishing via the Broker. It also supports bidirectional streaming of requests.

var (
	// NewClient returns a new client.
	NewClient func(...Option) Client = newRPCClient
	// DefaultClient is a default client to use out of the box.
	DefaultClient Client = newRPCClient()
)

func FromContext

func FromContext(ctx context.Context) (Client, bool)

type Closer

type Closer interface {
	// CloseSend closes the send direction of the stream.
	CloseSend() error
}

Closer handle client close.

type Message

type Message interface {
	Topic() string
	Payload() interface{}
	ContentType() string
}

Message is the interface for publishing asynchronously.

func NewMessage

func NewMessage(topic string, payload interface{}, opts ...MessageOption) Message

Creates a new message using the default client.

type MessageOption

type MessageOption func(*MessageOptions)

MessageOption used by NewMessage.

func WithMessageContentType

func WithMessageContentType(ct string) MessageOption

type MessageOptions

type MessageOptions struct {
	ContentType string
}

type Option

type Option func(*Options)

Option used by the Client.

func Backoff

func Backoff(fn BackoffFunc) Option

Backoff is used to set the backoff function used when retrying Calls.

func Broker

func Broker(b broker.Broker) Option

Broker to be used for pub/sub.

func Codec

func Codec(contentType string, c codec.NewCodec) Option

Codec to be used to encode/decode requests for a given content type.

func ContentType

func ContentType(ct string) Option

ContentType sets the default content type of the client.

func DialTimeout

func DialTimeout(d time.Duration) Option

DialTimeout sets the transport dial timeout.

func PoolCloseTimeout

func PoolCloseTimeout(d time.Duration) Option

PoolCloseTimeout sets the connection pool close timeout.

func PoolSize

func PoolSize(d int) Option

PoolSize sets the connection pool size.

func PoolTTL

func PoolTTL(d time.Duration) Option

PoolTTL sets the connection pool ttl.

func Registry

func Registry(r registry.Registry) Option

Registry to find nodes for a given service.

func RequestTimeout

func RequestTimeout(d time.Duration) Option

RequestTimeout set the request timeout.

func Retries

func Retries(i int) Option

Retries set the number of retries when making the request.

func Retry

func Retry(fn RetryFunc) Option

Retry sets the retry function to be used when re-trying.

func Selector

func Selector(s selector.Selector) Option

Select is used to select a node to route a request to.

func StreamTimeout

func StreamTimeout(d time.Duration) Option

StreamTimeout sets the stream timeout.

func Transport

func Transport(t transport.Transport) Option

Transport to use for communication e.g http, rabbitmq, etc.

func WithLogger

func WithLogger(l logger.Logger) Option

WithLogger sets the underline logger.

func WithRouter

func WithRouter(r Router) Option

WithRouter sets the client router.

func Wrap

func Wrap(w Wrapper) Option

Adds a Wrapper to a list of options passed into the client.

func WrapCall

func WrapCall(cw ...CallWrapper) Option

Adds a Wrapper to the list of CallFunc wrappers.

type Options

type Options struct {

	// Default Call Options
	CallOptions CallOptions

	// Router sets the router
	Router Router

	Registry  registry.Registry
	Selector  selector.Selector
	Transport transport.Transport

	// Plugged interfaces
	Broker broker.Broker

	// Logger is the underline logger
	Logger logger.Logger

	// Other options for implementations of the interface
	// can be stored in a context
	Context context.Context
	Codecs  map[string]codec.NewCodec

	// Response cache
	Cache *Cache

	// Used to select codec
	ContentType string

	// Middleware for client
	Wrappers []Wrapper

	// Connection Pool
	PoolSize         int
	PoolTTL          time.Duration
	PoolCloseTimeout time.Duration
}

Options are the Client options.

func NewOptions

func NewOptions(options ...Option) Options

NewOptions creates new Client options.

type PublishOption

type PublishOption func(*PublishOptions)

PublishOption used by Publish.

func PublishContext

func PublishContext(ctx context.Context) PublishOption

PublishContext sets the context in publish options.

func WithExchange

func WithExchange(e string) PublishOption

WithExchange sets the exchange to route a message through.

type PublishOptions

type PublishOptions struct {
	// Other options for implementations of the interface
	// can be stored in a context
	Context context.Context
	// Exchange is the routing exchange for the message
	Exchange string
}

type Request

type Request interface {
	// The service to call
	Service() string
	// The action to take
	Method() string
	// The endpoint to invoke
	Endpoint() string
	// The content type
	ContentType() string
	// The unencoded request body
	Body() interface{}
	// Write to the encoded request writer. This is nil before a call is made
	Codec() codec.Writer
	// indicates whether the request will be a streaming one rather than unary
	Stream() bool
}

Request is the interface for a synchronous request used by Call or Stream.

func NewRequest

func NewRequest(service, endpoint string, request interface{}, reqOpts ...RequestOption) Request

Creates a new request using the default client. Content Type will be set to the default within options and use the appropriate codec.

type RequestOption

type RequestOption func(*RequestOptions)

RequestOption used by NewRequest.

func StreamingRequest

func StreamingRequest() RequestOption

func WithContentType

func WithContentType(ct string) RequestOption

type RequestOptions

type RequestOptions struct {

	// Other options for implementations of the interface
	// can be stored in a context
	Context     context.Context
	ContentType string
	Stream      bool
}

type Response

type Response interface {
	// Read the response
	Codec() codec.Reader
	// read the header
	Header() map[string]string
	// Read the undecoded response
	Read() ([]byte, error)
}

Response is the response received from a service.

type RetryFunc

type RetryFunc func(ctx context.Context, req Request, retryCount int, err error) (bool, error)

note that returning either false or a non-nil error will result in the call not being retried.

type Router

type Router interface {
	SendRequest(context.Context, Request) (Response, error)
}

Router manages request routing.

type Stream

type Stream interface {
	Closer
	// Context for the stream
	Context() context.Context
	// The request made
	Request() Request
	// The response read
	Response() Response
	// Send will encode and send a request
	Send(interface{}) error
	// Recv will decode and read a response
	Recv(interface{}) error
	// Error returns the stream error
	Error() error
	// Close closes the stream
	Close() error
}

Stream is the inteface for a bidirectional synchronous stream.

func NewStream

func NewStream(ctx context.Context, request Request, opts ...CallOption) (Stream, error)

Creates a streaming connection with a service and returns responses on the channel passed in. It's up to the user to close the streamer.

type StreamWrapper

type StreamWrapper func(Stream) Stream

StreamWrapper wraps a Stream and returns the equivalent.

type Wrapper

type Wrapper func(Client) Client

Wrapper wraps a client and returns a client.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL