client

package
v1.6.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2023 License: MIT Imports: 9 Imported by: 20

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultClient is a default client to use out of the box
	DefaultClient Client
	// DefaultContentType is the default content type for client
	DefaultContentType = "application/protobuf"
	// DefaultBackoff is the default backoff function for retries
	DefaultBackoff = exponentialBackoff
	// DefaultRetry is the default check-for-retry function for retries
	DefaultRetry = RetryOnError
	// DefaultRetries is the default number of times a request is tried
	DefaultRetries = 1
	// DefaultDialTimeout is the default dial timeout
	DefaultDialTimeout = time.Second * 30
	// DefaultRequestTimeout is the default request timeout
	DefaultRequestTimeout = time.Second * 30
	// DefaultPoolSize sets the connection pool size
	DefaultPoolSize = 100
	// DefaultPoolTTL sets the connection pool ttl
	DefaultPoolTTL = time.Minute

	Flag = pflag.NewFlagSet("client", pflag.ExitOnError)
)

Functions

func Call

func Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error

Call makes a synchronous call to a service using the default client

func NewContext

func NewContext(ctx context.Context, c Client) context.Context

func Publish

func Publish(ctx context.Context, msg Message, opts ...PublishOption) error

Publish publishes a publication using the default client. Using the underlying broker set within the options.

func RetryAlways

func RetryAlways(ctx context.Context, req Request, retryCount int, err error) (bool, error)

RetryAlways always retry on error

func RetryOnError

func RetryOnError(ctx context.Context, req Request, retryCount int, err error) (bool, error)

RetryOnError retries a request on a 500 or timeout error

func String

func String() string

Types

type BackoffFunc

type BackoffFunc func(ctx context.Context, req Request, attempts int) (time.Duration, error)

type CallFunc

type CallFunc func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error

CallFunc represents the individual call func

type CallOption

type CallOption func(*CallOptions)

CallOption used by Call or Stream

func WithAddress

func WithAddress(a ...string) CallOption

WithAddress sets the remote addresses to use rather than using service discovery

func WithBackoff

func WithBackoff(fn BackoffFunc) CallOption

WithBackoff is a CallOption which overrides that which set in Options.CallOptions

func WithCache

func WithCache(c time.Duration) CallOption

WithCache is a CallOption which sets the duration the response should be cached for

func WithCallWrapper

func WithCallWrapper(cw ...CallWrapper) CallOption

WithCallWrapper is a CallOption which adds to the existing CallFunc wrappers

func WithDialTimeout

func WithDialTimeout(d time.Duration) CallOption

WithDialTimeout is a CallOption which overrides that which set in Options.CallOptions

func WithRequestTimeout

func WithRequestTimeout(d time.Duration) CallOption

WithRequestTimeout is a CallOption which overrides that which set in Options.CallOptions

func WithRetries

func WithRetries(i int) CallOption

WithRetries is a CallOption which overrides that which set in Options.CallOptions

func WithRetry

func WithRetry(fn RetryFunc) CallOption

WithRetry is a CallOption which overrides that which set in Options.CallOptions

func WithSelectOption

func WithSelectOption(so ...selector.SelectOption) CallOption

func WithServiceToken

func WithServiceToken() CallOption

WithServiceToken is a CallOption which overrides the authorization header with the services own auth token

func WithStreamTimeout

func WithStreamTimeout(d time.Duration) CallOption

WithStreamTimeout sets the stream timeout

type CallOptions

type CallOptions struct {
	SelectOptions []selector.SelectOption

	// Address of remote hosts
	Address []string
	// Backoff func
	Backoff BackoffFunc
	// Check if retryable func
	Retry RetryFunc
	// Transport Dial Timeout
	DialTimeout time.Duration
	// Number of Call attempts
	Retries int
	// Request/Response timeout
	RequestTimeout time.Duration
	// Stream timeout for the stream
	StreamTimeout time.Duration
	// Use the services own auth token
	ServiceToken bool
	// Duration to cache the response for
	CacheExpiry time.Duration

	// Middleware for low level call func
	CallWrappers []CallWrapper

	// Other options for implementations of the interface
	// can be stored in a context
	Context context.Context
}

type CallWrapper

type CallWrapper func(CallFunc) CallFunc

CallWrapper is a low level wrapper for the CallFunc

type Client

type Client interface {
	Init(...Option) error
	Options() Options
	NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
	NewRequest(service, endpoint string, req interface{}, reqOpts ...RequestOption) Request
	Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
	Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
	Publish(ctx context.Context, msg Message, opts ...PublishOption) error
	String() string
}

Client is the interface used to make requests to services. It supports Request/Response via Transport and Publishing via the Broker. It also supports bidirectional streaming of requests.

func FromContext

func FromContext(ctx context.Context) (Client, bool)

type Message

type Message interface {
	Topic() string
	Payload() interface{}
	ContentType() string
}

Message is the interface for publishing asynchronously

func NewMessage

func NewMessage(topic string, payload interface{}, opts ...MessageOption) Message

NewMessage creates a new message using the default client

type MessageOption

type MessageOption func(*MessageOptions)

MessageOption used by NewMessage

func WithMessageContentType

func WithMessageContentType(ct string) MessageOption

type MessageOptions

type MessageOptions struct {
	ContentType string
}

type Option

type Option func(*Options)

Option used by the Client

func Backoff

