ncl

package
v1.6.0-rc4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

README

Bacalhau NCL (NATS Client Library)

Overview

The NCL (NATS Client Library) is an internal library for Bacalhau, designed to provide reliable, scalable, and efficient communication between orchestrator and compute nodes. It leverages NATS for messaging and implements an event-driven architecture with support for both publish-subscribe and request-response patterns.

Key Components

  1. Publisher: Handles all message delivery patterns

    • Synchronous publishing with delivery guarantees
    • Request-response communication
    • Optional message ordering and delivery tracking
    • Configurable subjects and message routing
  2. Subscriber: Manages message consumption

    • Subject-based subscription management
    • Message filtering capabilities
    • Automatic acknowledgments and retries
    • Processing notifications
    • Backoff strategies for failures
  3. Responder: Handles request-response pattern

    • Type-based request routing
    • Handler registration
    • Automatic error responses
    • Request timeouts
  4. Encoder: Manages message serialization

    • Consistent message encoding/decoding
    • Metadata enrichment
    • Error handling
    • Type registration

Technical Details

Message Flow
  1. Publishing and Requesting:

    • Messages are encoded using the encoder
    • Messages are published to configured subjects
    • Optional request-response with reply subjects
    • OrderedPublisher ensures delivery ordering
  2. Subscribing:

    • Subscribers set up NATS subscriptions
    • Messages are decoded using the encoder
    • Filters determine message processing
    • Handlers process valid messages
    • Success/failure notifications are sent
    • Automatic ack/nack with backoff
Component Interfaces
Publisher
type Publisher interface {
    // Fire-and-forget publishing
    Publish(ctx context.Context, request PublishRequest) error
    
    // Request-response pattern
    Request(ctx context.Context, request PublishRequest) (*envelope.Message, error)
}
OrderedPublisher
type OrderedPublisher interface {
    Publisher
    PublishAsync(ctx context.Context, request PublishRequest) (PubFuture, error)
    Reset(ctx context.Context)
    Close(ctx context.Context) error
}
Subscriber
type Subscriber interface {
    Subscribe(ctx context.Context, subjects ...string) error
    Close(ctx context.Context) error
}
Responder
type Responder interface {
    Listen(ctx context.Context, messageType string, handler RequestHandler) error
    Close(ctx context.Context) error
}
Example Usage
// Create a publisher for both publishing and requests
publisher, _ := NewPublisher(nc, PublisherConfig{
    Name:            "compute-node",
    MessageRegistry: registry,
})

// Publish a message
err := publisher.Publish(ctx, NewPublishRequest(message))

// Make a request
response, err := publisher.Request(ctx, NewPublishRequest(request))

// Create a responder for handling requests
responder, _ := NewResponder(nc, ResponderConfig{
    Name:     "orchestrator",
    Subject:  "requests",
})

// Register request handlers
err = responder.Listen(ctx, "JobRequest", handler)

// Create a subscriber for message consumption
subscriber, _ := NewSubscriber(nc, SubscriberConfig{
    Name:           "worker",
    MessageHandler: handler,
})

// Subscribe to subjects
err = subscriber.Subscribe(ctx, "updates.>")

Usage Within Bacalhau

This library is designed to be used internally within the Bacalhau project. It integrates into the orchestrator and compute node components to handle all inter-node communication.

Example integration points:

  1. Job assignment from orchestrator to compute nodes
  2. Status updates from compute nodes to orchestrator
  3. Heartbeat messages for health monitoring
  4. Compute node registration and discovery

Each component type (Publisher, Subscriber, Responder) handles specific communication patterns while sharing the same underlying message encoding and metadata handling through the encoder.

Documentation

Overview

Package ncl is a generated GoMock package.

Index

Constants

View Source
const (
	KeySource      = "Bacalhau-Source"
	KeyEventTime   = "Bacalhau-EventTime"
	KeyMessageUUID = "Bacalhau-MessageUUID"
	KeyMessageID   = "Bacalhau-MessageID"
	KeySubject     = "Bacalhau-Subject"
)

