Documentation ¶
Index ¶
- Constants
- func NewErrDeserializationFailed(encoding string, err error) error
- func NewErrSerializationFailed(encoding string, err error) error
- func NewErrUnexpectedPayloadType(expected, actual string) error
- type EncodedMessage
- type ErrAlreadyRegistered
- type ErrBadMessage
- type ErrBadPayload
- type ErrDeserializationFailed
- type ErrSerializationFailed
- type ErrUnexpectedPayloadType
- type ErrUnsupportedEncoding
- type ErrUnsupportedMessageType
- type JSONMessageSerializer
- type JSONPayloadSerializer
- type Message
- type MessageSerializer
- type Metadata
- func (m Metadata) Get(key string) string
- func (m Metadata) GetInt(key string) int
- func (m Metadata) GetInt64(key string) int64
- func (m Metadata) GetTime(key string) time.Time
- func (m Metadata) GetUint64(key string) uint64
- func (m Metadata) Has(key string) bool
- func (m Metadata) Set(key, value string)
- func (m Metadata) SetInt(key string, value int)
- func (m Metadata) SetInt64(key string, value int64)
- func (m Metadata) SetTime(key string, value time.Time)
- func (m Metadata) ToHeaders() map[string][]string
- func (m Metadata) ToMap() map[string]string
- type PayloadSerializer
- type ProtoMessageSerializer
- type Registry
- type SchemaVersion
- type Serializer
Constants ¶
const ( ErrNilMessage = "message is nil" ErrEmptyData = "data is empty" )
const ( KeyMessageType = "Bacalhau-Type" KeyPayloadEncoding = "Bacalhau-PayloadEncoding" LegacyMessageType = "Type" LegacyEncoding = "PayloadEncoding" )
Metadata keys
const ( // JSONPayloadType is the identifier for JSON serialization/deserialization JSONPayloadType = "json" // DefaultPayloadEncoding is the default encoding used if none is specified DefaultPayloadEncoding = JSONPayloadType )
const ( ErrMsgTooShort = "too short" ErrMsgCRCFailed = "CRC check failed" )
Error message constants
const ( VersionSize = 1 CRCSize = 4 HeaderSize = VersionSize + CRCSize )
Variables ¶
This section is empty.
Functions ¶
func NewErrDeserializationFailed ¶
NewErrDeserializationFailed creates a new ErrDeserializationFailed error.
func NewErrSerializationFailed ¶
NewErrSerializationFailed creates a new ErrSerializationFailed error.
func NewErrUnexpectedPayloadType ¶
NewErrUnexpectedPayloadType creates a new ErrUnexpectedPayloadType error.
Types ¶
type EncodedMessage ¶
EncodedMessage represents a message after envelope handling but before payload deserialization. It contains metadata and a byte slice payload, which is the serialized form of the actual message content. EncodedMessage is the interface between MessageSerializer and PayloadSerializer.
type ErrAlreadyRegistered ¶ added in v1.6.0
type ErrAlreadyRegistered struct {
Type string
}
ErrAlreadyRegistered is returned when a type is already registered.
func NewErrAlreadyRegistered ¶ added in v1.6.0
func NewErrAlreadyRegistered(typeName string) *ErrAlreadyRegistered
NewErrAlreadyRegistered creates a new ErrAlreadyRegistered error.
func (ErrAlreadyRegistered) Error ¶ added in v1.6.0
func (e ErrAlreadyRegistered) Error() string
Error implements the error interface for ErrAlreadyRegistered.
type ErrBadMessage ¶
type ErrBadMessage struct {
Reason string
}
ErrBadMessage is returned when a message is malformed or invalid.
func NewErrBadMessage ¶
func NewErrBadMessage(reason string) *ErrBadMessage
NewErrBadMessage creates a new ErrBadMessage error.
func (*ErrBadMessage) Error ¶
func (e *ErrBadMessage) Error() string
Error implements the error interface for ErrBadMessage.
type ErrBadPayload ¶
type ErrBadPayload struct {
Reason string
}
ErrBadPayload is returned when a payload is malformed or invalid.
func NewErrBadPayload ¶
func NewErrBadPayload(reason string) *ErrBadPayload
NewErrBadPayload creates a new ErrBadPayload error.
func (*ErrBadPayload) Error ¶
func (e *ErrBadPayload) Error() string
Error implements the error interface for ErrBadPayload.
type ErrDeserializationFailed ¶
ErrDeserializationFailed is returned when payload deserialization fails.
func (*ErrDeserializationFailed) Error ¶
func (e *ErrDeserializationFailed) Error() string
Error implements the error interface for ErrDeserializationFailed.
func (*ErrDeserializationFailed) Unwrap ¶
func (e *ErrDeserializationFailed) Unwrap() error
Unwrap returns the underlying error for ErrDeserializationFailed.
type ErrSerializationFailed ¶
ErrSerializationFailed is returned when serialization fails.
func (*ErrSerializationFailed) Error ¶
func (e *ErrSerializationFailed) Error() string
Error implements the error interface for ErrSerializationFailed.
func (*ErrSerializationFailed) Unwrap ¶
func (e *ErrSerializationFailed) Unwrap() error
Unwrap returns the underlying error for ErrSerializationFailed.
type ErrUnexpectedPayloadType ¶
ErrUnexpectedPayloadType is returned when the payload type is unexpected.
func (*ErrUnexpectedPayloadType) Error ¶
func (e *ErrUnexpectedPayloadType) Error() string
Error implements the error interface for ErrUnexpectedPayloadType.
type ErrUnsupportedEncoding ¶
type ErrUnsupportedEncoding struct {
Encoding string
}
ErrUnsupportedEncoding is returned when an unsupported encoding is encountered.
func NewErrUnsupportedEncoding ¶
func NewErrUnsupportedEncoding(encoding string) *ErrUnsupportedEncoding
NewErrUnsupportedEncoding creates a new ErrUnsupportedEncoding error.
func (*ErrUnsupportedEncoding) Error ¶
func (e *ErrUnsupportedEncoding) Error() string
Error implements the error interface for ErrUnsupportedEncoding.
type ErrUnsupportedMessageType ¶
type ErrUnsupportedMessageType struct {
Type string
}
ErrUnsupportedMessageType is returned when an unsupported message type is encountered.
func NewErrUnsupportedMessageType ¶
func NewErrUnsupportedMessageType(messageType string) *ErrUnsupportedMessageType
NewErrUnsupportedMessageType creates a new ErrUnsupportedMessageType error.
func (*ErrUnsupportedMessageType) Error ¶
func (e *ErrUnsupportedMessageType) Error() string
Error implements the error interface for ErrUnsupportedMessageType.
type JSONMessageSerializer ¶
type JSONMessageSerializer struct{}
JSONMessageSerializer handles serialization and deserialization using JSON
func (*JSONMessageSerializer) Deserialize ¶
func (j *JSONMessageSerializer) Deserialize(data []byte) (*EncodedMessage, error)
Deserialize decodes a JSON message into a envelope.EncodedMessage
func (*JSONMessageSerializer) Serialize ¶
func (j *JSONMessageSerializer) Serialize(msg *EncodedMessage) ([]byte, error)
Serialize encodes a envelope.EncodedMessage into a JSON message
type JSONPayloadSerializer ¶
type JSONPayloadSerializer struct{}
JSONPayloadSerializer is a serializer/deserializer for JSON payloads
func (*JSONPayloadSerializer) Deserialize ¶
func (j *JSONPayloadSerializer) Deserialize(rMsg *EncodedMessage, payloadType reflect.Type) (*Message, error)
Deserialize deserializes a JSON payload
func (*JSONPayloadSerializer) Serialize ¶
func (j *JSONPayloadSerializer) Serialize(message *Message) (*EncodedMessage, error)
Serialize serializes a payload to JSON
type Message ¶
Message represents a fully deserialized message ready for processing by the application. It contains metadata and a deserialized payload of any type. Message is used by message handlers, filters, and subscribers.
func NewMessage ¶
NewMessage creates a new Message with the given payload
func (*Message) GetPayload ¶
GetPayload retrieves the Payload as type T, handling both value and pointer types
func (*Message) WithMetadata ¶
WithMetadata sets the metadata for the message
func (*Message) WithMetadataValue ¶
WithMetadataValue sets a key-value pair in the metadata
type MessageSerializer ¶
type MessageSerializer interface { Serialize(*EncodedMessage) ([]byte, error) Deserialize([]byte) (*EncodedMessage, error) }
MessageSerializer handles low-level message serialization. It converts EncodedMessages to and from raw bytes.
type Metadata ¶
Metadata contains metadata about the message
func NewMetadataFromMap ¶
NewMetadataFromMap creates a new shallow copy Metadata object from a map. Changes to the map will be reflected in the Metadata object, but more efficient than NewMetadataFromMapCopy
func NewMetadataFromMapCopy ¶
NewMetadataFromMapCopy creates a new deepcopy Metadata object from a map. Changes to the map will not be reflected in the Metadata object
func (Metadata) Get ¶
Get returns the value for a given key, or an empty string if the key doesn't exist
func (Metadata) GetInt ¶
GetInt gets the value as an int, returning 0 if the key doesn't exist or the value isn't a valid int
func (Metadata) GetInt64 ¶
GetInt64 gets the value as an int64, returning 0 if the key doesn't exist or the value isn't a valid int64
func (Metadata) GetTime ¶
GetTime gets the value as a time.Time, returning the zero time if the key doesn't exist or the value isn't a valid time
func (Metadata) GetUint64 ¶ added in v1.6.0
GetUint64 gets the value as a uint64, returning 0 if the key doesn't exist or the value isn't a valid uint64
type PayloadSerializer ¶
type PayloadSerializer interface { Serialize(message *Message) (*EncodedMessage, error) Deserialize(rawMessage *EncodedMessage, payloadType reflect.Type) (*Message, error) }
PayloadSerializer handles payload serialization within messages. It converts between typed Message and EncodedMessage formats.
type ProtoMessageSerializer ¶
type ProtoMessageSerializer struct{}
ProtoMessageSerializer handles serialization and deserialization using Protocol Buffers v1
func (*ProtoMessageSerializer) Deserialize ¶
func (p *ProtoMessageSerializer) Deserialize(data []byte) (*EncodedMessage, error)
Deserialize decodes a Protocol Buffers message into a envelope.EncodedMessage
func (*ProtoMessageSerializer) Serialize ¶
func (p *ProtoMessageSerializer) Serialize(msg *EncodedMessage) ([]byte, error)
Serialize encodes a envelope.EncodedMessage into a Protocol Buffers message
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry manages the serialization and deserialization of the Payload field in Message structs. It simplifies payload handling in your application by providing:
1. Type Registration: Allows registering custom payload types with unique names. 2. Serialization Management: Handles serialization and deserialization of payloads using different encoding methods. 3. Type Resolution: Provides a mechanism to resolve between type names and their corresponding Go types.
The Registry adds value to your application by:
Automatic Payload Handling: Users can set any registered Go struct as the Message.Payload without worrying about serialization. The manager handles this based on pre-configuration.
Type Safety: By registering payload types, the system ensures that only known, expected types are used as payloads, reducing runtime errors and improving system reliability.
Flexibility: Supports multiple serialization formats for payloads, allowing different message types to use the most appropriate format for their needs.
Centralized Payload Type Management: Provides a single point of configuration for all payload types used in the system, simplifying maintenance and reducing code duplication.
func NewRegistry ¶
func NewRegistry() *Registry
NewRegistry creates and initializes a new Registry It sets up the internal maps and registers the default JSON serializer
func (*Registry) Deserialize ¶
func (r *Registry) Deserialize(rawMessage *EncodedMessage) (*Message, error)
Deserialize deserializes a raw message using the specified deserializer It retrieves the correct deserializer, gets the payload type, and performs the deserialization Usage:
message, err := manager.Deserialize(rawMessage)
func (*Registry) Register ¶
Register adds a new payload type to the manager It registers both the name-to-type and type-to-name mappings Usage:
manager.Register("MyCustomType", MyCustomType{})
func (*Registry) Serialize ¶
func (r *Registry) Serialize(message *Message) (*EncodedMessage, error)
Serialize serializes a message using the specified serializer It handles default encoding, retrieves the correct serializer, and performs the serialization Usage:
rawMessage, err := manager.Serialize(message)
type SchemaVersion ¶
type SchemaVersion byte
SchemaVersion represents the version of the serialization schema
const ( SchemaVersionJSONV1 SchemaVersion = 1 SchemaVersionProtobufV1 SchemaVersion = 2 DefaultSchemaVersion = SchemaVersionJSONV1 )
Version and size constants
func (SchemaVersion) String ¶
func (v SchemaVersion) String() string
String returns a string representation of the schema version
type Serializer ¶
type Serializer struct {
// contains filtered or unexported fields
}
Serializer handles the serialization and deserialization of messages with version information and CRC checks. It wraps the actual message serialization with additional metadata for versioning and integrity checking.
Envelope Structure: +----------------+----------------+--------------------+ | Version (1 byte)| CRC (4 bytes) | Serialized envelope.Message | +----------------+----------------+--------------------+
- Version: Indicates the schema version used for serialization (1 byte) - CRC: A 32-bit CRC checksum of the serialized message (4 bytes) - Serialized envelope.Message: The actual message content, serialized by a version-specific serializer
The Serializer adds a version byte and a CRC checksum to each serialized message, allowing for future extensibility, backward compatibility, and data integrity verification.
func NewSerializer ¶
func NewSerializer() *Serializer
NewSerializer creates a new Serializer with default serializers
func (*Serializer) Deserialize ¶
func (v *Serializer) Deserialize(data []byte) (*EncodedMessage, error)
Deserialize decodes a byte slice into a envelope.EncodedMessage. It verifies the schema version and CRC checksum before using the appropriate deserializer to decode the message.
func (*Serializer) Serialize ¶
func (v *Serializer) Serialize(msg *EncodedMessage) ([]byte, error)
Serialize encodes a envelope.EncodedMessage into a byte slice, adding version information and a CRC checksum. It uses the serializer corresponding to the current serializationVersion.
func (*Serializer) WithSerializationVersion ¶
func (v *Serializer) WithSerializationVersion(version SchemaVersion) *Serializer
WithSerializationVersion sets the schema version used for serialization. This version will be used for all subsequent Serialize calls. It does not affect the deserialization of messages.