Documentation ¶
Index ¶
- Constants
- Variables
- func WriteHeader(w *bytes.Buffer, header *BinaryHeader) error
- func WriteString(w *bytes.Buffer, value string) error
- type BinaryHeader
- type BinaryRequest
- type BinaryResponse
- type ErrorCode
- type ErrorResponse
- type Flags
- type OpCode
- type ProduceRequest
- type ProduceRequestPart
- type StreamId
Constants ¶
View Source
const ResponseBodyMaxLength = 512
The only responses with body are errors, leave 511 for the error message
Variables ¶
View Source
var Endianness = binary.BigEndian
View Source
var HeaderSize = binarySize(BinaryHeader{})
Functions ¶
func WriteHeader ¶
func WriteHeader(w *bytes.Buffer, header *BinaryHeader) error
Types ¶
type BinaryHeader ¶
type BinaryHeader struct { Version uint8 Flags Flags StreamId StreamId Op OpCode BodyLength uint32 Crc uint32 }
Header for producer messages. Order of fields defines the serialization format.
type BinaryRequest ¶
type BinaryRequest interface { Marshal(w *bytes.Buffer, header *BinaryHeader) error ResponseChannels() []chan<- BinaryResponse StreamId() StreamId }
func NewProduceRequest ¶
func NewProduceRequest(streamId StreamId, parts []*ProduceRequestPart) BinaryRequest
type BinaryResponse ¶
type BinaryResponse interface {
Op() OpCode
}
func NewClientErrorResponse ¶
func NewClientErrorResponse(message string) BinaryResponse
func NewEmptyResponse ¶
func NewEmptyResponse(op OpCode) BinaryResponse
Represents a response without body
type ErrorResponse ¶
func (*ErrorResponse) Op ¶
func (r *ErrorResponse) Op() OpCode
func (*ErrorResponse) ToError ¶
func (r *ErrorResponse) ToError() error
type Flags ¶
type Flags uint8
const (
WithTimestamp Flags = 0b00000001
)
Flags. Use fixed numbers (not iota) to make it harder to break the protocol by moving stuff around.
type ProduceRequest ¶
type ProduceRequest struct {
// contains filtered or unexported fields
}
func (*ProduceRequest) Marshal ¶
func (r *ProduceRequest) Marshal(w *bytes.Buffer, header *BinaryHeader) error
func (*ProduceRequest) ResponseChannels ¶
func (r *ProduceRequest) ResponseChannels() []chan<- BinaryResponse
func (*ProduceRequest) StreamId ¶
func (r *ProduceRequest) StreamId() StreamId
type ProduceRequestPart ¶
type ProduceRequestPart struct { Topic string Message FixedLengthReader PartitionKey string Response chan BinaryResponse }
Represents a part of a potential produce request
func NewProduceRequestPart ¶
func NewProduceRequestPart( topic string, message FixedLengthReader, partitionKey string, ) *ProduceRequestPart
Click to show internal directories.
Click to hide internal directories.