Metadata keys

View Source
const (
	BacErrorMessageType = "BacError"

	// KeyStatusCode is the key for the status code
	KeyStatusCode = "Bacalhau-StatusCode"

	// KeyErrorCode is the key for the error code
	KeyErrorCode = "Bacalhau-ErrorCode"
)
View Source
const (
	DefaultProcessingTimeout   = 5 * time.Second
	DefaultBackoffInitialDelay = 100 * time.Millisecond
	DefaultBackoffMaximumDelay = 5 * time.Second
)
View Source
const (
	DefaultResponderProcessingTimeout = 5 * time.Second
)

Variables

View Source
var (
	// ErrHandlerExists is returned when attempting to register a handler for a message type that already has one
	ErrHandlerExists = errors.New("handler already exists for message type")

	// ErrNoHandler is returned when no handler is found for a message type
	ErrNoHandler = errors.New("no handler found for message type")
)

Functions

func Ack added in v1.5.2

func Ack(m *nats.Msg) error

Ack creates a success result and publishes it

func BacErrorToEnvelope added in v1.6.0

func BacErrorToEnvelope(err bacerrors.Error) *envelope.Message

BacErrorToEnvelope converts the error to an envelope

func Nack added in v1.5.2

func Nack(m *nats.Msg, err error) error

Nack creates an error result

func NackWithDelay added in v1.5.2

func NackWithDelay(m *nats.Msg, err error, delay time.Duration) error

Types

type AckMode added in v1.5.2

type AckMode int

AckMode determines how published messages should be acknowledged

const (
	// ExplicitAck requires explicit acknowledgment from one subscriber
	ExplicitAck AckMode = iota

	// NoAck means the message is considered delivered as soon as it's published
	NoAck
)

type MessageFilter

type MessageFilter interface {
	ShouldFilter(metadata nats.Header) bool
}

MessageFilter interface for filtering messages

type MessageFilterFunc

type MessageFilterFunc func(metadata nats.Header) bool

MessageFilterFunc is a function type that implements MessageFilter

func (MessageFilterFunc) ShouldFilter

func (f MessageFilterFunc) ShouldFilter(metadata nats.Header) bool

type MessageHandler

type MessageHandler interface {
	ShouldProcess(ctx context.Context, message *envelope.Message) bool
	HandleMessage(ctx context.Context, message *envelope.Message) error
}

MessageHandler interface for processing messages

type MessageHandlerFunc

type MessageHandlerFunc func(ctx context.Context, message *envelope.Message) error

MessageHandlerFunc is a function type that implements MessageHandler

func (MessageHandlerFunc) HandleMessage

func (f MessageHandlerFunc) HandleMessage(ctx context.Context, message *envelope.Message) error

func (MessageHandlerFunc) ShouldProcess

func (f MessageHandlerFunc) ShouldProcess(ctx context.Context, message *envelope.Message) bool

type MockMessageFilter added in v1.5.2

type MockMessageFilter struct {
	// contains filtered or unexported fields
}

MockMessageFilter is a mock of MessageFilter interface.

func NewMockMessageFilter added in v1.5.2

func NewMockMessageFilter(ctrl *gomock.Controller) *MockMessageFilter

NewMockMessageFilter creates a new mock instance.

func (*MockMessageFilter) EXPECT added in v1.5.2

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockMessageFilter) ShouldFilter added in v1.5.2

func (m *MockMessageFilter) ShouldFilter(metadata nats.Header) bool

ShouldFilter mocks base method.

type MockMessageFilterMockRecorder added in v1.5.2

type MockMessageFilterMockRecorder struct {
	// contains filtered or unexported fields
}

MockMessageFilterMockRecorder is the mock recorder for MockMessageFilter.

