wasmbus

package module
v0.0.0-...-a465894 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// PatternAll is a wildcard pattern that matches all subjects.
	PatternAll = "*"

	// NoBackLog is used to indicate that the subscription should not have a backlog.
	NoBackLog = 0

	// PrefixWadm is the prefix for WasmCloud Admin API.
	PrefixWadm = "wadm.api"
	// PrefixEvents is the prefix for Lattice Events.
	PrefixEvents = "wasmbus.evt"
	// PrefixControl is the prefix for Lattice RPC.
	PrefixCtlV1 = "wasmbus.ctl.v1"
)
View Source
const NatsDefaultURL = nats.DefaultURL

Variables

View Source
var (
	// ErrEncode is returned when encoding a message fails.
	ErrEncode = errors.New("encode error")
	// ErrInternal is returned when an internal error occurs.
	ErrInternal = errors.New("internal error")
	// ErrDecode is returned when decoding a message fails.
	ErrDecode = errors.New("decode error")
	// ErrTransport is returned when a transport error occurs.
	ErrTransport = errors.New("transport error")
	// ErrOperation is returned when an operation error occurs.
	ErrOperation = errors.New("operation error")
	// ErrValidation is returned when a validation error occurs.
	ErrValidation = errors.New("validation error")
)

Functions

func Decode

func Decode(rawResp *Message, into any) error

Decode unmarshals the raw response data into the provided struct. The content type is used to determine the unmarshaling format. If the content type is not supported, an error is returned. Acceptable content types are "application/json" and "application/yaml".

func EncodeMimetype

func EncodeMimetype(payload any, mimeType string) ([]byte, error)

func NatsConnect

func NatsConnect(url string, options ...NatsOption) (*nats.Conn, error)

NatsConnect connects to a NATS server at the given URL. The URL should be in the form of "nats://host:port". This helper function sets some default options and calls `nats.Connect`.

func NatsDefaultServerOptions

func NatsDefaultServerOptions() *server.Options

func NatsEmbeddedServer

func NatsEmbeddedServer(opts *server.Options, startTimeout time.Duration) (*server.Server, error)

func NewInbox

func NewInbox() string

Types

type AnyServerHandler

type AnyServerHandler interface {
	HandleMessage(ctx context.Context, msg *Message) error
}

AnyServerHandler is an interface that can be implemented by any handler that can be registered with a server. Primary implementations are `RequestHandler` and `ServerHandlerFunc`.

type Bus

type Bus interface {
	// Subscribe creates a subscription for the given subject.
	// The backlog parameter is the maximum number of messages that can be buffered in memory.
	Subscribe(subject string, backlog int) (Subscription, error)
	// QueueSubscribe creates a subscription for the given subject and queue group.
	// The backlog parameter is the maximum number of messages that can be buffered in memory.
	QueueSubscribe(subject string, queue string, backlog int) (Subscription, error)
	// Request sends a request message and waits for a reply.
	// The context is used for the request timeout.
	Request(ctx context.Context, msg *Message) (*Message, error)
	// Publish sends a message to `msg.Subject`.
	Publish(msg *Message) error
}

Bus is the interface for the message bus. It provides methods for subscribing to messages and sending messages. It doesn't hold any state and is safe for concurrent use. See `Subscription` for stateful operations. Modeled after the NATS API.

type Header = http.Header

Header is the Message header type.

type LatticeRequest

type LatticeRequest[T any, Y any] struct {
	// Request should be a reference to the request struct
	Request T
	// Response should be a struct that the response will be unmarshaled into
	// and should be passed by value
	Response    Y
	Subject     string
	Bus         Bus
	PreRequest  func(context.Context, T, *Message) error
	PostRequest func(context.Context, *Y, *Message) error
}

LatticeRequest encodes the Roundtrip logic for a Bus Request This is a generic implementation that can be used with any Bus and any Request/Response pair. Requests are encoded in json and Responses can be either json or yaml. Use Pre & Post Request hooks to modify the request/response before/after. The `T` and `Y` types are used to define the Request and Response types. `T` should be passed by reference (pointer) and `Y` by value (struct).

func NewLatticeRequest

func NewLatticeRequest[T any, Y any](bus Bus, subject string, in T, out Y) *LatticeRequest[T, Y]

NewLatticeRequest returns a `LatticeRequest` for a given type. The `T` and `Y` types are used to define the Request and Response types. `T` should be passed by reference (pointer) and `Y` by value (struct).

func (*LatticeRequest[T, Y]) Execute

func (l *LatticeRequest[T, Y]) Execute(ctx context.Context) (*Y, error)

Execute sends the request to the bus and returns the response.

type Message

type Message struct {
	Subject string
	Reply   string
	Header  Header
	Data    []byte
	// contains filtered or unexported fields
}

Message is the message type for the message bus. Modeled after the NATS message type.

func Encode

func Encode(subject string, payload any) (*Message, error)

Encode marshals the payload into a Message. The payload is encoded as json.

func NewMessage

func NewMessage(subject string) *Message

NewMessage creates a new message with the given subject.

func (*Message) Bus

func (m *Message) Bus() Bus

Bus returns the bus that the message was received on or sent to Might be null.

