Versions in this module Expand all Collapse all v0 v0.1.0 Jul 16, 2018 Changes in this version + const CompressionNoneCode + const DefaultCompressionLevel + const SeekAbsolute + const SeekCurrent + const SeekEnd + const SeekStart + var DefaultClientID string + var DefaultDialer = &Dialer + func RegisterCompressionCodec(codec func() CompressionCodec) + type Balancer interface + Balance func(msg Message, partitions ...int) (partition int) + type BalancerFunc func(Message, ...int) int + func (f BalancerFunc) Balance(msg Message, partitions ...int) int + type Batch struct + func (batch *Batch) Close() error + func (batch *Batch) HighWaterMark() int64 + func (batch *Batch) Offset() int64 + func (batch *Batch) Read(b []byte) (int, error) + func (batch *Batch) ReadMessage() (Message, error) + func (batch *Batch) Throttle() time.Duration + type Broker struct + Host string + ID int + Port int + type CompressionCodec interface + Code func() int8 + Decode func(dst, src []byte) (int, error) + Encode func(dst, src []byte) (int, error) + type ConfigEntry struct + ConfigName string + ConfigValue string + type Conn struct + func Dial(network string, address string) (*Conn, error) + func DialContext(ctx context.Context, network string, address string) (*Conn, error) + func DialLeader(ctx context.Context, network string, address string, topic string, ...) (*Conn, error) + func NewConn(conn net.Conn, topic string, partition int) *Conn + func NewConnWith(conn net.Conn, config ConnConfig) *Conn + func (c *Conn) Close() error + func (c *Conn) CreateTopics(topics ...TopicConfig) error + func (c *Conn) DeleteTopics(topics ...string) error + func (c *Conn) LocalAddr() net.Addr + func (c *Conn) Offset() (offset int64, whence int) + func (c *Conn) Read(b []byte) (int, error) + func (c *Conn) ReadBatch(minBytes int, maxBytes int) *Batch + func (c *Conn) ReadFirstOffset() (int64, error) + func (c *Conn) ReadLastOffset() (int64, error) + func (c *Conn) ReadMessage(maxBytes int) (Message, error) + func (c *Conn) ReadOffset(t time.Time) (int64, error) + func (c *Conn) ReadOffsets() (first int64, last int64, err error) + func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error) + func (c *Conn) RemoteAddr() net.Addr + func (c *Conn) Seek(offset int64, whence int) (int64, error) + func (c *Conn) SetDeadline(t time.Time) error + func (c *Conn) SetReadDeadline(t time.Time) error + func (c *Conn) SetRequiredAcks(n int) error + func (c *Conn) SetWriteDeadline(t time.Time) error + func (c *Conn) Write(b []byte) (int, error) + func (c *Conn) WriteMessages(msgs ...Message) (int, error) + type ConnConfig struct + ClientID string + Partition int + Topic string + type Dialer struct + ClientID string + Deadline time.Time + DualStack bool + FallbackDelay time.Duration + KeepAlive time.Duration + LocalAddr net.Addr + Resolver Resolver + TLS *tls.Config + Timeout time.Duration + func (d *Dialer) Dial(network string, address string) (*Conn, error) + func (d *Dialer) DialContext(ctx context.Context, network string, address string) (*Conn, error) + func (d *Dialer) DialLeader(ctx context.Context, network string, address string, topic string, ...) (*Conn, error) + func (d *Dialer) LookupLeader(ctx context.Context, network string, address string, topic string, ...) (Broker, error) + func (d *Dialer) LookupPartitions(ctx context.Context, network string, address string, topic string) ([]Partition, error) + type DurationStats struct + Avg time.Duration + Max time.Duration + Min time.Duration + type Error int + const BrokerAuthorizationFailed + const BrokerNotAvailable + const ClusterAuthorizationFailed + const ConcurrentTransactions + const DuplicateSequenceNumber + const GroupAuthorizationFailed + const GroupCoordinatorNotAvailable + const GroupLoadInProgress + const IllegalGeneration + const IllegalSASLState + const InconsistentGroupProtocol + const InvalidCommitOffsetSize + const InvalidConfiguration + const InvalidGroupId + const InvalidMessage + const InvalidMessageSize + const InvalidPartitionNumber + const InvalidProducerEpoch + const InvalidProducerIDMapping + const InvalidReplicaAssignment + const InvalidReplicationFactor + const InvalidRequest + const InvalidRequiredAcks + const InvalidSessionTimeout + const InvalidTimestamp + const InvalidTopic + const InvalidTransactionState + const InvalidTransactionTimeout + const LeaderNotAvailable + const MessageSizeTooLarge + const NotController + const NotCoordinatorForGroup + const NotEnoughReplicas + const NotEnoughReplicasAfterAppend + const NotLeaderForPartition + const OffsetMetadataTooLarge + const OffsetOutOfRange + const OutOfOrderSequenceNumber + const PolicyViolation + const RebalanceInProgress + const RecordListTooLarge + const ReplicaNotAvailable + const RequestTimedOut + const SecurityDisabled + const StaleControllerEpoch + const TopicAlreadyExists + const TopicAuthorizationFailed + const TransactionCoordinatorFenced + const TransactionalIDAuthorizationFailed + const Unknown + const UnknownMemberId + const UnknownTopicOrPartition + const UnsupportedForMessageFormat + const UnsupportedSASLMechanism + const UnsupportedVersion + func (e Error) Description() string + func (e Error) Error() string + func (e Error) Temporary() bool + func (e Error) Timeout() bool + func (e Error) Title() string + type Hash struct + Hasher hash.Hash32 + func (h *Hash) Balance(msg Message, partitions ...int) (partition int) + type LeastBytes struct + func (lb *LeastBytes) Balance(msg Message, partitions ...int) int + type ListGroupsResponseGroupV1 struct + GroupID string + ProtocolType string + type Message struct + Key []byte + Offset int64 + Partition int + Time time.Time + Topic string + Value []byte + type Partition struct + ID int + Isr []Broker + Leader Broker + Replicas []Broker + Topic string + type Reader struct + func NewReader(config ReaderConfig) *Reader + func (r *Reader) Close() error + func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error + func (r *Reader) Config() ReaderConfig + func (r *Reader) FetchMessage(ctx context.Context) (Message, error) + func (r *Reader) Lag() int64 + func (r *Reader) Offset() int64 + func (r *Reader) ReadLag(ctx context.Context) (lag int64, err error) + func (r *Reader) ReadMessage(ctx context.Context) (Message, error) + func (r *Reader) SetOffset(offset int64) error + func (r *Reader) Stats() ReaderStats + type ReaderConfig struct + Brokers []string + CommitInterval time.Duration + Dialer *Dialer + ErrorLogger *log.Logger + GroupID string + HeartbeatInterval time.Duration + Logger *log.Logger + MaxBytes int + MaxWait time.Duration + MinBytes int + Partition int + QueueCapacity int + ReadLagInterval time.Duration + RebalanceTimeout time.Duration + RetentionTime time.Duration + SessionTimeout time.Duration + Topic string + type ReaderStats struct + Bytes int64 + ClientID string + DialTime DurationStats + Dials int64 + Errors int64 + FetchBytes SummaryStats + FetchSize SummaryStats + Fetches int64 + Lag int64 + MaxBytes int64 + MaxWait time.Duration + Messages int64 + MinBytes int64 + Offset int64 + Partition string + QueueCapacity int64 + QueueLength int64 + ReadTime DurationStats + Rebalances int64 + Timeouts int64 + Topic string + WaitTime DurationStats + type ReplicaAssignment struct + Partition int + Replicas int + type Resolver interface + LookupHost func(ctx context.Context, host string) (addrs []string, err error) + type RoundRobin struct + func (rr *RoundRobin) Balance(msg Message, partitions ...int) int + type SummaryStats struct + Avg int64 + Max int64 + Min int64 + type TopicConfig struct + ConfigEntries []ConfigEntry + NumPartitions int + ReplicaAssignments []ReplicaAssignment + ReplicationFactor int + Topic string + type Writer struct + func NewWriter(config WriterConfig) *Writer + func (w *Writer) Close() (err error) + func (w *Writer) Stats() WriterStats + func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error + type WriterConfig struct + Async bool + Balancer Balancer + BatchSize int + BatchTimeout time.Duration + Brokers []string + Dialer *Dialer + MaxAttempts int + QueueCapacity int + ReadTimeout time.Duration + RebalanceInterval time.Duration + RequiredAcks int + Topic string + WriteTimeout time.Duration + type WriterStats struct + Async bool + BatchSize SummaryStats + BatchTimeout time.Duration + Bytes int64 + ClientID string + DialTime DurationStats + Dials int64 + Errors int64 + MaxAttempts int64 + MaxBatchSize int64 + Messages int64 + QueueCapacity int64 + QueueLength int64 + ReadTimeout time.Duration + RebalanceInterval time.Duration + Rebalances int64 + RequiredAcks int64 + Retries SummaryStats + Topic string + WaitTime DurationStats + WriteTime DurationStats + WriteTimeout time.Duration + Writes int64