func (*MockMessageFilterMockRecorder) ShouldFilter added in v1.5.2

func (mr *MockMessageFilterMockRecorder) ShouldFilter(metadata interface{}) *gomock.Call

ShouldFilter indicates an expected call of ShouldFilter.

type MockMessageHandler added in v1.5.2

type MockMessageHandler struct {
	// contains filtered or unexported fields
}

MockMessageHandler is a mock of MessageHandler interface.

func NewMockMessageHandler added in v1.5.2

func NewMockMessageHandler(ctrl *gomock.Controller) *MockMessageHandler

NewMockMessageHandler creates a new mock instance.

func (*MockMessageHandler) EXPECT added in v1.5.2

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockMessageHandler) HandleMessage added in v1.5.2

func (m *MockMessageHandler) HandleMessage(ctx context.Context, message *envelope.Message) error

HandleMessage mocks base method.

func (*MockMessageHandler) ShouldProcess added in v1.5.2

func (m *MockMessageHandler) ShouldProcess(ctx context.Context, message *envelope.Message) bool

ShouldProcess mocks base method.

type MockMessageHandlerMockRecorder added in v1.5.2

type MockMessageHandlerMockRecorder struct {
	// contains filtered or unexported fields
}

MockMessageHandlerMockRecorder is the mock recorder for MockMessageHandler.

func (*MockMessageHandlerMockRecorder) HandleMessage added in v1.5.2

func (mr *MockMessageHandlerMockRecorder) HandleMessage(ctx, message interface{}) *gomock.Call

HandleMessage indicates an expected call of HandleMessage.

func (*MockMessageHandlerMockRecorder) ShouldProcess added in v1.5.2

func (mr *MockMessageHandlerMockRecorder) ShouldProcess(ctx, message interface{}) *gomock.Call

ShouldProcess indicates an expected call of ShouldProcess.

type MockOrderedPublisher added in v1.5.2

type MockOrderedPublisher struct {
	// contains filtered or unexported fields
}

MockOrderedPublisher is a mock of OrderedPublisher interface.

func NewMockOrderedPublisher added in v1.5.2

func NewMockOrderedPublisher(ctrl *gomock.Controller) *MockOrderedPublisher

NewMockOrderedPublisher creates a new mock instance.

func (*MockOrderedPublisher) Close added in v1.5.2

func (m *MockOrderedPublisher) Close(ctx context.Context) error

Close mocks base method.

func (*MockOrderedPublisher) EXPECT added in v1.5.2

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockOrderedPublisher) Publish added in v1.5.2

func (m *MockOrderedPublisher) Publish(ctx context.Context, request PublishRequest) error

Publish mocks base method.

func (*MockOrderedPublisher) PublishAsync added in v1.5.2

func (m *MockOrderedPublisher) PublishAsync(ctx context.Context, request PublishRequest) (PubFuture, error)

PublishAsync mocks base method.

func (*MockOrderedPublisher) Request added in v1.6.0

Request mocks base method.

func (*MockOrderedPublisher) Reset added in v1.5.2

func (m *MockOrderedPublisher) Reset(ctx context.Context)

Reset mocks base method.

type MockOrderedPublisherMockRecorder added in v1.5.2

type MockOrderedPublisherMockRecorder struct {
	// contains filtered or unexported fields
}

MockOrderedPublisherMockRecorder is the mock recorder for MockOrderedPublisher.

func (*MockOrderedPublisherMockRecorder) Close added in v1.5.2

func (mr *MockOrderedPublisherMockRecorder) Close(ctx interface{}) *gomock.Call

Close indicates an expected call of Close.

func (*MockOrderedPublisherMockRecorder) Publish added in v1.5.2

func (mr *MockOrderedPublisherMockRecorder) Publish(ctx, request interface{}) *gomock.Call

Publish indicates an expected call of Publish.

func (*MockOrderedPublisherMockRecorder) PublishAsync added in v1.5.2

