Versions in this module Expand all Collapse all v0 v0.4.3 May 8, 2020 v0.4.2 May 8, 2020 Changes in this version + const SeekDontCheck + var ErrGenerationEnded = errors.New("consumer group generation has ended") + var ErrGroupClosed = errors.New("consumer group is closed") + func Addr(network, address string) net.Addr + func TCP(address string) net.Addr + func TLS(address string) net.Addr + type ApiVersion struct + ApiKey int16 + MaxVersion int16 + MinVersion int16 + func (v ApiVersion) Format(w fmt.State, r rune) type Batch + func (batch *Batch) Err() error + type Bytes = protocol.ByteSequence + type CRC32Balancer struct + Consistent bool + func (b CRC32Balancer) Balance(msg Message, partitions ...int) (partition int) + type Client struct + Addr net.Addr + Timeout time.Duration + Transport RoundTripper + func (c *Client) ConsumerOffsets(ctx context.Context, tg TopicAndGroup) (map[int]int64, error) + func (c *Client) CreateTopics(ctx context.Context, req *CreateTopicsRequest) (*CreateTopicsResponse, error) + func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*DeleteTopicsResponse, error) + func (c *Client) Fetch(ctx context.Context, req *FetchRequest) (*FetchResponse, error) + func (c *Client) ListOffsets(ctx context.Context, req *ListOffsetsRequest) (*ListOffsetsResponse, error) + func (c *Client) Metadata(ctx context.Context, req *MetadataRequest) (*MetadataResponse, error) + func (c *Client) OffsetFetch(ctx context.Context, req *OffsetFetchRequest) (*OffsetFetchResponse, error) + func (c *Client) Produce(ctx context.Context, req *ProduceRequest) (*ProduceResponse, error) type ConfigEntry + IsSensitive bool + ReadOnly bool type Conn + func (c *Conn) ApiVersions() ([]ApiVersion, error) + func (c *Conn) Brokers() ([]Broker, error) + func (c *Conn) Controller() (broker Broker, err error) + func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch + func (c *Conn) WriteCompressedMessagesAt(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error) type ConnConfig + TransactionalID string + type ConsumerGroup struct + func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error) + func (cg *ConsumerGroup) Close() error + func (cg *ConsumerGroup) Next(ctx context.Context) (*Generation, error) + type ConsumerGroupConfig struct + Brokers []string + Dialer *Dialer + ErrorLogger Logger + GroupBalancers []GroupBalancer + HeartbeatInterval time.Duration + ID string + JoinGroupBackoff time.Duration + Logger Logger + PartitionWatchInterval time.Duration + RebalanceTimeout time.Duration + RetentionTime time.Duration + SessionTimeout time.Duration + StartOffset int64 + Topics []string + WatchPartitionChanges bool + func (config *ConsumerGroupConfig) Validate() error + type CreateTopicStatus struct + ConfigEntries []ConfigEntry + Error error + NumPartitions int + ReplicationFactor int + Topic string + type CreateTopicsRequest struct + Addr net.Addr + Topics []TopicConfig + ValidateOnly bool + type CreateTopicsResponse struct + Throttle time.Duration + Topics []CreateTopicStatus + type DeleteTopicsRequest struct + Addr net.Addr + Topics []string + type DeleteTopicsResponse struct + Errors map[string]error + Throttle time.Duration type Dialer + SASLMechanism sasl.Mechanism + TransactionalID string type Error + const DelegationTokenAuthDisabled + const DelegationTokenAuthorizationFailed + const DelegationTokenExpired + const DelegationTokenNotFound + const DelegationTokenOwnerMismatch + const DelegationTokenRequestNotAllowed + const FencedInstanceID + const FencedLeaderEpoch + const FetchSessionIDNotFound + const GroupIdNotFound + const GroupMaxSizeReached + const InvalidFetchSessionEpoch + const InvalidPrincipalType + const KafkaStorageError + const ListenerNotFound + const LogDirNotFound + const MemberIDRequired + const NetworkException + const NonEmptyGroup + const OffsetNotAvailable + const PreferredLeaderNotAvailable + const ReassignmentInProgress + const SASLAuthenticationFailed + const StaleBrokerEpoch + const TopicDeletionDisabled + const UnknownLeaderEpoch + const UnknownProducerId + const UnsupportedCompressionType + type FetchRequest struct + Addr net.Addr + IsolationLevel IsolationLevel + MaxBytes int64 + MaxWait time.Duration + MinBytes int64 + Offset int64 + Partition int + Topic string + type FetchResponse struct + ControlBatch bool + Error error + HighWatermark int64 + LastStableOffset int64 + LogStartOffset int64 + Partition int + Records RecordSet + Throttle time.Duration + Topic string + Transactional bool + type Generation struct + Assignments map[string][]PartitionAssignment + GroupID string + ID int32 + MemberID string + func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error + func (g *Generation) Start(fn func(ctx context.Context)) + type Header = protocol.Header + type IsolationLevel int8 + const ReadCommitted + const ReadUncommitted + type ListOffsetsRequest struct + Addr net.Addr + IsolationLevel IsolationLevel + Topics map[string][]OffsetRequest + type ListOffsetsResponse struct + Throttle time.Duration + Topics map[string][]PartitionOffsets + type Logger interface + Printf func(string, ...interface{}) + type LoggerFunc func(string, ...interface{}) + func (f LoggerFunc) Printf(msg string, args ...interface{}) type Message + Headers []Header + type MessageTooLargeError struct + Message Message + Remaining []Message + func (e MessageTooLargeError) Error() string + type MetadataRequest struct + Addr net.Addr + Topics []string + type MetadataResponse struct + Brokers []Broker + ClusterID string + Controller Broker + Throttle time.Duration + Topics []Topic + type Murmur2Balancer struct + Consistent bool + func (b Murmur2Balancer) Balance(msg Message, partitions ...int) (partition int) + type OffsetFetchPartition struct + CommittedOffset int64 + Error error + Metadata string + Partition int + type OffsetFetchRequest struct + Addr net.Addr + GroupID string + RequireStable bool + Topics map[string][]int + type OffsetFetchResponse struct + Error error + Throttle time.Duration + Topics map[string][]OffsetFetchPartition + type OffsetRequest struct + Partition int + Timestamp int64 + func FirstOffsetOf(partition int) OffsetRequest + func LastOffsetOf(partition int) OffsetRequest + func TimeOffsetOf(partition int, at time.Time) OffsetRequest type Partition + Error error + type PartitionAssignment struct + ID int + Offset int64 + type PartitionOffsets struct + Error error + FirstOffset int64 + LastOffset int64 + Offsets map[int64]time.Time + Partition int + type ProduceRequest struct + Addr net.Addr + Compression compress.Compression + MessageVersion int + Partition int + Records RecordSet + RequiredAcks int + Topic string + TransactionalID string + type ProduceResponse struct + BaseOffset int64 + Error error + LogAppendTime time.Time + LogStartOffset int64 + RecordErrors map[int]error + Throttle time.Duration + type RackAffinityGroupBalancer struct + Rack string + func (r RackAffinityGroupBalancer) AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments + func (r RackAffinityGroupBalancer) ProtocolName() string + func (r RackAffinityGroupBalancer) UserData() ([]byte, error) + type ReadBatchConfig struct + IsolationLevel IsolationLevel + MaxBytes int + MinBytes int type Reader + func (r *Reader) SetOffsetAt(ctx context.Context, t time.Time) error type ReaderConfig + IsolationLevel IsolationLevel + JoinGroupBackoff time.Duration + MaxAttempts int + PartitionWatchInterval time.Duration + ReadBackoffMax time.Duration + ReadBackoffMin time.Duration + StartOffset int64 + WatchPartitionChanges bool + func (config *ReaderConfig) Validate() error + type Record interface + Headers func() []Header + Key func() Bytes + Offset func() int64 + Time func() time.Time + Value func() Bytes + func NewRecord(offset int64, time time.Time, key, value []byte, headers ...Header) Record + type RecordSet interface + Next func() Record + func NewRecordSet(records ...Record) RecordSet + type RoundTripper interface + RoundTrip func(context.Context, net.Addr, protocol.Message) (protocol.Message, error) + var DefaultTransport RoundTripper = &Transport{ ... } + type Topic struct + Error error + Internal bool + Name string + Partitions []Partition + type TopicAndGroup struct + GroupId string + Topic string + type Transport struct + ClientID string + Dial func(context.Context, string, string) (net.Conn, error) + DialTimeout time.Duration + IdleTimeout time.Duration + MetadataTTL time.Duration + ReadTimeout time.Duration + SASL sasl.Mechanism + TLS *tls.Config + func (t *Transport) CloseIdleConnections() + func (t *Transport) RoundTrip(ctx context.Context, addr net.Addr, req protocol.Message) (protocol.Message, error) type WriterConfig + BatchBytes int + IdleConnTimeout time.Duration + func (config *WriterConfig) Validate() error type WriterStats + BatchBytes SummaryStats v0.2.2 Nov 27, 2018 Changes in this version + const CompressionNoneCode + const DefaultCompressionLevel + const FirstOffset + const LastOffset + const SeekAbsolute + const SeekCurrent + const SeekEnd + const SeekStart + var DefaultClientID string + var DefaultDialer = &Dialer + func RegisterCompressionCodec(codec func() CompressionCodec) + type Balancer interface + Balance func(msg Message, partitions ...int) (partition int) + type BalancerFunc func(Message, ...int) int + func (f BalancerFunc) Balance(msg Message, partitions ...int) int + type Batch struct + func (batch *Batch) Close() error + func (batch *Batch) HighWaterMark() int64 + func (batch *Batch) Offset() int64 + func (batch *Batch) Read(b []byte) (int, error) + func (batch *Batch) ReadMessage() (Message, error) + func (batch *Batch) Throttle() time.Duration + type Broker struct + Host string + ID int + Port int + Rack string + type CompressionCodec interface + Code func() int8 + Decode func(src []byte) ([]byte, error) + Encode func(src []byte) ([]byte, error) + type ConfigEntry struct + ConfigName string + ConfigValue string + type Conn struct + func Dial(network string, address string) (*Conn, error) + func DialContext(ctx context.Context, network string, address string) (*Conn, error) + func DialLeader(ctx context.Context, network string, address string, topic string, ...) (*Conn, error) + func DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error) + func NewConn(conn net.Conn, topic string, partition int) *Conn + func NewConnWith(conn net.Conn, config ConnConfig) *Conn + func (c *Conn) Close() error + func (c *Conn) CreateTopics(topics ...TopicConfig) error + func (c *Conn) DeleteTopics(topics ...string) error + func (c *Conn) LocalAddr() net.Addr + func (c *Conn) Offset() (offset int64, whence int) + func (c *Conn) Read(b []byte) (int, error) + func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch + func (c *Conn) ReadFirstOffset() (int64, error) + func (c *Conn) ReadLastOffset() (int64, error) + func (c *Conn) ReadMessage(maxBytes int) (Message, error) + func (c *Conn) ReadOffset(t time.Time) (int64, error) + func (c *Conn) ReadOffsets() (first, last int64, err error) + func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error) + func (c *Conn) RemoteAddr() net.Addr + func (c *Conn) Seek(offset int64, whence int) (int64, error) + func (c *Conn) SetDeadline(t time.Time) error + func (c *Conn) SetReadDeadline(t time.Time) error + func (c *Conn) SetRequiredAcks(n int) error + func (c *Conn) SetWriteDeadline(t time.Time) error + func (c *Conn) Write(b []byte) (int, error) + func (c *Conn) WriteCompressedMessages(codec CompressionCodec, msgs ...Message) (int, error) + func (c *Conn) WriteMessages(msgs ...Message) (int, error) + type ConnConfig struct + ClientID string + Partition int + Topic string + type Dialer struct + ClientID string + Deadline time.Time + DualStack bool + FallbackDelay time.Duration + KeepAlive time.Duration + LocalAddr net.Addr + Resolver Resolver + TLS *tls.Config + Timeout time.Duration + func (d *Dialer) Dial(network string, address string) (*Conn, error) + func (d *Dialer) DialContext(ctx context.Context, network string, address string) (*Conn, error) + func (d *Dialer) DialLeader(ctx context.Context, network string, address string, topic string, ...) (*Conn, error) + func (d *Dialer) DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error) + func (d *Dialer) LookupLeader(ctx context.Context, network string, address string, topic string, ...) (Broker, error) + func (d *Dialer) LookupPartition(ctx context.Context, network string, address string, topic string, ...) (Partition, error) + func (d *Dialer) LookupPartitions(ctx context.Context, network string, address string, topic string) ([]Partition, error) + type DurationStats struct + Avg time.Duration + Max time.Duration + Min time.Duration + type Error int + const BrokerAuthorizationFailed + const BrokerNotAvailable + const ClusterAuthorizationFailed + const ConcurrentTransactions + const DuplicateSequenceNumber + const GroupAuthorizationFailed + const GroupCoordinatorNotAvailable + const GroupLoadInProgress + const IllegalGeneration + const IllegalSASLState + const InconsistentGroupProtocol + const InvalidCommitOffsetSize + const InvalidConfiguration + const InvalidGroupId + const InvalidMessage + const InvalidMessageSize + const InvalidPartitionNumber + const InvalidProducerEpoch + const InvalidProducerIDMapping + const InvalidReplicaAssignment + const InvalidReplicationFactor + const InvalidRequest + const InvalidRequiredAcks + const InvalidSessionTimeout + const InvalidTimestamp + const InvalidTopic + const InvalidTransactionState + const InvalidTransactionTimeout + const LeaderNotAvailable + const MessageSizeTooLarge + const NotController + const NotCoordinatorForGroup + const NotEnoughReplicas + const NotEnoughReplicasAfterAppend + const NotLeaderForPartition + const OffsetMetadataTooLarge + const OffsetOutOfRange + const OutOfOrderSequenceNumber + const PolicyViolation + const RebalanceInProgress + const RecordListTooLarge + const ReplicaNotAvailable + const RequestTimedOut + const SecurityDisabled + const StaleControllerEpoch + const TopicAlreadyExists + const TopicAuthorizationFailed + const TransactionCoordinatorFenced + const TransactionalIDAuthorizationFailed + const Unknown + const UnknownMemberId + const UnknownTopicOrPartition + const UnsupportedForMessageFormat + const UnsupportedSASLMechanism + const UnsupportedVersion + func (e Error) Description() string + func (e Error) Error() string + func (e Error) Temporary() bool + func (e Error) Timeout() bool + func (e Error) Title() string + type GroupBalancer interface + AssignGroups func(members []GroupMember, partitions []Partition) GroupMemberAssignments + ProtocolName func() string + UserData func() ([]byte, error) + type GroupMember struct + ID string + Topics []string + UserData []byte + type GroupMemberAssignments map[string]map[string][]int + type Hash struct + Hasher hash.Hash32 + func (h *Hash) Balance(msg Message, partitions ...int) (partition int) + type LeastBytes struct + func (lb *LeastBytes) Balance(msg Message, partitions ...int) int + type ListGroupsResponseGroupV1 struct + GroupID string + ProtocolType string + type Message struct + Key []byte + Offset int64 + Partition int + Time time.Time + Topic string + Value []byte + type Partition struct + ID int + Isr []Broker + Leader Broker + Replicas []Broker + Topic string + func LookupPartition(ctx context.Context, network string, address string, topic string, ...) (Partition, error) + func LookupPartitions(ctx context.Context, network string, address string, topic string) ([]Partition, error) + type RangeGroupBalancer struct + func (r RangeGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments + func (r RangeGroupBalancer) ProtocolName() string + func (r RangeGroupBalancer) UserData() ([]byte, error) + type Reader struct + func NewReader(config ReaderConfig) *Reader + func (r *Reader) Close() error + func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error + func (r *Reader) Config() ReaderConfig + func (r *Reader) FetchMessage(ctx context.Context) (Message, error) + func (r *Reader) Lag() int64 + func (r *Reader) Offset() int64 + func (r *Reader) ReadLag(ctx context.Context) (lag int64, err error) + func (r *Reader) ReadMessage(ctx context.Context) (Message, error) + func (r *Reader) SetOffset(offset int64) error + func (r *Reader) Stats() ReaderStats + type ReaderConfig struct + Brokers []string + CommitInterval time.Duration + Dialer *Dialer + ErrorLogger *log.Logger + GroupBalancers []GroupBalancer + GroupID string + HeartbeatInterval time.Duration + Logger *log.Logger + MaxBytes int + MaxWait time.Duration + MinBytes int + Partition int + QueueCapacity int + ReadLagInterval time.Duration + RebalanceTimeout time.Duration + RetentionTime time.Duration + SessionTimeout time.Duration + Topic string + type ReaderStats struct + Bytes int64 + ClientID string + DeprecatedFetchesWithTypo int64 + DialTime DurationStats + Dials int64 + Errors int64 + FetchBytes SummaryStats + FetchSize SummaryStats + Fetches int64 + Lag int64 + MaxBytes int64 + MaxWait time.Duration + Messages int64 + MinBytes int64 + Offset int64 + Partition string + QueueCapacity int64 + QueueLength int64 + ReadTime DurationStats + Rebalances int64 + Timeouts int64 + Topic string + WaitTime DurationStats + type ReplicaAssignment struct + Partition int + Replicas int + type Resolver interface + LookupHost func(ctx context.Context, host string) (addrs []string, err error) + type RoundRobin struct + func (rr *RoundRobin) Balance(msg Message, partitions ...int) int + type RoundRobinGroupBalancer struct + func (r RoundRobinGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments + func (r RoundRobinGroupBalancer) ProtocolName() string + func (r RoundRobinGroupBalancer) UserData() ([]byte, error) + type SummaryStats struct + Avg int64 + Max int64 + Min int64 + type TopicConfig struct + ConfigEntries []ConfigEntry + NumPartitions int + ReplicaAssignments []ReplicaAssignment + ReplicationFactor int + Topic string + type Writer struct + func NewWriter(config WriterConfig) *Writer + func (w *Writer) Close() (err error) + func (w *Writer) Stats() WriterStats + func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error + type WriterConfig struct + Async bool + Balancer Balancer + BatchSize int + BatchTimeout time.Duration + Brokers []string + Dialer *Dialer + ErrorLogger *log.Logger + Logger *log.Logger + MaxAttempts int + QueueCapacity int + ReadTimeout time.Duration + RebalanceInterval time.Duration + RequiredAcks int + Topic string + WriteTimeout time.Duration + type WriterStats struct + Async bool + BatchSize SummaryStats + BatchTimeout time.Duration + Bytes int64 + ClientID string + DialTime DurationStats + Dials int64 + Errors int64 + MaxAttempts int64 + MaxBatchSize int64 + Messages int64 + QueueCapacity int64 + QueueLength int64 + ReadTimeout time.Duration + RebalanceInterval time.Duration + Rebalances int64 + RequiredAcks int64 + Retries SummaryStats + Topic string + WaitTime DurationStats + WriteTime DurationStats + WriteTimeout time.Duration + Writes int64