func Backoff(fn BackoffFunc) Option

Backoff is used to set the backoff function used when retrying Calls

func Broker

func Broker(b broker.Broker) Option

Broker to be used for pub/sub

func Codec

func Codec(contentType string, c codec.NewCodec) Option

Codec to be used to encode/decode requests for a given content type

func ContentType

func ContentType(ct string) Option

ContentType default content type of the client

func DialTimeout

func DialTimeout(d time.Duration) Option

DialTimeout transport dial timeout

func PoolSize

func PoolSize(d int) Option

PoolSize sets the connection pool size

func PoolTTL

func PoolTTL(d time.Duration) Option

PoolTTL sets the connection pool ttl

func Registry

func Registry(r registry.Registry) Option

Registry to find nodes for a given service

func RequestTimeout

func RequestTimeout(d time.Duration) Option

RequestTimeout the request timeout. Should this be a Call Option?

func Retries

func Retries(i int) Option

Retries number of retries when making the request. Should this be a Call Option?

func Retry

func Retry(fn RetryFunc) Option

Retry sets the retry function to be used when re-trying.

func Selector

func Selector(s selector.Selector) Option

Selector select is used to select a node to route a request to

func StreamTimeout

func StreamTimeout(d time.Duration) Option

StreamTimeout sets the stream timeout

func WithRouter

func WithRouter(r Router) Option

WithRouter sets the client router

func Wrap

func Wrap(w Wrapper) Option

Wrap adds a Wrapper to a list of options passed into the client

func WrapCall

func WrapCall(cw ...CallWrapper) Option

WrapCall adds a Wrapper to the list of CallFunc wrappers

type Options

type Options struct {
	// Used to select codec
	ContentType string

	// Plugged interfaces
	Broker   broker.Broker
	Codecs   map[string]codec.NewCodec
	Registry registry.Registry
	Selector selector.Selector

	// Router sets the router
	Router Router

	// Connection Pool
	PoolSize int
	PoolTTL  time.Duration

	// Middleware for client
	Wrappers []Wrapper

	// Default Call Options
	CallOptions CallOptions

	// Other options for implementations of the interface
	// can be stored in a context
	Context context.Context
}

func NewOptions

func NewOptions(options ...Option) Options

type PublishOption

type PublishOption func(*PublishOptions)

PublishOption used by Publish

func WithExchange

func WithExchange(e string) PublishOption

WithExchange sets the exchange to route a message through

type PublishOptions

type PublishOptions struct {
	// Exchange is the routing exchange for the message
	Exchange string
}

type Request

type Request interface {
	// Service the service to call
	Service() string
	// Method the action to take
	Method() string
	// Endpoint the endpoint to invoke
	Endpoint() string
	// ContentType the content type
	ContentType() string
	// Body the unencoded request body
	Body() interface{}
	// Codec writes to the encoded request writer. This is nil before a call is made
	Codec() codec.Writer
	// Stream indicates whether the request will be a streaming one rather than unary
	Stream() bool
}

Request is the interface for a synchronous request used by Call or Stream

func NewRequest

func NewRequest(service, endpoint string, request interface{}, reqOpts ...RequestOption) Request

NewRequest creates a new request using the default client. Content Type will be set to the default within options and use the appropriate codec

type RequestOption

type RequestOption func(*RequestOptions)

RequestOption used by NewRequest

func StreamingRequest

func StreamingRequest() RequestOption

func WithContentType

func WithContentType(ct string) RequestOption

type RequestOptions

type RequestOptions struct {
	ContentType string
	Stream      bool

	// Other options for implementations of the interface
	// can be stored in a context
	Context context.Context
}

type Response

type Response interface {
	// Codec reads the response
	Codec() codec.Reader
	// Header reads the header
	Header() map[string]string
	// Read the undecoded response
	Read() ([]byte, error)
}

Response is the response received from a service

type RetryFunc

type RetryFunc func(ctx context.Context, req Request, retryCount int, err error) (bool, error)

RetryFunc note that returning either false or a non-nil error will result in the call not being retried

type Router

type Router interface {
	SendRequest(context.Context, Request) (Response, error)
}

Router manages request routing

type Stream

type Stream interface {
	// Context for the stream
	Context() context.Context
	// Request the request made
	Request() Request
	// Response the response read
	Response() Response
	// Send will encode and send a request
	Send(interface{}) error
	// Recv will decode and read a response
	Recv(interface{}) error
	// Error returns the stream error
	Error() error
	// Close closes the stream and close Conn
	Close() error
	// CloseSend closes the stream
	CloseSend() error
}

Stream is the interface for a bidirectional synchronous stream

func NewStream

func NewStream(ctx context.Context, request Request, opts ...CallOption) (Stream, error)

NewStream creates a streaming connection with a service and returns responses on the channel passed in. It's up to the user to close the streamer.

type StreamWrapper

type StreamWrapper func(Stream) Stream

StreamWrapper wraps a Stream and returns the equivalent

type Wrapper

type Wrapper func(Client) Client

Wrapper wraps a client and returns a client

Directories

Path Synopsis
Package selector is a way to pick a list of service nodes
Package selector is a way to pick a list of service nodes
dns
Package dns provides a dns SRV selector
Package dns provides a dns SRV selector
static
Package static provides a static resolver which returns the name/ip passed in without any change
Package static provides a static resolver which returns the name/ip passed in without any change

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL