Documentation ¶
Overview ¶
Package binding defines interfaces for protocol bindings.
NOTE: Most applications that emit or consume events should use the ../client package, which provides a simpler API to the underlying binding.
The interfaces in this package provide extra encoding and protocol information to allow efficient forwarding and end-to-end reliable delivery between a Receiver and a Sender belonging to different bindings. This is useful for intermediary applications that route or forward events, but not necessary for most "endpoint" applications that emit or consume events.
Protocol Bindings ¶
A protocol binding usually implements a Message, a Sender and Receiver, a StructuredWriter and a BinaryWriter (depending on the supported encodings of the protocol) and an Write[ProtocolMessage] method.
Read and write events ¶
The core of this package is the binding.Message interface. Through binding.MessageReader It defines how to read a protocol specific message for an encoded event in structured mode or binary mode. The entity who receives a protocol specific data structure representing a message (e.g. an HttpRequest) encapsulates it in a binding.Message implementation using a NewMessage method (e.g. http.NewMessage). Then the entity that wants to send the binding.Message back on the wire, translates it back to the protocol specific data structure (e.g. a Kafka ConsumerMessage), using the writers BinaryWriter and StructuredWriter specific to that protocol. Binding implementations exposes their writers through a specific Write[ProtocolMessage] function (e.g. kafka.EncodeProducerMessage), in order to simplify the encoding process.
The encoding process can be customized in order to mutate the final result with binding.TransformerFactory. A bunch of these are provided directly by the binding/transformer module.
Usually binding.Message implementations can be encoded only one time, because the encoding process drain the message itself. In order to consume a message several times, the binding/buffering package provides several APIs to buffer the Message.
A message can be converted to an event.Event using binding.ToEvent() method. An event.Event can be used as Message casting it to binding.EventMessage.
In order to simplify the encoding process for each protocol, this package provide several utility methods like binding.Write and binding.DirectWrite. The binding.Write method tries to preserve the structured/binary encoding, in order to be as much efficient as possible.
Messages can be eventually wrapped to change their behaviours and binding their lifecycle, like the binding.FinishMessage. Every Message wrapper implements the MessageWrapper interface
Sender and Receiver ¶
A Receiver receives protocol specific messages and wraps them to into binding.Message implementations.
A Sender converts arbitrary Message implementations to a protocol-specific form using the protocol specific Write method and sends them.
Message and ExactlyOnceMessage provide methods to allow acknowledgments to propagate when a reliable messages is forwarded from a Receiver to a Sender. QoS 0 (unreliable), 1 (at-least-once) and 2 (exactly-once) are supported.
Transport ¶
A binding implementation providing Sender and Receiver implementations can be used as a Transport through the BindingTransport adapter.
Index ¶
- Variables
- func GetOrDefaultFromCtx(ctx context.Context, key interface{}, def interface{}) interface{}
- func ToEvent(ctx context.Context, message MessageReader, transformers ...Transformer) (*event.Event, error)
- func ToEvents(ctx context.Context, message MessageReader, body io.Reader) ([]event.Event, error)
- func UseFormatForEvent(ctx context.Context, f format.Format) context.Context
- func WithForceBinary(ctx context.Context) context.Context
- func WithForceStructured(ctx context.Context) context.Context
- func WithPreferredEventEncoding(ctx context.Context, enc Encoding) context.Context
- func WithSkipDirectBinaryEncoding(ctx context.Context, skip bool) context.Context
- func WithSkipDirectStructuredEncoding(ctx context.Context, skip bool) context.Context
- type BinaryWriter
- type Encoding
- type EventMessage
- func (*EventMessage) Finish(error) error
- func (m *EventMessage) GetAttribute(k spec.Kind) (spec.Attribute, interface{})
- func (m *EventMessage) GetExtension(name string) interface{}
- func (m *EventMessage) ReadBinary(ctx context.Context, b BinaryWriter) (err error)
- func (m *EventMessage) ReadEncoding() Encoding
- func (m *EventMessage) ReadStructured(ctx context.Context, builder StructuredWriter) error
- type ExactlyOnceMessage
- type Message
- type MessageContext
- type MessageMetadataReader
- type MessageMetadataWriter
- type MessageReader
- type MessageWrapper
- type StructuredWriter
- type Transformer
- type TransformerFunc
- type Transformers
Constants ¶
This section is empty.
Variables ¶
var ErrCannotConvertToEvent = errors.New("cannot convert message to event")
ErrCannotConvertToEvent is a generic error when a conversion of a Message to an Event fails
var ErrCannotConvertToEvents = errors.New("cannot convert message to batched events")
ErrCannotConvertToEvents is a generic error when a conversion of a Message to a Batched Event fails
var ErrNotBinary = errors.New("message is not in binary mode")
ErrNotBinary returned by Message.Binary for non-binary messages.
var ErrNotStructured = errors.New("message is not in structured mode")
ErrNotStructured returned by Message.Structured for non-structured messages.
var ErrUnknownEncoding = errors.New("unknown Message encoding")
ErrUnknownEncoding specifies that the Message is not an event or it is encoded with an unknown encoding
Functions ¶
func GetOrDefaultFromCtx ¶
GetOrDefaultFromCtx gets a configuration value from the provided context
func ToEvent ¶
func ToEvent(ctx context.Context, message MessageReader, transformers ...Transformer) (*event.Event, error)
ToEvent translates a Message with a valid Structured or Binary representation to an Event. This function returns the Event generated from the Message and the original encoding of the message or an error that points the conversion error. transformers can be nil and this function guarantees that they are invoked only once during the encoding process.
func ToEvents ¶ added in v2.14.0
ToEvents translates a Batch Message and corresponding Reader data to a slice of Events. This function returns the Events generated from the body data, or an error that points to the conversion issue.
func UseFormatForEvent ¶
UseFormatForEvent configures which format to use when marshalling the event to structured mode
func WithForceBinary ¶
WithForceBinary forces binary encoding during the encoding process
func WithForceStructured ¶
WithForceStructured forces structured encoding during the encoding process
func WithPreferredEventEncoding ¶
WithPreferredEventEncoding defines the preferred encoding from event to message during the encoding process
func WithSkipDirectBinaryEncoding ¶
WithSkipDirectBinaryEncoding skips direct binary to binary encoding during the encoding process
Types ¶
type BinaryWriter ¶
type BinaryWriter interface { MessageMetadataWriter // Method invoked at the beginning of the visit. Useful to perform initial memory allocations Start(ctx context.Context) error // SetData receives an io.Reader for the data attribute. // io.Reader is not invoked when the data attribute is empty SetData(data io.Reader) error // End method is invoked only after the whole encoding process ends successfully. // If it fails, it's never invoked. It can be used to finalize the message. End(ctx context.Context) error }
BinaryWriter is used to visit a binary Message and generate a new representation.
Protocols that supports binary encoding should implement this interface to implement direct binary to binary encoding and event to binary encoding.
Start() and End() methods must be invoked by the caller of Message.ReadBinary() every time the BinaryWriter implementation is used to visit a Message.
type Encoding ¶
type Encoding int
Encoding enum specifies the type of encodings supported by binding interfaces
const ( // Binary encoding as specified in https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#message EncodingBinary Encoding = iota // Structured encoding as specified in https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#message EncodingStructured // Message is an instance of EventMessage or it contains EventMessage nested (through MessageWrapper) EncodingEvent // When the encoding is unknown (which means that the message is a non-event) EncodingUnknown // EncodingBatch is an instance of JSON Batched Events EncodingBatch )
func DirectWrite ¶
func DirectWrite( ctx context.Context, message MessageReader, structuredWriter StructuredWriter, binaryWriter BinaryWriter, transformers ...Transformer, ) (Encoding, error)
DirectWrite invokes the encoders. structuredWriter and binaryWriter could be nil if the protocol doesn't support it. transformers can be nil and this function guarantees that they are invoked only once during the encoding process. This function MUST be invoked only if message.ReadEncoding() == EncodingBinary or message.ReadEncoding() == EncodingStructured
Returns: * EncodingStructured, nil if message is correctly encoded in structured encoding * EncodingBinary, nil if message is correctly encoded in binary encoding * EncodingStructured, err if message was structured but error happened during the encoding * EncodingBinary, err if message was binary but error happened during the encoding * EncodingUnknown, ErrUnknownEncoding if message is not a structured or a binary Message
func Write ¶
func Write( ctx context.Context, message MessageReader, structuredWriter StructuredWriter, binaryWriter BinaryWriter, transformers ...Transformer, ) (Encoding, error)
Write executes the full algorithm to encode a Message using transformers: 1. It first tries direct encoding using DirectWrite 2. If no direct encoding is possible, it uses ToEvent to generate an Event representation 3. From the Event, the message is encoded back to the provided structured or binary encoders You can tweak the encoding process using the context decorators WithForceStructured, WithForceStructured, etc. transformers can be nil and this function guarantees that they are invoked only once during the encoding process. Returns: * EncodingStructured, nil if message is correctly encoded in structured encoding * EncodingBinary, nil if message is correctly encoded in binary encoding * EncodingUnknown, ErrUnknownEncoding if message.ReadEncoding() == EncodingUnknown * _, err if error happened during the encoding
type EventMessage ¶
EventMessage type-converts a event.Event object to implement Message. This allows local event.Event objects to be sent directly via Sender.Send()
s.Send(ctx, binding.EventMessage(e))
When an event is wrapped into a EventMessage, the original event could be potentially mutated. If you need to use the Event again, after wrapping it into an Event message, you should copy it before
func (*EventMessage) Finish ¶
func (*EventMessage) Finish(error) error
func (*EventMessage) GetAttribute ¶
func (m *EventMessage) GetAttribute(k spec.Kind) (spec.Attribute, interface{})
func (*EventMessage) GetExtension ¶
func (m *EventMessage) GetExtension(name string) interface{}
func (*EventMessage) ReadBinary ¶
func (m *EventMessage) ReadBinary(ctx context.Context, b BinaryWriter) (err error)
func (*EventMessage) ReadEncoding ¶
func (m *EventMessage) ReadEncoding() Encoding
func (*EventMessage) ReadStructured ¶
func (m *EventMessage) ReadStructured(ctx context.Context, builder StructuredWriter) error
type ExactlyOnceMessage ¶
type ExactlyOnceMessage interface { Message // Received is called by a forwarding QoS2 Sender when it gets // acknowledgment of receipt (e.g. AMQP 'accept' or MQTT PUBREC) // // The receiver must call settle(nil) when it get's the ack-of-ack // (e.g. AMQP 'settle' or MQTT PUBCOMP) or settle(err) if the // transfer fails. // // Finally the Sender calls Finish() to indicate the message can be // discarded. // // If sending fails, or if the sender does not support QoS 2, then // Finish() may be called without any call to Received() Received(settle func(error)) }
ExactlyOnceMessage is implemented by received Messages that support QoS 2. Only transports that support QoS 2 need to implement or use this interface.
type Message ¶
type Message interface { MessageReader // Finish *must* be called when message from a Receiver can be forgotten by // the receiver. A QoS 1 sender should not call Finish() until it gets an acknowledgment of // receipt on the underlying transport. For QoS 2 see ExactlyOnceMessage. // // Note that, depending on the Message implementation, forgetting to Finish the message // could produce memory/resources leaks! // // Passing a non-nil err indicates sending or processing failed. // A non-nil return indicates that the message was not accepted // by the receivers peer. Finish(error) error }
Message is the interface to a binding-specific message containing an event.
Reliable Delivery ¶
There are 3 reliable qualities of service for messages:
0/at-most-once/unreliable: messages can be dropped silently.
1/at-least-once: messages are not dropped without signaling an error to the sender, but they may be duplicated in the event of a re-send.
2/exactly-once: messages are never dropped (without error) or duplicated, as long as both sending and receiving ends maintain some binding-specific delivery state. Whether this is persisted depends on the configuration of the binding implementations.
The Message interface supports QoS 0 and 1, the ExactlyOnceMessage interface supports QoS 2
Message includes the MessageReader interface to read messages. Every binding.Message implementation *must* specify if the message can be accessed one or more times.
When a Message can be forgotten by the entity who produced the message, Message.Finish() *must* be invoked.
func UnwrapMessage ¶
func WithFinish ¶
WithFinish returns a wrapper for m that calls finish() and m.Finish() in its Finish(). Allows code to be notified when a message is Finished.
type MessageContext ¶ added in v2.4.0
type MessageContext interface { // Get the context associated with this message Context() context.Context }
MessageContext interface exposes the internal context that a message might contain Only some Message implementations implement this interface.
type MessageMetadataReader ¶
type MessageMetadataReader interface { // GetAttribute returns: // // * attribute, value: if the message contains an attribute of that attribute kind // * attribute, nil: if the message spec version supports the attribute kind, but doesn't have any value // * nil, nil: if the message spec version doesn't support the attribute kind GetAttribute(attributeKind spec.Kind) (spec.Attribute, interface{}) // GetExtension returns the value of that extension, if any. GetExtension(name string) interface{} }
MessageMetadataReader defines how to read metadata from a binary/event message
If a message implementing MessageReader is encoded as binary (MessageReader.ReadEncoding() == EncodingBinary) or it's an EventMessage, then it's safe to assume that it also implements this interface
type MessageMetadataWriter ¶
type MessageMetadataWriter interface { // Set a standard attribute. // // The value can either be the correct golang type for the attribute, or a canonical // string encoding, or nil. If value is nil, then the attribute should be deleted. // See package types to perform the needed conversions. SetAttribute(attribute spec.Attribute, value interface{}) error // Set an extension attribute. // // The value can either be the correct golang type for the attribute, or a canonical // string encoding, or nil. If value is nil, then the extension should be deleted. // See package types to perform the needed conversions. SetExtension(name string, value interface{}) error }
MessageMetadataWriter is used to set metadata when a binary Message is visited.
type MessageReader ¶
type MessageReader interface { // Return the type of the message Encoding. // The encoding should be preferably computed when the message is constructed. ReadEncoding() Encoding // ReadStructured transfers a structured-mode event to a StructuredWriter. // It must return ErrNotStructured if message is not in structured mode. // // Returns a different err if something wrong happened while trying to read the structured event. // In this case, the caller must Finish the message with appropriate error. // // This allows Senders to avoid re-encoding messages that are // already in suitable structured form. ReadStructured(context.Context, StructuredWriter) error // ReadBinary transfers a binary-mode event to an BinaryWriter. // It must return ErrNotBinary if message is not in binary mode. // // The implementation of ReadBinary must not control the lifecycle with BinaryWriter.Start() and BinaryWriter.End(), // because the caller must control the lifecycle. // // Returns a different err if something wrong happened while trying to read the binary event // In this case, the caller must Finish the message with appropriate error // // This allows Senders to avoid re-encoding messages that are // already in suitable binary form. ReadBinary(context.Context, BinaryWriter) error }
MessageReader defines the read-related portion of the Message interface.
The ReadStructured and ReadBinary methods allows to perform an optimized encoding of a Message to a specific data structure.
If MessageReader.ReadEncoding() can be equal to EncodingBinary, then the implementation of MessageReader MUST also implement MessageMetadataReader.
A Sender should try each method of interest and fall back to binding.ToEvent() if none are supported. An out of the box algorithm is provided for writing a message: binding.Write().
type MessageWrapper ¶
type MessageWrapper interface { Message MessageMetadataReader // Method to get the wrapped message GetWrappedMessage() Message }
MessageWrapper interface is used to walk through a decorated Message and unwrap it.
type StructuredWriter ¶
type StructuredWriter interface { // Event receives an io.Reader for the whole event. SetStructuredEvent(ctx context.Context, format format.Format, event io.Reader) error }
StructuredWriter is used to visit a structured Message and generate a new representation.
Protocols that supports structured encoding should implement this interface to implement direct structured to structured encoding and event to structured encoding.
type Transformer ¶
type Transformer interface {
Transform(MessageMetadataReader, MessageMetadataWriter) error
}
Transformer is an interface that implements a transformation process while transferring the event from the Message implementation to the provided encoder
When a write function (binding.Write, binding.ToEvent, buffering.CopyMessage, etc.) takes Transformer(s) as parameter, it eventually converts the message to a form which correctly implements MessageMetadataReader, in order to guarantee that transformation is applied
type TransformerFunc ¶
type TransformerFunc func(MessageMetadataReader, MessageMetadataWriter) error
TransformerFunc is a type alias to implement a Transformer through a function pointer
func (TransformerFunc) Transform ¶
func (t TransformerFunc) Transform(r MessageMetadataReader, w MessageMetadataWriter) error
type Transformers ¶
type Transformers []Transformer
Transformers is a utility alias to run several Transformer
func (Transformers) Transform ¶
func (t Transformers) Transform(r MessageMetadataReader, w MessageMetadataWriter) error
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package buffering provides APIs for buffered messages.
|
Package buffering provides APIs for buffered messages. |
Package format formats structured events.
|
Package format formats structured events. |
Package spec provides spec-version metadata.
|
Package spec provides spec-version metadata. |
Package test provides utilities to test binding implementations and transformers.
|
Package test provides utilities to test binding implementations and transformers. |
Package transformer provides methods for creating event message transformers.
|
Package transformer provides methods for creating event message transformers. |