func (*Message) LastSubjectPart

func (m *Message) LastSubjectPart() string

LastSubjectPart returns the last part of the subject. Example: "a.b.c" -> "c"

func (*Message) SubjectParts

func (m *Message) SubjectParts() []string

SubjectParts returns the parts of the subject. Example: "a.b.c" -> ["a", "b", "c"]

type NatsBus

type NatsBus struct {
	// contains filtered or unexported fields
}

NatsBus is a Bus implementation that uses NATS as the transport.

func NewNatsBus

func NewNatsBus(nc *nats.Conn) *NatsBus

NewNatsBus creates a new NATS bus using the given NATS connection.

func (*NatsBus) Publish

func (c *NatsBus) Publish(msg *Message) error

Publish implements `Bus.Publish` for NAT

func (*NatsBus) QueueSubscribe

func (c *NatsBus) QueueSubscribe(subject string, queue string, backlog int) (Subscription, error)

QueueSubscribe implements `Bus.QueueSubscribe` for NATS.

func (*NatsBus) Request

func (c *NatsBus) Request(ctx context.Context, msg *Message) (*Message, error)

Request implements `Bus.Request` for NATS.

func (*NatsBus) Subscribe

func (c *NatsBus) Subscribe(subject string, backlog int) (Subscription, error)

Subscribe implements `Bus.Subscribe` for NATS.

type NatsOption

type NatsOption = nats.Option

NatsOption is an option for configuring a NATS connection.

type NatsSubscription

type NatsSubscription struct {
	// contains filtered or unexported fields
}

NatsSubscription is a Subscription implementation for NATS.

func (*NatsSubscription) Drain

func (s *NatsSubscription) Drain() error

Drain implements `Subscription.Drain` for NATS.

func (*NatsSubscription) Handle

func (s *NatsSubscription) Handle(callback SubscriptionCallback)

Handle implements `Subscription.Handle` for NATS.

type RequestHandler

type RequestHandler[T any, Y any] struct {
	Request     T
	Response    Y
	PreRequest  func(context.Context, *T, *Message) error
	PostRequest func(context.Context, *Y, *Message) error
	Handler     func(context.Context, *T) (*Y, error)
}

RequestHandler is a generic handler that can be used to implement a server handler. It encodes the logic for handling a message and sending a response.

func NewRequestHandler

func NewRequestHandler[T any, Y any](req T, resp Y, handler func(context.Context, *T) (*Y, error)) *RequestHandler[T, Y]

NewRequestHandler returns a new server handler instance. The `T` and `Y` types are used to define the Request and Response types. Both should be structs. They will be used as template for request/responses.

func (*RequestHandler[T, Y]) HandleMessage

func (s *RequestHandler[T, Y]) HandleMessage(ctx context.Context, msg *Message) error

HandleMessage implements the `AnyServerHandler` interface.

type Server

type Server struct {
	Bus
	// Lattice is an informative field containing the lattice name.
	// It is NOT used when manipulating subjects.
	Lattice string
	// ContextFunc is a function that returns a new context for each message.
	// Defaults to `context.Background`.
	ContextFunc func() context.Context
	// contains filtered or unexported fields
}

Server is a higher-level abstraction that can be used to register handlers for specific subjects. See `AnyServerHandler` for more information.

func NewServer

func NewServer(bus Bus, lattice string) *Server

NewServer returns a new server instance.

func (*Server) Drain

func (s *Server) Drain() error

Drain walks through all subscriptions and drains them. It also closes the error stream. This is a blocking operation.

func (*Server) ErrorStream

func (s *Server) ErrorStream() <-chan *ServerError

ErrorStream returns a channel that can be used to listen for Transport / Encoding level errors. See `ServerError` for more information.

func (*Server) RegisterHandler

func (s *Server) RegisterHandler(subject string, handler AnyServerHandler) error

RegisterHandler registers a handler for a given subject. Each handler gets their channel subscription with no backlog, and their own goroutine for queue consumption. Callers should handle concurrency and synchronization themselves.

type ServerError

type ServerError struct {
	Context context.Context
	Err     error
	Request *Message
}

ServerError carries information about transport & encoding errors outside Request/Response scope.

type ServerHandlerFunc

type ServerHandlerFunc func(context.Context, *Message) error

ServerHandlerFunc is a function type that can be used to implement a server handler from a function.

func (ServerHandlerFunc) HandleMessage

func (f ServerHandlerFunc) HandleMessage(ctx context.Context, msg *Message) error

type Subscription

type Subscription interface {
	// Handle starts the subscription and calls the callback for each message.
	// Does not block.
	Handle(callback SubscriptionCallback)
	// Drain stops the subscription and closes the channel.
	// Blocks until all messages are processed, releasing the Subscription Thread.
	Drain() error
}

Subscription is the interface for a message subscription. It provides methods for handling messages and draining the subscription. Subscriptions run in their own goroutine.

type SubscriptionCallback

type SubscriptionCallback func(msg *Message)

SubscriptionCallback is the callback type for message subscriptions. Subscriptions are single threaded and the callback is called sequentially for each message. Callers should handle concurrency and synchronization themselves.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL