Documentation ¶
Overview ¶
Package frame provides the ability to encode and decode to and from Pulsar's custom binary protocol. The protocol is a light wrapper around protobuf messages.
Index ¶
- Constants
- type AsyncResp
- type CmdSender
- type Dispatcher
- func (f *Dispatcher) NotifyGlobal(frame Frame) error
- func (f *Dispatcher) NotifyProdSeqIDs(producerID, sequenceID uint64, frame Frame) error
- func (f *Dispatcher) NotifyReqID(requestID uint64, frame Frame) error
- func (f *Dispatcher) RegisterGlobal() (Response <-chan Frame, cancel func(), err error)
- func (f *Dispatcher) RegisterProdSeqIDs(producerID, sequenceID uint64) (Response <-chan Frame, cancel func(), err error)
- func (f *Dispatcher) RegisterReqID(requestID uint64) (Response <-chan Frame, cancel func(), err error)
- type Frame
- type MockSender
- type ProdSeqKey
Constants ¶
const MaxFrameSize = 5 * 1024 * 1024 // 5mb
MaxFrameSize is defined by the Pulsar spec with a single sentence: "The maximum allowable size of a single frame is 5 MB."
https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Framing-5l6bym
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AsyncResp ¶
type AsyncResp struct { Resp chan<- Frame Done <-chan struct{} }
AsyncResp manages the state between a request and Response. Requestors wait on the `Resp` channel for the corResponding Response frame to their request. If they are no longer interested in the Response (timeout), then the `done` channel is closed, signaling to the Response side that the Response is not expected/needed.
type CmdSender ¶
type CmdSender interface { SendSimpleCmd(cmd api.BaseCommand) error SendPayloadCmd(cmd api.BaseCommand, metadata api.MessageMetadata, payload []byte) error Closed() <-chan struct{} // closed unblocks when the connection has been closed }
CmdSender is an interface that is capable of sending commands to Pulsar. It allows abstraction of a core.
type Dispatcher ¶
type Dispatcher struct { // Connected and Pong Responses have no requestID, // therefore a single channel is used as their // Respective FrameDispatcher. If the channel is // nil, there's no outstanding request. GlobalMu sync.Mutex // protects following Global *AsyncResp // All Responses that are correlated by their // requestID ReqIDMu sync.Mutex // protects following ReqIDs map[uint64]AsyncResp // All Responses that are correlated by their // (producerID, sequenceID) tuple ProdSeqIDsMu sync.Mutex // protects following ProdSeqIDs map[ProdSeqKey]AsyncResp }
Dispatcher is Responsible for handling the request/Response state of outstanding requests. It allows for users of this type to present a synchronous interface to an asynchronous process.
func NewFrameDispatcher ¶
func NewFrameDispatcher() *Dispatcher
NewFrameDispatcher returns an instantiated FrameDispatcher.
func (*Dispatcher) NotifyGlobal ¶
func (f *Dispatcher) NotifyGlobal(frame Frame) error
NotifyGlobal should be called with Response frames that have no identifying id (Pong, Connected).
func (*Dispatcher) NotifyProdSeqIDs ¶
func (f *Dispatcher) NotifyProdSeqIDs(producerID, sequenceID uint64, frame Frame) error
NotifyProdSeqIDs should be called with Response frames that have (producerID, sequenceID) id tuples to correlate them to their requests.
func (*Dispatcher) NotifyReqID ¶
func (f *Dispatcher) NotifyReqID(requestID uint64, frame Frame) error
NotifyReqID should be called with Response frames that have a requestID to correlate them to their requests.
func (*Dispatcher) RegisterGlobal ¶
func (f *Dispatcher) RegisterGlobal() (Response <-chan Frame, cancel func(), err error)
RegisterGlobal is used to wait for Responses that have no identifying id (Pong, Connected Responses). Only one outstanding global request is allowed at a time. Callers should always call cancel, specifically when they're not interested in the Response.
func (*Dispatcher) RegisterProdSeqIDs ¶
func (f *Dispatcher) RegisterProdSeqIDs(producerID, sequenceID uint64) (Response <-chan Frame, cancel func(), err error)
RegisterProdSeqID is used to wait for Responses that have (producerID, sequenceID) id tuples to correlate them to their request. Callers should always call cancel, specifically when they're not interested in the Response. It is an error to have multiple outstanding requests with the same id tuple.
func (*Dispatcher) RegisterReqID ¶
func (f *Dispatcher) RegisterReqID(requestID uint64) (Response <-chan Frame, cancel func(), err error)
RegisterReqID is used to wait for Responses that have a requestID id to correlate them to their request. Callers should always call cancel, specifically when they're not interested in the Response. It is an error to have multiple outstanding requests with the id.
type Frame ¶
type Frame struct { // BaseCmd is a required field BaseCmd *api.BaseCommand // The following fields are optional. // If present, the frame is a "Payload" // command, as opposed to a "Simple" command // if there's only the BaseCmd. Metadata *api.MessageMetadata Payload []byte }
Frame represents a pulsar message frame. It can be used to encode and decode messages to and from the Pulsar binary wire format.
The binary protocol is outlined here: https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/ But the Java source should be considered the canonical format.
All sizes are passed as 4-byte unsigned big endian integers.
"Simple" command frame format:
+------------------------------------------------------------------------+ | totalSize (uint32) | commandSize (uint32) | message (protobuf encoded) | | 4 bytes | 4 bytes | var length | |====================|======================|============================| | size of everything | size of the message | | | following these 4 | | | | bytes | | | +------------------------------------------------------------------------+
"Payload" command frame format (It has the same 3 fields as a "simple" command, plus the following):
+-------------------------------------------------------------------------------------------------------------------------------------------------+ | "Simple" fields | magicNumber (0x0e01) | checksum (CRC32-C) | metadataSize (uint32) | metadata (protobuf encoded) | payload (bytes) | | var length | 2 bytes | 4 bytes | 4 bytes | var length | totalSize - (SUM others) | |=================|======================|====================|=======================|=============================|=============================| | | OPTIONAL If present, | OPTIONAL Checksum | size of the metadata | | Any sequence of bytes, | | | indicates following | of the following | | | possibly compressed and | | | 4 bytes are checksum | bytes | | | or encrypted (see metadata) | +-------------------------------------------------------------------------------------------------------------------------------------------------+
func (*Frame) Decode ¶
Decode the pulsar binary protocol from r into the receiver frame. Returns any errors encountered.
type MockSender ¶
MockSender implements the sender interface
func (*MockSender) Closed ¶
func (m *MockSender) Closed() <-chan struct{}
func (*MockSender) GetFrames ¶
func (m *MockSender) GetFrames() []Frame
func (*MockSender) SendPayloadCmd ¶
func (m *MockSender) SendPayloadCmd(cmd api.BaseCommand, metadata api.MessageMetadata, payload []byte) error
func (*MockSender) SendSimpleCmd ¶
func (m *MockSender) SendSimpleCmd(cmd api.BaseCommand) error
type ProdSeqKey ¶
prodSeqKey is a composite lookup key for the dispatchers that use producerID and sequenceID to correlate Responses, which are the SendReceipt and SendError Responses.