Documentation ¶
Index ¶
- Constants
- Variables
- func Publish(bus MessageBus, ctx context.Context, channel string, msg proto.Message) error
- func RequestAll[ResponseType proto.Message](ctx context.Context, client RPCClient, rpc string, request proto.Message, ...) (<-chan *Response[ResponseType], error)
- func RequestSingle[ResponseType proto.Message](ctx context.Context, client RPCClient, rpc string, request proto.Message, ...) (ResponseType, error)
- func RequestTopicAll[ResponseType proto.Message](ctx context.Context, client RPCClient, rpc string, topic string, ...) (<-chan *Response[ResponseType], error)
- func RequestTopicSingle[ResponseType proto.Message](ctx context.Context, client RPCClient, rpc string, topic string, ...) (ResponseType, error)
- func SetLogger(l logr.Logger)
- type AffinityFunc
- type ClientOpt
- type Handler
- func NewHandler[RequestType proto.Message, ResponseType proto.Message](rpc string, ...) Handler
- func NewHandlerWithAffinity[RequestType proto.Message, ResponseType proto.Message](rpc string, ...) Handler
- func NewTopicHandler[RequestType proto.Message, ResponseType proto.Message](rpc string, topic string, ...) Handler
- func NewTopicHandlerWithAffinity[RequestType proto.Message, ResponseType proto.Message](rpc string, topic string, ...) Handler
- type MessageBus
- type RPCClient
- type RPCServer
- type RequestOpt
- type Response
- type SelectionOpts
- type ServerOpt
- type Subscription
- func Subscribe[MessageType proto.Message](bus MessageBus, ctx context.Context, channel string) (Subscription[MessageType], error)
- func SubscribeQueue[MessageType proto.Message](bus MessageBus, ctx context.Context, channel string) (Subscription[MessageType], error)
- func SubscribeStream[ResponseType proto.Message](ctx context.Context, client RPCClient, rpc string) (Subscription[ResponseType], error)
- func SubscribeStreamQueue[ResponseType proto.Message](ctx context.Context, client RPCClient, rpc string) (Subscription[ResponseType], error)
- func SubscribeTopic[ResponseType proto.Message](ctx context.Context, client RPCClient, rpc string, topic string) (Subscription[ResponseType], error)
- func SubscribeTopicQueue[ResponseType proto.Message](ctx context.Context, client RPCClient, rpc string, topic string) (Subscription[ResponseType], error)
Constants ¶
View Source
const ( DefaultTimeout = time.Second * 3 ChannelSize = 100 )
Variables ¶
View Source
var ( ErrRequestTimedOut = errors.New("request timed out") ErrNoResponse = errors.New("no response from servers") )
View Source
var ErrBusNotConnected = errors.New("bus not connected")
Functions ¶
func RequestAll ¶
func RequestSingle ¶
func RequestTopicAll ¶
func RequestTopicSingle ¶
Types ¶
type AffinityFunc ¶
type Handler ¶
type Handler interface {
// contains filtered or unexported methods
}
func NewHandler ¶
func NewHandlerWithAffinity ¶
func NewTopicHandler ¶
type MessageBus ¶
type MessageBus interface {
// contains filtered or unexported methods
}
func NewNatsMessageBus ¶
func NewNatsMessageBus(nc *nats.Conn) MessageBus
func NewRedisMessageBus ¶
func NewRedisMessageBus(rc redis.UniversalClient) MessageBus
type RPCClient ¶
type RPCClient interface { // close all subscriptions and stop Close() // contains filtered or unexported methods }
func NewRPCClient ¶
func NewRPCClient(serviceName, clientID string, bus MessageBus, opts ...ClientOpt) (RPCClient, error)
type RPCServer ¶
type RPCServer interface { // register a handler RegisterHandler(h Handler) error // publish updates to a streaming rpc Publish(ctx context.Context, rpc string, message proto.Message) error // publish updates to a topic within a streaming rpc PublishTopic(ctx context.Context, rpc, topic string, message proto.Message) error // stop listening for requests for a rpc DeregisterHandler(rpc string) error // stop listening on a topic for a rpc DeregisterTopic(rpc, topic string) error // close all subscriptions and stop Close() }
func NewRPCServer ¶
func NewRPCServer(serviceName, serverID string, bus MessageBus, opts ...ServerOpt) RPCServer
type RequestOpt ¶
type RequestOpt func(reqOpts) reqOpts
func WithRequestTimeout ¶
func WithRequestTimeout(timeout time.Duration) RequestOpt
func WithSelectionOpts ¶
func WithSelectionOpts(opts SelectionOpts) RequestOpt
type SelectionOpts ¶
type SelectionOpts struct { MinimumAffinity float32 // minimum affinity for a server to be considered a valid handler AcceptFirstAvailable bool // go fast AffinityTimeout time.Duration // server selection deadline ShortCircuitTimeout time.Duration // deadline imposed after receiving first response }
type Subscription ¶
type Subscription[MessageType proto.Message] interface { Channel() <-chan MessageType Close() error }
func Subscribe ¶
func Subscribe[MessageType proto.Message](bus MessageBus, ctx context.Context, channel string) (Subscription[MessageType], error)
func SubscribeQueue ¶
func SubscribeQueue[MessageType proto.Message](bus MessageBus, ctx context.Context, channel string) (Subscription[MessageType], error)
func SubscribeStream ¶
func SubscribeStreamQueue ¶
func SubscribeTopic ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.