func (mr *MockOrderedPublisherMockRecorder) PublishAsync(ctx, request interface{}) *gomock.Call

PublishAsync indicates an expected call of PublishAsync.

func (*MockOrderedPublisherMockRecorder) Request added in v1.6.0

func (mr *MockOrderedPublisherMockRecorder) Request(ctx, request interface{}) *gomock.Call

Request indicates an expected call of Request.

func (*MockOrderedPublisherMockRecorder) Reset added in v1.5.2

func (mr *MockOrderedPublisherMockRecorder) Reset(ctx interface{}) *gomock.Call

Reset indicates an expected call of Reset.

type MockProcessingNotifier added in v1.6.0

type MockProcessingNotifier struct {
	// contains filtered or unexported fields
}

MockProcessingNotifier is a mock of ProcessingNotifier interface.

func NewMockProcessingNotifier added in v1.6.0

func NewMockProcessingNotifier(ctrl *gomock.Controller) *MockProcessingNotifier

NewMockProcessingNotifier creates a new mock instance.

func (*MockProcessingNotifier) EXPECT added in v1.6.0

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockProcessingNotifier) OnProcessed added in v1.6.0

func (m *MockProcessingNotifier) OnProcessed(ctx context.Context, message *envelope.Message)

OnProcessed mocks base method.

type MockProcessingNotifierMockRecorder added in v1.6.0

type MockProcessingNotifierMockRecorder struct {
	// contains filtered or unexported fields
}

MockProcessingNotifierMockRecorder is the mock recorder for MockProcessingNotifier.

func (*MockProcessingNotifierMockRecorder) OnProcessed added in v1.6.0

func (mr *MockProcessingNotifierMockRecorder) OnProcessed(ctx, message interface{}) *gomock.Call

OnProcessed indicates an expected call of OnProcessed.

type MockPubFuture added in v1.5.2

type MockPubFuture struct {
	// contains filtered or unexported fields
}

MockPubFuture is a mock of PubFuture interface.

func NewMockPubFuture added in v1.5.2

func NewMockPubFuture(ctrl *gomock.Controller) *MockPubFuture

NewMockPubFuture creates a new mock instance.

func (*MockPubFuture) Done added in v1.5.2

func (m *MockPubFuture) Done() <-chan struct{}

Done mocks base method.

func (*MockPubFuture) EXPECT added in v1.5.2

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockPubFuture) Err added in v1.5.2

func (m *MockPubFuture) Err() error

Err mocks base method.

func (*MockPubFuture) Msg added in v1.5.2

func (m *MockPubFuture) Msg() *nats.Msg

Msg mocks base method.

func (*MockPubFuture) Result added in v1.5.2

func (m *MockPubFuture) Result() *Result

Result mocks base method.

func (*MockPubFuture) Wait added in v1.5.2

func (m *MockPubFuture) Wait(ctx context.Context) error

Wait mocks base method.

type MockPubFutureMockRecorder added in v1.5.2

type MockPubFutureMockRecorder struct {
	// contains filtered or unexported fields
}

MockPubFutureMockRecorder is the mock recorder for MockPubFuture.

func (*MockPubFutureMockRecorder) Done added in v1.5.2

Done indicates an expected call of Done.

func (*MockPubFutureMockRecorder) Err added in v1.5.2

Err indicates an expected call of Err.

func (*MockPubFutureMockRecorder) Msg added in v1.5.2

Msg indicates an expected call of Msg.

func (*MockPubFutureMockRecorder) Result added in v1.5.2

func (mr *MockPubFutureMockRecorder) Result() *gomock.Call

Result indicates an expected call of Result.

func (*MockPubFutureMockRecorder) Wait added in v1.5.2

func (mr *MockPubFutureMockRecorder) Wait(ctx interface{}) *gomock.Call

Wait indicates an expected call of Wait.

type MockPublisher added in v1.5.2

