Versions in this module Expand all Collapse all v0 v0.1.6 Oct 29, 2024 v0.1.5 Oct 16, 2024 v0.1.4 Oct 16, 2024 v0.1.3 Oct 16, 2024 v0.1.2 Oct 16, 2024 Changes in this version + func Marshal(version int16, value interface{}) ([]byte, error) + func ReadAll(b Bytes) ([]byte, error) + func Register(req, res Message) + func RegisterOverride(req, res Message, key OverrideTypeKey) + func Unmarshal(data []byte, version int16, value interface{}) error + func WriteRequest(w io.Writer, apiVersion int16, correlationID int32, clientID string, ...) error + func WriteResponse(w io.Writer, apiVersion int16, correlationID int32, msg Message) error + type ApiKey int16 + const AddOffsetsToTxn + const AddPartitionsToTxn + const AlterClientQuotas + const AlterConfigs + const AlterPartitionReassignments + const AlterReplicaLogDirs + const AlterUserScramCredentials + const ApiVersions + const ControlledShutdown + const CreateAcls + const CreateDelegationToken + const CreatePartitions + const CreateTopics + const DeleteAcls + const DeleteGroups + const DeleteRecords + const DeleteTopics + const DescribeAcls + const DescribeClientQuotas + const DescribeConfigs + const DescribeDelegationToken + const DescribeGroups + const DescribeLogDirs + const DescribeUserScramCredentials + const ElectLeaders + const EndTxn + const ExpireDelegationToken + const Fetch + const FindCoordinator + const Heartbeat + const IncrementalAlterConfigs + const InitProducerId + const JoinGroup + const LeaderAndIsr + const LeaveGroup + const ListGroups + const ListOffsets + const ListPartitionReassignments + const Metadata + const OffsetCommit + const OffsetDelete + const OffsetFetch + const OffsetForLeaderEpoch + const Produce + const RenewDelegationToken + const SaslAuthenticate + const SaslHandshake + const StopReplica + const SyncGroup + const TxnOffsetCommit + const UpdateMetadata + const WriteTxnMarkers + func (k ApiKey) MaxVersion() int16 + func (k ApiKey) MinVersion() int16 + func (k ApiKey) SelectVersion(minVersion, maxVersion int16) int16 + func (k ApiKey) String() string + type Attributes int16 + const Control + const Gzip + const Lz4 + const Snappy + const Transactional + const Zstd + func (a Attributes) Compression() compress.Compression + func (a Attributes) Control() bool + func (a Attributes) String() string + func (a Attributes) Transactional() bool + type Broker struct + Host string + ID int32 + Port int32 + Rack string + func (b Broker) Format(w fmt.State, v rune) + func (b Broker) String() string + type BrokerMessage interface + Broker func(Cluster) (Broker, error) + type Bytes interface + Len func() int + func NewBytes(b []byte) Bytes + type Cluster struct + Brokers map[int32]Broker + ClusterID string + Controller int32 + Topics map[string]Topic + func (c Cluster) BrokerIDs() []int32 + func (c Cluster) Format(w fmt.State, _ rune) + func (c Cluster) IsZero() bool + func (c Cluster) TopicNames() []string + type Conn struct + func NewConn(conn net.Conn, clientID string) *Conn + func (c *Conn) Close() error + func (c *Conn) Discard(n int) (int, error) + func (c *Conn) LocalAddr() net.Addr + func (c *Conn) Peek(n int) ([]byte, error) + func (c *Conn) Read(b []byte) (int, error) + func (c *Conn) RemoteAddr() net.Addr + func (c *Conn) RoundTrip(msg Message) (Message, error) + func (c *Conn) SetDeadline(t time.Time) error + func (c *Conn) SetReadDeadline(t time.Time) error + func (c *Conn) SetVersions(versions map[ApiKey]int16) + func (c *Conn) SetWriteDeadline(t time.Time) error + func (c *Conn) String() string + func (c *Conn) Write(b []byte) (int, error) + type ControlBatch struct + Attributes Attributes + BaseOffset int64 + BaseSequence int32 + PartitionLeaderEpoch int32 + ProducerEpoch int16 + ProducerID int64 + Records RecordReader + func NewControlBatch(records ...ControlRecord) *ControlBatch + func (c *ControlBatch) Offset() int64 + func (c *ControlBatch) ReadControlRecord() (*ControlRecord, error) + func (c *ControlBatch) ReadRecord() (*Record, error) + func (c *ControlBatch) Version() int + type ControlRecord struct + Data []byte + Headers []Header + Offset int64 + Time time.Time + Type int16 + Version int16 + func ReadControlRecord(r *Record) (*ControlRecord, error) + func (cr *ControlRecord) Key() Bytes + func (cr *ControlRecord) Record() Record + func (cr *ControlRecord) Value() Bytes + type Error string + const ErrNoLeader + const ErrNoPartition + const ErrNoRecord + const ErrNoReset + const ErrNoTopic + func Errorf(msg string, args ...interface{}) Error + func (e Error) Error() string + type GroupMessage interface + Group func() string + type Header struct + Key string + Value []byte + type Merger interface + Merge func(messages []Message, results []interface{}) (Message, error) + type Message interface + ApiKey func() ApiKey + func ReadRequest(r io.Reader) (apiVersion int16, correlationID int32, clientID string, msg Message, err error) + func ReadResponse(r io.Reader, apiKey ApiKey, apiVersion int16) (correlationID int32, msg Message, err error) + func Result(r interface{}) (Message, error) + func RoundTrip(rw io.ReadWriter, apiVersion int16, correlationID int32, clientID string, ...) (Message, error) + type MessageSet struct + Attributes Attributes + BaseOffset int64 + Records RecordReader + func (m *MessageSet) Offset() int64 + func (m *MessageSet) ReadRecord() (*Record, error) + func (m *MessageSet) Version() int + type OverrideTypeKey int16 + const RawProduceOverride + type OverrideTypeMessage interface + TypeKey func() OverrideTypeKey + type Partition struct + Error int16 + ID int32 + ISR []int32 + Leader int32 + Offline []int32 + Replicas []int32 + type PreparedMessage interface + Prepare func(apiVersion int16) + type RawExchanger interface + RawExchange func(rw io.ReadWriter) (Message, error) + Required func(versions map[ApiKey]int16) bool + type RawRecordSet struct + Reader io.Reader + func (rrs *RawRecordSet) ReadFrom(r io.Reader) (int64, error) + func (rrs *RawRecordSet) WriteTo(w io.Writer) (int64, error) + type Record struct + Headers []Header + Key Bytes + Offset int64 + Time time.Time + Value Bytes + type RecordBatch struct + Attributes Attributes + BaseOffset int64 + BaseSequence int32 + PartitionLeaderEpoch int32 + ProducerEpoch int16 + ProducerID int64 + Records RecordReader + func (r *RecordBatch) Offset() int64 + func (r *RecordBatch) ReadRecord() (*Record, error) + func (r *RecordBatch) Version() int + type RecordReader interface + ReadRecord func() (*Record, error) + func MultiRecordReader(batches ...RecordReader) RecordReader + func NewRecordReader(records ...Record) RecordReader + type RecordSet struct + Attributes Attributes + Records RecordReader + Version int8 + func (rs *RecordSet) ReadFrom(r io.Reader) (int64, error) + func (rs *RecordSet) WriteTo(w io.Writer) (int64, error) + type RecordStream struct + Records []RecordReader + func (s *RecordStream) ReadRecord() (*Record, error) + type Splitter interface + Split func(Cluster) ([]Message, Merger, error) + type Topic struct + Error int16 + Name string + Partitions map[int32]Partition + type TopicError struct + Err error + Topic string + func NewErrNoTopic(topic string) *TopicError + func NewTopicError(topic string, err error) *TopicError + func (e *TopicError) Error() string + func (e *TopicError) Unwrap() error + type TopicPartitionError struct + Err error + Partition int32 + Topic string + func NewErrNoLeader(topic string, partition int32) *TopicPartitionError + func NewErrNoPartition(topic string, partition int32) *TopicPartitionError + func NewTopicPartitionError(topic string, partition int32, err error) *TopicPartitionError + func (e *TopicPartitionError) Error() string + func (e *TopicPartitionError) Unwrap() error + type TransactionalMessage interface + Transaction func() string