Documentation ¶
Index ¶
- Constants
- Variables
- type Broker
- type BrokerConsumer
- func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec)
- func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc, stop <-chan struct{}) (int, error)
- func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan struct{}) (int, error)
- func (consumer *BrokerConsumer) GetOffset() uint64
- func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, error)
- type BrokerPublisher
- type GzipPayloadCodec
- type Message
- func Decode(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, []Message, error)
- func DecodeWithDefaultCodecs(packet []byte) (uint32, []Message, error)
- func NewCompressedMessage(payload []byte) *Message
- func NewCompressedMessages(messages ...*Message) *Message
- func NewMessage(payload []byte) *Message
- func NewMessageWithCodec(payload []byte, codec PayloadCodec) *Message
- type MessageHandlerFunc
- type NoCompressionPayloadCodec
- type PayloadCodec
- type RequestType
- type SnappyPayloadCodec
- type Timing
Constants ¶
const ( // MAGIC_DEFAULT is the default value for the Kafka wire format. // Compression Support uses '1' - https://cwiki.apache.org/confluence/display/KAFKA/Compression MAGIC_DEFAULT = 1 // NO_LEN_HEADER_SIZE is the length of the header after the 4 bytes representing the length // magic + compression + chksum NO_LEN_HEADER_SIZE = 1 + 1 + 4 )
const ( NO_COMPRESSION_ID = 0 GZIP_COMPRESSION_ID = 1 SNAPPY_COMPRESSION_ID = 2 )
compression flags
const ( REQUEST_PRODUCE RequestType = 0 REQUEST_FETCH = 1 REQUEST_MULTIFETCH = 2 REQUEST_MULTIPRODUCE = 3 REQUEST_OFFSETS = 4 OFFSET_LATEST int64 = -1 OFFSET_EARLIEST int64 = -2 )
Request Types
const (
// NETWORK constant for supported network protocol
NETWORK = "tcp"
)
Variables ¶
var ( ErrMalformedPacket = fmt.Errorf("kafka message malformed, expecting at least 4 bytes") ErrIncompletePacket = fmt.Errorf("kafka message incomplete, expecting larger packet") ErrIncompleteInnerPacket = fmt.Errorf("incomplete kafka message within a compressed message") ErrInvalidMagic = fmt.Errorf("incorrect magic value") ErrChecksumMismatch = fmt.Errorf("checksum mismatch on kafka message") )
Error constants
var DefaultCodecs = []PayloadCodec{ new(NoCompressionPayloadCodec), new(GzipPayloadCodec), new(SnappyPayloadCodec), }
DefaultCodecs is a list of codecs supported and packaged with the library itself
var DefaultCodecsMap = codecsMap(DefaultCodecs)
DefaultCodecsMap is a map[id]Codec representation of the supported codecs
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker contains the generic kafka parameters for a broker
func (*Broker) EncodeConsumeRequest ¶
EncodeConsumeRequest encodes a fetch request into kafka's wire format <Request Header><OFFSET: uint64><MAX SIZE: uint32>
func (*Broker) EncodeOffsetRequest ¶
EncodeOffsetRequest encodes an offset request into kafka's wire format <Request Header><TIME: uint64><MAX NUMBER of OFFSETS: uint32>
func (*Broker) EncodePublishRequest ¶
EncodePublishRequest encodes a publish request into kafka's wire format <Request Header><MESSAGE SET SIZE: uint32><MESSAGE SETS>
func (*Broker) EncodeRequestHeader ¶
func (b *Broker) EncodeRequestHeader(requestType RequestType) *bytes.Buffer
EncodeRequestHeader marshals a request into kafka's wire format Request Header: <REQUEST_SIZE: uint32><REQUEST_TYPE: uint16><TOPIC SIZE: uint16><TOPIC: bytes><PARTITION: uint32>
type BrokerConsumer ¶
type BrokerConsumer struct {
// contains filtered or unexported fields
}
BrokerConsumer holds a Kafka broker instance and the consumer settings
func NewBrokerConsumer ¶
func NewBrokerConsumer(hostname string, topic string, partition int, offset uint64, maxSize uint32) *BrokerConsumer
NewBrokerConsumer creates a new broker consumer * hostname - host and optionally port, delimited by ':' * topic to consume * partition to consume from * offset to start consuming from * maxSize (in bytes) of the message to consume (this should be at least as big as the biggest message to be published)
func NewBrokerOffsetConsumer ¶
func NewBrokerOffsetConsumer(hostname string, topic string, partition int) *BrokerConsumer
NewBrokerOffsetConsumer creates a simplified consumer that defaults the offset and maxSize to 0. * hostname - host and optionally port, delimited by ':' * topic to consume * partition to consume from
func (*BrokerConsumer) AddCodecs ¶
func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec)
AddCodecs is a utility method to add Custom Payload Codecs for Consumer Decoding payloadCodecs - an array of PayloadCodec implementations
func (*BrokerConsumer) Consume ¶
func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc, stop <-chan struct{}) (int, error)
Consume makes a single fetch request and sends the messages in the message set to a handler function
func (*BrokerConsumer) ConsumeOnChannel ¶
func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan struct{}) (int, error)
ConsumeOnChannel fetches messages from kafka and enqueues them in a channel
func (*BrokerConsumer) GetOffset ¶
func (consumer *BrokerConsumer) GetOffset() uint64
GetOffset returns the current offset for a broker.
func (*BrokerConsumer) GetOffsets ¶
func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, error)
GetOffsets returns a list of valid offsets (up to maxNumOffsets) before the given time, where time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available) The result is a list of offsets, in descending order.
type BrokerPublisher ¶
type BrokerPublisher struct {
// contains filtered or unexported fields
}
BrokerPublisher holds a Kafka broker instance and the publisher settings
func NewBrokerPublisher ¶
func NewBrokerPublisher(hostname string, topic string, partition int) *BrokerPublisher
NewBrokerPublisher returns a new broker instance for a publisher
func (*BrokerPublisher) BatchPublish ¶
func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, error)
BatchPublish writes a batch of messages to the kafka broker
func (*BrokerPublisher) ProduceFromChannel ¶
func (b *BrokerPublisher) ProduceFromChannel(msgChan chan *Message, quit chan struct{}) (int, error)
ProduceFromChannel reads the messages from a Kafka log and sends them to a Message channel
type GzipPayloadCodec ¶
type GzipPayloadCodec struct { }
GzipPayloadCodec - Gzip Codec
func (*GzipPayloadCodec) Decode ¶
func (codec *GzipPayloadCodec) Decode(data []byte) []byte
Decode decodes the message with GZip compression
func (*GzipPayloadCodec) Encode ¶
func (codec *GzipPayloadCodec) Encode(data []byte) []byte
Encode encodes the message with GZip compression
func (*GzipPayloadCodec) ID ¶
func (codec *GzipPayloadCodec) ID() byte
ID returns the 1-byte id of the codec
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message wraps the message headers and the payload
func DecodeWithDefaultCodecs ¶
DecodeWithDefaultCodecs decodes the message(s) with the default codecs list
func NewCompressedMessage ¶
NewCompressedMessage creates a Message using the default compression method (gzip)
func NewCompressedMessages ¶
NewCompressedMessages encodes a batch of Messages using the default compression method (gzip)
func NewMessage ¶
NewMessage creates a message with no compression
func NewMessageWithCodec ¶
func NewMessageWithCodec(payload []byte, codec PayloadCodec) *Message
NewMessageWithCodec creates a new Message instance, with the payload encoded with the given codec
func (*Message) Encode ¶
Encode marshals the Message object into kafka's wire format MESSAGE SET: <MESSAGE LENGTH: uint32><MAGIC: 1 byte><COMPRESSION: 1 byte><CHECKSUM: uint32><MESSAGE PAYLOAD: bytes>
func (*Message) PayloadString ¶
PayloadString returns the actual payload of the message, as string
type MessageHandlerFunc ¶
type MessageHandlerFunc func(msg *Message)
MessageHandlerFunc defines the interface for message handlers accepted by Consume()
type NoCompressionPayloadCodec ¶
type NoCompressionPayloadCodec struct { }
NoCompressionPayloadCodec - No compression codec, noop
func (*NoCompressionPayloadCodec) Decode ¶
func (codec *NoCompressionPayloadCodec) Decode(data []byte) []byte
Decode decodes the message without any compression (no-op)
func (*NoCompressionPayloadCodec) Encode ¶
func (codec *NoCompressionPayloadCodec) Encode(data []byte) []byte
Encode encodes the message without any compression (no-op)
func (*NoCompressionPayloadCodec) ID ¶
func (codec *NoCompressionPayloadCodec) ID() byte
ID returns the 1-byte id of the codec
type PayloadCodec ¶
type PayloadCodec interface { // ID returns the 1-byte id of the codec ID() byte // Encode is the encoder interface for compression implementation Encode(data []byte) []byte // Decode is the decoder interface for decompression implementation Decode(data []byte) []byte }
PayloadCodec defines an interface for the Codecs supported by the Kafka library
type SnappyPayloadCodec ¶
type SnappyPayloadCodec struct { }
SnappyPayloadCodec - Snappy Codec
func (*SnappyPayloadCodec) Decode ¶
func (codec *SnappyPayloadCodec) Decode(data []byte) []byte
Decode decodes the message with Snappy compression
func (*SnappyPayloadCodec) Encode ¶
func (codec *SnappyPayloadCodec) Encode(data []byte) []byte
Encode encodes the message with Snappy compression
func (*SnappyPayloadCodec) ID ¶
func (codec *SnappyPayloadCodec) ID() byte
ID returns the 1-byte id of the codec