type MockPublisher struct {
	// contains filtered or unexported fields
}

MockPublisher is a mock of Publisher interface.

func NewMockPublisher added in v1.5.2

func NewMockPublisher(ctrl *gomock.Controller) *MockPublisher

NewMockPublisher creates a new mock instance.

func (*MockPublisher) EXPECT added in v1.5.2

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockPublisher) Publish added in v1.5.2

func (m *MockPublisher) Publish(ctx context.Context, request PublishRequest) error

Publish mocks base method.

func (*MockPublisher) Request added in v1.6.0

func (m *MockPublisher) Request(ctx context.Context, request PublishRequest) (*envelope.Message, error)

Request mocks base method.

type MockPublisherMockRecorder added in v1.5.2

type MockPublisherMockRecorder struct {
	// contains filtered or unexported fields
}

MockPublisherMockRecorder is the mock recorder for MockPublisher.

func (*MockPublisherMockRecorder) Publish added in v1.5.2

func (mr *MockPublisherMockRecorder) Publish(ctx, request interface{}) *gomock.Call

Publish indicates an expected call of Publish.

func (*MockPublisherMockRecorder) Request added in v1.6.0

func (mr *MockPublisherMockRecorder) Request(ctx, request interface{}) *gomock.Call

Request indicates an expected call of Request.

type MockRequestHandler added in v1.6.0

type MockRequestHandler struct {
	// contains filtered or unexported fields
}

MockRequestHandler is a mock of RequestHandler interface.

func NewMockRequestHandler added in v1.6.0

func NewMockRequestHandler(ctrl *gomock.Controller) *MockRequestHandler

NewMockRequestHandler creates a new mock instance.

func (*MockRequestHandler) EXPECT added in v1.6.0

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockRequestHandler) HandleRequest added in v1.6.0

func (m *MockRequestHandler) HandleRequest(ctx context.Context, message *envelope.Message) (*envelope.Message, error)

HandleRequest mocks base method.

type MockRequestHandlerMockRecorder added in v1.6.0

type MockRequestHandlerMockRecorder struct {
	// contains filtered or unexported fields
}

MockRequestHandlerMockRecorder is the mock recorder for MockRequestHandler.

func (*MockRequestHandlerMockRecorder) HandleRequest added in v1.6.0

func (mr *MockRequestHandlerMockRecorder) HandleRequest(ctx, message interface{}) *gomock.Call

HandleRequest indicates an expected call of HandleRequest.

type MockResponder added in v1.6.0

type MockResponder struct {
	// contains filtered or unexported fields
}

MockResponder is a mock of Responder interface.

func NewMockResponder added in v1.6.0

func NewMockResponder(ctrl *gomock.Controller) *MockResponder

NewMockResponder creates a new mock instance.

func (*MockResponder) Close added in v1.6.0

func (m *MockResponder) Close(ctx context.Context) error

Close mocks base method.

func (*MockResponder) EXPECT added in v1.6.0

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockResponder) Listen added in v1.6.0

func (m *MockResponder) Listen(ctx context.Context, messageType string, handler RequestHandler) error

Listen mocks base method.

type MockResponderMockRecorder added in v1.6.0

type MockResponderMockRecorder struct {
	// contains filtered or unexported fields
}

MockResponderMockRecorder is the mock recorder for MockResponder.

func (*MockResponderMockRecorder) Close added in v1.6.0

func (mr *MockResponderMockRecorder) Close(ctx interface{}) *gomock.Call

Close indicates an expected call of Close.

func (*MockResponderMockRecorder) Listen added in v1.6.0

func (mr *MockResponderMockRecorder) Listen(ctx, messageType, handler interface{}) *gomock.Call

Listen indicates an expected call of Listen.

type MockSubscriber added in v1.5.2

type MockSubscriber struct {
	// contains filtered or unexported fields
}

MockSubscriber is a mock of Subscriber interface.

