Documentation ¶
Index ¶
- func Marshal(version int16, value interface{}) ([]byte, error)
- func ReadAll(b Bytes) ([]byte, error)
- func Register(req, res Message)
- func RegisterOverride(req, res Message, key OverrideTypeKey)
- func Unmarshal(data []byte, version int16, value interface{}) error
- func WriteRequest(w io.Writer, apiVersion int16, correlationID int32, clientID string, ...) error
- func WriteResponse(w io.Writer, apiVersion int16, correlationID int32, msg Message) error
- type ApiKey
- type Attributes
- type Broker
- type BrokerMessage
- type Bytes
- type Cluster
- type Conn
- func (c *Conn) Close() error
- func (c *Conn) Discard(n int) (int, error)
- func (c *Conn) LocalAddr() net.Addr
- func (c *Conn) Peek(n int) ([]byte, error)
- func (c *Conn) Read(b []byte) (int, error)
- func (c *Conn) RemoteAddr() net.Addr
- func (c *Conn) RoundTrip(msg Message) (Message, error)
- func (c *Conn) SetDeadline(t time.Time) error
- func (c *Conn) SetReadDeadline(t time.Time) error
- func (c *Conn) SetVersions(versions map[ApiKey]int16)
- func (c *Conn) SetWriteDeadline(t time.Time) error
- func (c *Conn) String() string
- func (c *Conn) Write(b []byte) (int, error)
- type ControlBatch
- type ControlRecord
- type Error
- type GroupMessage
- type Header
- type Merger
- type Message
- func ReadRequest(r io.Reader) (apiVersion int16, correlationID int32, clientID string, msg Message, err error)
- func ReadResponse(r io.Reader, apiKey ApiKey, apiVersion int16) (correlationID int32, msg Message, err error)
- func Result(r interface{}) (Message, error)
- func RoundTrip(rw io.ReadWriter, apiVersion int16, correlationID int32, clientID string, ...) (Message, error)
- type MessageSet
- type OverrideTypeKey
- type OverrideTypeMessage
- type Partition
- type PreparedMessage
- type RawExchanger
- type RawRecordSet
- type Record
- type RecordBatch
- type RecordReader
- type RecordSet
- type RecordStream
- type Splitter
- type Topic
- type TopicError
- type TopicPartitionError
- type TransactionalMessage
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ReadAll ¶
ReadAll is similar to ioutil.ReadAll, but it takes advantage of knowing the length of b to minimize the memory footprint.
The function returns a nil slice if b is nil.
func Register ¶
func Register(req, res Message)
Register is automatically called by sub-packages are imported to install a new pair of request/response message types.
func RegisterOverride ¶
func RegisterOverride(req, res Message, key OverrideTypeKey)
func WriteRequest ¶
Types ¶
type ApiKey ¶
type ApiKey int16
const ( Produce ApiKey = 0 Fetch ApiKey = 1 ListOffsets ApiKey = 2 Metadata ApiKey = 3 LeaderAndIsr ApiKey = 4 StopReplica ApiKey = 5 UpdateMetadata ApiKey = 6 ControlledShutdown ApiKey = 7 OffsetCommit ApiKey = 8 OffsetFetch ApiKey = 9 FindCoordinator ApiKey = 10 JoinGroup ApiKey = 11 Heartbeat ApiKey = 12 LeaveGroup ApiKey = 13 SyncGroup ApiKey = 14 DescribeGroups ApiKey = 15 ListGroups ApiKey = 16 SaslHandshake ApiKey = 17 ApiVersions ApiKey = 18 CreateTopics ApiKey = 19 DeleteTopics ApiKey = 20 DeleteRecords ApiKey = 21 InitProducerId ApiKey = 22 OffsetForLeaderEpoch ApiKey = 23 AddPartitionsToTxn ApiKey = 24 AddOffsetsToTxn ApiKey = 25 EndTxn ApiKey = 26 WriteTxnMarkers ApiKey = 27 TxnOffsetCommit ApiKey = 28 DescribeAcls ApiKey = 29 CreateAcls ApiKey = 30 DeleteAcls ApiKey = 31 DescribeConfigs ApiKey = 32 AlterConfigs ApiKey = 33 AlterReplicaLogDirs ApiKey = 34 DescribeLogDirs ApiKey = 35 SaslAuthenticate ApiKey = 36 CreatePartitions ApiKey = 37 CreateDelegationToken ApiKey = 38 RenewDelegationToken ApiKey = 39 ExpireDelegationToken ApiKey = 40 DescribeDelegationToken ApiKey = 41 DeleteGroups ApiKey = 42 ElectLeaders ApiKey = 43 IncrementalAlterConfigs ApiKey = 44 AlterPartitionReassignments ApiKey = 45 ListPartitionReassignments ApiKey = 46 OffsetDelete ApiKey = 47 DescribeClientQuotas ApiKey = 48 AlterClientQuotas ApiKey = 49 DescribeUserScramCredentials ApiKey = 50 AlterUserScramCredentials ApiKey = 51 )
func (ApiKey) MaxVersion ¶
func (ApiKey) MinVersion ¶
func (ApiKey) SelectVersion ¶
type Attributes ¶
type Attributes int16
Attributes is a bitset representing special attributes set on records.
const ( Gzip Attributes = Attributes(compress.Gzip) // 1 Snappy Attributes = Attributes(compress.Snappy) // 2 Lz4 Attributes = Attributes(compress.Lz4) // 3 Zstd Attributes = Attributes(compress.Zstd) // 4 Transactional Attributes = 1 << 4 Control Attributes = 1 << 5 )
func (Attributes) Compression ¶
func (a Attributes) Compression() compress.Compression
func (Attributes) Control ¶
func (a Attributes) Control() bool
func (Attributes) String ¶
func (a Attributes) String() string
func (Attributes) Transactional ¶
func (a Attributes) Transactional() bool
type BrokerMessage ¶
type BrokerMessage interface { // Given a representation of the kafka cluster state as argument, returns // the broker that the message should be routed to. Broker(Cluster) (Broker, error) }
BrokerMessage is an extension of the Message interface implemented by some request types to customize the broker assignment logic.
type Bytes ¶
type Bytes interface { io.ReadCloser // Returns the number of bytes remaining to be read from the payload. Len() int }
Bytes is an interface implemented by types that represent immutable sequences of bytes.
Bytes values are used to abstract the location where record keys and values are read from (e.g. in-memory buffers, network sockets, files).
The Close method should be called to release resources held by the object when the program is done with it.
Bytes values are generally not safe to use concurrently from multiple goroutines.
type Cluster ¶
type Cluster struct { ClusterID string Controller int32 Brokers map[int32]Broker Topics map[string]Topic }
func (Cluster) TopicNames ¶
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
func (*Conn) RemoteAddr ¶
func (*Conn) SetVersions ¶
type ControlBatch ¶
type ControlBatch struct { Attributes Attributes PartitionLeaderEpoch int32 BaseOffset int64 ProducerID int64 ProducerEpoch int16 BaseSequence int32 Records RecordReader }
ControlBatch is an implementation of the RecordReader interface representing control batches returned by kafka brokers.
func NewControlBatch ¶
func NewControlBatch(records ...ControlRecord) *ControlBatch
NewControlBatch constructs a control batch from the list of records passed as arguments.
func (*ControlBatch) Offset ¶
func (c *ControlBatch) Offset() int64
func (*ControlBatch) ReadControlRecord ¶
func (c *ControlBatch) ReadControlRecord() (*ControlRecord, error)
func (*ControlBatch) ReadRecord ¶
func (c *ControlBatch) ReadRecord() (*Record, error)
func (*ControlBatch) Version ¶
func (c *ControlBatch) Version() int
type ControlRecord ¶
type ControlRecord struct { Offset int64 Time time.Time Version int16 Type int16 Data []byte Headers []Header }
ControlRecord represents a record read from a control batch.
func ReadControlRecord ¶
func ReadControlRecord(r *Record) (*ControlRecord, error)
func (*ControlRecord) Key ¶
func (cr *ControlRecord) Key() Bytes
func (*ControlRecord) Record ¶
func (cr *ControlRecord) Record() Record
func (*ControlRecord) Value ¶
func (cr *ControlRecord) Value() Bytes
type Error ¶
type Error string
Error represents client-side protocol errors.
const ( // ErrNoTopic is returned when a request needs to be sent to a specific. ErrNoTopic Error = "topic not found" // ErrNoPartition is returned when a request needs to be sent to a specific // partition, but the client did not find it in the cluster metadata. ErrNoPartition Error = "topic partition not found" // ErrNoLeader is returned when a request needs to be sent to a partition // leader, but the client could not determine what the leader was at this // time. ErrNoLeader Error = "topic partition has no leader" // ErrNoRecord is returned when attempting to write a message containing an // empty record set (which kafka forbids). // // We handle this case client-side because kafka will close the connection // that it received an empty produce request on, causing all concurrent // requests to be aborted. ErrNoRecord Error = "record set contains no records" // ErrNoReset is returned by ResetRecordReader when the record reader does // not support being reset. ErrNoReset Error = "record sequence does not support reset" )
type GroupMessage ¶
type GroupMessage interface { // Returns the group configured on the message. Group() string }
GroupMessage is an extension of the Message interface implemented by some request types to inform the program that they should be routed to a group coordinator.
type Merger ¶
type Merger interface { // Given a list of message and associated results, merge them back into a // response (or an error). The results must be either Message or error // values, other types should trigger a panic. Merge(messages []Message, results []interface{}) (Message, error) }
Merger is an interface implemented by messages which can merge multiple results into one response.
type Message ¶
type Message interface {
ApiKey() ApiKey
}
Message is an interface implemented by all request and response types of the kafka protocol.
This interface is used mostly as a safe-guard to provide a compile-time check for values passed to functions dealing kafka message types.
func ReadRequest ¶
func ReadResponse ¶
type MessageSet ¶
type MessageSet struct { Attributes Attributes BaseOffset int64 Records RecordReader }
MessageSet is an implementation of the RecordReader interface representing regular message sets (v1).
func (*MessageSet) Offset ¶
func (m *MessageSet) Offset() int64
func (*MessageSet) ReadRecord ¶
func (m *MessageSet) ReadRecord() (*Record, error)
func (*MessageSet) Version ¶
func (m *MessageSet) Version() int
type OverrideTypeMessage ¶
type OverrideTypeMessage interface {
TypeKey() OverrideTypeKey
}
OverrideTypeMessage is an interface implemented by messages that want to override the standard request/response types for a given API.
type PreparedMessage ¶
type PreparedMessage interface { // Prepares the message before being sent to a kafka broker using the API // version passed as argument. Prepare(apiVersion int16) }
PreparedMessage is an extension of the Message interface implemented by some request types which may need to run some pre-processing on their state before being sent.
type RawExchanger ¶
type RawExchanger interface { // Required should return true when a RawExchange is needed. // The passed in versions are the negotiated versions for the connection // performing the request. Required(versions map[ApiKey]int16) bool // RawExchange is given the raw connection to the broker and the Message // is responsible for writing itself to the connection as well as reading // the response. RawExchange(rw io.ReadWriter) (Message, error) }
RawExchanger is an extention to the Message interface to allow messages to control the request response cycle for the message. This is currently only used to facilitate v0 SASL Authenticate requests being written in a non-standard fashion when the SASL Handshake was done at v0 but not when done at v1.
type RawRecordSet ¶
type RawRecordSet struct { // Reader exposes the raw sequence of record set bytes. Reader io.Reader }
RawRecordSet represents a record set for a RawProduce request. The record set is represented as a raw sequence of pre-encoded record set bytes.
func (*RawRecordSet) ReadFrom ¶
func (rrs *RawRecordSet) ReadFrom(r io.Reader) (int64, error)
ReadFrom reads the representation of a record set from r into rrs. It re-uses the existing RecordSet.ReadFrom implementation to first read/decode data into a RecordSet, then writes/encodes the RecordSet to a buffer referenced by the RawRecordSet.
Note: re-using the RecordSet.ReadFrom implementation makes this suboptimal from a performance standpoint as it require an extra copy of the record bytes. Holding off on optimizing, as this code path is only invoked in tests.
type Record ¶
type Record struct { // The offset at which the record exists in a topic partition. This value // is ignored in produce requests. Offset int64 // Returns the time of the record. This value may be omitted in produce // requests to let kafka set the time when it saves the record. Time time.Time // Returns a byte sequence containing the key of this record. The returned // sequence may be nil to indicate that the record has no key. If the record // is part of a RecordSet, the content of the key must remain valid at least // until the record set is closed (or until the key is closed). Key Bytes // Returns a byte sequence containing the value of this record. The returned // sequence may be nil to indicate that the record has no value. If the // record is part of a RecordSet, the content of the value must remain valid // at least until the record set is closed (or until the value is closed). Value Bytes // Returns the list of headers associated with this record. The returned // slice may be reused across calls, the program should use it as an // immutable value. Headers []Header }
Record is an interface representing a single kafka record.
Record values are not safe to use concurrently from multiple goroutines.
type RecordBatch ¶
type RecordBatch struct { Attributes Attributes PartitionLeaderEpoch int32 BaseOffset int64 ProducerID int64 ProducerEpoch int16 BaseSequence int32 Records RecordReader }
RecordBatch is an implementation of the RecordReader interface representing regular record batches (v2).
func (*RecordBatch) Offset ¶
func (r *RecordBatch) Offset() int64
func (*RecordBatch) ReadRecord ¶
func (r *RecordBatch) ReadRecord() (*Record, error)
func (*RecordBatch) Version ¶
func (r *RecordBatch) Version() int
type RecordReader ¶
type RecordReader interface { // Returns the next record in the set, or io.EOF if the end of the sequence // has been reached. // // The returned Record is guaranteed to be valid until the next call to // ReadRecord. If the program needs to retain the Record value it must make // a copy. ReadRecord() (*Record, error) }
RecordReader is an interface representing a sequence of records. Record sets are used in both produce and fetch requests to represent the sequence of records that are sent to or receive from kafka brokers.
RecordSet values are not safe to use concurrently from multiple goroutines.
func MultiRecordReader ¶
func MultiRecordReader(batches ...RecordReader) RecordReader
MultiRecordReader merges multiple record batches into one.
func NewRecordReader ¶
func NewRecordReader(records ...Record) RecordReader
NewRecordReader constructs a reader exposing the records passed as arguments.
type RecordSet ¶
type RecordSet struct { // The message version that this record set will be represented as, valid // values are 1, or 2. // // When reading, this is the value of the highest version used in the // batches that compose the record set. // // When writing, this value dictates the format that the records will be // encoded in. Version int8 // Attributes set on the record set. // // When reading, the attributes are the combination of all attributes in // the batches that compose the record set. // // When writing, the attributes apply to the whole sequence of records in // the set. Attributes Attributes // A reader exposing the sequence of records. // // When reading a RecordSet from an io.Reader, the Records field will be a // *RecordStream. If the program needs to access the details of each batch // that compose the stream, it may use type assertions to access the // underlying types of each batch. Records RecordReader }
RecordSet represents a sequence of records in Produce requests and Fetch responses. All v0, v1, and v2 formats are supported.
func (*RecordSet) ReadFrom ¶
ReadFrom reads the representation of a record set from r into rs, returning the number of bytes consumed from r, and an non-nil error if the record set could not be read.
func (*RecordSet) WriteTo ¶
WriteTo writes the representation of rs into w. The value of rs.Version dictates which format that the record set will be represented as.
The error will be ErrNoRecord if rs contained no records.
Note: since this package is only compatible with kafka 0.10 and above, the method never produces messages in version 0. If rs.Version is zero, the method defaults to producing messages in version 1.
type RecordStream ¶
type RecordStream struct { Records []RecordReader // contains filtered or unexported fields }
RecordStream is an implementation of the RecordReader interface which combines multiple underlying RecordReader and only expose records that are not from control batches.
func (*RecordStream) ReadRecord ¶
func (s *RecordStream) ReadRecord() (*Record, error)
type Splitter ¶
type Splitter interface { // For a given cluster layout, returns the list of messages constructed // from the receiver for each requests that should be sent to the cluster. // The second return value is a Merger which can be used to merge back the // results of each request into a single message (or an error). Split(Cluster) ([]Message, Merger, error) }
Splitter is an interface implemented by messages that can be split into multiple requests and have their results merged back by a Merger.
type TopicError ¶
func NewErrNoTopic ¶
func NewErrNoTopic(topic string) *TopicError
func NewTopicError ¶
func NewTopicError(topic string, err error) *TopicError
func (*TopicError) Error ¶
func (e *TopicError) Error() string
func (*TopicError) Unwrap ¶
func (e *TopicError) Unwrap() error
type TopicPartitionError ¶
func NewErrNoLeader ¶
func NewErrNoLeader(topic string, partition int32) *TopicPartitionError
func NewErrNoPartition ¶
func NewErrNoPartition(topic string, partition int32) *TopicPartitionError
func NewTopicPartitionError ¶
func NewTopicPartitionError(topic string, partition int32, err error) *TopicPartitionError
func (*TopicPartitionError) Error ¶
func (e *TopicPartitionError) Error() string
func (*TopicPartitionError) Unwrap ¶
func (e *TopicPartitionError) Unwrap() error
type TransactionalMessage ¶
type TransactionalMessage interface { // Returns the transactional id configured on the message. Transaction() string }
TransactionalMessage is an extension of the Message interface implemented by some request types to inform the program that they should be routed to a transaction coordinator.