Versions in this module Expand all Collapse all v0 v0.4.36 Sep 6, 2022 v0.4.35 Aug 22, 2022 Changes in this version + func Marshal(version int16, value interface{}) ([]byte, error) + func ReadAll(b Bytes) ([]byte, error) + func Register(req, res Message) + func Unmarshal(data []byte, version int16, value interface{}) error + func WriteRequest(w io.Writer, apiVersion int16, correlationID int32, clientID string, ...) error + func WriteResponse(w io.Writer, apiVersion int16, correlationID int32, msg Message) error + type ApiKey int16 + const AddOffsetsToTxn + const AddPartitionsToTxn + const AlterClientQuotas + const AlterConfigs + const AlterPartitionReassignments + const AlterReplicaLogDirs + const ApiVersions + const ControlledShutdown + const CreateAcls + const CreateDelegationToken + const CreatePartitions + const CreateTopics + const DeleteAcls + const DeleteGroups + const DeleteRecords + const DeleteTopics + const DescribeAcls + const DescribeClientQuotas + const DescribeConfigs + const DescribeDelegationToken + const DescribeGroups + const DescribeLogDirs + const ElectLeaders + const EndTxn + const ExpireDelegationToken + const Fetch + const FindCoordinator + const Heartbeat + const IncrementalAlterConfigs + const InitProducerId + const JoinGroup + const LeaderAndIsr + const LeaveGroup + const ListGroups + const ListOffsets + const ListPartitionReassignments + const Metadata + const OffsetCommit + const OffsetDelete + const OffsetFetch + const OffsetForLeaderEpoch + const Produce + const RenewDelegationToken + const SaslAuthenticate + const SaslHandshake + const StopReplica + const SyncGroup + const TxnOffsetCommit + const UpdateMetadata + const WriteTxnMarkers + func (k ApiKey) MaxVersion() int16 + func (k ApiKey) MinVersion() int16 + func (k ApiKey) SelectVersion(minVersion, maxVersion int16) int16 + func (k ApiKey) String() string + type Attributes int16 + const Control + const Gzip + const Lz4 + const Snappy + const Transactional + const Zstd + func (a Attributes) Compression() compress.Compression + func (a Attributes) Control() bool + func (a Attributes) String() string + func (a Attributes) Transactional() bool + type Broker struct + Host string + ID int32 + Port int32 + Rack string + func (b Broker) Format(w fmt.State, v rune) + func (b Broker) String() string + type BrokerMessage interface + Broker func(Cluster) (Broker, error) + type Bytes interface + Len func() int + func NewBytes(b []byte) Bytes + type Cluster struct + Brokers map[int32]Broker + ClusterID string + Controller int32 + Topics map[string]Topic + func (c Cluster) BrokerIDs() []int32 + func (c Cluster) Format(w fmt.State, _ rune) + func (c Cluster) IsZero() bool + func (c Cluster) TopicNames() []string + type Conn struct + func NewConn(conn net.Conn, clientID string) *Conn + func (c *Conn) Close() error + func (c *Conn) Discard(n int) (int, error) + func (c *Conn) LocalAddr() net.Addr + func (c *Conn) Peek(n int) ([]byte, error) + func (c *Conn) Read(b []byte) (int, error) + func (c *Conn) RemoteAddr() net.Addr + func (c *Conn) RoundTrip(msg Message) (Message, error) + func (c *Conn) SetDeadline(t time.Time) error + func (c *Conn) SetReadDeadline(t time.Time) error + func (c *Conn) SetVersions(versions map[ApiKey]int16) + func (c *Conn) SetWriteDeadline(t time.Time) error + func (c *Conn) String() string + func (c *Conn) Write(b []byte) (int, error) + type ControlBatch struct + Attributes Attributes + BaseOffset int64 + BaseSequence int32 + PartitionLeaderEpoch int32 + ProducerEpoch int16 + ProducerID int64 + Records RecordReader + func NewControlBatch(records ...ControlRecord) *ControlBatch + func (c *ControlBatch) Offset() int64 + func (c *ControlBatch) ReadControlRecord() (*ControlRecord, error) + func (c *ControlBatch) ReadRecord() (*Record, error) + func (c *ControlBatch) Version() int + type ControlRecord struct + Data []byte + Headers []Header + Offset int64 + Time time.Time + Type int16 + Version int16 + func ReadControlRecord(r *Record) (*ControlRecord, error) + func (cr *ControlRecord) Key() Bytes + func (cr *ControlRecord) Record() Record + func (cr *ControlRecord) Value() Bytes + type Error string + const ErrNoLeader + const ErrNoPartition + const ErrNoRecord + const ErrNoReset + const ErrNoTopic + func Errorf(msg string, args ...interface{}) Error + func (e Error) Error() string + type GroupMessage interface + Group func() string + type Header struct + Key string + Value []byte + type Merger interface + Merge func(messages []Message, results []interface{}) (Message, error) + type Message interface + ApiKey func() ApiKey + func ReadRequest(r io.Reader) (apiVersion int16, correlationID int32, clientID string, msg Message, err error) + func ReadResponse(r io.Reader, apiKey ApiKey, apiVersion int16) (correlationID int32, msg Message, err error) + func Result(r interface{}) (Message, error) + func RoundTrip(rw io.ReadWriter, apiVersion int16, correlationID int32, clientID string, ...) (Message, error) + type MessageSet struct + Attributes Attributes + BaseOffset int64 + Records RecordReader + func (m *MessageSet) Offset() int64 + func (m *MessageSet) ReadRecord() (*Record, error) + func (m *MessageSet) Version() int + type Partition struct + Error int16 + ID int32 + ISR []int32 + Leader int32 + Offline []int32 + Replicas []int32 + type PreparedMessage interface + Prepare func(apiVersion int16) + type RawExchanger interface + RawExchange func(rw io.ReadWriter) (Message, error) + Required func(versions map[ApiKey]int16) bool + type Record struct + Headers []Header + Key Bytes + Offset int64 + Time time.Time + Value Bytes + type RecordBatch struct + Attributes Attributes + BaseOffset int64 + BaseSequence int32 + PartitionLeaderEpoch int32 + ProducerEpoch int16 + ProducerID int64 + Records RecordReader + func (r *RecordBatch) Offset() int64 + func (r *RecordBatch) ReadRecord() (*Record, error) + func (r *RecordBatch) Version() int + type RecordReader interface + ReadRecord func() (*Record, error) + func MultiRecordReader(batches ...RecordReader) RecordReader + func NewRecordReader(records ...Record) RecordReader + type RecordSet struct + Attributes Attributes + Records RecordReader + Version int8 + func (rs *RecordSet) ReadFrom(r io.Reader) (int64, error) + func (rs *RecordSet) WriteTo(w io.Writer) (int64, error) + type RecordStream struct + Records []RecordReader + func (s *RecordStream) ReadRecord() (*Record, error) + type Splitter interface + Split func(Cluster) ([]Message, Merger, error) + type Topic struct + Error int16 + Name string + Partitions map[int32]Partition + type TopicError struct + Err error + Topic string + func NewErrNoTopic(topic string) *TopicError + func NewTopicError(topic string, err error) *TopicError + func (e *TopicError) Error() string + func (e *TopicError) Unwrap() error + type TopicPartitionError struct + Err error + Partition int32 + Topic string + func NewErrNoLeader(topic string, partition int32) *TopicPartitionError + func NewErrNoPartition(topic string, partition int32) *TopicPartitionError + func NewTopicPartitionError(topic string, partition int32, err error) *TopicPartitionError + func (e *TopicPartitionError) Error() string + func (e *TopicPartitionError) Unwrap() error + type TransactionalMessage interface + Transaction func() string