func NewMockSubscriber added in v1.5.2

func NewMockSubscriber(ctrl *gomock.Controller) *MockSubscriber

NewMockSubscriber creates a new mock instance.

func (*MockSubscriber) Close added in v1.5.2

func (m *MockSubscriber) Close(ctx context.Context) error

Close mocks base method.

func (*MockSubscriber) EXPECT added in v1.5.2

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockSubscriber) Subscribe added in v1.5.2

func (m *MockSubscriber) Subscribe(ctx context.Context, subjects ...string) error

Subscribe mocks base method.

type MockSubscriberMockRecorder added in v1.5.2

type MockSubscriberMockRecorder struct {
	// contains filtered or unexported fields
}

MockSubscriberMockRecorder is the mock recorder for MockSubscriber.

func (*MockSubscriberMockRecorder) Close added in v1.5.2

func (mr *MockSubscriberMockRecorder) Close(ctx interface{}) *gomock.Call

Close indicates an expected call of Close.

func (*MockSubscriberMockRecorder) Subscribe added in v1.5.2

func (mr *MockSubscriberMockRecorder) Subscribe(ctx interface{}, subjects ...interface{}) *gomock.Call

Subscribe indicates an expected call of Subscribe.

type NoopMessageFilter

type NoopMessageFilter struct{}

NoopMessageFilter is a no-op message filter

func (NoopMessageFilter) ShouldFilter

func (n NoopMessageFilter) ShouldFilter(_ nats.Header) bool

ShouldFilter always returns false

type NoopNotifier added in v1.6.0

type NoopNotifier struct{}

NoopNotifier is a ProcessingNotifier that does nothing

func (*NoopNotifier) OnProcessed added in v1.6.0

func (n *NoopNotifier) OnProcessed(ctx context.Context, message *envelope.Message)

OnProcessed does nothing

type OrderedPublisher added in v1.5.2

type OrderedPublisher interface {
	Publisher // Embed the Publisher interface
	PublishAsync(ctx context.Context, request PublishRequest) (PubFuture, error)
	Reset(ctx context.Context)
	Close(ctx context.Context) error
}

func NewOrderedPublisher added in v1.5.2

func NewOrderedPublisher(nc *nats.Conn, config OrderedPublisherConfig) (OrderedPublisher, error)

type OrderedPublisherConfig added in v1.5.2

type OrderedPublisherConfig struct {
	// Name identifies this publisher instance
	Name string

	// MessageSerializer handles message envelope serialization
	// Optional: defaults to envelope.NewSerializer()
	MessageSerializer envelope.MessageSerializer

	// MessageRegistry for registering and deserializing message types
	MessageRegistry *envelope.Registry

	// Destination is the exact NATS subject for all messages
	Destination string

	// DestinationPrefix is used to construct the subject by appending the message type
	// e.g. if prefix is "events", a UserCreated message type will be published to "events.UserCreated"
	DestinationPrefix string

	// AckWait is how long to wait for publish acknowledgement
	// Optional: defaults to 5s
	AckWait time.Duration

	// AckMode determines how messages should be acknowledged
	// Optional: defaults to ExplicitAck for backwards compatibility
	AckMode AckMode

	// MaxPending is the maximum number of queued messages
	// Optional: defaults to 1000
	MaxPending int

	// RetryAttempts is the number of publish retry attempts
	// Optional: defaults to 3
	RetryAttempts int

	// RetryWait is how long to wait between retry attempts
	// Optional: defaults to 1s
	RetryWait time.Duration
}

OrderedPublisherConfig defines configuration for ordered publisher

func DefaultOrderedPublisherConfig added in v1.5.2

func DefaultOrderedPublisherConfig() OrderedPublisherConfig

func (*OrderedPublisherConfig) Validate added in v1.5.2

func (c *OrderedPublisherConfig) Validate() error

type ProcessingNotifier added in v1.6.0

