Versions in this module Expand all Collapse all v0 v0.2.3 May 3, 2019 Changes in this version + const CompressionNoneCode + const FirstOffset + const LastOffset + const SeekAbsolute + const SeekCurrent + const SeekEnd + const SeekStart + var DefaultClientID string + var DefaultDialer = &Dialer + func RegisterCompressionCodec(codec func() CompressionCodec) + type ApiVersion struct + ApiKey int16 + MaxVersion int16 + MinVersion int16 + type Balancer interface + Balance func(msg Message, partitions ...int) (partition int) + type BalancerFunc func(Message, ...int) int + func (f BalancerFunc) Balance(msg Message, partitions ...int) int + type Batch struct + func (batch *Batch) Close() error + func (batch *Batch) HighWaterMark() int64 + func (batch *Batch) Offset() int64 + func (batch *Batch) Read(b []byte) (int, error) + func (batch *Batch) ReadMessage() (Message, error) + func (batch *Batch) Throttle() time.Duration + type Broker struct + Host string + ID int + Port int + Rack string + type CompressionCodec interface + Code func() int8 + Decode func(src []byte) ([]byte, error) + Encode func(src []byte) ([]byte, error) + type ConfigEntry struct + ConfigName string + ConfigValue string + type Conn struct + func Dial(network string, address string) (*Conn, error) + func DialContext(ctx context.Context, network string, address string) (*Conn, error) + func DialLeader(ctx context.Context, network string, address string, topic string, ...) (*Conn, error) + func DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error) + func NewConn(conn net.Conn, topic string, partition int) *Conn + func NewConnWith(conn net.Conn, config ConnConfig) *Conn + func (c *Conn) ApiVersions() ([]ApiVersion, error) + func (c *Conn) Brokers() ([]Broker, error) + func (c *Conn) Close() error + func (c *Conn) Controller() (broker Broker, err error) + func (c *Conn) CreateTopics(topics ...TopicConfig) error + func (c *Conn) DeleteTopics(topics ...string) error + func (c *Conn) LocalAddr() net.Addr + func (c *Conn) Offset() (offset int64, whence int) + func (c *Conn) Read(b []byte) (int, error) + func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch + func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch + func (c *Conn) ReadFirstOffset() (int64, error) + func (c *Conn) ReadLastOffset() (int64, error) + func (c *Conn) ReadMessage(maxBytes int) (Message, error) + func (c *Conn) ReadOffset(t time.Time) (int64, error) + func (c *Conn) ReadOffsets() (first, last int64, err error) + func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error) + func (c *Conn) RemoteAddr() net.Addr + func (c *Conn) Seek(offset int64, whence int) (int64, error) + func (c *Conn) SetDeadline(t time.Time) error + func (c *Conn) SetReadDeadline(t time.Time) error + func (c *Conn) SetRequiredAcks(n int) error + func (c *Conn) SetWriteDeadline(t time.Time) error + func (c *Conn) Write(b []byte) (int, error) + func (c *Conn) WriteCompressedMessages(codec CompressionCodec, msgs ...Message) (nbytes int, err error) + func (c *Conn) WriteCompressedMessagesAt(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error) + func (c *Conn) WriteMessages(msgs ...Message) (int, error) + type ConnConfig struct + ClientID string + Partition int + Topic string + TransactionalID string + type Dialer struct + ClientID string + Deadline time.Time + DualStack bool + FallbackDelay time.Duration + KeepAlive time.Duration + LocalAddr net.Addr + Resolver Resolver + SASLMechanism sasl.Mechanism + TLS *tls.Config + Timeout time.Duration + TransactionalID string + func (d *Dialer) Dial(network string, address string) (*Conn, error) + func (d *Dialer) DialContext(ctx context.Context, network string, address string) (*Conn, error) + func (d *Dialer) DialLeader(ctx context.Context, network string, address string, topic string, ...) (*Conn, error) + func (d *Dialer) DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error) + func (d *Dialer) LookupLeader(ctx context.Context, network string, address string, topic string, ...) (Broker, error) + func (d *Dialer) LookupPartition(ctx context.Context, network string, address string, topic string, ...) (Partition, error) + func (d *Dialer) LookupPartitions(ctx context.Context, network string, address string, topic string) ([]Partition, error) + type DurationStats struct + Avg time.Duration + Max time.Duration + Min time.Duration + type Error int + const BrokerAuthorizationFailed + const BrokerNotAvailable + const ClusterAuthorizationFailed + const ConcurrentTransactions + const DelegationTokenAuthDisabled + const DelegationTokenAuthorizationFailed + const DelegationTokenExpired + const DelegationTokenNotFound + const DelegationTokenOwnerMismatch + const DelegationTokenRequestNotAllowed + const DuplicateSequenceNumber + const FencedLeaderEpoch + const FetchSessionIDNotFound + const GroupAuthorizationFailed + const GroupCoordinatorNotAvailable + const GroupIdNotFound + const GroupLoadInProgress + const IllegalGeneration + const IllegalSASLState + const InconsistentGroupProtocol + const InvalidCommitOffsetSize + const InvalidConfiguration + const InvalidFetchSessionEpoch + const InvalidGroupId + const InvalidMessage + const InvalidMessageSize + const InvalidPartitionNumber + const InvalidPrincipalType + const InvalidProducerEpoch + const InvalidProducerIDMapping + const InvalidReplicaAssignment + const InvalidReplicationFactor + const InvalidRequest + const InvalidRequiredAcks + const InvalidSessionTimeout + const InvalidTimestamp + const InvalidTopic + const InvalidTransactionState + const InvalidTransactionTimeout + const KafkaStorageError + const LeaderNotAvailable + const ListenerNotFound + const LogDirNotFound + const MessageSizeTooLarge + const NonEmptyGroup + const NotController + const NotCoordinatorForGroup + const NotEnoughReplicas + const NotEnoughReplicasAfterAppend + const NotLeaderForPartition + const OffsetMetadataTooLarge + const OffsetOutOfRange + const OutOfOrderSequenceNumber + const PolicyViolation + const ReassignmentInProgress + const RebalanceInProgress + const RecordListTooLarge + const ReplicaNotAvailable + const RequestTimedOut + const SASLAuthenticationFailed + const SecurityDisabled + const StaleControllerEpoch + const TopicAlreadyExists + const TopicAuthorizationFailed + const TopicDeletionDisabled + const TransactionCoordinatorFenced + const TransactionalIDAuthorizationFailed + const Unknown + const UnknownLeaderEpoch + const UnknownMemberId + const UnknownProducerId + const UnknownTopicOrPartition + const UnsupportedCompressionType + const UnsupportedForMessageFormat + const UnsupportedSASLMechanism + const UnsupportedVersion + func (e Error) Description() string + func (e Error) Error() string + func (e Error) Temporary() bool + func (e Error) Timeout() bool + func (e Error) Title() string + type GroupBalancer interface + AssignGroups func(members []GroupMember, partitions []Partition) GroupMemberAssignments + ProtocolName func() string + UserData func() ([]byte, error) + type GroupMember struct + ID string + Topics []string + UserData []byte + type GroupMemberAssignments map[string]map[string][]int + type Hash struct + Hasher hash.Hash32 + func (h *Hash) Balance(msg Message, partitions ...int) (partition int) + type Header struct + Key string + Value []byte + type IsolationLevel int8 + const ReadCommitted + const ReadUncommitted + type LeastBytes struct + func (lb *LeastBytes) Balance(msg Message, partitions ...int) int + type ListGroupsResponseGroupV1 struct + GroupID string + ProtocolType string + type Message struct + Headers []Header + Key []byte + Offset int64 + Partition int + Time time.Time + Topic string + Value []byte + type Partition struct + ID int + Isr []Broker + Leader Broker + Replicas []Broker + Topic string + func LookupPartition(ctx context.Context, network string, address string, topic string, ...) (Partition, error) + func LookupPartitions(ctx context.Context, network string, address string, topic string) ([]Partition, error) + type RangeGroupBalancer struct + func (r RangeGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments + func (r RangeGroupBalancer) ProtocolName() string + func (r RangeGroupBalancer) UserData() ([]byte, error) + type ReadBatchConfig struct + IsolationLevel IsolationLevel + MaxBytes int + MinBytes int + type Reader struct + func NewReader(config ReaderConfig) *Reader + func (r *Reader) Close() error + func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error + func (r *Reader) Config() ReaderConfig + func (r *Reader) FetchMessage(ctx context.Context) (Message, error) + func (r *Reader) Lag() int64 + func (r *Reader) Offset() int64 + func (r *Reader) ReadLag(ctx context.Context) (lag int64, err error) + func (r *Reader) ReadMessage(ctx context.Context) (Message, error) + func (r *Reader) SetOffset(offset int64) error + func (r *Reader) SetOffsetAt(ctx context.Context, t time.Time) error + func (r *Reader) Stats() ReaderStats + type ReaderConfig struct + Brokers []string + CommitInterval time.Duration + Dialer *Dialer + ErrorLogger *log.Logger + GroupBalancers []GroupBalancer + GroupID string + HeartbeatInterval time.Duration + IsolationLevel IsolationLevel + Logger *log.Logger + MaxAttempts int + MaxBytes int + MaxWait time.Duration + MinBytes int + Partition int + PartitionWatchInterval time.Duration + QueueCapacity int + ReadLagInterval time.Duration + RebalanceTimeout time.Duration + RetentionTime time.Duration + SessionTimeout time.Duration + Topic string + WatchPartitionChanges bool + func (config *ReaderConfig) Validate() error + type ReaderStats struct + Bytes int64 + ClientID string + DeprecatedFetchesWithTypo int64 + DialTime DurationStats + Dials int64 + Errors int64 + FetchBytes SummaryStats + FetchSize SummaryStats + Fetches int64 + Lag int64 + MaxBytes int64 + MaxWait time.Duration + Messages int64 + MinBytes int64 + Offset int64 + Partition string + QueueCapacity int64 + QueueLength int64 + ReadTime DurationStats + Rebalances int64 + Timeouts int64 + Topic string + WaitTime DurationStats + type ReplicaAssignment struct + Partition int + Replicas int + type Resolver interface + LookupHost func(ctx context.Context, host string) (addrs []string, err error) + type RoundRobin struct + func (rr *RoundRobin) Balance(msg Message, partitions ...int) int + type RoundRobinGroupBalancer struct + func (r RoundRobinGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments + func (r RoundRobinGroupBalancer) ProtocolName() string + func (r RoundRobinGroupBalancer) UserData() ([]byte, error) + type SummaryStats struct + Avg int64 + Max int64 + Min int64 + type TopicConfig struct + ConfigEntries []ConfigEntry + NumPartitions int + ReplicaAssignments []ReplicaAssignment + ReplicationFactor int + Topic string + type Writer struct + func NewWriter(config WriterConfig) *Writer + func (w *Writer) Close() (err error) + func (w *Writer) Stats() WriterStats + func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error + type WriterConfig struct + Async bool + Balancer Balancer + BatchSize int + BatchTimeout time.Duration + Brokers []string + Dialer *Dialer + ErrorLogger *log.Logger + Logger *log.Logger + MaxAttempts int + QueueCapacity int + ReadTimeout time.Duration + RebalanceInterval time.Duration + RequiredAcks int + Topic string + WriteTimeout time.Duration + func (config *WriterConfig) Validate() error + type WriterStats struct + Async bool + BatchSize SummaryStats + BatchTimeout time.Duration + Bytes int64 + ClientID string + DialTime DurationStats + Dials int64 + Errors int64 + MaxAttempts int64 + MaxBatchSize int64 + Messages int64 + QueueCapacity int64 + QueueLength int64 + ReadTimeout time.Duration + RebalanceInterval time.Duration + Rebalances int64 + RequiredAcks int64 + Retries SummaryStats + Topic string + WaitTime DurationStats + WriteTime DurationStats + WriteTimeout time.Duration + Writes int64