ncl

package
v1.5.0-dev14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

README

Bacalhau NCL (NATS Client Library)

Overview

The NCL (NATS Client Library) is an internal library for Bacalhau, designed to provide reliable, scalable, and efficient communication between orchestrator and compute nodes. It leverages NATS for messaging and implements an event-driven architecture with support for asynchronous communication, granular event logging, and robust state management.

Key Components

  1. EnvelopedRawMessageSerDe: Handles serialization and deserialization of RawMessages with versioning and CRC checks.
  2. MessageSerDeRegistry: Manages serialization and deserialization of different message types.
  3. Publisher: Handles asynchronous message publishing.
  4. Subscriber: Manages message consumption and processing.
  5. MessageHandler: Interface for processing received messages.
  6. MessageFilter: Interface for filtering incoming messages.
  7. Checkpointer: Interface for managing checkpoints in message processing.

Technical Details

Message Flow
  1. Publishing:

    • The publisher accepts a Message struct through its Publish method.
    • The MessageSerDeRegistry serializes the Message into a RawMessage using the appropriate MessageSerDe for the message type.
    • The EnvelopedRawMessageSerDe serializes the RawMessage into a byte slice with an envelope containing a version byte and a CRC checksum.
    • The serialized message is published to NATS using the configured subject.
  2. Subscribing:

    • The subscriber sets up a NATS subscription for specified subjects.
    • When a message is received, it's passed to the processMessage method.
    • The EnvelopedRawMessageSerDe deserializes the raw bytes into a RawMessage. The envelope version helps determine the deserialization method, and the CRC checksum is used to verify the message integrity.
    • The message filter is applied to determine if the message should be processed.
    • The MessageSerDeRegistry deserializes the RawMessage into a Message using the appropriate MessageSerDe for the message type.
    • The deserialized Message is passed to each configured MessageHandler.
Serialization/Deserialization (SerDe) Flow
  1. Message to bytes (for sending): Message -> MessageSerDe.Serialize() -> RawMessage -> EnvelopedRawMessageSerDe.Serialize() -> []byte

  2. Bytes to Message (when receiving): []byte -> EnvelopedRawMessageSerDe.Deserialize() -> RawMessage -> MessageSerDe.Deserialize() -> Message

EnvelopedRawMessageSerDe

The EnvelopedRawMessageSerDe adds a version byte and a CRC checksum to each serialized RawMessage. The envelope structure is as follows:

+----------------+----------------+--------------------+
| Version (1 byte)| CRC (4 bytes) | Serialized Message |
+----------------+----------------+--------------------+

This allows for future extensibility, backward compatibility, and data integrity verification.

MessageSerDeRegistry

The MessageSerDeRegistry manages the serialization and deserialization of different message types. It allows registering custom message types with unique names and provides methods for serializing and deserializing messages.

Key methods:

  • Register(name string, messageType any, serde MessageSerDe) error
  • Serialize(message *Message) (*RawMessage, error)
  • Deserialize(rawMessage *RawMessage) (*Message, error)
Publisher

The publisher struct handles message publishing. It supports asynchronous publishing and can be configured with options like message serializer, MessageSerDeRegistry, and destination subject or prefix.

Key method:

  • Publish(ctx context.Context, message *Message) error
Subscriber

The subscriber struct manages message consumption. It sets up NATS subscriptions, processes incoming messages, and routes them to the appropriate message handlers.

Key methods:

  • Subscribe(subjects ...string) error
  • Close(ctx context.Context) error

Usage Within Bacalhau

This library is designed to be used internally within the Bacalhau project. It should be integrated into the orchestrator and compute node components to handle all inter-node communication.

Example integration points:

  1. Job assignment from orchestrator to compute nodes
  2. Status updates from compute nodes to orchestrator
  3. Heartbeat messages for health monitoring

Documentation

Index

Constants

View Source
const (
	KeyMessageID       = "MessageID"
	KeyMessageType     = "Type"
	KeySource          = "Source"
	KeyEventTime       = "EventTime"
	KeyPayloadEncoding = "PayloadEncoding"
)