type ProcessingNotifier interface {
	// OnProcessed is called when a message has been successfully processed
	OnProcessed(ctx context.Context, message *envelope.Message)
}

ProcessingNotifier provides callbacks for message processing events

type PubFuture added in v1.5.2

type PubFuture interface {
	// Done returns a receive only channel that can be used to wait for the future to be done.
	Done() <-chan struct{}

	// Err returns
	// If Done is not yet closed, Err returns nil.
	Err() error

	// Result returns the result of the future.
	// If Done is not yet closed, Result returns nil.
	Result() *Result

	// Msg returns the message that was sent to the server.
	Msg() *nats.Msg

	// Wait blocks until the future is done or the context is cancelled.
	Wait(ctx context.Context) error
}

type PublishRequest added in v1.5.2

type PublishRequest struct {
	// Message is the payload to be published (required)
	Message *envelope.Message
	// Subject is the exact NATS subject to publish to
	Subject string
	// SubjectPrefix is used to construct the final subject by appending additional information
	SubjectPrefix string
}

PublishRequest encapsulates the parameters needed to publish a message. Only one of Subject or SubjectPrefix should be set, not both.

func NewPublishRequest added in v1.5.2

func NewPublishRequest(message *envelope.Message) PublishRequest

NewPublishRequest creates a new PublishRequest

func (PublishRequest) WithSubject added in v1.5.2

func (r PublishRequest) WithSubject(subject string) PublishRequest

WithSubject sets the subject for the PublishRequest

func (PublishRequest) WithSubjectPrefix added in v1.5.2

func (r PublishRequest) WithSubjectPrefix(prefix string) PublishRequest

WithSubjectPrefix sets the subject prefix for the PublishRequest

type Publisher

type Publisher interface {
	// Publish sends a message without expecting a response
	Publish(ctx context.Context, request PublishRequest) error

	// Request sends a message and waits for a response
	// Returns error if no response is received within the timeout
	Request(ctx context.Context, request PublishRequest) (*envelope.Message, error)
}

Publisher interface combines publish and request operations

func NewPublisher

func NewPublisher(nc *nats.Conn, config PublisherConfig) (Publisher, error)

NewPublisher creates a new publisher that can handle both publish and request operations

type PublisherConfig added in v1.5.2

type PublisherConfig struct {
	// Name identifies this publisher instance
	Name string

	// MessageSerializer handles message envelope serialization
	// Optional: defaults to envelope.NewSerializer()
	MessageSerializer envelope.MessageSerializer

	// MessageRegistry for registering and deserializing message types
	MessageRegistry *envelope.Registry

	// Destination is the exact NATS subject for all messages
	Destination string

	// DestinationPrefix is used to construct the subject by appending the message type
	// e.g. if prefix is "events", a UserCreated message type will be published to "events.UserCreated"
	DestinationPrefix string
}

PublisherConfig defines configuration for a NATS publisher

func DefaultPublisherConfig added in v1.5.2

func DefaultPublisherConfig() PublisherConfig

func (*PublisherConfig) Validate added in v1.5.2

func (c *PublisherConfig) Validate() error

Validate checks if the publisher options are properly configured

type RequestHandler added in v1.6.0

type RequestHandler interface {
	// HandleRequest processes a request message and returns a response
	HandleRequest(ctx context.Context, message *envelope.Message) (*envelope.Message, error)
}

RequestHandler processes incoming requests and returns responses

type RequestHandlerFunc added in v1.6.0

type RequestHandlerFunc func(ctx context.Context, message *envelope.Message) (*envelope.Message, error)

RequestHandlerFunc is a function type that implements RequestHandler

func (RequestHandlerFunc) HandleRequest added in v1.6.0

func (f RequestHandlerFunc) HandleRequest(ctx context.Context, message *envelope.Message) (*envelope.Message, error)

type Responder added in v1.6.0

