Documentation ¶
Index ¶
- Constants
- Variables
- func Decode(rawResp *Message, into any) error
- func EncodeMimetype(payload any, mimeType string) ([]byte, error)
- func NatsConnect(url string, options ...NatsOption) (*nats.Conn, error)
- func NatsDefaultServerOptions() *server.Options
- func NatsEmbeddedServer(opts *server.Options, startTimeout time.Duration) (*server.Server, error)
- func NewInbox() string
- type AnyServerHandler
- type Bus
- type Header
- type LatticeRequest
- type Message
- type NatsBus
- func (c *NatsBus) Publish(msg *Message) error
- func (c *NatsBus) QueueSubscribe(subject string, queue string, backlog int) (Subscription, error)
- func (c *NatsBus) Request(ctx context.Context, msg *Message) (*Message, error)
- func (c *NatsBus) Subscribe(subject string, backlog int) (Subscription, error)
- type NatsOption
- type NatsSubscription
- type RequestHandler
- type Server
- type ServerError
- type ServerHandlerFunc
- type Subscription
- type SubscriptionCallback
Constants ¶
const ( // PatternAll is a wildcard pattern that matches all subjects. PatternAll = "*" // NoBackLog is used to indicate that the subscription should not have a backlog. NoBackLog = 0 // PrefixWadm is the prefix for WasmCloud Admin API. PrefixWadm = "wadm.api" // PrefixEvents is the prefix for Lattice Events. PrefixEvents = "wasmbus.evt" // PrefixControl is the prefix for Lattice RPC. PrefixCtlV1 = "wasmbus.ctl.v1" )
const NatsDefaultURL = nats.DefaultURL
Variables ¶
var ( // ErrEncode is returned when encoding a message fails. ErrEncode = errors.New("encode error") // ErrInternal is returned when an internal error occurs. ErrInternal = errors.New("internal error") // ErrDecode is returned when decoding a message fails. ErrDecode = errors.New("decode error") // ErrTransport is returned when a transport error occurs. ErrTransport = errors.New("transport error") // ErrOperation is returned when an operation error occurs. ErrOperation = errors.New("operation error") // ErrValidation is returned when a validation error occurs. ErrValidation = errors.New("validation error") )
Functions ¶
func Decode ¶
Decode unmarshals the raw response data into the provided struct. The content type is used to determine the unmarshaling format. If the content type is not supported, an error is returned. Acceptable content types are "application/json" and "application/yaml".
func NatsConnect ¶
func NatsConnect(url string, options ...NatsOption) (*nats.Conn, error)
NatsConnect connects to a NATS server at the given URL. The URL should be in the form of "nats://host:port". This helper function sets some default options and calls `nats.Connect`.
func NatsEmbeddedServer ¶
Types ¶
type AnyServerHandler ¶
AnyServerHandler is an interface that can be implemented by any handler that can be registered with a server. Primary implementations are `RequestHandler` and `ServerHandlerFunc`.
type Bus ¶
type Bus interface { // Subscribe creates a subscription for the given subject. // The backlog parameter is the maximum number of messages that can be buffered in memory. Subscribe(subject string, backlog int) (Subscription, error) // QueueSubscribe creates a subscription for the given subject and queue group. // The backlog parameter is the maximum number of messages that can be buffered in memory. QueueSubscribe(subject string, queue string, backlog int) (Subscription, error) // Request sends a request message and waits for a reply. // The context is used for the request timeout. Request(ctx context.Context, msg *Message) (*Message, error) // Publish sends a message to `msg.Subject`. Publish(msg *Message) error }
Bus is the interface for the message bus. It provides methods for subscribing to messages and sending messages. It doesn't hold any state and is safe for concurrent use. See `Subscription` for stateful operations. Modeled after the NATS API.
type LatticeRequest ¶
type LatticeRequest[T any, Y any] struct { // Request should be a reference to the request struct Request T // Response should be a struct that the response will be unmarshaled into // and should be passed by value Response Y Subject string Bus Bus PreRequest func(context.Context, T, *Message) error PostRequest func(context.Context, *Y, *Message) error }
LatticeRequest encodes the Roundtrip logic for a Bus Request This is a generic implementation that can be used with any Bus and any Request/Response pair. Requests are encoded in json and Responses can be either json or yaml. Use Pre & Post Request hooks to modify the request/response before/after. The `T` and `Y` types are used to define the Request and Response types. `T` should be passed by reference (pointer) and `Y` by value (struct).
func NewLatticeRequest ¶
func NewLatticeRequest[T any, Y any](bus Bus, subject string, in T, out Y) *LatticeRequest[T, Y]
NewLatticeRequest returns a `LatticeRequest` for a given type. The `T` and `Y` types are used to define the Request and Response types. `T` should be passed by reference (pointer) and `Y` by value (struct).
type Message ¶
type Message struct { Subject string Reply string Header Header Data []byte // contains filtered or unexported fields }
Message is the message type for the message bus. Modeled after the NATS message type.
func NewMessage ¶
NewMessage creates a new message with the given subject.
func (*Message) Bus ¶
Bus returns the bus that the message was received on or sent to Might be null.
func (*Message) LastSubjectPart ¶
LastSubjectPart returns the last part of the subject. Example: "a.b.c" -> "c"
func (*Message) SubjectParts ¶
SubjectParts returns the parts of the subject. Example: "a.b.c" -> ["a", "b", "c"]
type NatsBus ¶
type NatsBus struct {
// contains filtered or unexported fields
}
NatsBus is a Bus implementation that uses NATS as the transport.
func NewNatsBus ¶
func NewNatsBus(nc *nats.Conn) *NatsBus
NewNatsBus creates a new NATS bus using the given NATS connection.
func (*NatsBus) QueueSubscribe ¶
QueueSubscribe implements `Bus.QueueSubscribe` for NATS.
type NatsOption ¶
type NatsOption = nats.Option
NatsOption is an option for configuring a NATS connection.
type NatsSubscription ¶
type NatsSubscription struct {
// contains filtered or unexported fields
}
NatsSubscription is a Subscription implementation for NATS.
func (*NatsSubscription) Drain ¶
func (s *NatsSubscription) Drain() error
Drain implements `Subscription.Drain` for NATS.
func (*NatsSubscription) Handle ¶
func (s *NatsSubscription) Handle(callback SubscriptionCallback)
Handle implements `Subscription.Handle` for NATS.
type RequestHandler ¶
type RequestHandler[T any, Y any] struct { Request T Response Y PreRequest func(context.Context, *T, *Message) error PostRequest func(context.Context, *Y, *Message) error Handler func(context.Context, *T) (*Y, error) }
RequestHandler is a generic handler that can be used to implement a server handler. It encodes the logic for handling a message and sending a response.
func NewRequestHandler ¶
func NewRequestHandler[T any, Y any](req T, resp Y, handler func(context.Context, *T) (*Y, error)) *RequestHandler[T, Y]
NewRequestHandler returns a new server handler instance. The `T` and `Y` types are used to define the Request and Response types. Both should be structs. They will be used as template for request/responses.
func (*RequestHandler[T, Y]) HandleMessage ¶
func (s *RequestHandler[T, Y]) HandleMessage(ctx context.Context, msg *Message) error
HandleMessage implements the `AnyServerHandler` interface.
type Server ¶
type Server struct { Bus // Lattice is an informative field containing the lattice name. // It is NOT used when manipulating subjects. Lattice string // ContextFunc is a function that returns a new context for each message. // Defaults to `context.Background`. ContextFunc func() context.Context // contains filtered or unexported fields }
Server is a higher-level abstraction that can be used to register handlers for specific subjects. See `AnyServerHandler` for more information.
func (*Server) Drain ¶
Drain walks through all subscriptions and drains them. It also closes the error stream. This is a blocking operation.
func (*Server) ErrorStream ¶
func (s *Server) ErrorStream() <-chan *ServerError
ErrorStream returns a channel that can be used to listen for Transport / Encoding level errors. See `ServerError` for more information.
func (*Server) RegisterHandler ¶
func (s *Server) RegisterHandler(subject string, handler AnyServerHandler) error
RegisterHandler registers a handler for a given subject. Each handler gets their channel subscription with no backlog, and their own goroutine for queue consumption. Callers should handle concurrency and synchronization themselves.
type ServerError ¶
ServerError carries information about transport & encoding errors outside Request/Response scope.
type ServerHandlerFunc ¶
ServerHandlerFunc is a function type that can be used to implement a server handler from a function.
func (ServerHandlerFunc) HandleMessage ¶
func (f ServerHandlerFunc) HandleMessage(ctx context.Context, msg *Message) error
type Subscription ¶
type Subscription interface { // Handle starts the subscription and calls the callback for each message. // Does not block. Handle(callback SubscriptionCallback) // Drain stops the subscription and closes the channel. // Blocks until all messages are processed, releasing the Subscription Thread. Drain() error }
Subscription is the interface for a message subscription. It provides methods for handling messages and draining the subscription. Subscriptions run in their own goroutine.
type SubscriptionCallback ¶
type SubscriptionCallback func(msg *Message)
SubscriptionCallback is the callback type for message subscriptions. Subscriptions are single threaded and the callback is called sequentially for each message. Callers should handle concurrency and synchronization themselves.