Metadata keys

View Source
const (
	// JSONPayloadSerDeType is the identifier for JSON serialization/deserialization
	JSONPayloadSerDeType = "json"
	// DefaultPayloadEncoding is the default encoding used if none is specified
	DefaultPayloadEncoding = JSONPayloadSerDeType
)
View Source
const (
	ErrNilMessage = "message is nil"
	ErrEmptyData  = "data is empty"
)
View Source
const (
	ErrMsgTooShort  = "too short"
	ErrMsgCRCFailed = "CRC check failed"
)

Error message constants

View Source
const (
	VersionSize = 1
	CRCSize     = 4
	HeaderSize  = VersionSize + CRCSize
)

Variables

This section is empty.

Functions

func NewErrDeserializationFailed

func NewErrDeserializationFailed(encoding string, err error) error

NewErrDeserializationFailed creates a new ErrDeserializationFailed error.

func NewErrSerializationFailed

func NewErrSerializationFailed(encoding string, err error) error

NewErrSerializationFailed creates a new ErrSerializationFailed error.

func NewErrUnexpectedPayloadType

func NewErrUnexpectedPayloadType(expected, actual string) error

NewErrUnexpectedPayloadType creates a new ErrUnexpectedPayloadType error.

Types

type Checkpointer

type Checkpointer interface {
	Checkpoint(message *Message) error
	GetLastCheckpoint() (int64, error)
}

Checkpointer interface for managing checkpoints

type EnvelopedRawMessageSerDe

type EnvelopedRawMessageSerDe struct {
	// contains filtered or unexported fields
}

EnvelopedRawMessageSerDe handles the serialization and deserialization of messages with version information and CRC checks. It wraps the actual message serialization with additional metadata for versioning and integrity checking.

Envelope Structure: +----------------+----------------+--------------------+ | Version (1 byte)| CRC (4 bytes) | Serialized Message | +----------------+----------------+--------------------+

- Version: Indicates the schema version used for serialization (1 byte) - CRC: A 32-bit CRC checksum of the serialized message (4 bytes) - Serialized Message: The actual message content, serialized by a version-specific serializer

The EnvelopedRawMessageSerDe adds a version byte and a CRC checksum to each serialized message, allowing for future extensibility, backward compatibility, and data integrity verification.

func NewEnvelopedRawMessageSerDe

func NewEnvelopedRawMessageSerDe() *EnvelopedRawMessageSerDe

NewEnvelopedRawMessageSerDe creates a new EnvelopedRawMessageSerDe with default serializers

func (*EnvelopedRawMessageSerDe) Deserialize

func (v *EnvelopedRawMessageSerDe) Deserialize(data []byte) (*RawMessage, error)

Deserialize decodes a byte slice into a RawMessage. It verifies the schema version and CRC checksum before using the appropriate deserializer to decode the message.

func (*EnvelopedRawMessageSerDe) Serialize

func (v *EnvelopedRawMessageSerDe) Serialize(msg *RawMessage) ([]byte, error)

Serialize encodes a RawMessage into a byte slice, adding version information and a CRC checksum. It uses the serializer corresponding to the current serializationVersion.

func (*EnvelopedRawMessageSerDe) WithSerializationVersion

func (v *EnvelopedRawMessageSerDe) WithSerializationVersion(version SchemaVersion) *EnvelopedRawMessageSerDe

WithSerializationVersion sets the schema version used for serialization. This version will be used for all subsequent Serialize calls. It does not affect the deserialization of messages.

type ErrBadMessage

type ErrBadMessage struct {
	Reason string
}

ErrBadMessage is returned when a message is malformed or invalid.

func NewErrBadMessage

func NewErrBadMessage(reason string) *ErrBadMessage

NewErrBadMessage creates a new ErrBadMessage error.

func (*ErrBadMessage) Error

func (e *ErrBadMessage) Error() string

Error implements the error interface for ErrBadMessage.