type Responder interface {
	// Listen starts listening for requests of the given type
	Listen(ctx context.Context, messageType string, handler RequestHandler) error
	// Close stops listening for requests
	Close(ctx context.Context) error
}

Responder handles incoming requests and sends back responses

func NewResponder added in v1.6.0

func NewResponder(nc *nats.Conn, config ResponderConfig) (Responder, error)

NewResponder creates a new responder instance

type ResponderConfig added in v1.6.0

type ResponderConfig struct {
	// Name identifies this responder instance
	Name string

	// MessageSerializer handles message envelope serialization
	// Optional: defaults to envelope.NewSerializer()
	MessageSerializer envelope.MessageSerializer

	// MessageRegistry for registering and deserializing message types
	MessageRegistry *envelope.Registry

	// Subject is the NATS subject to subscribe to
	Subject string

	// ProcessingTimeout is the maximum time allowed for processing a request
	// Optional: defaults to 5 seconds
	ProcessingTimeout time.Duration
}

ResponderConfig defines configuration for request handlers

func DefaultResponderConfig added in v1.6.0

func DefaultResponderConfig() ResponderConfig

DefaultResponderConfig returns a new ResponderConfig with default values

func (*ResponderConfig) Validate added in v1.6.0

func (c *ResponderConfig) Validate() error

Validate checks if the config is valid

type Result added in v1.5.2

type Result struct {
	Error string `json:"error,omitempty"`
	Delay int64  `json:"delay,omitempty"`
}

Result represents the result of message processing

func NewResult added in v1.5.2

func NewResult() *Result

NewResult creates a new reply

func ParseResult added in v1.5.2

func ParseResult(msg *nats.Msg) (*Result, error)

ParseResult parses result from NATS message

func (*Result) DelayDuration added in v1.5.2

func (r *Result) DelayDuration() time.Duration

DelayDuration converts delay to time.Duration

func (*Result) Err added in v1.5.2

func (r *Result) Err() error

Err converts result to error if it represents a failure

func (*Result) WithDelay added in v1.5.2

func (r *Result) WithDelay(delay time.Duration) *Result

WithDelay sets the delay

func (*Result) WithError added in v1.5.2

func (r *Result) WithError(err error) *Result

WithError sets the error message

type Subscriber

type Subscriber interface {
	Subscribe(ctx context.Context, subjects ...string) error
	Close(ctx context.Context) error
}

Subscriber subscribes to messages from a NATS server

func NewSubscriber

func NewSubscriber(nc *nats.Conn, config SubscriberConfig) (Subscriber, error)

NewSubscriber creates a new subscriber with the given options

type SubscriberConfig added in v1.5.2

type SubscriberConfig struct {
	// Name identifies this subscriber instance
	Name string

	// MessageSerializer handles message envelope serialization
	// Optional: defaults to envelope.NewSerializer()
	MessageSerializer envelope.MessageSerializer

	// MessageRegistry for registering and deserializing message types
	MessageRegistry *envelope.Registry

	// MessageHandler processes received messages
	MessageHandler MessageHandler

	// MessageFilter determines which messages should be processed
	// Optional: defaults to NoopMessageFilter which processes all messages
	MessageFilter MessageFilter

	// ProcessingNotifier is notified when messages are successfully processed
	// Optional: defaults to NoopNotifier
	ProcessingNotifier ProcessingNotifier

	// ProcessingTimeout is the maximum time allowed for processing a message
	// Optional: defaults to 5 seconds
	ProcessingTimeout time.Duration

	// Backoff strategy for failed message processing
	// Optional: defaults to exponential backoff
	Backoff backoff.Backoff
}

SubscriberConfig defines configuration for a NATS subscriber

func DefaultSubscriberConfig added in v1.5.2

func DefaultSubscriberConfig() SubscriberConfig

func (*SubscriberConfig) Validate added in v1.5.2

func (c *SubscriberConfig) Validate() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL