Versions in this module Expand all Collapse all v0 v0.4.48 Mar 13, 2024 v0.4.47 Mar 13, 2024 Changes in this version + const FirstOffset + const LastOffset + const SeekAbsolute + const SeekCurrent + const SeekDontCheck + const SeekEnd + const SeekStart + var DefaultClientID string + var DefaultDialer = &Dialer + var ErrGenerationEnded = errors.New("consumer group generation has ended") + var ErrGroupClosed = errors.New("consumer group is closed") + func Marshal(v interface{}) ([]byte, error) + func ReadAll(b Bytes) ([]byte, error) + func TCP(address ...string) net.Addr + func Unmarshal(b []byte, v interface{}) error + type ACLDescription struct + Host string + Operation ACLOperationType + PermissionType ACLPermissionType + Principal string + type ACLEntry struct + Host string + Operation ACLOperationType + PermissionType ACLPermissionType + Principal string + ResourceName string + ResourcePatternType PatternType + ResourceType ResourceType + type ACLFilter struct + HostFilter string + Operation ACLOperationType + PermissionType ACLPermissionType + PrincipalFilter string + ResourceNameFilter string + ResourcePatternTypeFilter PatternType + ResourceTypeFilter ResourceType + type ACLOperationType int8 + const ACLOperationTypeAll + const ACLOperationTypeAlter + const ACLOperationTypeAlterConfigs + const ACLOperationTypeAny + const ACLOperationTypeClusterAction + const ACLOperationTypeCreate + const ACLOperationTypeDelete + const ACLOperationTypeDescribe + const ACLOperationTypeDescribeConfigs + const ACLOperationTypeIdempotentWrite + const ACLOperationTypeRead + const ACLOperationTypeUnknown + const ACLOperationTypeWrite + func (aot *ACLOperationType) UnmarshalText(text []byte) error + func (aot ACLOperationType) MarshalText() ([]byte, error) + func (aot ACLOperationType) String() string + type ACLPermissionType int8 + const ACLPermissionTypeAllow + const ACLPermissionTypeAny + const ACLPermissionTypeDeny + const ACLPermissionTypeUnknown + func (apt *ACLPermissionType) UnmarshalText(text []byte) error + func (apt ACLPermissionType) MarshalText() ([]byte, error) + func (apt ACLPermissionType) String() string + type ACLResource struct + ACLs []ACLDescription + PatternType PatternType + ResourceName string + ResourceType ResourceType + type AddOffsetsToTxnRequest struct + Addr net.Addr + GroupID string + ProducerEpoch int + ProducerID int + TransactionalID string + type AddOffsetsToTxnResponse struct + Error error + Throttle time.Duration + type AddPartitionToTxn struct + Partition int + type AddPartitionToTxnPartition struct + Error error + Partition int + type AddPartitionsToTxnRequest struct + Addr net.Addr + ProducerEpoch int + ProducerID int + Topics map[string][]AddPartitionToTxn + TransactionalID string + type AddPartitionsToTxnResponse struct + Throttle time.Duration + Topics map[string][]AddPartitionToTxnPartition + type AlterClientQuotaEntity struct + EntityName string + EntityType string + type AlterClientQuotaEntry struct + Entities []AlterClientQuotaEntity + Ops []AlterClientQuotaOps + type AlterClientQuotaOps struct + Key string + Remove bool + Value float64 + type AlterClientQuotaResponseQuotas struct + Entities []AlterClientQuotaEntity + Error error + type AlterClientQuotasRequest struct + Addr net.Addr + Entries []AlterClientQuotaEntry + ValidateOnly bool + type AlterClientQuotasResponse struct + Entries []AlterClientQuotaResponseQuotas + Throttle time.Duration + type AlterConfigRequestConfig struct + Name string + Value string + type AlterConfigRequestResource struct + Configs []AlterConfigRequestConfig + ResourceName string + ResourceType ResourceType + type AlterConfigsRequest struct + Addr net.Addr + Resources []AlterConfigRequestResource + ValidateOnly bool + type AlterConfigsResponse struct + Errors map[AlterConfigsResponseResource]error + Throttle time.Duration + type AlterConfigsResponseResource struct + Name string + Type int8 + type AlterPartitionReassignmentsRequest struct + Addr net.Addr + Assignments []AlterPartitionReassignmentsRequestAssignment + Timeout time.Duration + Topic string + type AlterPartitionReassignmentsRequestAssignment struct + BrokerIDs []int + PartitionID int + Topic string + type AlterPartitionReassignmentsResponse struct + Error error + PartitionResults []AlterPartitionReassignmentsResponsePartitionResult + type AlterPartitionReassignmentsResponsePartitionResult struct + Error error + PartitionID int + Topic string + type AlterUserScramCredentialsRequest struct + Addr net.Addr + Deletions []UserScramCredentialsDeletion + Upsertions []UserScramCredentialsUpsertion + type AlterUserScramCredentialsResponse struct + Results []AlterUserScramCredentialsResponseUser + Throttle time.Duration + type AlterUserScramCredentialsResponseUser struct + Error error + User string + type ApiVersion struct + ApiKey int16 + MaxVersion int16 + MinVersion int16 + func (v ApiVersion) Format(w fmt.State, r rune) + type ApiVersionsRequest struct + Addr net.Addr + type ApiVersionsResponse struct + ApiKeys []ApiVersionsResponseApiKey + Error error + type ApiVersionsResponseApiKey struct + ApiKey int + ApiName string + MaxVersion int + MinVersion int + type Balancer interface + Balance func(msg Message, partitions ...int) (partition int) + type BalancerFunc func(Message, ...int) int + func (f BalancerFunc) Balance(msg Message, partitions ...int) int + type Batch struct + func (batch *Batch) Close() error + func (batch *Batch) Err() error + func (batch *Batch) HighWaterMark() int64 + func (batch *Batch) Offset() int64 + func (batch *Batch) Partition() int + func (batch *Batch) Read(b []byte) (int, error) + func (batch *Batch) ReadMessage() (Message, error) + func (batch *Batch) Throttle() time.Duration + type Broker struct + Host string + ID int + Port int + Rack string + type BrokerResolver interface + LookupBrokerIPAddr func(ctx context.Context, broker Broker) ([]net.IPAddr, error) + func NewBrokerResolver(r *net.Resolver) BrokerResolver + type Bytes = protocol.Bytes + func NewBytes(b []byte) Bytes + type CRC32Balancer struct + Consistent bool + func (b CRC32Balancer) Balance(msg Message, partitions ...int) (partition int) + type Client struct + Addr net.Addr + Timeout time.Duration + Transport RoundTripper + func (c *Client) AddOffsetsToTxn(ctx context.Context, req *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) + func (c *Client) AddPartitionsToTxn(ctx context.Context, req *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) + func (c *Client) AlterClientQuotas(ctx context.Context, req *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error) + func (c *Client) AlterConfigs(ctx context.Context, req *AlterConfigsRequest) (*AlterConfigsResponse, error) + func (c *Client) AlterPartitionReassignments(ctx context.Context, req *AlterPartitionReassignmentsRequest) (*AlterPartitionReassignmentsResponse, error) + func (c *Client) AlterUserScramCredentials(ctx context.Context, req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error) + func (c *Client) ApiVersions(ctx context.Context, req *ApiVersionsRequest) (*ApiVersionsResponse, error) + func (c *Client) ConsumerOffsets(ctx context.Context, tg TopicAndGroup) (map[int]int64, error) + func (c *Client) CreateACLs(ctx context.Context, req *CreateACLsRequest) (*CreateACLsResponse, error) + func (c *Client) CreatePartitions(ctx context.Context, req *CreatePartitionsRequest) (*CreatePartitionsResponse, error) + func (c *Client) CreateTopics(ctx context.Context, req *CreateTopicsRequest) (*CreateTopicsResponse, error) + func (c *Client) DeleteACLs(ctx context.Context, req *DeleteACLsRequest) (*DeleteACLsResponse, error) + func (c *Client) DeleteGroups(ctx context.Context, req *DeleteGroupsRequest) (*DeleteGroupsResponse, error) + func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*DeleteTopicsResponse, error) + func (c *Client) DescribeACLs(ctx context.Context, req *DescribeACLsRequest) (*DescribeACLsResponse, error) + func (c *Client) DescribeClientQuotas(ctx context.Context, req *DescribeClientQuotasRequest) (*DescribeClientQuotasResponse, error) + func (c *Client) DescribeConfigs(ctx context.Context, req *DescribeConfigsRequest) (*DescribeConfigsResponse, error) + func (c *Client) DescribeGroups(ctx context.Context, req *DescribeGroupsRequest) (*DescribeGroupsResponse, error) + func (c *Client) DescribeUserScramCredentials(ctx context.Context, req *DescribeUserScramCredentialsRequest) (*DescribeUserScramCredentialsResponse, error) + func (c *Client) ElectLeaders(ctx context.Context, req *ElectLeadersRequest) (*ElectLeadersResponse, error) + func (c *Client) EndTxn(ctx context.Context, req *EndTxnRequest) (*EndTxnResponse, error) + func (c *Client) Fetch(ctx context.Context, req *FetchRequest) (*FetchResponse, error) + func (c *Client) FindCoordinator(ctx context.Context, req *FindCoordinatorRequest) (*FindCoordinatorResponse, error) + func (c *Client) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error) + func (c *Client) IncrementalAlterConfigs(ctx context.Context, req *IncrementalAlterConfigsRequest) (*IncrementalAlterConfigsResponse, error) + func (c *Client) InitProducerID(ctx context.Context, req *InitProducerIDRequest) (*InitProducerIDResponse, error) + func (c *Client) JoinGroup(ctx context.Context, req *JoinGroupRequest) (*JoinGroupResponse, error) + func (c *Client) LeaveGroup(ctx context.Context, req *LeaveGroupRequest) (*LeaveGroupResponse, error) + func (c *Client) ListGroups(ctx context.Context, req *ListGroupsRequest) (*ListGroupsResponse, error) + func (c *Client) ListOffsets(ctx context.Context, req *ListOffsetsRequest) (*ListOffsetsResponse, error) + func (c *Client) ListPartitionReassignments(ctx context.Context, req *ListPartitionReassignmentsRequest) (*ListPartitionReassignmentsResponse, error) + func (c *Client) Metadata(ctx context.Context, req *MetadataRequest) (*MetadataResponse, error) + func (c *Client) OffsetCommit(ctx context.Context, req *OffsetCommitRequest) (*OffsetCommitResponse, error) + func (c *Client) OffsetDelete(ctx context.Context, req *OffsetDeleteRequest) (*OffsetDeleteResponse, error) + func (c *Client) OffsetFetch(ctx context.Context, req *OffsetFetchRequest) (*OffsetFetchResponse, error) + func (c *Client) Produce(ctx context.Context, req *ProduceRequest) (*ProduceResponse, error) + func (c *Client) RawProduce(ctx context.Context, req *RawProduceRequest) (*ProduceResponse, error) + func (c *Client) SyncGroup(ctx context.Context, req *SyncGroupRequest) (*SyncGroupResponse, error) + func (c *Client) TxnOffsetCommit(ctx context.Context, req *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) + type Compression = compress.Compression + const Gzip + const Lz4 + const Snappy + const Zstd + type CompressionCodec = compress.Codec + type ConfigEntry struct + ConfigName string + ConfigValue string + type ConfigOperation int8 + const ConfigOperationAppend + const ConfigOperationDelete + const ConfigOperationSet + const ConfigOperationSubtract + type Conn struct + func Dial(network string, address string) (*Conn, error) + func DialContext(ctx context.Context, network string, address string) (*Conn, error) + func DialLeader(ctx context.Context, network string, address string, topic string, ...) (*Conn, error) + func DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error) + func NewConn(conn net.Conn, topic string, partition int) *Conn + func NewConnWith(conn net.Conn, config ConnConfig) *Conn + func (c *Conn) ApiVersions() ([]ApiVersion, error) + func (c *Conn) Broker() Broker + func (c *Conn) Brokers() ([]Broker, error) + func (c *Conn) Close() error + func (c *Conn) Controller() (broker Broker, err error) + func (c *Conn) CreateTopics(topics ...TopicConfig) error + func (c *Conn) DeleteTopics(topics ...string) error + func (c *Conn) LocalAddr() net.Addr + func (c *Conn) Offset() (offset int64, whence int) + func (c *Conn) Read(b []byte) (int, error) + func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch + func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch + func (c *Conn) ReadFirstOffset() (int64, error) + func (c *Conn) ReadLastOffset() (int64, error) + func (c *Conn) ReadMessage(maxBytes int) (Message, error) + func (c *Conn) ReadOffset(t time.Time) (int64, error) + func (c *Conn) ReadOffsets() (first, last int64, err error) + func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error) + func (c *Conn) RemoteAddr() net.Addr + func (c *Conn) Seek(offset int64, whence int) (int64, error) + func (c *Conn) SetDeadline(t time.Time) error + func (c *Conn) SetReadDeadline(t time.Time) error + func (c *Conn) SetRequiredAcks(n int) error + func (c *Conn) SetWriteDeadline(t time.Time) error + func (c *Conn) Write(b []byte) (int, error) + func (c *Conn) WriteCompressedMessages(codec CompressionCodec, msgs ...Message) (nbytes int, err error) + func (c *Conn) WriteCompressedMessagesAt(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error) + func (c *Conn) WriteMessages(msgs ...Message) (int, error) + type ConnConfig struct + Broker int + ClientID string + Partition int + Rack string + Topic string + TransactionalID string + type ConsumerGroup struct + func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error) + func (cg *ConsumerGroup) Close() error + func (cg *ConsumerGroup) Next(ctx context.Context) (*Generation, error) + type ConsumerGroupConfig struct + Brokers []string + Dialer *Dialer + ErrorLogger Logger + GroupBalancers []GroupBalancer + HeartbeatInterval time.Duration + ID string + JoinGroupBackoff time.Duration + Logger Logger + PartitionWatchInterval time.Duration + RebalanceTimeout time.Duration + RetentionTime time.Duration + SessionTimeout time.Duration + StartOffset int64 + Timeout time.Duration + Topics []string + WatchPartitionChanges bool + func (config *ConsumerGroupConfig) Validate() error + type CoordinatorKeyType int8 + const CoordinatorKeyTypeConsumer + const CoordinatorKeyTypeTransaction + type CreateACLsRequest struct + ACLs []ACLEntry + Addr net.Addr + type CreateACLsResponse struct + Errors []error + Throttle time.Duration + type CreatePartitionsRequest struct + Addr net.Addr + Topics []TopicPartitionsConfig + ValidateOnly bool + type CreatePartitionsResponse struct + Errors map[string]error + Throttle time.Duration + type CreateTopicsRequest struct + Addr net.Addr + Topics []TopicConfig + ValidateOnly bool + type CreateTopicsResponse struct + Errors map[string]error + Throttle time.Duration + type DeleteACLsFilter struct + HostFilter string + Operation ACLOperationType + PermissionType ACLPermissionType + PrincipalFilter string + ResourceNameFilter string + ResourcePatternTypeFilter PatternType + ResourceTypeFilter ResourceType + type DeleteACLsMatchingACLs struct + Error error + Host string + Operation ACLOperationType + PermissionType ACLPermissionType + Principal string + ResourceName string + ResourcePatternType PatternType + ResourceType ResourceType + type DeleteACLsRequest struct + Addr net.Addr + Filters []DeleteACLsFilter + type DeleteACLsResponse struct + Results []DeleteACLsResult + Throttle time.Duration + type DeleteACLsResult struct + Error error + MatchingACLs []DeleteACLsMatchingACLs + type DeleteGroupsRequest struct + Addr net.Addr + GroupIDs []string + type DeleteGroupsResponse struct + Errors map[string]error + Throttle time.Duration + type DeleteTopicsRequest struct + Addr net.Addr + Topics []string + type DeleteTopicsResponse struct + Errors map[string]error + Throttle time.Duration + type DescribeACLsRequest struct + Addr net.Addr + Filter ACLFilter + type DescribeACLsResponse struct + Error error + Resources []ACLResource + Throttle time.Duration + type DescribeClientQuotasEntity struct + EntityName string + EntityType string + type DescribeClientQuotasRequest struct + Addr net.Addr + Components []DescribeClientQuotasRequestComponent + Strict bool + type DescribeClientQuotasRequestComponent struct + EntityType string + Match string + MatchType int8 + type DescribeClientQuotasResponse struct + Entries []DescribeClientQuotasResponseQuotas + Error error + Throttle time.Duration + type DescribeClientQuotasResponseQuotas struct + Entities []DescribeClientQuotasEntity + Values []DescribeClientQuotasValue + type DescribeClientQuotasValue struct + Key string + Value float64 + type DescribeConfigRequestResource struct + ConfigNames []string + ResourceName string + ResourceType ResourceType + type DescribeConfigResponseConfigEntry struct + ConfigDocumentation string + ConfigName string + ConfigSource int8 + ConfigSynonyms []DescribeConfigResponseConfigSynonym + ConfigType int8 + ConfigValue string + IsDefault bool + IsSensitive bool + ReadOnly bool + type DescribeConfigResponseConfigSynonym struct + ConfigName string + ConfigSource int8 + ConfigValue string + type DescribeConfigResponseResource struct + ConfigEntries []DescribeConfigResponseConfigEntry + Error error + ResourceName string + ResourceType int8 + type DescribeConfigsRequest struct + Addr net.Addr + IncludeDocumentation bool + IncludeSynonyms bool + Resources []DescribeConfigRequestResource + type DescribeConfigsResponse struct + Resources []DescribeConfigResponseResource + Throttle time.Duration + type DescribeGroupsRequest struct + Addr net.Addr + GroupIDs []string + type DescribeGroupsResponse struct + Groups []DescribeGroupsResponseGroup + type DescribeGroupsResponseAssignments struct + Topics []GroupMemberTopic + UserData []byte + Version int + type DescribeGroupsResponseGroup struct + Error error + GroupID string + GroupState string + Members []DescribeGroupsResponseMember + type DescribeGroupsResponseMember struct + ClientHost string + ClientID string + MemberAssignments DescribeGroupsResponseAssignments + MemberID string + MemberMetadata DescribeGroupsResponseMemberMetadata + type DescribeGroupsResponseMemberMetadata struct + OwnedPartitions []DescribeGroupsResponseMemberMetadataOwnedPartition + Topics []string + UserData []byte + Version int + type DescribeGroupsResponseMemberMetadataOwnedPartition struct + Partitions []int + Topic string + type DescribeUserScramCredentialsCredentialInfo struct + Iterations int + Mechanism ScramMechanism + type DescribeUserScramCredentialsRequest struct + Addr net.Addr + Users []UserScramCredentialsUser + type DescribeUserScramCredentialsResponse struct + Error error + Results []DescribeUserScramCredentialsResponseResult + Throttle time.Duration + type DescribeUserScramCredentialsResponseResult struct + CredentialInfos []DescribeUserScramCredentialsCredentialInfo + Error error + User string + type Dialer struct + ClientID string + Deadline time.Time + DialFunc func(ctx context.Context, network string, address string) (net.Conn, error) + DualStack bool + FallbackDelay time.Duration + KeepAlive time.Duration + LocalAddr net.Addr + Resolver Resolver + SASLMechanism sasl.Mechanism + TLS *tls.Config + Timeout time.Duration + TransactionalID string + func (d *Dialer) Dial(network string, address string) (*Conn, error) + func (d *Dialer) DialContext(ctx context.Context, network string, address string) (*Conn, error) + func (d *Dialer) DialLeader(ctx context.Context, network string, address string, topic string, ...) (*Conn, error) + func (d *Dialer) DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error) + func (d *Dialer) LookupLeader(ctx context.Context, network string, address string, topic string, ...) (Broker, error) + func (d *Dialer) LookupPartition(ctx context.Context, network string, address string, topic string, ...) (Partition, error) + func (d *Dialer) LookupPartitions(ctx context.Context, network string, address string, topic string) ([]Partition, error) + type DurationStats struct + Avg time.Duration + Count int64 + Max time.Duration + Min time.Duration + Sum time.Duration + type ElectLeadersRequest struct + Addr net.Addr + Partitions []int + Timeout time.Duration + Topic string + type ElectLeadersResponse struct + Error error + PartitionResults []ElectLeadersResponsePartitionResult + type ElectLeadersResponsePartitionResult struct + Error error + Partition int + type EndTxnRequest struct + Addr net.Addr + Committed bool + ProducerEpoch int + ProducerID int + TransactionalID string + type EndTxnResponse struct + Error error + Throttle time.Duration + type Error int + const BrokerAuthorizationFailed + const BrokerIDNotRegistered + const BrokerNotAvailable + const ClusterAuthorizationFailed + const ConcurrentTransactions + const DelegationTokenAuthDisabled + const DelegationTokenAuthorizationFailed + const DelegationTokenExpired + const DelegationTokenNotFound + const DelegationTokenOwnerMismatch + const DelegationTokenRequestNotAllowed + const DuplicateBrokerRegistration + const DuplicateResource + const DuplicateSequenceNumber + const ElectionNotNeeded + const EligibleLeadersNotAvailable + const FeatureUpdateFailed + const FencedInstanceID + const FencedLeaderEpoch + const FetchSessionIDNotFound + const FetchSessionTopicIDError + const GroupAuthorizationFailed + const GroupCoordinatorNotAvailable + const GroupIdNotFound + const GroupLoadInProgress + const GroupMaxSizeReached + const GroupSubscribedToTopic + const IllegalGeneration + const IllegalSASLState + const InconsistentClusterID + const InconsistentGroupProtocol + const InconsistentTopicID + const InconsistentVoterSet + const InvalidCommitOffsetSize + const InvalidConfiguration + const InvalidFetchSessionEpoch + const InvalidGroupId + const InvalidMessage + const InvalidMessageSize + const InvalidPartitionNumber + const InvalidPrincipalType + const InvalidProducerEpoch + const InvalidProducerIDMapping + const InvalidRecord + const InvalidReplicaAssignment + const InvalidReplicationFactor + const InvalidRequest + const InvalidRequiredAcks + const InvalidSessionTimeout + const InvalidTimestamp + const InvalidTopic + const InvalidTransactionState + const InvalidTransactionTimeout + const InvalidUpdateVersion + const KafkaStorageError + const LeaderNotAvailable + const ListenerNotFound + const LogDirNotFound + const MemberIDRequired + const MessageSizeTooLarge + const NetworkException + const NoReassignmentInProgress + const NonEmptyGroup + const NotController + const NotCoordinatorForGroup + const NotEnoughReplicas + const NotEnoughReplicasAfterAppend + const NotLeaderForPartition + const OffsetMetadataTooLarge + const OffsetNotAvailable + const OffsetOutOfRange + const OutOfOrderSequenceNumber + const PolicyViolation + const PositionOutOfRange + const PreferredLeaderNotAvailable + const PrincipalDeserializationFailure + const ProducerFenced + const ReassignmentInProgress + const RebalanceInProgress + const RecordListTooLarge + const ReplicaNotAvailable + const RequestTimedOut + const ResourceNotFound + const SASLAuthenticationFailed + const SecurityDisabled + const SnapshotNotFound + const StaleBrokerEpoch + const StaleControllerEpoch + const ThrottlingQuotaExceeded + const TopicAlreadyExists + const TopicAuthorizationFailed + const TopicDeletionDisabled + const TransactionCoordinatorFenced + const TransactionalIDAuthorizationFailed + const TransactionalIDNotFound + const UnacceptableCredential + const Unknown + const UnknownLeaderEpoch + const UnknownMemberId + const UnknownProducerId + const UnknownTopicID + const UnknownTopicOrPartition + const UnstableOffsetCommit + const UnsupportedCompressionType + const UnsupportedForMessageFormat + const UnsupportedSASLMechanism + const UnsupportedVersion + func (e Error) Description() string + func (e Error) Error() string + func (e Error) Temporary() bool + func (e Error) Timeout() bool + func (e Error) Title() string + type FetchRequest struct + Addr net.Addr + IsolationLevel IsolationLevel + MaxBytes int64 + MaxWait time.Duration + MinBytes int64 + Offset int64 + Partition int + Topic string + type FetchResponse struct + Error error + HighWatermark int64 + LastStableOffset int64 + LogStartOffset int64 + Partition int + Records RecordReader + Throttle time.Duration + Topic string + type FindCoordinatorRequest struct + Addr net.Addr + Key string + KeyType CoordinatorKeyType + type FindCoordinatorResponse struct + Coordinator *FindCoordinatorResponseCoordinator + Error error + Throttle time.Duration + type FindCoordinatorResponseCoordinator struct + Host string + NodeID int + Port int + type Generation struct + Assignments map[string][]PartitionAssignment + GroupID string + ID int32 + MemberID string + func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error + func (g *Generation) Start(fn func(ctx context.Context)) + type GroupBalancer interface + AssignGroups func(members []GroupMember, partitions []Partition) GroupMemberAssignments + ProtocolName func() string + UserData func() ([]byte, error) + type GroupMember struct + ID string + Topics []string + UserData []byte + type GroupMemberAssignments map[string]map[string][]int + type GroupMemberTopic struct + Partitions []int + Topic string + type GroupProtocol struct + Metadata GroupProtocolSubscription + Name string + type GroupProtocolAssignment struct + AssignedPartitions map[string][]int + UserData []byte + type GroupProtocolSubscription struct + OwnedPartitions map[string][]int + Topics []string + UserData []byte + type Hash struct + Hasher hash.Hash32 + func (h *Hash) Balance(msg Message, partitions ...int) int + type Header = protocol.Header + type HeartbeatRequest struct + Addr net.Addr + GenerationID int32 + GroupID string + GroupInstanceID string + MemberID string + type HeartbeatResponse struct + Error error + Throttle time.Duration + type IncrementalAlterConfigsRequest struct + Addr net.Addr + Resources []IncrementalAlterConfigsRequestResource + ValidateOnly bool + type IncrementalAlterConfigsRequestConfig struct + ConfigOperation ConfigOperation + Name string + Value string + type IncrementalAlterConfigsRequestResource struct + Configs []IncrementalAlterConfigsRequestConfig + ResourceName string + ResourceType ResourceType + type IncrementalAlterConfigsResponse struct + Resources []IncrementalAlterConfigsResponseResource + type IncrementalAlterConfigsResponseResource struct + Error error + ResourceName string + ResourceType ResourceType + type InitProducerIDRequest struct + Addr net.Addr + ProducerEpoch int + ProducerID int + TransactionTimeoutMs int + TransactionalID string + type InitProducerIDResponse struct + Error error + Producer *ProducerSession + Throttle time.Duration + type IsolationLevel int8 + const ReadCommitted + const ReadUncommitted + type JoinGroupRequest struct + Addr net.Addr + GroupID string + GroupInstanceID string + MemberID string + ProtocolType string + Protocols []GroupProtocol + RebalanceTimeout time.Duration + SessionTimeout time.Duration + type JoinGroupResponse struct + Error error + GenerationID int + LeaderID string + MemberID string + Members []JoinGroupResponseMember + ProtocolName string + ProtocolType string + Throttle time.Duration + type JoinGroupResponseMember struct + GroupInstanceID string + ID string + Metadata GroupProtocolSubscription + type LeastBytes struct + func (lb *LeastBytes) Balance(msg Message, partitions ...int) int + type LeaveGroupRequest struct + Addr net.Addr + GroupID string + Members []LeaveGroupRequestMember + type LeaveGroupRequestMember struct + GroupInstanceID string + ID string + type LeaveGroupResponse struct + Error error + Members []LeaveGroupResponseMember + Throttle time.Duration + type LeaveGroupResponseMember struct + Error error + GroupInstanceID string + ID string + type ListGroupsRequest struct + Addr net.Addr + type ListGroupsResponse struct + Error error + Groups []ListGroupsResponseGroup + type ListGroupsResponseGroup struct + Coordinator int + GroupID string + type ListOffsetsRequest struct + Addr net.Addr + IsolationLevel IsolationLevel + Topics map[string][]OffsetRequest + type ListOffsetsResponse struct + Throttle time.Duration + Topics map[string][]PartitionOffsets + type ListPartitionReassignmentsRequest struct + Addr net.Addr + Timeout time.Duration + Topics map[string]ListPartitionReassignmentsRequestTopic + type ListPartitionReassignmentsRequestTopic struct + PartitionIndexes []int + type ListPartitionReassignmentsResponse struct + Error error + Topics map[string]ListPartitionReassignmentsResponseTopic + type ListPartitionReassignmentsResponsePartition struct + AddingReplicas []int + PartitionIndex int + RemovingReplicas []int + Replicas []int + type ListPartitionReassignmentsResponseTopic struct + Partitions []ListPartitionReassignmentsResponsePartition + type Logger interface + Printf func(string, ...interface{}) + type LoggerFunc func(string, ...interface{}) + func (f LoggerFunc) Printf(msg string, args ...interface{}) + type Message struct + Headers []Header + HighWaterMark int64 + Key []byte + Offset int64 + Partition int + Time time.Time + Topic string + Value []byte + WriterData interface{} + type MessageTooLargeError struct + Message Message + Remaining []Message + func (e MessageTooLargeError) Error() string + type MetadataRequest struct + Addr net.Addr + Topics []string + type MetadataResponse struct + Brokers []Broker + ClusterID string + Controller Broker + Throttle time.Duration + Topics []Topic + type Murmur2Balancer struct + Consistent bool + func (b Murmur2Balancer) Balance(msg Message, partitions ...int) (partition int) + type OffsetCommit struct + Metadata string + Offset int64 + Partition int + type OffsetCommitPartition struct + Error error + Partition int + type OffsetCommitRequest struct + Addr net.Addr + GenerationID int + GroupID string + InstanceID string + MemberID string + Topics map[string][]OffsetCommit + type OffsetCommitResponse struct + Throttle time.Duration + Topics map[string][]OffsetCommitPartition + type OffsetDelete struct + Partition int + Topic string + type OffsetDeletePartition struct + Error error + Partition int + type OffsetDeleteRequest struct + Addr net.Addr + GroupID string + Topics map[string][]int + type OffsetDeleteResponse struct + Error error + Throttle time.Duration + Topics map[string][]OffsetDeletePartition + type OffsetFetchPartition struct + CommittedOffset int64 + Error error + Metadata string + Partition int + type OffsetFetchRequest struct + Addr net.Addr + GroupID string + Topics map[string][]int + type OffsetFetchResponse struct + Error error + Throttle time.Duration + Topics map[string][]OffsetFetchPartition + type OffsetRequest struct + Partition int + Timestamp int64 + func FirstOffsetOf(partition int) OffsetRequest + func LastOffsetOf(partition int) OffsetRequest + func TimeOffsetOf(partition int, at time.Time) OffsetRequest + type Partition struct + Error error + ID int + Isr []Broker + Leader Broker + OfflineReplicas []Broker + Replicas []Broker + Topic string + func LookupPartition(ctx context.Context, network string, address string, topic string, ...) (Partition, error) + func LookupPartitions(ctx context.Context, network string, address string, topic string) ([]Partition, error) + type PartitionAssignment struct + ID int + Offset int64 + type PartitionOffsets struct + Error error + FirstOffset int64 + LastOffset int64 + Offsets map[int64]time.Time + Partition int + type PatternType int8 + const PatternTypeAny + const PatternTypeLiteral + const PatternTypeMatch + const PatternTypePrefixed + const PatternTypeUnknown + func (pt *PatternType) UnmarshalText(text []byte) error + func (pt PatternType) MarshalText() ([]byte, error) + func (pt PatternType) String() string + type ProduceRequest struct + Addr net.Addr + Compression Compression + MessageVersion int + Partition int + Records RecordReader + RequiredAcks RequiredAcks + Topic string + TransactionalID string + type ProduceResponse struct + BaseOffset int64 + Error error + LogAppendTime time.Time + LogStartOffset int64 + RecordErrors map[int]error + Throttle time.Duration + type ProducerSession struct + ProducerEpoch int + ProducerID int + type RackAffinityGroupBalancer struct + Rack string + func (r RackAffinityGroupBalancer) AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments + func (r RackAffinityGroupBalancer) ProtocolName() string + func (r RackAffinityGroupBalancer) UserData() ([]byte, error) + type RangeGroupBalancer struct + func (r RangeGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments + func (r RangeGroupBalancer) ProtocolName() string + func (r RangeGroupBalancer) UserData() ([]byte, error) + type RawProduceRequest struct + Addr net.Addr + MessageVersion int + Partition int + RawRecords protocol.RawRecordSet + RequiredAcks RequiredAcks + Topic string + TransactionalID string + type ReadBatchConfig struct + IsolationLevel IsolationLevel + MaxBytes int + MaxWait time.Duration + MinBytes int + type Reader struct + func NewReader(config ReaderConfig) *Reader + func (r *Reader) Close() error + func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error + func (r *Reader) Config() ReaderConfig + func (r *Reader) FetchMessage(ctx context.Context) (Message, error) + func (r *Reader) Lag() int64 + func (r *Reader) Offset() int64 + func (r *Reader) ReadLag(ctx context.Context) (lag int64, err error) + func (r *Reader) ReadMessage(ctx context.Context) (Message, error) + func (r *Reader) SetOffset(offset int64) error + func (r *Reader) SetOffsetAt(ctx context.Context, t time.Time) error + func (r *Reader) Stats() ReaderStats + type ReaderConfig struct + Brokers []string + CommitInterval time.Duration + Dialer *Dialer + ErrorLogger Logger + GroupBalancers []GroupBalancer + GroupID string + GroupTopics []string + HeartbeatInterval time.Duration + IsolationLevel IsolationLevel + JoinGroupBackoff time.Duration + Logger Logger + MaxAttempts int + MaxBytes int + MaxWait time.Duration + MinBytes int + OffsetOutOfRangeError bool + Partition int + PartitionWatchInterval time.Duration + QueueCapacity int + ReadBackoffMax time.Duration + ReadBackoffMin time.Duration + ReadBatchTimeout time.Duration + ReadLagInterval time.Duration + RebalanceTimeout time.Duration + RetentionTime time.Duration + SessionTimeout time.Duration + StartOffset int64 + Topic string + WatchPartitionChanges bool + func (config *ReaderConfig) Validate() error + type ReaderStats struct + Bytes int64 + ClientID string + DeprecatedFetchesWithTypo int64 + DialTime DurationStats + Dials int64 + Errors int64 + FetchBytes SummaryStats + FetchSize SummaryStats + Fetches int64 + Lag int64 + MaxBytes int64 + MaxWait time.Duration + Messages int64 + MinBytes int64 + Offset int64 + Partition string + QueueCapacity int64 + QueueLength int64 + ReadTime DurationStats + Rebalances int64 + Timeouts int64 + Topic string + WaitTime DurationStats + type Record = protocol.Record + type RecordReader = protocol.RecordReader + func NewRecordReader(records ...Record) RecordReader + type ReferenceHash struct + Hasher hash.Hash32 + func (h *ReferenceHash) Balance(msg Message, partitions ...int) int + type ReplicaAssignment struct + Partition int + Replicas []int + type Request = protocol.Message + type RequiredAcks int + const RequireAll + const RequireNone + const RequireOne + func (acks *RequiredAcks) UnmarshalText(b []byte) error + func (acks RequiredAcks) MarshalText() ([]byte, error) + func (acks RequiredAcks) String() string + type Resolver interface + LookupHost func(ctx context.Context, host string) (addrs []string, err error) + type ResourceType int8 + const ResourceTypeAny + const ResourceTypeBroker + const ResourceTypeCluster + const ResourceTypeDelegationToken + const ResourceTypeGroup + const ResourceTypeTopic + const ResourceTypeTransactionalID + const ResourceTypeUnknown + func (rt *ResourceType) UnmarshalText(text []byte) error + func (rt ResourceType) MarshalText() ([]byte, error) + func (rt ResourceType) String() string + type Response = protocol.Message + type RoundRobin struct + ChunkSize int + func (rr *RoundRobin) Balance(msg Message, partitions ...int) int + type RoundRobinGroupBalancer struct + func (r RoundRobinGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments + func (r RoundRobinGroupBalancer) ProtocolName() string + func (r RoundRobinGroupBalancer) UserData() ([]byte, error) + type RoundTripper interface + RoundTrip func(context.Context, net.Addr, Request) (Response, error) + var DefaultTransport RoundTripper = &Transport{ ... } + type ScramMechanism int8 + const ScramMechanismSha256 + const ScramMechanismSha512 + const ScramMechanismUnknown + type SummaryStats struct + Avg int64 + Count int64 + Max int64 + Min int64 + Sum int64 + type SyncGroupRequest struct + Addr net.Addr + Assignments []SyncGroupRequestAssignment + GenerationID int + GroupID string + GroupInstanceID string + MemberID string + ProtocolName string + ProtocolType string + type SyncGroupRequestAssignment struct + Assignment GroupProtocolAssignment + MemberID string + type SyncGroupResponse struct + Assignment GroupProtocolAssignment + Error error + ProtocolName string + ProtocolType string + Throttle time.Duration + type Topic struct + Error error + Internal bool + Name string + Partitions []Partition + type TopicAndGroup struct + GroupId string + Topic string + type TopicConfig struct + ConfigEntries []ConfigEntry + NumPartitions int + ReplicaAssignments []ReplicaAssignment + ReplicationFactor int + Topic string + type TopicPartitionAssignment struct + BrokerIDs []int32 + type TopicPartitionsConfig struct + Count int32 + Name string + TopicPartitionAssignments []TopicPartitionAssignment + type Transport struct + ClientID string + Context context.Context + Dial func(context.Context, string, string) (net.Conn, error) + DialTimeout time.Duration + IdleTimeout time.Duration + MetadataTTL time.Duration + MetadataTopics []string + Resolver BrokerResolver + SASL sasl.Mechanism + TLS *tls.Config + func (t *Transport) CloseIdleConnections() + func (t *Transport) RoundTrip(ctx context.Context, addr net.Addr, req Request) (Response, error) + type TxnOffsetCommit struct + Metadata string + Offset int64 + Partition int + type TxnOffsetCommitPartition struct + Error error + Partition int + type TxnOffsetCommitRequest struct + Addr net.Addr + GenerationID int + GroupID string + GroupInstanceID string + MemberID string + ProducerEpoch int + ProducerID int + Topics map[string][]TxnOffsetCommit + TransactionalID string + type TxnOffsetCommitResponse struct + Throttle time.Duration + Topics map[string][]TxnOffsetCommitPartition + type UserScramCredentialsDeletion struct + Mechanism ScramMechanism + Name string + type UserScramCredentialsUpsertion struct + Iterations int + Mechanism ScramMechanism + Name string + Salt []byte + SaltedPassword []byte + type UserScramCredentialsUser struct + Name string + type Version int16 + func (n Version) Marshal(v interface{}) ([]byte, error) + func (n Version) Unmarshal(b []byte, v interface{}) error + type WriteErrors []error + func (err WriteErrors) Count() int + func (err WriteErrors) Error() string + type Writer struct + Addr net.Addr + AllowAutoTopicCreation bool + Async bool + Balancer Balancer + BatchBytes int64 + BatchSize int + BatchTimeout time.Duration + Completion func(messages []Message, err error) + Compression Compression + ErrorLogger Logger + Logger Logger + MaxAttempts int + ReadTimeout time.Duration + RequiredAcks RequiredAcks + Topic string + Transport RoundTripper + WriteBackoffMax time.Duration + WriteBackoffMin time.Duration + WriteTimeout time.Duration + func NewWriter(config WriterConfig) *Writer + func (w *Writer) Close() error + func (w *Writer) Stats() WriterStats + func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error + type WriterConfig struct + Async bool + Balancer Balancer + BatchBytes int + BatchSize int + BatchTimeout time.Duration + Brokers []string + Dialer *Dialer + ErrorLogger Logger + IdleConnTimeout time.Duration + Logger Logger + MaxAttempts int + QueueCapacity int + ReadTimeout time.Duration + RebalanceInterval time.Duration + RequiredAcks int + Topic string + WriteTimeout time.Duration + func (config *WriterConfig) Validate() error + type WriterStats struct + Async bool + BatchBytes SummaryStats + BatchQueueTime DurationStats + BatchSize SummaryStats + BatchTime DurationStats + BatchTimeout time.Duration + Bytes int64 + ClientID string + DialTime DurationStats + Dials int64 + Errors int64 + MaxAttempts int64 + MaxBatchSize int64 + Messages int64 + QueueCapacity int64 + QueueLength int64 + ReadTimeout time.Duration + RebalanceInterval time.Duration + Rebalances int64 + RequiredAcks int64 + Retries int64 + Topic string + WaitTime DurationStats + WriteBackoffMax time.Duration + WriteBackoffMin time.Duration + WriteTime DurationStats + WriteTimeout time.Duration + Writes int64