type ErrBadPayload

type ErrBadPayload struct {
	Reason string
}

ErrBadPayload is returned when a payload is malformed or invalid.

func NewErrBadPayload

func NewErrBadPayload(reason string) *ErrBadPayload

NewErrBadPayload creates a new ErrBadPayload error.

func (*ErrBadPayload) Error

func (e *ErrBadPayload) Error() string

Error implements the error interface for ErrBadPayload.

type ErrDeserializationFailed

type ErrDeserializationFailed struct {
	Encoding string
	Err      error
}

ErrDeserializationFailed is returned when payload deserialization fails.

func (*ErrDeserializationFailed) Error

func (e *ErrDeserializationFailed) Error() string

Error implements the error interface for ErrDeserializationFailed.

func (*ErrDeserializationFailed) Unwrap

func (e *ErrDeserializationFailed) Unwrap() error

Unwrap returns the underlying error for ErrDeserializationFailed.

type ErrSerializationFailed

type ErrSerializationFailed struct {
	Encoding string
	Err      error
}

ErrSerializationFailed is returned when serialization fails.

func (*ErrSerializationFailed) Error

func (e *ErrSerializationFailed) Error() string

Error implements the error interface for ErrSerializationFailed.

func (*ErrSerializationFailed) Unwrap

func (e *ErrSerializationFailed) Unwrap() error

Unwrap returns the underlying error for ErrSerializationFailed.

type ErrUnexpectedPayloadType

type ErrUnexpectedPayloadType struct {
	Expected string
	Actual   string
}

ErrUnexpectedPayloadType is returned when the payload type is unexpected.

func (*ErrUnexpectedPayloadType) Error

func (e *ErrUnexpectedPayloadType) Error() string

Error implements the error interface for ErrUnexpectedPayloadType.

type ErrUnsupportedEncoding

type ErrUnsupportedEncoding struct {
	Encoding string
}

ErrUnsupportedEncoding is returned when an unsupported encoding is encountered.

func NewErrUnsupportedEncoding

func NewErrUnsupportedEncoding(encoding string) *ErrUnsupportedEncoding

NewErrUnsupportedEncoding creates a new ErrUnsupportedEncoding error.

func (*ErrUnsupportedEncoding) Error

func (e *ErrUnsupportedEncoding) Error() string

Error implements the error interface for ErrUnsupportedEncoding.

type ErrUnsupportedMessageType

type ErrUnsupportedMessageType struct {
	Type string
}

ErrUnsupportedMessageType is returned when an unsupported message type is encountered.

func NewErrUnsupportedMessageType

func NewErrUnsupportedMessageType(messageType string) *ErrUnsupportedMessageType

NewErrUnsupportedMessageType creates a new ErrUnsupportedMessageType error.

func (*ErrUnsupportedMessageType) Error

func (e *ErrUnsupportedMessageType) Error() string

Error implements the error interface for ErrUnsupportedMessageType.

type JSONMessageSerDe

type JSONMessageSerDe struct{}

JSONMessageSerDe is a serializer/deserializer for JSON payloads

func (*JSONMessageSerDe) Deserialize

func (j *JSONMessageSerDe) Deserialize(rMsg *RawMessage, payloadType reflect.Type) (*Message, error)

Deserialize deserializes a JSON payload

func (*JSONMessageSerDe) Serialize

func (j *JSONMessageSerDe) Serialize(message *Message) (*RawMessage, error)

Serialize serializes a payload to JSON

type JSONRawMessageSerializer

type JSONRawMessageSerializer struct{}

JSONRawMessageSerializer handles serialization and deserialization using JSON

func (*JSONRawMessageSerializer) Deserialize

func (j *JSONRawMessageSerializer) Deserialize(data []byte) (*RawMessage, error)

Deserialize decodes a JSON message into a RawMessage

func (*JSONRawMessageSerializer) Serialize

func (j *JSONRawMessageSerializer) Serialize(msg *RawMessage) ([]byte, error)

Serialize encodes a RawMessage into a JSON message

