Documentation ¶
Index ¶
- Constants
- Variables
- type Broker
- type BrokerConsumer
- func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec)
- func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, error)
- func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, error)
- func (consumer *BrokerConsumer) GetOffset() uint64
- func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, error)
- type BrokerPublisher
- type GzipPayloadCodec
- type Message
- func Decode(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, []Message)
- func DecodeWithDefaultCodecs(packet []byte) (uint32, []Message)
- func NewCompressedMessage(payload []byte) *Message
- func NewCompressedMessages(messages ...*Message) *Message
- func NewMessage(payload []byte) *Message
- func NewMessageWithCodec(payload []byte, codec PayloadCodec) *Message
- type MessageHandlerFunc
- type NoCompressionPayloadCodec
- type PayloadCodec
- type RequestType
- type Timing
Constants ¶
const ( // Compression Support uses '1' - https://cwiki.apache.org/confluence/display/KAFKA/Compression MAGIC_DEFAULT = 1 // magic + compression + chksum NO_LEN_HEADER_SIZE = 1 + 1 + 4 )
const ( NO_COMPRESSION_ID = 0 GZIP_COMPRESSION_ID = 1 )
const ( REQUEST_PRODUCE RequestType = 0 REQUEST_FETCH = 1 REQUEST_MULTIFETCH = 2 REQUEST_MULTIPRODUCE = 3 REQUEST_OFFSETS = 4 )
Request Types
const (
NETWORK = "tcp"
)
Variables ¶
var DefaultCodecs = []PayloadCodec{ new(NoCompressionPayloadCodec), new(GzipPayloadCodec), }
var DefaultCodecsMap = codecsMap(DefaultCodecs)
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
func (*Broker) EncodeConsumeRequest ¶
<Request Header><OFFSET: uint64><MAX SIZE: uint32>
func (*Broker) EncodeOffsetRequest ¶
<Request Header><TIME: uint64><MAX NUMBER of OFFSETS: uint32>
func (*Broker) EncodePublishRequest ¶
<Request Header><MESSAGE SET SIZE: uint32><MESSAGE SETS>
func (*Broker) EncodeRequestHeader ¶
func (b *Broker) EncodeRequestHeader(requestType RequestType) *bytes.Buffer
Request Header: <REQUEST_SIZE: uint32><REQUEST_TYPE: uint16><TOPIC SIZE: uint16><TOPIC: bytes><PARTITION: uint32>
type BrokerConsumer ¶
type BrokerConsumer struct {
// contains filtered or unexported fields
}
func NewBrokerConsumer ¶
func NewBrokerConsumer(hostname string, topic string, partition int, offset uint64, maxSize uint32) *BrokerConsumer
Create a new broker consumer hostname - host and optionally port, delimited by ':' topic to consume partition to consume from offset to start consuming from maxSize (in bytes) of the message to consume (this should be at least as big as the biggest message to be published)
func NewBrokerOffsetConsumer ¶
func NewBrokerOffsetConsumer(hostname string, topic string, partition int) *BrokerConsumer
Simplified consumer that defaults the offset and maxSize to 0. hostname - host and optionally port, delimited by ':' topic to consume partition to consume from
func (*BrokerConsumer) AddCodecs ¶
func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec)
Add Custom Payload Codecs for Consumer Decoding payloadCodecs - an array of PayloadCodec implementations
func (*BrokerConsumer) Consume ¶
func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, error)
func (*BrokerConsumer) ConsumeOnChannel ¶
func (*BrokerConsumer) GetOffset ¶
func (consumer *BrokerConsumer) GetOffset() uint64
Get the current offset for a broker.
func (*BrokerConsumer) GetOffsets ¶
func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, error)
Get a list of valid offsets (up to maxNumOffsets) before the given time, where time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available) The result is a list of offsets, in descending order.
type BrokerPublisher ¶
type BrokerPublisher struct {
// contains filtered or unexported fields
}
func NewBrokerPublisher ¶
func NewBrokerPublisher(hostname string, topic string, partition int) *BrokerPublisher
func (*BrokerPublisher) BatchPublish ¶
func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, error)
type GzipPayloadCodec ¶
type GzipPayloadCodec struct { }
func (*GzipPayloadCodec) Decode ¶
func (codec *GzipPayloadCodec) Decode(data []byte) []byte
func (*GzipPayloadCodec) Encode ¶
func (codec *GzipPayloadCodec) Encode(data []byte) []byte
func (*GzipPayloadCodec) Id ¶
func (codec *GzipPayloadCodec) Id() byte
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
func Decode ¶
func Decode(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, []Message)
func DecodeWithDefaultCodecs ¶
func NewCompressedMessage ¶
Create a Message using the default compression method (gzip)
func NewCompressedMessages ¶
func NewMessage ¶
Default is is create a message with no compression
func NewMessageWithCodec ¶
func NewMessageWithCodec(payload []byte, codec PayloadCodec) *Message
func (*Message) Encode ¶
MESSAGE SET: <MESSAGE LENGTH: uint32><MAGIC: 1 byte><COMPRESSION: 1 byte><CHECKSUM: uint32><MESSAGE PAYLOAD: bytes>
func (*Message) PayloadString ¶
type MessageHandlerFunc ¶
type MessageHandlerFunc func(msg *Message)
type NoCompressionPayloadCodec ¶
type NoCompressionPayloadCodec struct { }
func (*NoCompressionPayloadCodec) Decode ¶
func (codec *NoCompressionPayloadCodec) Decode(data []byte) []byte
func (*NoCompressionPayloadCodec) Encode ¶
func (codec *NoCompressionPayloadCodec) Encode(data []byte) []byte
func (*NoCompressionPayloadCodec) Id ¶
func (codec *NoCompressionPayloadCodec) Id() byte
type PayloadCodec ¶
type RequestType ¶
type RequestType uint16