Documentation ¶
Index ¶
- Constants
- func Register(req, res Message)
- func WriteRequest(w io.Writer, apiVersion int16, correlationID int32, clientID string, ...) error
- func WriteResponse(w io.Writer, apiVersion int16, correlationID int32, msg Message) error
- type ApiKey
- type Attributes
- type Broker
- type BrokerMessage
- type ByteSequence
- type Cluster
- type Error
- type GroupMessage
- type Header
- type Mapper
- type Message
- func ReadRequest(r io.Reader) (apiVersion int16, correlationID int32, clientID string, msg Message, err error)
- func ReadResponse(r io.Reader, apiKey, apiVersion int16) (correlationID int32, msg Message, err error)
- func Result(r interface{}) (Message, error)
- func RoundTrip(rw io.ReadWriter, apiVersion int16, clientID string, msg Message) (Message, error)
- type Partition
- type PreparedMessage
- type Record
- type RecordSet
- type Reducer
- type Topic
Constants ¶
const ( ErrCorrupted = Error("corrupted") ErrTruncated = Error("truncated") )
Variables ¶
This section is empty.
Functions ¶
func Register ¶
func Register(req, res Message)
Register is automatically called by sub-packages are imported to install a new pair of request/response message types.
func WriteRequest ¶
Types ¶
type ApiKey ¶
type ApiKey int16
const ( Produce ApiKey = 0 Fetch ApiKey = 1 ListOffsets ApiKey = 2 Metadata ApiKey = 3 LeaderAndIsr ApiKey = 4 StopReplica ApiKey = 5 UpdateMetadata ApiKey = 6 ControlledShutdown ApiKey = 7 OffsetCommit ApiKey = 8 OffsetFetch ApiKey = 9 FindCoordinator ApiKey = 10 JoinGroup ApiKey = 11 Heartbeat ApiKey = 12 LeaveGroup ApiKey = 13 SyncGroup ApiKey = 14 DescribeGroups ApiKey = 15 ListGroups ApiKey = 16 SaslHandshake ApiKey = 17 ApiVersions ApiKey = 18 CreateTopics ApiKey = 19 DeleteTopics ApiKey = 20 DeleteRecords ApiKey = 21 InitProducerId ApiKey = 22 OffsetForLeaderEpoch ApiKey = 23 AddPartitionsToTxn ApiKey = 24 AddOffsetsToTxn ApiKey = 25 EndTxn ApiKey = 26 WriteTxnMarkers ApiKey = 27 TxnOffsetCommit ApiKey = 28 DescribeAcls ApiKey = 29 CreateAcls ApiKey = 30 DeleteAcls ApiKey = 31 DescribeConfigs ApiKey = 32 AlterConfigs ApiKey = 33 AlterReplicaLogDirs ApiKey = 34 DescribeLogDirs ApiKey = 35 SaslAuthenticate ApiKey = 36 CreatePartitions ApiKey = 37 CreateDelegationToken ApiKey = 38 RenewDelegationToken ApiKey = 39 ExpireDelegationToken ApiKey = 40 DescribeDelegationToken ApiKey = 41 DeleteGroups ApiKey = 42 ElectLeaders ApiKey = 43 IncrementalAlterConfigs ApiKey = 44 AlterPartitionReassignments ApiKey = 45 ListPartitionReassignments ApiKey = 46 OffsetDelete ApiKey = 47 )
func (ApiKey) MaxVersion ¶
func (ApiKey) MinVersion ¶
func (ApiKey) SelectVersion ¶
type Attributes ¶
type Attributes int16
Attributes is a bitset representing special attributes set on records.
const ( Gzip Attributes = Attributes(compress.Gzip) // 1 Snappy Attributes = Attributes(compress.Snappy) // 2 Lz4 Attributes = Attributes(compress.Lz4) // 3 Zstd Attributes = Attributes(compress.Zstd) // 4 Transactional Attributes = 1 << 4 ControlBatch Attributes = 1 << 5 )
func (Attributes) Compression ¶
func (a Attributes) Compression() compress.Compression
func (Attributes) ControlBatch ¶
func (a Attributes) ControlBatch() bool
func (Attributes) Transactional ¶
func (a Attributes) Transactional() bool
type BrokerMessage ¶
type BrokerMessage interface { // Given a representation of the kafka cluster state as argument, returns // the broker that the message should be routed to. Broker(Cluster) (Broker, error) }
BrokerMessage is an extension of the Message interface implemented by some request types to customize the broker assignment logic.
type ByteSequence ¶
ByteSequence is an interface implemented by types that represent immutable sequences of bytes.
ByteSequence values are used to abstract the location where record keys and values are read from (e.g. in-memory buffers, network sockets, files).
The Close method should be called to release resources held by the sequence when the program is done with it.
ByteSequence values are generally not safe to use concurrently from multiple goroutines.
func Bytes ¶
func Bytes(b []byte) ByteSequence
Bytes constructs a ByteSequence which exposes the content of b.
func String ¶
func String(s string) ByteSequence
String constructs a ByteSequence which exposes the content of s.
type Cluster ¶
type Error ¶
type Error string
Error is a string type implementing the error interface and used to declare constants representing recoverable protocol errors.
type GroupMessage ¶
type GroupMessage interface { // Returns the group configured on the message. Group() string }
GroupMessage is an extension of the Message interface implemented by some request types to inform the program that they should be routed to a group coordinator.
type Mapper ¶
type Mapper interface { // For a given cluster layout, returns the list of messages constructed // from the receiver for each requests that should be sent to the cluster. // The second return value is a Reducer which can be used to merge back the // results of each request into a single message (or an error). Map(Cluster) ([]Message, Reducer, error) }
Mapper is an interface implemented by messages that can be split into multiple requests and have their results merged back by a Reducer.
type Message ¶
type Message interface {
ApiKey() ApiKey
}
Message is an interface implemented by all request and response types of the kafka protocol.
This interface is used mostly as a safe-guard to provide a compile-time check for values passed to functions dealing kafka message types.
func ReadRequest ¶
func ReadResponse ¶
func Result ¶
Result converts r to a Message or and error, or panics if r could be be converted to these types.
type PreparedMessage ¶
type PreparedMessage interface { // Prepares the message before being sent to a kafka broker using the API // version passed as argument. Prepare(apiVersion int16) }
PreparedMessage is an extension of the Message interface implemented by some request types which may need to run some pre-processing on their state before being sent.
type Record ¶
type Record struct { Offset int64 Time time.Time Key ByteSequence Value ByteSequence Headers []Header }
Record values represent single records exchanged in Produce requests and Fetch responses.
type RecordSet ¶
type RecordSet struct { // The message version that this record set will be represented as, valid // values are 1, or 2. Version int8 // The following fields carry properties used when representing the record // batch in version 2. Attributes Attributes PartitionLeaderEpoch int32 BaseOffset int64 ProducerID int64 ProducerEpoch int16 BaseSequence int32 // The list of records contained in this set. Records []Record }
RecordSet represents a sequence of records in Produce requests and Fetch responses. All v0, v1, and v2 formats are supported.
func (*RecordSet) ReadFrom ¶
ReadFrom reads the representation of a record set from r into rs, returning the number of bytes consumed from r, and an non-nil error if the record set could not be read.
func (*RecordSet) WriteTo ¶
WriteTo writes the representation of rs into w. The value of rs.Version dictates which format that the record set will be represented as.
Note: since this package is only compatible with kafka 0.10 and above, the method never produces messages in version 0. If rs.Version is zero, the method defaults to producing messages in version 1.
type Reducer ¶
type Reducer interface { // Given a list of message and associated results, merge them back into a // response (or an error). The results must be either Message or error // values, other types should trigger a panic. Reduce(messages []Message, results []interface{}) (Message, error) }
Reducer is an interface implemented by messages which can merge multiple results into one response.