type Message

type Message struct {
	Metadata *Metadata
	Payload  any
}

Message represents a fully deserialized message ready for processing by the application. It contains metadata and a deserialized payload of any type. Message is used by message handlers, filters, and subscribers.

func NewMessage

func NewMessage(payload any) *Message

NewMessage creates a new Message with the given payload

func (*Message) GetPayload

func (m *Message) GetPayload(t interface{}) (interface{}, bool)

GetPayload retrieves the Payload as type T, handling both value and pointer types

func (*Message) IsType

func (m *Message) IsType(t interface{}) bool

IsType checks if the Message's Payload is of a specific type T

func (*Message) WithMetadata

func (m *Message) WithMetadata(metadata *Metadata) *Message

WithMetadata sets the metadata for the message

func (*Message) WithMetadataValue

func (m *Message) WithMetadataValue(key, value string) *Message

WithMetadataValue sets a key-value pair in the metadata

type MessageFilter

type MessageFilter interface {
	ShouldFilter(metadata *Metadata) bool
}

MessageFilter interface for filtering messages

type MessageFilterFunc

type MessageFilterFunc func(metadata *Metadata) bool

MessageFilterFunc is a function type that implements MessageFilter

func (MessageFilterFunc) ShouldFilter

func (f MessageFilterFunc) ShouldFilter(metadata *Metadata) bool

type MessageHandler

type MessageHandler interface {
	ShouldProcess(ctx context.Context, message *Message) bool
	HandleMessage(ctx context.Context, message *Message) error
}

MessageHandler interface for processing messages

type MessageHandlerFunc

type MessageHandlerFunc func(ctx context.Context, message *Message) error

MessageHandlerFunc is a function type that implements MessageHandler

func (MessageHandlerFunc) HandleMessage

func (f MessageHandlerFunc) HandleMessage(ctx context.Context, message *Message) error

func (MessageHandlerFunc) ShouldProcess

func (f MessageHandlerFunc) ShouldProcess(ctx context.Context, message *Message) bool

type MessageSerDe

type MessageSerDe interface {
	Serialize(message *Message) (*RawMessage, error)
	Deserialize(rawMessage *RawMessage, payloadType reflect.Type) (*Message, error)
}

MessageSerDe interface for serializing and deserializing messages to and from raw messages.

type MessageSerDeRegistry

type MessageSerDeRegistry struct {
	// contains filtered or unexported fields
}

MessageSerDeRegistry manages the serialization and deserialization of the Payload field in NATS Message structs. It simplifies payload handling in your NCL library by providing:

1. Type Registration: Allows registering custom payload types with unique names. 2. Serialization Management: Handles serialization and deserialization of payloads using different encoding methods. 3. Type Resolution: Provides a mechanism to resolve between type names and their corresponding Go types.

The MessageSerDeRegistry adds value to your NATS-based communication library by:

  • Automatic Payload Handling: Users can set any registered Go struct as the Message.Payload without worrying about serialization. The manager handles this based on pre-configuration.

  • Type Safety: By registering payload types, the system ensures that only known, expected types are used as payloads, reducing runtime errors and improving system reliability.

  • Flexibility: Supports multiple serialization formats for payloads, allowing different message types to use the most appropriate format for their needs.

  • Centralized Payload Type Management: Provides a single point of configuration for all payload types used in the system, simplifying maintenance and reducing code duplication.

This abstraction significantly reduces the complexity of working with payload data in NATS messages, allowing developers to focus on business logic rather than payload encoding details.

func NewMessageSerDeRegistry

func NewMessageSerDeRegistry() *MessageSerDeRegistry

NewMessageSerDeRegistry creates and initializes a new MessageSerDeRegistry It sets up the internal maps and registers the default JSON serializer

func (*MessageSerDeRegistry) Deserialize

func (r *MessageSerDeRegistry) Deserialize(rawMessage *RawMessage) (*Message, error)

Deserialize deserializes a raw message using the specified deserializer It retrieves the correct deserializer, gets the payload type, and performs the deserialization Usage:

message, err := manager.Deserialize(rawMessage)

func (*MessageSerDeRegistry) Register

func (r *MessageSerDeRegistry) Register(name string, payload any) error

Register adds a new payload type to the manager It registers both the name-to-type and type-to-name mappings Usage:

manager.Register("MyCustomType", MyCustomType{})

func (*MessageSerDeRegistry) Serialize

func (r *MessageSerDeRegistry) Serialize(message *Message) (*RawMessage, error)

Serialize serializes a message using the specified serializer It handles default encoding, retrieves the correct serializer, and performs the serialization Usage:

rawMessage, err := manager.Serialize(message)

type Metadata

type Metadata map[string]string

Metadata contains metadata about the message

func NewMetadataFromMap

func NewMetadataFromMap(m map[string]string) *Metadata

NewMetadataFromMap creates a new shallow copy Metadata object from a map. Changes to the map will be reflected in the Metadata object, but more efficient than NewMetadataFromMapCopy

func NewMetadataFromMapCopy

func NewMetadataFromMapCopy(m map[string]string) *Metadata

NewMetadataFromMapCopy creates a new deepcopy Metadata object from a map. Changes to the map will not be reflected in the Metadata object

func (Metadata) Get

func (m Metadata) Get(key string) string

Get returns the value for a given key, or an empty string if the key doesn't exist

func (Metadata) GetInt

func (m Metadata) GetInt(key string) int

GetInt gets the value as an int, returning 0 if the key doesn't exist or the value isn't a valid int

func (Metadata) GetInt64

func (m Metadata) GetInt64(key string) int64

GetInt64 gets the value as an int64, returning 0 if the key doesn't exist or the value isn't a valid int64

func (Metadata) GetTime

func (m Metadata) GetTime(key string) time.Time

GetTime gets the value as a time.Time, returning the zero time if the key doesn't exist or the value isn't a valid time

func (Metadata) Has

func (m Metadata) Has(key string) bool

Has checks if a key exists in the metadata

func (Metadata) Set

func (m Metadata) Set(key, value string)

Set sets the value for a given key

func (Metadata) SetInt

func (m Metadata) SetInt(key string, value int)

SetInt sets the value for a given key as an int

func (Metadata) SetInt64

func (m Metadata) SetInt64(key string, value int64)

SetInt64 sets the value for a given key as an int64

func (Metadata) SetTime

func (m Metadata) SetTime(key string, value time.Time)

SetTime sets the value for a given key as a time.Time

func (Metadata) ToMap

func (m Metadata) ToMap() map[string]string

ToMap returns the Metadata as a regular map[string]string

type NoopCheckpointer

type NoopCheckpointer struct{}

NoopCheckpointer is a Checkpointer that does nothing

func (*NoopCheckpointer) Checkpoint

func (n *NoopCheckpointer) Checkpoint(_ *Message) error

Checkpoint does nothing

func (*NoopCheckpointer) GetLastCheckpoint

func (n *NoopCheckpointer) GetLastCheckpoint() (int64, error)

GetLastCheckpoint returns 0

type NoopMessageFilter

type NoopMessageFilter struct{}

NoopMessageFilter is a no-op message filter

func (NoopMessageFilter) ShouldFilter

func (n NoopMessageFilter) ShouldFilter(_ *Metadata) bool

ShouldFilter always returns false

type ProtoSerializer

type ProtoSerializer struct{}

ProtoSerializer handles serialization and deserialization using Protocol Buffers v1

func (*ProtoSerializer) Deserialize

func (p *ProtoSerializer) Deserialize(data []byte) (*RawMessage, error)

Deserialize decodes a Protocol Buffers message into a RawMessage

func (*ProtoSerializer) Serialize

func (p *ProtoSerializer) Serialize(msg *RawMessage) ([]byte, error)

Serialize encodes a RawMessage into a Protocol Buffers message

type Publisher

type Publisher interface {
	Publish(ctx context.Context, message *Message) error
}

Publisher publishes messages to a NATS server

func NewPublisher

func NewPublisher(nc *nats.Conn, opts ...PublisherOption) (Publisher, error)

NewPublisher creates a new publisher with the given options

type PublisherOption

type PublisherOption func(*publisher)

PublisherOption is a function type for configuring a publisher

func WithPublisherDestination

func WithPublisherDestination(destination string) PublisherOption

WithPublisherDestination sets the destination for the publisher The destination is used as the subject for the message to be published Caution: cannot be used with WithPublisherDestinationPrefix

func WithPublisherDestinationPrefix

func WithPublisherDestinationPrefix(prefix string) PublisherOption

WithPublisherDestinationPrefix sets the destination prefix for the publisher The destination prefix is used to construct the subject for the message to be published The subject is constructed as follows: destinationPrefix + "." + messageType Caution: cannot be used with WithPublisherDestination

func WithPublisherMessageSerDeRegistry

func WithPublisherMessageSerDeRegistry(registry *MessageSerDeRegistry) PublisherOption

WithPublisherMessageSerDeRegistry sets the payload registry for the publisher

func WithPublisherMessageSerializer

func WithPublisherMessageSerializer(serializer RawMessageSerDe) PublisherOption

WithPublisherMessageSerializer sets the message serializer for the publisher

func WithPublisherName

func WithPublisherName(name string) PublisherOption

WithPublisherName sets the name for the publisher

type RawMessage

type RawMessage struct {
	Metadata *Metadata
	Payload  []byte
}

RawMessage represents a message after envelope handling but before payload deserialization. It contains metadata and a byte slice payload, which is the serialized form of the actual message content. RawMessage is the interface between RawMessageSerDe and MessageSerDe.

type RawMessageSerDe

type RawMessageSerDe interface {
	Serialize(*RawMessage) ([]byte, error)
	Deserialize([]byte) (*RawMessage, error)
}

RawMessageSerDe interface for serializing and deserializing raw messages to and from byte slices.

type SchemaVersion

type SchemaVersion byte

SchemaVersion represents the version of the serialization schema

const (
	SchemaVersionJSONV1     SchemaVersion = 1
	SchemaVersionProtobufV1 SchemaVersion = 2
	DefaultSchemaVersion                  = SchemaVersionJSONV1
)

Version and size constants

func (SchemaVersion) String

func (v SchemaVersion) String() string

String returns a string representation of the schema version

type Subscriber

type Subscriber interface {
	Subscribe(subjects ...string) error
	Close(ctx context.Context) error
}

Subscriber subscribes to messages from a NATS server

func NewSubscriber

func NewSubscriber(nc *nats.Conn, opts ...SubscriberOption) (Subscriber, error)

NewSubscriber creates a new subscriber with the given options

type SubscriberOption

type SubscriberOption func(*subscriber)

SubscriberOption is a function type for configuring a subscriber

func WithSubscriberCheckpointer

func WithSubscriberCheckpointer(checkpointer Checkpointer) SubscriberOption

WithSubscriberCheckpointer sets the checkpointer for the subscriber

func WithSubscriberMessageDeserializer

func WithSubscriberMessageDeserializer(deserializer RawMessageSerDe) SubscriberOption

WithSubscriberMessageDeserializer sets the message deserializer for the subscriber

func WithSubscriberMessageFilter

func WithSubscriberMessageFilter(filter MessageFilter) SubscriberOption

WithSubscriberMessageFilter sets the message filter for the subscriber

func WithSubscriberMessageHandlers

func WithSubscriberMessageHandlers(handlers ...MessageHandler) SubscriberOption

WithSubscriberMessageHandlers sets the message handlers for the subscriber

func WithSubscriberMessageSerDeRegistry

func WithSubscriberMessageSerDeRegistry(registry *MessageSerDeRegistry) SubscriberOption

WithSubscriberMessageSerDeRegistry sets the payload registry for the subscriber

Directories

Path Synopsis
proto
v1

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL