Documentation ¶
Index ¶
- Constants
- func NewErrDeserializationFailed(encoding string, err error) error
- func NewErrSerializationFailed(encoding string, err error) error
- func NewErrUnexpectedPayloadType(expected, actual string) error
- type Checkpointer
- type EnvelopedRawMessageSerDe
- type ErrBadMessage
- type ErrBadPayload
- type ErrDeserializationFailed
- type ErrSerializationFailed
- type ErrUnexpectedPayloadType
- type ErrUnsupportedEncoding
- type ErrUnsupportedMessageType
- type JSONMessageSerDe
- type JSONRawMessageSerializer
- type Message
- type MessageFilter
- type MessageFilterFunc
- type MessageHandler
- type MessageHandlerFunc
- type MessageSerDe
- type MessageSerDeRegistry
- type Metadata
- func (m Metadata) Get(key string) string
- func (m Metadata) GetInt(key string) int
- func (m Metadata) GetInt64(key string) int64
- func (m Metadata) GetTime(key string) time.Time
- func (m Metadata) Has(key string) bool
- func (m Metadata) Set(key, value string)
- func (m Metadata) SetInt(key string, value int)
- func (m Metadata) SetInt64(key string, value int64)
- func (m Metadata) SetTime(key string, value time.Time)
- func (m Metadata) ToMap() map[string]string
- type NoopCheckpointer
- type NoopMessageFilter
- type ProtoSerializer
- type Publisher
- type PublisherOption
- func WithPublisherDestination(destination string) PublisherOption
- func WithPublisherDestinationPrefix(prefix string) PublisherOption
- func WithPublisherMessageSerDeRegistry(registry *MessageSerDeRegistry) PublisherOption
- func WithPublisherMessageSerializer(serializer RawMessageSerDe) PublisherOption
- func WithPublisherName(name string) PublisherOption
- type RawMessage
- type RawMessageSerDe
- type SchemaVersion
- type Subscriber
- type SubscriberOption
- func WithSubscriberCheckpointer(checkpointer Checkpointer) SubscriberOption
- func WithSubscriberMessageDeserializer(deserializer RawMessageSerDe) SubscriberOption
- func WithSubscriberMessageFilter(filter MessageFilter) SubscriberOption
- func WithSubscriberMessageHandlers(handlers ...MessageHandler) SubscriberOption
- func WithSubscriberMessageSerDeRegistry(registry *MessageSerDeRegistry) SubscriberOption
Constants ¶
const ( KeyMessageID = "MessageID" KeyMessageType = "Type" KeySource = "Source" KeyEventTime = "EventTime" KeyPayloadEncoding = "PayloadEncoding" )
Metadata keys
const ( // JSONPayloadSerDeType is the identifier for JSON serialization/deserialization JSONPayloadSerDeType = "json" // DefaultPayloadEncoding is the default encoding used if none is specified DefaultPayloadEncoding = JSONPayloadSerDeType )
const ( ErrNilMessage = "message is nil" ErrEmptyData = "data is empty" )
const ( ErrMsgTooShort = "too short" ErrMsgCRCFailed = "CRC check failed" )
Error message constants
const ( VersionSize = 1 CRCSize = 4 HeaderSize = VersionSize + CRCSize )
Variables ¶
This section is empty.
Functions ¶
func NewErrDeserializationFailed ¶
NewErrDeserializationFailed creates a new ErrDeserializationFailed error.
func NewErrSerializationFailed ¶
NewErrSerializationFailed creates a new ErrSerializationFailed error.
func NewErrUnexpectedPayloadType ¶
NewErrUnexpectedPayloadType creates a new ErrUnexpectedPayloadType error.
Types ¶
type Checkpointer ¶
type Checkpointer interface { Checkpoint(message *Message) error GetLastCheckpoint() (int64, error) }
Checkpointer interface for managing checkpoints
type EnvelopedRawMessageSerDe ¶
type EnvelopedRawMessageSerDe struct {
// contains filtered or unexported fields
}
EnvelopedRawMessageSerDe handles the serialization and deserialization of messages with version information and CRC checks. It wraps the actual message serialization with additional metadata for versioning and integrity checking.
Envelope Structure: +----------------+----------------+--------------------+ | Version (1 byte)| CRC (4 bytes) | Serialized Message | +----------------+----------------+--------------------+
- Version: Indicates the schema version used for serialization (1 byte) - CRC: A 32-bit CRC checksum of the serialized message (4 bytes) - Serialized Message: The actual message content, serialized by a version-specific serializer
The EnvelopedRawMessageSerDe adds a version byte and a CRC checksum to each serialized message, allowing for future extensibility, backward compatibility, and data integrity verification.
func NewEnvelopedRawMessageSerDe ¶
func NewEnvelopedRawMessageSerDe() *EnvelopedRawMessageSerDe
NewEnvelopedRawMessageSerDe creates a new EnvelopedRawMessageSerDe with default serializers
func (*EnvelopedRawMessageSerDe) Deserialize ¶
func (v *EnvelopedRawMessageSerDe) Deserialize(data []byte) (*RawMessage, error)
Deserialize decodes a byte slice into a RawMessage. It verifies the schema version and CRC checksum before using the appropriate deserializer to decode the message.
func (*EnvelopedRawMessageSerDe) Serialize ¶
func (v *EnvelopedRawMessageSerDe) Serialize(msg *RawMessage) ([]byte, error)
Serialize encodes a RawMessage into a byte slice, adding version information and a CRC checksum. It uses the serializer corresponding to the current serializationVersion.
func (*EnvelopedRawMessageSerDe) WithSerializationVersion ¶
func (v *EnvelopedRawMessageSerDe) WithSerializationVersion(version SchemaVersion) *EnvelopedRawMessageSerDe
WithSerializationVersion sets the schema version used for serialization. This version will be used for all subsequent Serialize calls. It does not affect the deserialization of messages.
type ErrBadMessage ¶
type ErrBadMessage struct {
Reason string
}
ErrBadMessage is returned when a message is malformed or invalid.
func NewErrBadMessage ¶
func NewErrBadMessage(reason string) *ErrBadMessage
NewErrBadMessage creates a new ErrBadMessage error.
func (*ErrBadMessage) Error ¶
func (e *ErrBadMessage) Error() string
Error implements the error interface for ErrBadMessage.
type ErrBadPayload ¶
type ErrBadPayload struct {
Reason string
}
ErrBadPayload is returned when a payload is malformed or invalid.
func NewErrBadPayload ¶
func NewErrBadPayload(reason string) *ErrBadPayload
NewErrBadPayload creates a new ErrBadPayload error.
func (*ErrBadPayload) Error ¶
func (e *ErrBadPayload) Error() string
Error implements the error interface for ErrBadPayload.
type ErrDeserializationFailed ¶
ErrDeserializationFailed is returned when payload deserialization fails.
func (*ErrDeserializationFailed) Error ¶
func (e *ErrDeserializationFailed) Error() string
Error implements the error interface for ErrDeserializationFailed.
func (*ErrDeserializationFailed) Unwrap ¶
func (e *ErrDeserializationFailed) Unwrap() error
Unwrap returns the underlying error for ErrDeserializationFailed.
type ErrSerializationFailed ¶
ErrSerializationFailed is returned when serialization fails.
func (*ErrSerializationFailed) Error ¶
func (e *ErrSerializationFailed) Error() string
Error implements the error interface for ErrSerializationFailed.
func (*ErrSerializationFailed) Unwrap ¶
func (e *ErrSerializationFailed) Unwrap() error
Unwrap returns the underlying error for ErrSerializationFailed.
type ErrUnexpectedPayloadType ¶
ErrUnexpectedPayloadType is returned when the payload type is unexpected.
func (*ErrUnexpectedPayloadType) Error ¶
func (e *ErrUnexpectedPayloadType) Error() string
Error implements the error interface for ErrUnexpectedPayloadType.
type ErrUnsupportedEncoding ¶
type ErrUnsupportedEncoding struct {
Encoding string
}
ErrUnsupportedEncoding is returned when an unsupported encoding is encountered.
func NewErrUnsupportedEncoding ¶
func NewErrUnsupportedEncoding(encoding string) *ErrUnsupportedEncoding
NewErrUnsupportedEncoding creates a new ErrUnsupportedEncoding error.
func (*ErrUnsupportedEncoding) Error ¶
func (e *ErrUnsupportedEncoding) Error() string
Error implements the error interface for ErrUnsupportedEncoding.
type ErrUnsupportedMessageType ¶
type ErrUnsupportedMessageType struct {
Type string
}
ErrUnsupportedMessageType is returned when an unsupported message type is encountered.
func NewErrUnsupportedMessageType ¶
func NewErrUnsupportedMessageType(messageType string) *ErrUnsupportedMessageType
NewErrUnsupportedMessageType creates a new ErrUnsupportedMessageType error.
func (*ErrUnsupportedMessageType) Error ¶
func (e *ErrUnsupportedMessageType) Error() string
Error implements the error interface for ErrUnsupportedMessageType.
type JSONMessageSerDe ¶
type JSONMessageSerDe struct{}
JSONMessageSerDe is a serializer/deserializer for JSON payloads
func (*JSONMessageSerDe) Deserialize ¶
func (j *JSONMessageSerDe) Deserialize(rMsg *RawMessage, payloadType reflect.Type) (*Message, error)
Deserialize deserializes a JSON payload
func (*JSONMessageSerDe) Serialize ¶
func (j *JSONMessageSerDe) Serialize(message *Message) (*RawMessage, error)
Serialize serializes a payload to JSON
type JSONRawMessageSerializer ¶
type JSONRawMessageSerializer struct{}
JSONRawMessageSerializer handles serialization and deserialization using JSON
func (*JSONRawMessageSerializer) Deserialize ¶
func (j *JSONRawMessageSerializer) Deserialize(data []byte) (*RawMessage, error)
Deserialize decodes a JSON message into a RawMessage
func (*JSONRawMessageSerializer) Serialize ¶
func (j *JSONRawMessageSerializer) Serialize(msg *RawMessage) ([]byte, error)
Serialize encodes a RawMessage into a JSON message
type Message ¶
Message represents a fully deserialized message ready for processing by the application. It contains metadata and a deserialized payload of any type. Message is used by message handlers, filters, and subscribers.
func NewMessage ¶
NewMessage creates a new Message with the given payload
func (*Message) GetPayload ¶
GetPayload retrieves the Payload as type T, handling both value and pointer types
func (*Message) WithMetadata ¶
WithMetadata sets the metadata for the message
func (*Message) WithMetadataValue ¶
WithMetadataValue sets a key-value pair in the metadata
type MessageFilter ¶
MessageFilter interface for filtering messages
type MessageFilterFunc ¶
MessageFilterFunc is a function type that implements MessageFilter
func (MessageFilterFunc) ShouldFilter ¶
func (f MessageFilterFunc) ShouldFilter(metadata *Metadata) bool
type MessageHandler ¶
type MessageHandler interface { ShouldProcess(ctx context.Context, message *Message) bool HandleMessage(ctx context.Context, message *Message) error }
MessageHandler interface for processing messages
type MessageHandlerFunc ¶
MessageHandlerFunc is a function type that implements MessageHandler
func (MessageHandlerFunc) HandleMessage ¶
func (f MessageHandlerFunc) HandleMessage(ctx context.Context, message *Message) error
func (MessageHandlerFunc) ShouldProcess ¶
func (f MessageHandlerFunc) ShouldProcess(ctx context.Context, message *Message) bool
type MessageSerDe ¶
type MessageSerDe interface { Serialize(message *Message) (*RawMessage, error) Deserialize(rawMessage *RawMessage, payloadType reflect.Type) (*Message, error) }
MessageSerDe interface for serializing and deserializing messages to and from raw messages.
type MessageSerDeRegistry ¶
type MessageSerDeRegistry struct {
// contains filtered or unexported fields
}
MessageSerDeRegistry manages the serialization and deserialization of the Payload field in NATS Message structs. It simplifies payload handling in your NCL library by providing:
1. Type Registration: Allows registering custom payload types with unique names. 2. Serialization Management: Handles serialization and deserialization of payloads using different encoding methods. 3. Type Resolution: Provides a mechanism to resolve between type names and their corresponding Go types.
The MessageSerDeRegistry adds value to your NATS-based communication library by:
Automatic Payload Handling: Users can set any registered Go struct as the Message.Payload without worrying about serialization. The manager handles this based on pre-configuration.
Type Safety: By registering payload types, the system ensures that only known, expected types are used as payloads, reducing runtime errors and improving system reliability.
Flexibility: Supports multiple serialization formats for payloads, allowing different message types to use the most appropriate format for their needs.
Centralized Payload Type Management: Provides a single point of configuration for all payload types used in the system, simplifying maintenance and reducing code duplication.
This abstraction significantly reduces the complexity of working with payload data in NATS messages, allowing developers to focus on business logic rather than payload encoding details.
func NewMessageSerDeRegistry ¶
func NewMessageSerDeRegistry() *MessageSerDeRegistry
NewMessageSerDeRegistry creates and initializes a new MessageSerDeRegistry It sets up the internal maps and registers the default JSON serializer
func (*MessageSerDeRegistry) Deserialize ¶
func (r *MessageSerDeRegistry) Deserialize(rawMessage *RawMessage) (*Message, error)
Deserialize deserializes a raw message using the specified deserializer It retrieves the correct deserializer, gets the payload type, and performs the deserialization Usage:
message, err := manager.Deserialize(rawMessage)
func (*MessageSerDeRegistry) Register ¶
func (r *MessageSerDeRegistry) Register(name string, payload any) error
Register adds a new payload type to the manager It registers both the name-to-type and type-to-name mappings Usage:
manager.Register("MyCustomType", MyCustomType{})
func (*MessageSerDeRegistry) Serialize ¶
func (r *MessageSerDeRegistry) Serialize(message *Message) (*RawMessage, error)
Serialize serializes a message using the specified serializer It handles default encoding, retrieves the correct serializer, and performs the serialization Usage:
rawMessage, err := manager.Serialize(message)
type Metadata ¶
Metadata contains metadata about the message
func NewMetadataFromMap ¶
NewMetadataFromMap creates a new shallow copy Metadata object from a map. Changes to the map will be reflected in the Metadata object, but more efficient than NewMetadataFromMapCopy
func NewMetadataFromMapCopy ¶
NewMetadataFromMapCopy creates a new deepcopy Metadata object from a map. Changes to the map will not be reflected in the Metadata object
func (Metadata) Get ¶
Get returns the value for a given key, or an empty string if the key doesn't exist
func (Metadata) GetInt ¶
GetInt gets the value as an int, returning 0 if the key doesn't exist or the value isn't a valid int
func (Metadata) GetInt64 ¶
GetInt64 gets the value as an int64, returning 0 if the key doesn't exist or the value isn't a valid int64
func (Metadata) GetTime ¶
GetTime gets the value as a time.Time, returning the zero time if the key doesn't exist or the value isn't a valid time
type NoopCheckpointer ¶
type NoopCheckpointer struct{}
NoopCheckpointer is a Checkpointer that does nothing
func (*NoopCheckpointer) Checkpoint ¶
func (n *NoopCheckpointer) Checkpoint(_ *Message) error
Checkpoint does nothing
func (*NoopCheckpointer) GetLastCheckpoint ¶
func (n *NoopCheckpointer) GetLastCheckpoint() (int64, error)
GetLastCheckpoint returns 0
type NoopMessageFilter ¶
type NoopMessageFilter struct{}
NoopMessageFilter is a no-op message filter
func (NoopMessageFilter) ShouldFilter ¶
func (n NoopMessageFilter) ShouldFilter(_ *Metadata) bool
ShouldFilter always returns false
type ProtoSerializer ¶
type ProtoSerializer struct{}
ProtoSerializer handles serialization and deserialization using Protocol Buffers v1
func (*ProtoSerializer) Deserialize ¶
func (p *ProtoSerializer) Deserialize(data []byte) (*RawMessage, error)
Deserialize decodes a Protocol Buffers message into a RawMessage
func (*ProtoSerializer) Serialize ¶
func (p *ProtoSerializer) Serialize(msg *RawMessage) ([]byte, error)
Serialize encodes a RawMessage into a Protocol Buffers message
type Publisher ¶
Publisher publishes messages to a NATS server
func NewPublisher ¶
func NewPublisher(nc *nats.Conn, opts ...PublisherOption) (Publisher, error)
NewPublisher creates a new publisher with the given options
type PublisherOption ¶
type PublisherOption func(*publisher)
PublisherOption is a function type for configuring a publisher
func WithPublisherDestination ¶
func WithPublisherDestination(destination string) PublisherOption
WithPublisherDestination sets the destination for the publisher The destination is used as the subject for the message to be published Caution: cannot be used with WithPublisherDestinationPrefix
func WithPublisherDestinationPrefix ¶
func WithPublisherDestinationPrefix(prefix string) PublisherOption
WithPublisherDestinationPrefix sets the destination prefix for the publisher The destination prefix is used to construct the subject for the message to be published The subject is constructed as follows: destinationPrefix + "." + messageType Caution: cannot be used with WithPublisherDestination
func WithPublisherMessageSerDeRegistry ¶
func WithPublisherMessageSerDeRegistry(registry *MessageSerDeRegistry) PublisherOption
WithPublisherMessageSerDeRegistry sets the payload registry for the publisher
func WithPublisherMessageSerializer ¶
func WithPublisherMessageSerializer(serializer RawMessageSerDe) PublisherOption
WithPublisherMessageSerializer sets the message serializer for the publisher
func WithPublisherName ¶
func WithPublisherName(name string) PublisherOption
WithPublisherName sets the name for the publisher
type RawMessage ¶
RawMessage represents a message after envelope handling but before payload deserialization. It contains metadata and a byte slice payload, which is the serialized form of the actual message content. RawMessage is the interface between RawMessageSerDe and MessageSerDe.
type RawMessageSerDe ¶
type RawMessageSerDe interface { Serialize(*RawMessage) ([]byte, error) Deserialize([]byte) (*RawMessage, error) }
RawMessageSerDe interface for serializing and deserializing raw messages to and from byte slices.
type SchemaVersion ¶
type SchemaVersion byte
SchemaVersion represents the version of the serialization schema
const ( SchemaVersionJSONV1 SchemaVersion = 1 SchemaVersionProtobufV1 SchemaVersion = 2 DefaultSchemaVersion = SchemaVersionJSONV1 )
Version and size constants
func (SchemaVersion) String ¶
func (v SchemaVersion) String() string
String returns a string representation of the schema version
type Subscriber ¶
Subscriber subscribes to messages from a NATS server
func NewSubscriber ¶
func NewSubscriber(nc *nats.Conn, opts ...SubscriberOption) (Subscriber, error)
NewSubscriber creates a new subscriber with the given options
type SubscriberOption ¶
type SubscriberOption func(*subscriber)
SubscriberOption is a function type for configuring a subscriber
func WithSubscriberCheckpointer ¶
func WithSubscriberCheckpointer(checkpointer Checkpointer) SubscriberOption
WithSubscriberCheckpointer sets the checkpointer for the subscriber
func WithSubscriberMessageDeserializer ¶
func WithSubscriberMessageDeserializer(deserializer RawMessageSerDe) SubscriberOption
WithSubscriberMessageDeserializer sets the message deserializer for the subscriber
func WithSubscriberMessageFilter ¶
func WithSubscriberMessageFilter(filter MessageFilter) SubscriberOption
WithSubscriberMessageFilter sets the message filter for the subscriber
func WithSubscriberMessageHandlers ¶
func WithSubscriberMessageHandlers(handlers ...MessageHandler) SubscriberOption
WithSubscriberMessageHandlers sets the message handlers for the subscriber
func WithSubscriberMessageSerDeRegistry ¶
func WithSubscriberMessageSerDeRegistry(registry *MessageSerDeRegistry) SubscriberOption
WithSubscriberMessageSerDeRegistry sets the payload registry for the subscriber