Versions in this module Expand all Collapse all v1 v1.6.2 Dec 5, 2022 v1.6.1 Dec 5, 2022 Changes in this version + func StringPtr(s string) *string + type ACLBuilder struct + func NewACLs() *ACLBuilder + func (b *ACLBuilder) Allow(principals ...string) *ACLBuilder + func (b *ACLBuilder) AllowHosts(hosts ...string) *ACLBuilder + func (b *ACLBuilder) AnyResource(name ...string) *ACLBuilder + func (b *ACLBuilder) Clusters() *ACLBuilder + func (b *ACLBuilder) DelegationTokens(t ...string) *ACLBuilder + func (b *ACLBuilder) Deny(principals ...string) *ACLBuilder + func (b *ACLBuilder) DenyHosts(hosts ...string) *ACLBuilder + func (b *ACLBuilder) Groups(g ...string) *ACLBuilder + func (b *ACLBuilder) HasAnyFilter() bool + func (b *ACLBuilder) HasHosts() bool + func (b *ACLBuilder) HasPrincipals() bool + func (b *ACLBuilder) HasResource() bool + func (b *ACLBuilder) MaybeAllow(principals ...string) *ACLBuilder + func (b *ACLBuilder) MaybeAllowHosts(hosts ...string) *ACLBuilder + func (b *ACLBuilder) MaybeClusters(c bool) *ACLBuilder + func (b *ACLBuilder) MaybeDelegationTokens(t ...string) *ACLBuilder + func (b *ACLBuilder) MaybeDeny(principals ...string) *ACLBuilder + func (b *ACLBuilder) MaybeDenyHosts(hosts ...string) *ACLBuilder + func (b *ACLBuilder) MaybeGroups(g ...string) *ACLBuilder + func (b *ACLBuilder) MaybeOperations(operations ...ACLOperation) *ACLBuilder + func (b *ACLBuilder) MaybeTopics(t ...string) *ACLBuilder + func (b *ACLBuilder) MaybeTransactionalIDs(x ...string) *ACLBuilder + func (b *ACLBuilder) Operations(operations ...ACLOperation) *ACLBuilder + func (b *ACLBuilder) PrefixUser() + func (b *ACLBuilder) PrefixUserExcept(except ...string) + func (b *ACLBuilder) ResourcePatternType(pattern ACLPattern) *ACLBuilder + func (b *ACLBuilder) Topics(t ...string) *ACLBuilder + func (b *ACLBuilder) TransactionalIDs(x ...string) *ACLBuilder + func (b *ACLBuilder) ValidateCreate() error + func (b *ACLBuilder) ValidateDelete() error + func (b *ACLBuilder) ValidateDescribe() error + func (b *ACLBuilder) ValidateFilter() error + type ACLOperation = kmsg.ACLOperation + const OpAll + const OpAlter + const OpAlterConfigs + const OpAny + const OpClusterAction + const OpCreate + const OpDelete + const OpDescribe + const OpDescribeConfigs + const OpIdempotentWrite + const OpRead + const OpUnknown + const OpWrite + type ACLPattern = kmsg.ACLResourcePatternType + const ACLPatternAny + const ACLPatternLiteral + const ACLPatternMatch + const ACLPatternPrefixed + const ACLPatternUnknown + type AlterAllReplicaLogDirsResponses map[int32]AlterReplicaLogDirsResponses + func (rs AlterAllReplicaLogDirsResponses) Each(fn func(AlterReplicaLogDirsResponse)) + func (rs AlterAllReplicaLogDirsResponses) Sorted() []AlterReplicaLogDirsResponse + type AlterClientQuotaEntry struct + Entity ClientQuotaEntity + Ops []AlterClientQuotaOp + type AlterClientQuotaOp struct + Key string + Remove bool + Value float64 + type AlterConfig struct + Name string + Op IncrementalOp + Value *string + type AlterConfigsResponse struct + Err error + Name string + type AlterConfigsResponses []AlterConfigsResponse + func (rs AlterConfigsResponses) On(name string, fn func(*AlterConfigsResponse) error) (AlterConfigsResponse, error) + type AlterPartitionAssignmentsReq map[string]map[int32][]int32 + func (r *AlterPartitionAssignmentsReq) Assign(t string, p int32, brokers []int32) + func (r *AlterPartitionAssignmentsReq) CancelAssign(t string, p int32) + type AlterPartitionAssignmentsResponse struct + Err error + ErrMessage string + Partition int32 + Topic string + type AlterPartitionAssignmentsResponses map[string]map[int32]AlterPartitionAssignmentsResponse + func (rs AlterPartitionAssignmentsResponses) Each(fn func(AlterPartitionAssignmentsResponse)) + func (rs AlterPartitionAssignmentsResponses) Sorted() []AlterPartitionAssignmentsResponse + type AlterReplicaLogDirsReq map[string]TopicsSet + func (r *AlterReplicaLogDirsReq) Add(d string, s TopicsSet) + type AlterReplicaLogDirsResponse struct + Broker int32 + Dir string + Err error + Partition int32 + Topic string + func (a AlterReplicaLogDirsResponse) Less(other AlterReplicaLogDirsResponse) bool + type AlterReplicaLogDirsResponses map[string]map[int32]AlterReplicaLogDirsResponse + func (rs AlterReplicaLogDirsResponses) Each(fn func(AlterReplicaLogDirsResponse)) + func (rs AlterReplicaLogDirsResponses) Sorted() []AlterReplicaLogDirsResponse + type AlteredClientQuota struct + Entity ClientQuotaEntity + Err error + ErrMessage string + type AlteredClientQuotas []AlteredClientQuota + type AlteredUserSCRAM struct + Err error + ErrMessage string + User string + type AlteredUserSCRAMs map[string]AlteredUserSCRAM + func (as AlteredUserSCRAMs) AllFailed() bool + func (as AlteredUserSCRAMs) Each(fn func(AlteredUserSCRAM)) + func (as AlteredUserSCRAMs) EachError(fn func(AlteredUserSCRAM)) + func (as AlteredUserSCRAMs) Error() error + func (as AlteredUserSCRAMs) Ok() bool + func (as AlteredUserSCRAMs) Sorted() []AlteredUserSCRAM + type AuthError struct + Err error + func (a *AuthError) Error() string + func (a *AuthError) Is(err error) bool + func (a *AuthError) Unwrap() error + type BrokerApiVersions struct + Err error + NodeID int32 + func (v *BrokerApiVersions) EachKeySorted(fn func(key, min, max int16)) + func (v *BrokerApiVersions) KeyMaxVersion(key int16) (max int16, exists bool) + func (v *BrokerApiVersions) KeyMinVersion(key int16) (min int16, exists bool) + func (v *BrokerApiVersions) KeyVersions(key int16) (min, max int16, exists bool) + func (v *BrokerApiVersions) Raw() *kmsg.ApiVersionsResponse + func (v *BrokerApiVersions) VersionGuess(opt ...kversion.VersionGuessOpt) string + type BrokerDetail = kgo.BrokerMetadata + type BrokerDetails []BrokerDetail + func (ds BrokerDetails) NodeIDs() []int32 + type BrokersApiVersions map[int32]BrokerApiVersions + func (vs BrokersApiVersions) Each(fn func(BrokerApiVersions)) + func (vs BrokersApiVersions) Sorted() []BrokerApiVersions + type Client struct + func NewClient(cl *kgo.Client) *Client + func NewOptClient(opts ...kgo.Opt) (*Client, error) + func (cl *Client) AlterAllReplicaLogDirs(ctx context.Context, alter AlterReplicaLogDirsReq) (AlterAllReplicaLogDirsResponses, error) + func (cl *Client) AlterBrokerConfigs(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error) + func (cl *Client) AlterBrokerReplicaLogDirs(ctx context.Context, broker int32, alter AlterReplicaLogDirsReq) (AlterReplicaLogDirsResponses, error) + func (cl *Client) AlterClientQuotas(ctx context.Context, entries []AlterClientQuotaEntry) (AlteredClientQuotas, error) + func (cl *Client) AlterPartitionAssignments(ctx context.Context, req AlterPartitionAssignmentsReq) (AlterPartitionAssignmentsResponses, error) + func (cl *Client) AlterTopicConfigs(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error) + func (cl *Client) AlterUserSCRAMs(ctx context.Context, del []DeleteSCRAM, upsert []UpsertSCRAM) (AlteredUserSCRAMs, error) + func (cl *Client) ApiVersions(ctx context.Context) (BrokersApiVersions, error) + func (cl *Client) BrokerMetadata(ctx context.Context) (Metadata, error) + func (cl *Client) Close() + func (cl *Client) CommitAllOffsets(ctx context.Context, group string, os Offsets) error + func (cl *Client) CommitOffsets(ctx context.Context, group string, os Offsets) (OffsetResponses, error) + func (cl *Client) CreateACLs(ctx context.Context, b *ACLBuilder) (CreateACLsResults, error) + func (cl *Client) CreateDelegationToken(ctx context.Context, d CreateDelegationToken) (DelegationToken, error) + func (cl *Client) CreatePartitions(ctx context.Context, add int, topics ...string) (CreatePartitionsResponses, error) + func (cl *Client) CreateTopics(ctx context.Context, partitions int32, replicationFactor int16, ...) (CreateTopicResponses, error) + func (cl *Client) DeleteACLs(ctx context.Context, b *ACLBuilder) (DeleteACLsResults, error) + func (cl *Client) DeleteGroups(ctx context.Context, groups ...string) (DeleteGroupResponses, error) + func (cl *Client) DeleteOffsets(ctx context.Context, group string, s TopicsSet) (DeleteOffsetsResponses, error) + func (cl *Client) DeleteRecords(ctx context.Context, os Offsets) (DeleteRecordsResponses, error) + func (cl *Client) DeleteTopics(ctx context.Context, topics ...string) (DeleteTopicResponses, error) + func (cl *Client) DescribeACLs(ctx context.Context, b *ACLBuilder) (DescribeACLsResults, error) + func (cl *Client) DescribeAllLogDirs(ctx context.Context, s TopicsSet) (DescribedAllLogDirs, error) + func (cl *Client) DescribeBrokerConfigs(ctx context.Context, brokers ...int32) (ResourceConfigs, error) + func (cl *Client) DescribeBrokerLogDirs(ctx context.Context, broker int32, s TopicsSet) (DescribedLogDirs, error) + func (cl *Client) DescribeClientQuotas(ctx context.Context, strict bool, ...) (DescribedClientQuotas, error) + func (cl *Client) DescribeDelegationTokens(ctx context.Context, owners ...Principal) (DelegationTokens, error) + func (cl *Client) DescribeGroups(ctx context.Context, groups ...string) (DescribedGroups, error) + func (cl *Client) DescribeProducers(ctx context.Context, s TopicsSet) (DescribedProducersTopics, error) + func (cl *Client) DescribeTopicConfigs(ctx context.Context, topics ...string) (ResourceConfigs, error) + func (cl *Client) DescribeTransactions(ctx context.Context, txnIDs ...string) (DescribedTransactions, error) + func (cl *Client) DescribeUserSCRAMs(ctx context.Context, users ...string) (DescribedUserSCRAMs, error) + func (cl *Client) ElectLeaders(ctx context.Context, how ElectLeadersHow, s TopicsSet) (ElectLeadersResults, error) + func (cl *Client) ExpireDelegationToken(ctx context.Context, hmac []byte, expiry time.Duration) (expiryTimestamp time.Time, err error) + func (cl *Client) FetchManyOffsets(ctx context.Context, groups ...string) FetchOffsetsResponses + func (cl *Client) FetchOffsets(ctx context.Context, group string) (OffsetResponses, error) + func (cl *Client) FetchOffsetsForTopics(ctx context.Context, group string, topics ...string) (OffsetResponses, error) + func (cl *Client) FindGroupCoordinators(ctx context.Context, groups ...string) FindCoordinatorResponses + func (cl *Client) FindTxnCoordinators(ctx context.Context, txnIDs ...string) FindCoordinatorResponses + func (cl *Client) LeaveGroup(ctx context.Context, b *LeaveGroupBuilder) (LeaveGroupResponses, error) + func (cl *Client) ListBrokers(ctx context.Context) (BrokerDetails, error) + func (cl *Client) ListCommittedOffsets(ctx context.Context, topics ...string) (ListedOffsets, error) + func (cl *Client) ListEndOffsets(ctx context.Context, topics ...string) (ListedOffsets, error) + func (cl *Client) ListGroups(ctx context.Context, filterStates ...string) (ListedGroups, error) + func (cl *Client) ListOffsetsAfterMilli(ctx context.Context, millisecond int64, topics ...string) (ListedOffsets, error) + func (cl *Client) ListPartitionReassignments(ctx context.Context, s TopicsSet) (ListPartitionReassignmentsResponses, error) + func (cl *Client) ListStartOffsets(ctx context.Context, topics ...string) (ListedOffsets, error) + func (cl *Client) ListTopics(ctx context.Context, topics ...string) (TopicDetails, error) + func (cl *Client) ListTopicsWithInternal(ctx context.Context, topics ...string) (TopicDetails, error) + func (cl *Client) ListTransactions(ctx context.Context, producerIDs []int64, filterStates []string) (ListedTransactions, error) + func (cl *Client) Metadata(ctx context.Context, topics ...string) (Metadata, error) + func (cl *Client) OffetForLeaderEpoch(ctx context.Context, r OffsetForLeaderEpochRequest) (OffsetsForLeaderEpochs, error) + func (cl *Client) RenewDelegationToken(ctx context.Context, hmac []byte, renewTime time.Duration) (expiryTimestamp time.Time, err error) + func (cl *Client) SetTimeoutMillis(millis int32) + func (cl *Client) UpdatePartitions(ctx context.Context, set int, topics ...string) (CreatePartitionsResponses, error) + func (cl *Client) ValidateAlterBrokerConfigs(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error) + func (cl *Client) ValidateAlterClientQuotas(ctx context.Context, entries []AlterClientQuotaEntry) (AlteredClientQuotas, error) + func (cl *Client) ValidateAlterTopicConfigs(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error) + func (cl *Client) ValidateCreatePartitions(ctx context.Context, add int, topics ...string) (CreatePartitionsResponses, error) + func (cl *Client) ValidateCreateTopics(ctx context.Context, partitions int32, replicationFactor int16, ...) (CreateTopicResponses, error) + func (cl *Client) ValidateUpdatePartitions(ctx context.Context, set int, topics ...string) (CreatePartitionsResponses, error) + func (cl *Client) WriteTxnMarkers(ctx context.Context, markers ...TxnMarkers) (TxnMarkersResponses, error) + type ClientQuotaEntity []ClientQuotaEntityComponent + func (ds ClientQuotaEntity) String() string + type ClientQuotaEntityComponent struct + Name *string + Type string + func (d ClientQuotaEntityComponent) String() string + type ClientQuotaValue struct + Key string + Value float64 + func (d ClientQuotaValue) String() string + type ClientQuotaValues []ClientQuotaValue + type Config struct + Key string + Sensitive bool + Source kmsg.ConfigSource + Synonyms []ConfigSynonym + Value *string + func (c *Config) MaybeValue() string + type ConfigSynonym struct + Key string + Source kmsg.ConfigSource + Value *string + type CreateACLsResult struct + Err error + Host string + Name string + Operation ACLOperation + Pattern ACLPattern + Permission kmsg.ACLPermissionType + Principal string + Type kmsg.ACLResourceType + type CreateACLsResults []CreateACLsResult + type CreateDelegationToken struct + MaxLifetime time.Duration + Owner *Principal + Renewers []Principal + type CreatePartitionsResponse struct + Err error + Topic string + type CreatePartitionsResponses map[string]CreatePartitionsResponse + func (rs CreatePartitionsResponses) On(topic string, fn func(*CreatePartitionsResponse) error) (CreatePartitionsResponse, error) + func (rs CreatePartitionsResponses) Sorted() []CreatePartitionsResponse + type CreateTopicResponse struct + Err error + ID TopicID + Topic string + type CreateTopicResponses map[string]CreateTopicResponse + func (rs CreateTopicResponses) On(topic string, fn func(*CreateTopicResponse) error) (CreateTopicResponse, error) + func (rs CreateTopicResponses) Sorted() []CreateTopicResponse + type CredInfo struct + Iterations int32 + Mechanism ScramMechanism + func (c CredInfo) String() string + type DelegationToken struct + ExpiryTimestamp time.Time + HMAC []byte + IssueTimestamp time.Time + MaxTimestamp time.Time + Owner Principal + Renewers []Principal + TokenID string + TokenRequesterPrincipal Principal + type DelegationTokens []DelegationToken + type DeleteACLsResult struct + Deleted DeletedACLs + Err error + Host *string + Name *string + Operation ACLOperation + Pattern ACLPattern + Permission kmsg.ACLPermissionType + Principal *string + Type kmsg.ACLResourceType + type DeleteACLsResults []DeleteACLsResult + type DeleteGroupResponse struct + Err error + Group string + type DeleteGroupResponses map[string]DeleteGroupResponse + func (ds DeleteGroupResponses) Sorted() []DeleteGroupResponse + func (rs DeleteGroupResponses) On(group string, fn func(*DeleteGroupResponse) error) (DeleteGroupResponse, error) + type DeleteOffsetsResponses map[string]map[int32]error + func (ds DeleteOffsetsResponses) EachError(fn func(string, int32, error)) + func (ds DeleteOffsetsResponses) Lookup(t string, p int32) (error, bool) + type DeleteRecordsResponse struct + Err error + LowWatermark int64 + Partition int32 + Topic string + type DeleteRecordsResponses map[string]map[int32]DeleteRecordsResponse + func (ds DeleteRecordsResponses) Each(fn func(DeleteRecordsResponse)) + func (ds DeleteRecordsResponses) Lookup(t string, p int32) (DeleteRecordsResponse, bool) + func (rs DeleteRecordsResponses) On(topic string, partition int32, fn func(*DeleteRecordsResponse) error) (DeleteRecordsResponse, error) + func (rs DeleteRecordsResponses) Sorted() []DeleteRecordsResponse + type DeleteSCRAM struct + Mechanism ScramMechanism + User string + type DeleteTopicResponse struct + Err error + ID TopicID + Topic string + type DeleteTopicResponses map[string]DeleteTopicResponse + func (rs DeleteTopicResponses) On(topic string, fn func(*DeleteTopicResponse) error) (DeleteTopicResponse, error) + func (rs DeleteTopicResponses) Sorted() []DeleteTopicResponse + type DeletedACL struct + Err error + Host string + Name string + Operation ACLOperation + Pattern ACLPattern + Permission kmsg.ACLPermissionType + Principal string + Type kmsg.ACLResourceType + type DeletedACLs []DeletedACL + type DescribeACLsResult struct + Described DescribedACLs + Err error + Host *string + Name *string + Operation ACLOperation + Pattern ACLPattern + Permission kmsg.ACLPermissionType + Principal *string + Type kmsg.ACLResourceType + type DescribeACLsResults []DescribeACLsResult + type DescribeClientQuotaComponent struct + MatchName *string + MatchType QuotasMatchType + Type string + type DescribedACL struct + Host string + Name string + Operation ACLOperation + Pattern ACLPattern + Permission kmsg.ACLPermissionType + Principal string + Type kmsg.ACLResourceType + type DescribedACLs []DescribedACL + type DescribedAllLogDirs map[int32]DescribedLogDirs + func (ds DescribedAllLogDirs) Each(fn func(DescribedLogDir)) + func (ds DescribedAllLogDirs) Sorted() []DescribedLogDir + type DescribedClientQuota struct + Entity ClientQuotaEntity + Values ClientQuotaValues + type DescribedClientQuotas []DescribedClientQuota + type DescribedGroup struct + Coordinator BrokerDetail + Err error + Group string + Members []DescribedGroupMember + Protocol string + ProtocolType string + State string + func (d *DescribedGroup) AssignedPartitions() TopicsSet + type DescribedGroupMember struct + Assigned GroupMemberAssignment + ClientHost string + ClientID string + InstanceID *string + Join GroupMemberMetadata + MemberID string + type DescribedGroups map[string]DescribedGroup + func (ds DescribedGroups) AssignedPartitions() TopicsSet + func (ds DescribedGroups) Names() []string + func (ds DescribedGroups) Sorted() []DescribedGroup + func (rs DescribedGroups) On(group string, fn func(*DescribedGroup) error) (DescribedGroup, error) + type DescribedLogDir struct + Broker int32 + Dir string + Err error + Topics DescribedLogDirTopics + func (ds DescribedLogDir) Size() int64 + type DescribedLogDirPartition struct + Broker int32 + Dir string + IsFuture bool + OffsetLag int64 + Partition int32 + Size int64 + Topic string + func (p DescribedLogDirPartition) Less(other DescribedLogDirPartition) bool + func (p DescribedLogDirPartition) LessBySize(other DescribedLogDirPartition) bool + type DescribedLogDirTopics map[string]map[int32]DescribedLogDirPartition + func (ds DescribedLogDirTopics) Each(fn func(p DescribedLogDirPartition)) + func (ds DescribedLogDirTopics) Lookup(t string, p int32) (DescribedLogDirPartition, bool) + func (ds DescribedLogDirTopics) Size() int64 + func (ds DescribedLogDirTopics) Sorted() []DescribedLogDirPartition + func (ds DescribedLogDirTopics) SortedBySize() []DescribedLogDirPartition + type DescribedLogDirs map[string]DescribedLogDir + func (ds DescribedLogDirs) Each(fn func(DescribedLogDir)) + func (ds DescribedLogDirs) EachError(fn func(DescribedLogDir)) + func (ds DescribedLogDirs) EachPartition(fn func(d DescribedLogDirPartition)) + func (ds DescribedLogDirs) Error() error + func (ds DescribedLogDirs) LargestPartitionBySize() (DescribedLogDirPartition, bool) + func (ds DescribedLogDirs) Lookup(d, t string, p int32) (DescribedLogDirPartition, bool) + func (ds DescribedLogDirs) LookupPartition(t string, p int32) (DescribedLogDirPartition, bool) + func (ds DescribedLogDirs) Ok() bool + func (ds DescribedLogDirs) Size() int64 + func (ds DescribedLogDirs) SmallestPartitionBySize() (DescribedLogDirPartition, bool) + func (ds DescribedLogDirs) Sorted() []DescribedLogDir + func (ds DescribedLogDirs) SortedBySize() []DescribedLogDir + func (ds DescribedLogDirs) SortedPartitions() []DescribedLogDirPartition + func (ds DescribedLogDirs) SortedPartitionsBySize() []DescribedLogDirPartition + type DescribedProducer struct + CoordinatorEpoch int32 + CurrentTxnStartOffset int64 + LastSequence int32 + LastTimestamp int64 + Leader int32 + Partition int32 + ProducerEpoch int16 + ProducerID int64 + Topic string + func (l *DescribedProducer) Less(r *DescribedProducer) bool + type DescribedProducers map[int64]DescribedProducer + func (ds DescribedProducers) Each(fn func(DescribedProducer)) + func (ds DescribedProducers) Sorted() []DescribedProducer + type DescribedProducersPartition struct + ActiveProducers DescribedProducers + Err error + Leader int32 + Partition int32 + Topic string + type DescribedProducersPartitions map[int32]DescribedProducersPartition + func (ds DescribedProducersPartitions) Each(fn func(DescribedProducersPartition)) + func (ds DescribedProducersPartitions) EachProducer(fn func(DescribedProducer)) + func (ds DescribedProducersPartitions) Sorted() []DescribedProducersPartition + func (ds DescribedProducersPartitions) SortedProducers() []DescribedProducer + type DescribedProducersTopic struct + Partitions DescribedProducersPartitions + Topic string + type DescribedProducersTopics map[string]DescribedProducersTopic + func (ds DescribedProducersTopics) Each(fn func(DescribedProducersTopic)) + func (ds DescribedProducersTopics) EachPartition(fn func(DescribedProducersPartition)) + func (ds DescribedProducersTopics) EachProducer(fn func(DescribedProducer)) + func (ds DescribedProducersTopics) Sorted() []DescribedProducersTopic + func (ds DescribedProducersTopics) SortedPartitions() []DescribedProducersPartition + func (ds DescribedProducersTopics) SortedProducers() []DescribedProducer + type DescribedTransaction struct + Coordinator int32 + Err error + ProducerEpoch int16 + ProducerID int64 + StartTimestamp int64 + State string + TimeoutMillis int32 + Topics TopicsSet + TxnID string + type DescribedTransactions map[string]DescribedTransaction + func (ds DescribedTransactions) Each(fn func(DescribedTransaction)) + func (ds DescribedTransactions) Sorted() []DescribedTransaction + func (ds DescribedTransactions) TransactionalIDs() []string + func (rs DescribedTransactions) On(txnID string, fn func(*DescribedTransaction) error) (DescribedTransaction, error) + type DescribedUserSCRAM struct + CredInfos []CredInfo + Err error + ErrMessage string + User string + type DescribedUserSCRAMs map[string]DescribedUserSCRAM + func (ds DescribedUserSCRAMs) AllFailed() bool + func (ds DescribedUserSCRAMs) Each(fn func(DescribedUserSCRAM)) + func (ds DescribedUserSCRAMs) EachError(fn func(DescribedUserSCRAM)) + func (ds DescribedUserSCRAMs) Error() error + func (ds DescribedUserSCRAMs) Ok() bool + func (ds DescribedUserSCRAMs) Sorted() []DescribedUserSCRAM + type ElectLeadersHow int8 + const ElectLiveReplica + const ElectPreferredReplica + type ElectLeadersResult struct + Err error + ErrMessage string + How ElectLeadersHow + Partition int32 + Topic string + type ElectLeadersResults map[string]map[int32]ElectLeadersResult + type FetchOffsetsResponse struct + Err error + Fetched OffsetResponses + Group string + func (r FetchOffsetsResponse) CommittedPartitions() TopicsSet + type FetchOffsetsResponses map[string]FetchOffsetsResponse + func (rs FetchOffsetsResponses) AllFailed() bool + func (rs FetchOffsetsResponses) CommittedPartitions() TopicsSet + func (rs FetchOffsetsResponses) EachError(fn func(FetchOffsetsResponse)) + func (rs FetchOffsetsResponses) On(group string, fn func(*FetchOffsetsResponse) error) (FetchOffsetsResponse, error) + type FindCoordinatorResponse struct + Err error + ErrMessage string + Host string + Name string + NodeID int32 + Port int32 + type FindCoordinatorResponses map[string]FindCoordinatorResponse + func (rs FindCoordinatorResponses) AllFailed() bool + func (rs FindCoordinatorResponses) Each(fn func(FindCoordinatorResponse)) + func (rs FindCoordinatorResponses) EachError(fn func(FindCoordinatorResponse)) + func (rs FindCoordinatorResponses) Error() error + func (rs FindCoordinatorResponses) Ok() bool + func (rs FindCoordinatorResponses) Sorted() []FindCoordinatorResponse + type GroupLag map[string]map[int32]GroupMemberLag + func CalculateGroupLag(group DescribedGroup, commit OffsetResponses, endOffsets ListedOffsets) GroupLag + func (l GroupLag) IsEmpty() bool + func (l GroupLag) Lookup(t string, p int32) (GroupMemberLag, bool) + func (l GroupLag) Sorted() []GroupMemberLag + func (l GroupLag) Total() int64 + func (l GroupLag) TotalByTopic() GroupTopicsLag + type GroupMemberAssignment struct + func (m GroupMemberAssignment) AsConnect() (*kmsg.ConnectMemberAssignment, bool) + func (m GroupMemberAssignment) AsConsumer() (*kmsg.ConsumerMemberAssignment, bool) + func (m GroupMemberAssignment) Raw() ([]byte, bool) + type GroupMemberLag struct + Commit Offset + End ListedOffset + Err error + Lag int64 + Member *DescribedGroupMember + func (g *GroupMemberLag) IsEmpty() bool + type GroupMemberMetadata struct + func (m GroupMemberMetadata) AsConnect() (*kmsg.ConnectMemberMetadata, bool) + func (m GroupMemberMetadata) AsConsumer() (*kmsg.ConsumerMemberMetadata, bool) + func (m GroupMemberMetadata) Raw() ([]byte, bool) + type GroupTopicsLag map[string]TopicLag + func (l GroupTopicsLag) Sorted() []TopicLag + type IncrementalOp int8 + const AppendConfig + const DeleteConfig + const SetConfig + const SubtractConfig + type LeaveGroupBuilder struct + func LeaveGroup(group string) *LeaveGroupBuilder + func (b *LeaveGroupBuilder) InstanceIDs(ids ...string) *LeaveGroupBuilder + func (b *LeaveGroupBuilder) Reason(reason string) *LeaveGroupBuilder + type LeaveGroupResponse struct + Err error + Group string + InstanceID string + MemberID string + type LeaveGroupResponses map[string]LeaveGroupResponse + func (ls LeaveGroupResponses) Each(fn func(l LeaveGroupResponse)) + func (ls LeaveGroupResponses) EachError(fn func(l LeaveGroupResponse)) + func (ls LeaveGroupResponses) Error() error + func (ls LeaveGroupResponses) Ok() bool + func (ls LeaveGroupResponses) Sorted() []LeaveGroupResponse + type ListPartitionReassignmentsResponse struct + AddingReplicas []int32 + Partition int32 + RemovingReplicas []int32 + Replicas []int32 + Topic string + type ListPartitionReassignmentsResponses map[string]map[int32]ListPartitionReassignmentsResponse + func (rs ListPartitionReassignmentsResponses) Each(fn func(ListPartitionReassignmentsResponse)) + func (rs ListPartitionReassignmentsResponses) Sorted() []ListPartitionReassignmentsResponse + type ListedGroup struct + Coordinator int32 + Group string + ProtocolType string + State string + type ListedGroups map[string]ListedGroup + func (ls ListedGroups) Groups() []string + func (ls ListedGroups) Sorted() []ListedGroup + type ListedOffset struct + Err error + LeaderEpoch int32 + Offset int64 + Partition int32 + Timestamp int64 + Topic string + type ListedOffsets map[string]map[int32]ListedOffset + func (l ListedOffsets) Each(fn func(ListedOffset)) + func (l ListedOffsets) Error() error + func (l ListedOffsets) KOffsets() map[string]map[int32]kgo.Offset + func (l ListedOffsets) Lookup(t string, p int32) (ListedOffset, bool) + func (l ListedOffsets) Offsets() Offsets + type ListedTransaction struct + Coordinator int32 + ProducerID int64 + State string + TxnID string + type ListedTransactions map[string]ListedTransaction + func (ls ListedTransactions) Each(fn func(ListedTransaction)) + func (ls ListedTransactions) Sorted() []ListedTransaction + func (ls ListedTransactions) TransactionalIDs() []string + type Metadata struct + Brokers BrokerDetails + Cluster string + Controller int32 + Topics TopicDetails + type Offset struct + At int64 + LeaderEpoch int32 + Metadata string + Partition int32 + Topic string + type OffsetForLeaderEpoch struct + EndOffset int64 + Err error + LeaderEpoch int32 + NodeID int32 + Partition int32 + Topic string + type OffsetForLeaderEpochRequest map[string]map[int32]int32 + func (l *OffsetForLeaderEpochRequest) Add(topic string, partition, leaderEpoch int32) + type OffsetResponse struct + Err error + type OffsetResponses map[string]map[int32]OffsetResponse + func (os *OffsetResponses) Add(o OffsetResponse) + func (os OffsetResponses) DeleteFunc(fn func(OffsetResponse) bool) + func (os OffsetResponses) Each(fn func(OffsetResponse)) + func (os OffsetResponses) EachError(fn func(o OffsetResponse)) + func (os OffsetResponses) Error() error + func (os OffsetResponses) KOffsets() map[string]map[int32]kgo.Offset + func (os OffsetResponses) Keep(o Offsets) + func (os OffsetResponses) KeepFunc(fn func(OffsetResponse) bool) + func (os OffsetResponses) Lookup(t string, p int32) (OffsetResponse, bool) + func (os OffsetResponses) Offsets() Offsets + func (os OffsetResponses) Ok() bool + func (os OffsetResponses) Partitions() TopicsSet + func (os OffsetResponses) Sorted() []OffsetResponse + type Offsets map[string]map[int32]Offset + func OffsetsFromFetches(fs kgo.Fetches) Offsets + func OffsetsFromRecords(rs ...kgo.Record) Offsets + func (os *Offsets) Add(o Offset) + func (os *Offsets) AddOffset(t string, p int32, o int64, leaderEpoch int32) + func (os Offsets) Delete(t string, p int32) + func (os Offsets) DeleteFunc(fn func(o Offset) bool) + func (os Offsets) Each(fn func(Offset)) + func (os Offsets) KOffsets() map[string]map[int32]kgo.Offset + func (os Offsets) KeepFunc(fn func(o Offset) bool) + func (os Offsets) Lookup(t string, p int32) (Offset, bool) + func (os Offsets) Sorted() []Offset + func (os Offsets) TopicsSet() TopicsSet + type OffsetsForLeaderEpochs map[string]map[int32]OffsetForLeaderEpoch + type OffsetsList []Offset + func (l OffsetsList) KOffsets() map[string]map[int32]kgo.Offset + func (l OffsetsList) Offsets() Offsets + type Partition struct + Partition int32 + Topic string + type PartitionDetail struct + Err error + ISR []int32 + Leader int32 + LeaderEpoch int32 + OfflineReplicas []int32 + Partition int32 + Replicas []int32 + Topic string + type PartitionDetails map[int32]PartitionDetail + func (ds PartitionDetails) NumReplicas() int + func (ds PartitionDetails) Numbers() []int32 + func (ds PartitionDetails) Sorted() []PartitionDetail + type Partitions []Partition + func (ps Partitions) TopicsList() TopicsList + func (ps Partitions) TopicsSet() TopicsSet + type Principal struct + Name string + Type string + type QuotasMatchType = kmsg.QuotasMatchType + type ResourceConfig struct + Configs []Config + Err error + Name string + type ResourceConfigs []ResourceConfig + func (rs ResourceConfigs) On(name string, fn func(*ResourceConfig) error) (ResourceConfig, error) + type ScramMechanism int8 + const ScramSha256 + const ScramSha512 + func (s ScramMechanism) String() string + type ShardError struct + Broker BrokerDetail + Err error + Req kmsg.Request + type ShardErrors struct + AllFailed bool + Errs []ShardError + Name string + func (e *ShardErrors) Error() string + type TopicDetail struct + Err error + ID TopicID + IsInternal bool + Partitions PartitionDetails + Topic string + type TopicDetails map[string]TopicDetail + func (ds TopicDetails) EachError(fn func(TopicDetail)) + func (ds TopicDetails) EachPartition(fn func(PartitionDetail)) + func (ds TopicDetails) FilterInternal() + func (ds TopicDetails) Has(topic string) bool + func (ds TopicDetails) Names() []string + func (ds TopicDetails) Sorted() []TopicDetail + func (ds TopicDetails) TopicsList() TopicsList + func (ds TopicDetails) TopicsSet() TopicsSet + type TopicID [16]byte + func (t TopicID) Less(other TopicID) bool + func (t TopicID) MarshalJSON() ([]byte, error) + func (t TopicID) String() string + type TopicLag struct + Lag int64 + Topic string + type TopicPartitions struct + Partitions []int32 + Topic string + type TopicsList []TopicPartitions + func (l TopicsList) Each(fn func(t string, p int32)) + func (l TopicsList) EachPartitions(fn func(t string, ps []int32)) + func (l TopicsList) EmptyTopics() []string + func (l TopicsList) IntoSet() TopicsSet + func (l TopicsList) Topics() []string + type TopicsSet map[string]map[int32]struct + func (s *TopicsSet) Add(t string, ps ...int32) + func (s TopicsSet) Delete(t string, ps ...int32) + func (s TopicsSet) Each(fn func(t string, p int32)) + func (s TopicsSet) EachPartitions(fn func(t string, ps []int32)) + func (s TopicsSet) EmptyTopics() []string + func (s TopicsSet) IntoList() TopicsList + func (s TopicsSet) Lookup(t string, p int32) bool + func (s TopicsSet) Merge(other TopicsSet) + func (s TopicsSet) Sorted() TopicsList + func (s TopicsSet) Topics() []string + type TxnMarkers struct + Commit bool + CoordinatorEpoch int32 + ProducerEpoch int16 + ProducerID int64 + Topics TopicsSet + type TxnMarkersPartitionResponse struct + Err error + NodeID int32 + Partition int32 + ProducerID int64 + Topic string + type TxnMarkersPartitionResponses map[int32]TxnMarkersPartitionResponse + func (ps TxnMarkersPartitionResponses) Each(fn func(TxnMarkersPartitionResponse)) + func (ps TxnMarkersPartitionResponses) Sorted() []TxnMarkersPartitionResponse + type TxnMarkersResponse struct + ProducerID int64 + Topics TxnMarkersTopicResponses + type TxnMarkersResponses map[int64]TxnMarkersResponse + func (ms TxnMarkersResponses) Each(fn func(TxnMarkersResponse)) + func (ms TxnMarkersResponses) EachPartition(fn func(TxnMarkersPartitionResponse)) + func (ms TxnMarkersResponses) EachTopic(fn func(TxnMarkersTopicResponse)) + func (ms TxnMarkersResponses) Sorted() []TxnMarkersResponse + func (ms TxnMarkersResponses) SortedPartitions() []TxnMarkersPartitionResponse + func (ms TxnMarkersResponses) SortedTopics() []TxnMarkersTopicResponse + type TxnMarkersTopicResponse struct + Partitions TxnMarkersPartitionResponses + ProducerID int64 + Topic string + type TxnMarkersTopicResponses map[string]TxnMarkersTopicResponse + func (ts TxnMarkersTopicResponses) Each(fn func(TxnMarkersTopicResponse)) + func (ts TxnMarkersTopicResponses) EachPartition(fn func(TxnMarkersPartitionResponse)) + func (ts TxnMarkersTopicResponses) Sorted() []TxnMarkersTopicResponse + func (ts TxnMarkersTopicResponses) SortedPartitions() []TxnMarkersPartitionResponse + type UpsertSCRAM struct + Iterations int32 + Mechanism ScramMechanism + Password string + Salt []byte + SaltedPassword []byte + User string