Documentation ¶
Overview ¶
Package kadm provides a helper Kafka admin client around a *kgo.Client.
This package is meant to cover the common use cases for dropping into an "admin" like interface for Kafka. As with any admin client, this package must make opinionated decisions on what to provide and what to hide. The underlying Kafka protocol gives more detailed information in responses, or allows more fine tuning in requests, but most of the time, these details are unnecessary.
By virtue of making opinionated decisions, this package cannot satisfy every need for requests and responses. If you need more control than this admin client provides, you can use the kmsg package directly.
This package contains a lot of types, but the main two types type to know are Client and ShardErrors. Every other type is used for inputs or outputs to methods on the client.
The Client type is a simple small wrapper around a *kgo.Client that exists solely to namespace methods. The ShardErrors type is a bit more complicated. When issuing requests, under the hood some of these requests actually need to be mapped to brokers and split, issuing different pieces of the input request to different brokers. The *kgo.Client handles this all internally, but (if using RequestSharded as directed), returns each response to each of these split requests individually. Each response can fail or be successful. This package goes one step further and merges these failures into one meta failure, ShardErrors. Any function that returns ShardErrors is documented as such, and if a function returns a non-nil ShardErrors, it is possible that the returned data is actually valid and usable. If you care to, you can log / react to the partial failures and continue using the partial successful result. This is in contrast to other clients, which either require to to request individual brokers directly, or they completely hide individual failures, or they completely fail on any individual failure.
For methods that list or describe things, this package often completely fails responses on auth failures. If you use a method that accepts two topics, one that you are authorized to and one that you are not, you will not receive a partial successful response. Instead, you will receive an AuthError. Methods that do *not* fail on auth errors are explicitly documented as such.
Users may often find it easy to work with lists of topics or partitions. Rather than needing to build deeply nested maps directly, this package has a few helper types that are worth knowing:
TopicsList - a slice of topics and their partitions TopicsSet - a set of topics, each containing a set of partitions Partitions - a slice of partitions OffsetsList - a slice of offsets Offsets - a map of offsets
These types are meant to be easy to build and use, and can be used as the starting point for other types.
Index ¶
- func StringPtr(s string) *string
- type ACLBuilder
- func (b *ACLBuilder) Allow(principals ...string) *ACLBuilder
- func (b *ACLBuilder) AllowHosts(hosts ...string) *ACLBuilder
- func (b *ACLBuilder) AnyResource(name ...string) *ACLBuilder
- func (b *ACLBuilder) Clusters() *ACLBuilder
- func (b *ACLBuilder) DelegationTokens(t ...string) *ACLBuilder
- func (b *ACLBuilder) Deny(principals ...string) *ACLBuilder
- func (b *ACLBuilder) DenyHosts(hosts ...string) *ACLBuilder
- func (b *ACLBuilder) Groups(g ...string) *ACLBuilder
- func (b *ACLBuilder) HasAnyFilter() bool
- func (b *ACLBuilder) HasHosts() bool
- func (b *ACLBuilder) HasPrincipals() bool
- func (b *ACLBuilder) HasResource() bool
- func (b *ACLBuilder) MaybeAllow(principals ...string) *ACLBuilder
- func (b *ACLBuilder) MaybeAllowHosts(hosts ...string) *ACLBuilder
- func (b *ACLBuilder) MaybeClusters(c bool) *ACLBuilder
- func (b *ACLBuilder) MaybeDelegationTokens(t ...string) *ACLBuilder
- func (b *ACLBuilder) MaybeDeny(principals ...string) *ACLBuilder
- func (b *ACLBuilder) MaybeDenyHosts(hosts ...string) *ACLBuilder
- func (b *ACLBuilder) MaybeGroups(g ...string) *ACLBuilder
- func (b *ACLBuilder) MaybeOperations(operations ...ACLOperation) *ACLBuilder
- func (b *ACLBuilder) MaybeTopics(t ...string) *ACLBuilder
- func (b *ACLBuilder) MaybeTransactionalIDs(x ...string) *ACLBuilder
- func (b *ACLBuilder) Operations(operations ...ACLOperation) *ACLBuilder
- func (b *ACLBuilder) PrefixUser()
- func (b *ACLBuilder) PrefixUserExcept(except ...string)
- func (b *ACLBuilder) ResourcePatternType(pattern ACLPattern) *ACLBuilder
- func (b *ACLBuilder) Topics(t ...string) *ACLBuilder
- func (b *ACLBuilder) TransactionalIDs(x ...string) *ACLBuilder
- func (b *ACLBuilder) ValidateCreate() error
- func (b *ACLBuilder) ValidateDelete() error
- func (b *ACLBuilder) ValidateDescribe() error
- func (b *ACLBuilder) ValidateFilter() error
- type ACLOperation
- type ACLPattern
- type AlterAllReplicaLogDirsResponses
- type AlterClientQuotaEntry
- type AlterClientQuotaOp
- type AlterConfig
- type AlterConfigsResponse
- type AlterConfigsResponses
- type AlterPartitionAssignmentsReq
- type AlterPartitionAssignmentsResponse
- type AlterPartitionAssignmentsResponses
- type AlterReplicaLogDirsReq
- type AlterReplicaLogDirsResponse
- type AlterReplicaLogDirsResponses
- type AlteredClientQuota
- type AlteredClientQuotas
- type AlteredUserSCRAM
- type AlteredUserSCRAMs
- func (as AlteredUserSCRAMs) AllFailed() bool
- func (as AlteredUserSCRAMs) Each(fn func(AlteredUserSCRAM))
- func (as AlteredUserSCRAMs) EachError(fn func(AlteredUserSCRAM))
- func (as AlteredUserSCRAMs) Error() error
- func (as AlteredUserSCRAMs) Ok() bool
- func (as AlteredUserSCRAMs) Sorted() []AlteredUserSCRAM
- type AuthError
- type BrokerApiVersions
- func (v *BrokerApiVersions) EachKeySorted(fn func(key, min, max int16))
- func (v *BrokerApiVersions) KeyMaxVersion(key int16) (max int16, exists bool)
- func (v *BrokerApiVersions) KeyMinVersion(key int16) (min int16, exists bool)
- func (v *BrokerApiVersions) KeyVersions(key int16) (min, max int16, exists bool)
- func (v *BrokerApiVersions) Raw() *kmsg.ApiVersionsResponse
- func (v *BrokerApiVersions) VersionGuess(opt ...kversion.VersionGuessOpt) string
- type BrokerDetail
- type BrokerDetails
- type BrokersApiVersions
- type Client
- func (cl *Client) AlterAllReplicaLogDirs(ctx context.Context, alter AlterReplicaLogDirsReq) (AlterAllReplicaLogDirsResponses, error)
- func (cl *Client) AlterBrokerConfigs(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error)
- func (cl *Client) AlterBrokerReplicaLogDirs(ctx context.Context, broker int32, alter AlterReplicaLogDirsReq) (AlterReplicaLogDirsResponses, error)
- func (cl *Client) AlterClientQuotas(ctx context.Context, entries []AlterClientQuotaEntry) (AlteredClientQuotas, error)
- func (cl *Client) AlterPartitionAssignments(ctx context.Context, req AlterPartitionAssignmentsReq) (AlterPartitionAssignmentsResponses, error)
- func (cl *Client) AlterTopicConfigs(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error)
- func (cl *Client) AlterUserSCRAMs(ctx context.Context, del []DeleteSCRAM, upsert []UpsertSCRAM) (AlteredUserSCRAMs, error)
- func (cl *Client) ApiVersions(ctx context.Context) (BrokersApiVersions, error)
- func (cl *Client) BrokerMetadata(ctx context.Context) (Metadata, error)
- func (cl *Client) Close()
- func (cl *Client) CommitAllOffsets(ctx context.Context, group string, os Offsets) error
- func (cl *Client) CommitOffsets(ctx context.Context, group string, os Offsets) (OffsetResponses, error)
- func (cl *Client) CreateACLs(ctx context.Context, b *ACLBuilder) (CreateACLsResults, error)
- func (cl *Client) CreateDelegationToken(ctx context.Context, d CreateDelegationToken) (DelegationToken, error)
- func (cl *Client) CreatePartitions(ctx context.Context, add int, topics ...string) (CreatePartitionsResponses, error)
- func (cl *Client) CreateTopics(ctx context.Context, partitions int32, replicationFactor int16, ...) (CreateTopicResponses, error)
- func (cl *Client) DeleteACLs(ctx context.Context, b *ACLBuilder) (DeleteACLsResults, error)
- func (cl *Client) DeleteGroups(ctx context.Context, groups ...string) (DeleteGroupResponses, error)
- func (cl *Client) DeleteOffsets(ctx context.Context, group string, s TopicsSet) (DeleteOffsetsResponses, error)
- func (cl *Client) DeleteRecords(ctx context.Context, os Offsets) (DeleteRecordsResponses, error)
- func (cl *Client) DeleteTopics(ctx context.Context, topics ...string) (DeleteTopicResponses, error)
- func (cl *Client) DescribeACLs(ctx context.Context, b *ACLBuilder) (DescribeACLsResults, error)
- func (cl *Client) DescribeAllLogDirs(ctx context.Context, s TopicsSet) (DescribedAllLogDirs, error)
- func (cl *Client) DescribeBrokerConfigs(ctx context.Context, brokers ...int32) (ResourceConfigs, error)
- func (cl *Client) DescribeBrokerLogDirs(ctx context.Context, broker int32, s TopicsSet) (DescribedLogDirs, error)
- func (cl *Client) DescribeClientQuotas(ctx context.Context, strict bool, ...) (DescribedClientQuotas, error)
- func (cl *Client) DescribeDelegationTokens(ctx context.Context, owners ...Principal) (DelegationTokens, error)
- func (cl *Client) DescribeGroups(ctx context.Context, groups ...string) (DescribedGroups, error)
- func (cl *Client) DescribeProducers(ctx context.Context, s TopicsSet) (DescribedProducersTopics, error)
- func (cl *Client) DescribeTopicConfigs(ctx context.Context, topics ...string) (ResourceConfigs, error)
- func (cl *Client) DescribeTransactions(ctx context.Context, txnIDs ...string) (DescribedTransactions, error)
- func (cl *Client) DescribeUserSCRAMs(ctx context.Context, users ...string) (DescribedUserSCRAMs, error)
- func (cl *Client) ElectLeaders(ctx context.Context, how ElectLeadersHow, s TopicsSet) (ElectLeadersResults, error)
- func (cl *Client) ExpireDelegationToken(ctx context.Context, hmac []byte, expiry time.Duration) (expiryTimestamp time.Time, err error)
- func (cl *Client) FetchManyOffsets(ctx context.Context, groups ...string) FetchOffsetsResponses
- func (cl *Client) FetchOffsets(ctx context.Context, group string) (OffsetResponses, error)
- func (cl *Client) FetchOffsetsForTopics(ctx context.Context, group string, topics ...string) (OffsetResponses, error)
- func (cl *Client) FindGroupCoordinators(ctx context.Context, groups ...string) FindCoordinatorResponses
- func (cl *Client) FindTxnCoordinators(ctx context.Context, txnIDs ...string) FindCoordinatorResponses
- func (cl *Client) LeaveGroup(ctx context.Context, b *LeaveGroupBuilder) (LeaveGroupResponses, error)
- func (cl *Client) ListBrokers(ctx context.Context) (BrokerDetails, error)
- func (cl *Client) ListCommittedOffsets(ctx context.Context, topics ...string) (ListedOffsets, error)
- func (cl *Client) ListEndOffsets(ctx context.Context, topics ...string) (ListedOffsets, error)
- func (cl *Client) ListGroups(ctx context.Context, filterStates ...string) (ListedGroups, error)
- func (cl *Client) ListOffsetsAfterMilli(ctx context.Context, millisecond int64, topics ...string) (ListedOffsets, error)
- func (cl *Client) ListPartitionReassignments(ctx context.Context, s TopicsSet) (ListPartitionReassignmentsResponses, error)
- func (cl *Client) ListStartOffsets(ctx context.Context, topics ...string) (ListedOffsets, error)
- func (cl *Client) ListTopics(ctx context.Context, topics ...string) (TopicDetails, error)
- func (cl *Client) ListTopicsWithInternal(ctx context.Context, topics ...string) (TopicDetails, error)
- func (cl *Client) ListTransactions(ctx context.Context, producerIDs []int64, filterStates []string) (ListedTransactions, error)
- func (cl *Client) Metadata(ctx context.Context, topics ...string) (Metadata, error)
- func (cl *Client) OffetForLeaderEpoch(ctx context.Context, r OffsetForLeaderEpochRequest) (OffsetsForLeaderEpochs, error)
- func (cl *Client) RenewDelegationToken(ctx context.Context, hmac []byte, renewTime time.Duration) (expiryTimestamp time.Time, err error)
- func (cl *Client) SetTimeoutMillis(millis int32)
- func (cl *Client) UpdatePartitions(ctx context.Context, set int, topics ...string) (CreatePartitionsResponses, error)
- func (cl *Client) ValidateAlterBrokerConfigs(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error)
- func (cl *Client) ValidateAlterClientQuotas(ctx context.Context, entries []AlterClientQuotaEntry) (AlteredClientQuotas, error)
- func (cl *Client) ValidateAlterTopicConfigs(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error)
- func (cl *Client) ValidateCreatePartitions(ctx context.Context, add int, topics ...string) (CreatePartitionsResponses, error)
- func (cl *Client) ValidateCreateTopics(ctx context.Context, partitions int32, replicationFactor int16, ...) (CreateTopicResponses, error)
- func (cl *Client) ValidateUpdatePartitions(ctx context.Context, set int, topics ...string) (CreatePartitionsResponses, error)
- func (cl *Client) WriteTxnMarkers(ctx context.Context, markers ...TxnMarkers) (TxnMarkersResponses, error)
- type ClientQuotaEntity
- type ClientQuotaEntityComponent
- type ClientQuotaValue
- type ClientQuotaValues
- type Config
- type ConfigSynonym
- type CreateACLsResult
- type CreateACLsResults
- type CreateDelegationToken
- type CreatePartitionsResponse
- type CreatePartitionsResponses
- type CreateTopicResponse
- type CreateTopicResponses
- type CredInfo
- type DelegationToken
- type DelegationTokens
- type DeleteACLsResult
- type DeleteACLsResults
- type DeleteGroupResponse
- type DeleteGroupResponses
- type DeleteOffsetsResponses
- type DeleteRecordsResponse
- type DeleteRecordsResponses
- func (ds DeleteRecordsResponses) Each(fn func(DeleteRecordsResponse))
- func (ds DeleteRecordsResponses) Lookup(t string, p int32) (DeleteRecordsResponse, bool)
- func (rs DeleteRecordsResponses) On(topic string, partition int32, fn func(*DeleteRecordsResponse) error) (DeleteRecordsResponse, error)
- func (rs DeleteRecordsResponses) Sorted() []DeleteRecordsResponse
- type DeleteSCRAM
- type DeleteTopicResponse
- type DeleteTopicResponses
- type DeletedACL
- type DeletedACLs
- type DescribeACLsResult
- type DescribeACLsResults
- type DescribeClientQuotaComponent
- type DescribedACL
- type DescribedACLs
- type DescribedAllLogDirs
- type DescribedClientQuota
- type DescribedClientQuotas
- type DescribedGroup
- type DescribedGroupMember
- type DescribedGroups
- type DescribedLogDir
- type DescribedLogDirPartition
- type DescribedLogDirTopics
- func (ds DescribedLogDirTopics) Each(fn func(p DescribedLogDirPartition))
- func (ds DescribedLogDirTopics) Lookup(t string, p int32) (DescribedLogDirPartition, bool)
- func (ds DescribedLogDirTopics) Size() int64
- func (ds DescribedLogDirTopics) Sorted() []DescribedLogDirPartition
- func (ds DescribedLogDirTopics) SortedBySize() []DescribedLogDirPartition
- type DescribedLogDirs
- func (ds DescribedLogDirs) Each(fn func(DescribedLogDir))
- func (ds DescribedLogDirs) EachError(fn func(DescribedLogDir))
- func (ds DescribedLogDirs) EachPartition(fn func(d DescribedLogDirPartition))
- func (ds DescribedLogDirs) Error() error
- func (ds DescribedLogDirs) LargestPartitionBySize() (DescribedLogDirPartition, bool)
- func (ds DescribedLogDirs) Lookup(d, t string, p int32) (DescribedLogDirPartition, bool)
- func (ds DescribedLogDirs) LookupPartition(t string, p int32) (DescribedLogDirPartition, bool)
- func (ds DescribedLogDirs) Ok() bool
- func (ds DescribedLogDirs) Size() int64
- func (ds DescribedLogDirs) SmallestPartitionBySize() (DescribedLogDirPartition, bool)
- func (ds DescribedLogDirs) Sorted() []DescribedLogDir
- func (ds DescribedLogDirs) SortedBySize() []DescribedLogDir
- func (ds DescribedLogDirs) SortedPartitions() []DescribedLogDirPartition
- func (ds DescribedLogDirs) SortedPartitionsBySize() []DescribedLogDirPartition
- type DescribedProducer
- type DescribedProducers
- type DescribedProducersPartition
- type DescribedProducersPartitions
- func (ds DescribedProducersPartitions) Each(fn func(DescribedProducersPartition))
- func (ds DescribedProducersPartitions) EachProducer(fn func(DescribedProducer))
- func (ds DescribedProducersPartitions) Sorted() []DescribedProducersPartition
- func (ds DescribedProducersPartitions) SortedProducers() []DescribedProducer
- type DescribedProducersTopic
- type DescribedProducersTopics
- func (ds DescribedProducersTopics) Each(fn func(DescribedProducersTopic))
- func (ds DescribedProducersTopics) EachPartition(fn func(DescribedProducersPartition))
- func (ds DescribedProducersTopics) EachProducer(fn func(DescribedProducer))
- func (ds DescribedProducersTopics) Sorted() []DescribedProducersTopic
- func (ds DescribedProducersTopics) SortedPartitions() []DescribedProducersPartition
- func (ds DescribedProducersTopics) SortedProducers() []DescribedProducer
- type DescribedTransaction
- type DescribedTransactions
- func (ds DescribedTransactions) Each(fn func(DescribedTransaction))
- func (rs DescribedTransactions) On(txnID string, fn func(*DescribedTransaction) error) (DescribedTransaction, error)
- func (ds DescribedTransactions) Sorted() []DescribedTransaction
- func (ds DescribedTransactions) TransactionalIDs() []string
- type DescribedUserSCRAM
- type DescribedUserSCRAMs
- func (ds DescribedUserSCRAMs) AllFailed() bool
- func (ds DescribedUserSCRAMs) Each(fn func(DescribedUserSCRAM))
- func (ds DescribedUserSCRAMs) EachError(fn func(DescribedUserSCRAM))
- func (ds DescribedUserSCRAMs) Error() error
- func (ds DescribedUserSCRAMs) Ok() bool
- func (ds DescribedUserSCRAMs) Sorted() []DescribedUserSCRAM
- type ElectLeadersHow
- type ElectLeadersResult
- type ElectLeadersResults
- type FetchOffsetsResponse
- type FetchOffsetsResponses
- func (rs FetchOffsetsResponses) AllFailed() bool
- func (rs FetchOffsetsResponses) CommittedPartitions() TopicsSet
- func (rs FetchOffsetsResponses) EachError(fn func(FetchOffsetsResponse))
- func (rs FetchOffsetsResponses) On(group string, fn func(*FetchOffsetsResponse) error) (FetchOffsetsResponse, error)
- type FindCoordinatorResponse
- type FindCoordinatorResponses
- func (rs FindCoordinatorResponses) AllFailed() bool
- func (rs FindCoordinatorResponses) Each(fn func(FindCoordinatorResponse))
- func (rs FindCoordinatorResponses) EachError(fn func(FindCoordinatorResponse))
- func (rs FindCoordinatorResponses) Error() error
- func (rs FindCoordinatorResponses) Ok() bool
- func (rs FindCoordinatorResponses) Sorted() []FindCoordinatorResponse
- type GroupLag
- type GroupMemberAssignment
- type GroupMemberLag
- type GroupMemberMetadata
- type GroupTopicsLag
- type IncrementalOp
- type LeaveGroupBuilder
- type LeaveGroupResponse
- type LeaveGroupResponses
- type ListPartitionReassignmentsResponse
- type ListPartitionReassignmentsResponses
- type ListedGroup
- type ListedGroups
- type ListedOffset
- type ListedOffsets
- type ListedTransaction
- type ListedTransactions
- type Metadata
- type Offset
- type OffsetForLeaderEpoch
- type OffsetForLeaderEpochRequest
- type OffsetResponse
- type OffsetResponses
- func (os *OffsetResponses) Add(o OffsetResponse)
- func (os OffsetResponses) DeleteFunc(fn func(OffsetResponse) bool)
- func (os OffsetResponses) Each(fn func(OffsetResponse))
- func (os OffsetResponses) EachError(fn func(o OffsetResponse))
- func (os OffsetResponses) Error() error
- func (os OffsetResponses) KOffsets() map[string]map[int32]kgo.Offset
- func (os OffsetResponses) Keep(o Offsets)
- func (os OffsetResponses) KeepFunc(fn func(OffsetResponse) bool)
- func (os OffsetResponses) Lookup(t string, p int32) (OffsetResponse, bool)
- func (os OffsetResponses) Offsets() Offsets
- func (os OffsetResponses) Ok() bool
- func (os OffsetResponses) Partitions() TopicsSet
- func (os OffsetResponses) Sorted() []OffsetResponse
- type Offsets
- func (os *Offsets) Add(o Offset)
- func (os *Offsets) AddOffset(t string, p int32, o int64, leaderEpoch int32)
- func (os Offsets) Delete(t string, p int32)
- func (os Offsets) DeleteFunc(fn func(o Offset) bool)
- func (os Offsets) Each(fn func(Offset))
- func (os Offsets) KOffsets() map[string]map[int32]kgo.Offset
- func (os Offsets) KeepFunc(fn func(o Offset) bool)
- func (os Offsets) Lookup(t string, p int32) (Offset, bool)
- func (os Offsets) Sorted() []Offset
- func (os Offsets) TopicsSet() TopicsSet
- type OffsetsForLeaderEpochs
- type OffsetsList
- type Partition
- type PartitionDetail
- type PartitionDetails
- type Partitions
- type Principal
- type QuotasMatchType
- type ResourceConfig
- type ResourceConfigs
- type ScramMechanism
- type ShardError
- type ShardErrors
- type TopicDetail
- type TopicDetails
- func (ds TopicDetails) EachError(fn func(TopicDetail))
- func (ds TopicDetails) EachPartition(fn func(PartitionDetail))
- func (ds TopicDetails) FilterInternal()
- func (ds TopicDetails) Has(topic string) bool
- func (ds TopicDetails) Names() []string
- func (ds TopicDetails) Sorted() []TopicDetail
- func (ds TopicDetails) TopicsList() TopicsList
- func (ds TopicDetails) TopicsSet() TopicsSet
- type TopicID
- type TopicLag
- type TopicPartitions
- type TopicsList
- type TopicsSet
- func (s *TopicsSet) Add(t string, ps ...int32)
- func (s TopicsSet) Delete(t string, ps ...int32)
- func (s TopicsSet) Each(fn func(t string, p int32))
- func (s TopicsSet) EachPartitions(fn func(t string, ps []int32))
- func (s TopicsSet) EmptyTopics() []string
- func (s TopicsSet) IntoList() TopicsList
- func (s TopicsSet) Lookup(t string, p int32) bool
- func (s TopicsSet) Merge(other TopicsSet)
- func (s TopicsSet) Sorted() TopicsList
- func (s TopicsSet) Topics() []string
- type TxnMarkers
- type TxnMarkersPartitionResponse
- type TxnMarkersPartitionResponses
- type TxnMarkersResponse
- type TxnMarkersResponses
- func (ms TxnMarkersResponses) Each(fn func(TxnMarkersResponse))
- func (ms TxnMarkersResponses) EachPartition(fn func(TxnMarkersPartitionResponse))
- func (ms TxnMarkersResponses) EachTopic(fn func(TxnMarkersTopicResponse))
- func (ms TxnMarkersResponses) Sorted() []TxnMarkersResponse
- func (ms TxnMarkersResponses) SortedPartitions() []TxnMarkersPartitionResponse
- func (ms TxnMarkersResponses) SortedTopics() []TxnMarkersTopicResponse
- type TxnMarkersTopicResponse
- type TxnMarkersTopicResponses
- func (ts TxnMarkersTopicResponses) Each(fn func(TxnMarkersTopicResponse))
- func (ts TxnMarkersTopicResponses) EachPartition(fn func(TxnMarkersPartitionResponse))
- func (ts TxnMarkersTopicResponses) Sorted() []TxnMarkersTopicResponse
- func (ts TxnMarkersTopicResponses) SortedPartitions() []TxnMarkersPartitionResponse
- type UpsertSCRAM
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ACLBuilder ¶
type ACLBuilder struct {
// contains filtered or unexported fields
}
ACLBuilder is a builder that is used for batch creating / listing / deleting ACLS.
An ACL consists of five components:
- the user (principal)
- the host the user runs on
- what resource to access (topic name, group id, etc.)
- the operation (read, write)
- whether to allow or deny the above
This builder allows for adding the above five components in batches and then creating, listing, or deleting a batch of ACLs in one go. This builder merges the fifth component (allowing or denying) into allowing principals and hosts and denying principals and hosts. The builder must always have an Allow or Deny. For creating, the host is optional and defaults to the wildcard * that allows or denies all hosts. For listing / deleting, the host is also required (specifying no hosts matches all hosts, but you must specify this).
Building works on a multiplying factor: every user, every host, every resource, and every operation is combined (principals * hosts * resources * operations).
With the Kafka simple authorizer (and most reimplementations), all principals are required to have the "User:" prefix. The PrefixUserExcept function can be used to easily add the "User:" prefix if missing.
The full set of operations and which requests require what operations is described in a large doc comment on the ACLOperation type.
Lastly, resources to access / deny access to can be created / matched based on literal (exact) names, or on prefix names, or more. See the ACLPattern docs for more information.
func (*ACLBuilder) Allow ¶
func (b *ACLBuilder) Allow(principals ...string) *ACLBuilder
Allow sets the principals to add allow permissions for. For listing and deleting, you must also use AllowHosts.
This returns the input pointer.
For creating, if this is not paired with AllowHosts, the user will have access to all hosts (the wildcard *).
For listing & deleting, if the principals are empty, this matches any user.
func (*ACLBuilder) AllowHosts ¶
func (b *ACLBuilder) AllowHosts(hosts ...string) *ACLBuilder
AllowHosts sets the hosts to add allow permissions for. If using this, you must also use Allow.
This returns the input pointer.
For creating, if this is empty, the user will have access to all hosts (the wildcard *) and this function is actually not necessary.
For listing & deleting, if the hosts are empty, this matches any host.
func (*ACLBuilder) AnyResource ¶
func (b *ACLBuilder) AnyResource(name ...string) *ACLBuilder
AnyResource lists & deletes ACLs of any type matching the given names (pending other filters). If no names are given, this matches all names.
This returns the input pointer.
This function does nothing for creating.
func (*ACLBuilder) Clusters ¶
func (b *ACLBuilder) Clusters() *ACLBuilder
Clusters lists/deletes/creates ACLs of resource type "cluster".
This returns the input pointer.
There is only one type of cluster in Kafka, "kafka-cluster". Opting in to listing or deleting by cluster inherently matches all ACLS of resource type cluster. For creating, this function allows for creating cluster ACLs.
func (*ACLBuilder) DelegationTokens ¶
func (b *ACLBuilder) DelegationTokens(t ...string) *ACLBuilder
DelegationTokens lists/deletes/creates ACLs of resource type "delegation_token" for the given delegation tokens.
This returns the input pointer.
For listing or deleting, if this is provided no tokens, all "delegation_token" resource type ACLs are matched. For creating, if no tokens are provided, this function does nothing.
func (*ACLBuilder) Deny ¶
func (b *ACLBuilder) Deny(principals ...string) *ACLBuilder
Deny sets the principals to add deny permissions for. For listing and deleting, you must also use DenyHosts.
This returns the input pointer.
For creating, if this is not paired with DenyHosts, the user will be denied access to all hosts (the wildcard *).
For listing & deleting, if the principals are empty, this matches any user.
func (*ACLBuilder) DenyHosts ¶
func (b *ACLBuilder) DenyHosts(hosts ...string) *ACLBuilder
DenyHosts sets the hosts to add deny permissions for. If using this, you must also use Deny.
This returns the input pointer.
For creating, if this is empty, the user will be denied access to all hosts (the wildcard *) and this function is actually not necessary.
For listing & deleting, if the hosts are empty, this matches any host.
func (*ACLBuilder) Groups ¶
func (b *ACLBuilder) Groups(g ...string) *ACLBuilder
Groups lists/deletes/creates ACLs of resource type "group" for the given groups.
This returns the input pointer.
For listing or deleting, if this is provided no groups, all "group" resource type ACLs are matched. For creating, if no groups are provided, this function does nothing.
func (*ACLBuilder) HasAnyFilter ¶
func (b *ACLBuilder) HasAnyFilter() bool
HasAnyFilter returns whether any field in this builder is opted into "any", meaning a wide glob. This would be if you used Topics with no topics, and so on. This function can be used to detect if you accidentally opted into a non-specific ACL.
The evaluated fields are: resources, principals/hosts, a single OpAny operation, and an Any pattern.
func (*ACLBuilder) HasHosts ¶
func (b *ACLBuilder) HasHosts() bool
HasHosts returns if any allow or deny hosts have been set, or if their "any" field is true.
func (*ACLBuilder) HasPrincipals ¶
func (b *ACLBuilder) HasPrincipals() bool
HasPrincipals returns if any allow or deny principals have been set, or if their "any" field is true.
func (*ACLBuilder) HasResource ¶
func (b *ACLBuilder) HasResource() bool
HasResource returns true if the builder has a non-empty resource (topic, group, ...), or if any resource has "any" set to true.
func (*ACLBuilder) MaybeAllow ¶
func (b *ACLBuilder) MaybeAllow(principals ...string) *ACLBuilder
MaybeAllow is the same as Allow, but does not match all allowed principals if none are provided.
func (*ACLBuilder) MaybeAllowHosts ¶
func (b *ACLBuilder) MaybeAllowHosts(hosts ...string) *ACLBuilder
MaybeAllowHosts is the same as AllowHosts, but does not match all allowed hosts if none are provided.
func (*ACLBuilder) MaybeClusters ¶
func (b *ACLBuilder) MaybeClusters(c bool) *ACLBuilder
MaybeClusters is the same as Clusters, but only matches clusters if c is true.
func (*ACLBuilder) MaybeDelegationTokens ¶
func (b *ACLBuilder) MaybeDelegationTokens(t ...string) *ACLBuilder
MaybeDelegationTokens is the same as DelegationTokens, but does not match all tokens if none are provided.
func (*ACLBuilder) MaybeDeny ¶
func (b *ACLBuilder) MaybeDeny(principals ...string) *ACLBuilder
MaybeDeny is the same as Deny, but does not match all denied principals if none are provided.
func (*ACLBuilder) MaybeDenyHosts ¶
func (b *ACLBuilder) MaybeDenyHosts(hosts ...string) *ACLBuilder
MaybeDenyHosts is the same as DenyHosts, but does not match all denied hosts if none are provided.
func (*ACLBuilder) MaybeGroups ¶
func (b *ACLBuilder) MaybeGroups(g ...string) *ACLBuilder
MaybeGroups is the same as Groups, but does not match all groups if none are provided.
func (*ACLBuilder) MaybeOperations ¶
func (b *ACLBuilder) MaybeOperations(operations ...ACLOperation) *ACLBuilder
MaybeOperations is the same as Operations, but does not match all operations if none are provided.
func (*ACLBuilder) MaybeTopics ¶
func (b *ACLBuilder) MaybeTopics(t ...string) *ACLBuilder
MaybeTopics is the same as Topics, but does not match all topics if none are provided.
func (*ACLBuilder) MaybeTransactionalIDs ¶
func (b *ACLBuilder) MaybeTransactionalIDs(x ...string) *ACLBuilder
MaybeTransactionalIDs is the same as TransactionalIDs, but does not match all transactional ID's if none are provided.
func (*ACLBuilder) Operations ¶
func (b *ACLBuilder) Operations(operations ...ACLOperation) *ACLBuilder
Operations sets operations to allow or deny. Passing no operations defaults to OpAny.
This returns the input pointer.
For creating, OpAny returns an error, for it is strictly used for filters (listing & deleting).
func (*ACLBuilder) PrefixUser ¶
func (b *ACLBuilder) PrefixUser()
PrefixUser prefixes all allowed and denied principals with "User:".
func (*ACLBuilder) PrefixUserExcept ¶
func (b *ACLBuilder) PrefixUserExcept(except ...string)
PrefixUserExcept prefixes all allowed and denied principals with "User:", unless they have any of the given except prefixes.
func (*ACLBuilder) ResourcePatternType ¶
func (b *ACLBuilder) ResourcePatternType(pattern ACLPattern) *ACLBuilder
ResourcePatternType sets the pattern type to use when creating or filtering ACL resource names, overriding the default of LITERAL.
This returns the input pointer.
For creating, only LITERAL and PREFIXED are supported.
func (*ACLBuilder) Topics ¶
func (b *ACLBuilder) Topics(t ...string) *ACLBuilder
Topics lists/deletes/creates ACLs of resource type "topic" for the given topics.
This returns the input pointer.
For listing or deleting, if this is provided no topics, all "topic" resource type ACLs are matched. For creating, if no topics are provided, this function does nothing.
func (*ACLBuilder) TransactionalIDs ¶
func (b *ACLBuilder) TransactionalIDs(x ...string) *ACLBuilder
TransactionalIDs lists/deletes/creates ACLs of resource type "transactional_id" for the given transactional IDs.
This returns the input pointer.
For listing or deleting, if this is provided no IDs, all "transactional_id" resource type ACLs matched. For creating, if no IDs are provided, this function does nothing.
func (*ACLBuilder) ValidateCreate ¶
func (b *ACLBuilder) ValidateCreate() error
ValidateCreate returns an error if the builder is invalid for creating ACLs.
func (*ACLBuilder) ValidateDelete ¶
func (b *ACLBuilder) ValidateDelete() error
ValidateDelete is an alias for ValidateFilter.
func (*ACLBuilder) ValidateDescribe ¶
func (b *ACLBuilder) ValidateDescribe() error
ValidateDescribe is an alias for ValidateFilter.
func (*ACLBuilder) ValidateFilter ¶
func (b *ACLBuilder) ValidateFilter() error
ValidateFilter returns an error if the builder is invalid for deleting or describing ACLs (which both operate on a filter basis).
type ACLOperation ¶
type ACLOperation = kmsg.ACLOperation
ACLOperation is a type alias for kmsg.ACLOperation, which is an enum containing all Kafka ACL operations and has helper functions.
Kafka requests require the following operations (broker <=> broker ACLs elided):
PRODUCING/CONSUMING =================== Produce WRITE on TOPIC for topics WRITE on TRANSACTIONAL_ID for txn id (if transactionally producing) Fetch READ on TOPIC for topics ListOffsets DESCRIBE on TOPIC for topics Metadata DESCRIBE on TOPIC for topics CREATE on CLUSTER for kafka-cluster (if automatically creating new topics) CREATE on TOPIC for topics (if automatically creating new topics) OffsetForLeaderEpoch DESCRIBE on TOPIC for topics GROUPS ====== FindCoordinator DESCRIBE on GROUP for group (if finding group coordinator) DESCRIBE on TRANSACTIONAL_ID for id (if finding transactiona coordinator) OffsetCommit READ on GROUP for group READ on TOPIC for topics OffsetFetch DESCRIBE on GROUP for group DESCRIBE on TOPIC for topics OffsetDelete DELETE on GROUP For group READ on TOPIC for topics JoinGroup READ on GROUP for group Heartbeat READ on GROUP for group LeaveGroup READ on GROUP for group SyncGroup READ on GROUP for group DescribeGroup DESCRIBE on GROUP for groups ListGroups DESCRIBE on GROUP for groups or, DESCRIBE on CLUSTER for kafka-cluster DeleteGroups DELETE on GROUP for groups TRANSACTIONS (including FindCoordinator above) ============ InitProducerID WRITE on TRANSACTIONAL_ID for id, if using transactions or, IDEMPOTENT_WRITE on CLUSTER for kafka-cluster, if pre Kafka 3.0 or, WRITE on TOPIC for any topic, if Kafka 3.0+ AddPartitionsToTxn WRITE on TRANSACTIONAL_ID for id WRITE on TOPIC for topics AddOffsetsToTxn WRITE on TRANSACTIONAL_ID for id READ on GROUP for group EndTxn WRITE on TRANSACTIONAL_ID for id TxnOffsetCommit WRITE on TRANSACTIONAL_ID for id READ on GROUP for group READ on TOPIC for topics TOPIC ADMIN =========== CreateTopics CREATE on CLUSTER for kafka-cluster CREATE on TOPIC for topics DESCRIBE_CONFIGS on TOPIC for topics, for returning topic configs on create CreatePartitions ALTER on TOPIC for topics DeleteTopics DELETE on TOPIC for topics DESCRIBE on TOPIC for topics, if deleting by topic id (in addition to prior ACL) DeleteRecords DELETE on TOPIC for topics CONFIG ADMIN ============ DescribeConfigs DESCRIBE_CONFIGS on CLUSTER for kafka-cluster, for broker or broker-logger describing DESCRIBE_CONFIGS on TOPIC for topics, for topic describing AlterConfigs ALTER_CONFIGS on CLUSTER for kafka-cluster, for broker altering ALTER_CONFIGS on TOPIC for topics, for topic altering IncrementalAlterConfigs ALTER_CONFIGS on CLUSTER for kafka-cluster, for broker or broker-logger altering ALTER_CONFIGS on TOPIC for topics, for topic altering MISC ADMIN ========== AlterReplicaLogDirs ALTER on CLUSTER for kafka-cluster DescribeLogDirs DESCRIBE on CLUSTER for kafka-cluster AlterPartitionAssignments ALTER on CLUSTER for kafka-cluster ListPartitionReassignments DESCRIBE on CLUSTER for kafka-cluster DescribeDelegationTokens DESCRIBE on DELEGATION_TOKEN for id ElectLeaders ALTER on CLUSTER for kafka-cluster DescribeClientQuotas DESCRIBE_CONFIGS on CLUSTER for kafka-cluster AlterClientQuotas ALTER_CONFIGS on CLUSTER for kafka-cluster DescribeUserScramCredentials DESCRIBE on CLUSTER for kafka-cluster AlterUserScramCredentials ALTER on CLUSTER for kafka-cluster UpdateFeatures ALTER on CLUSTER for kafka-cluster DescribeCluster DESCRIBE on CLUSTER for kafka-cluster DescribeProducerIDs READ on TOPIC for topics DescribeTransactions DESCRIBE on TRANSACTIONAL_ID for ids DESCRIBE on TOPIC for topics ListTransactions DESCRIBE on TRANSACTIONAL_ID for ids
const ( // OpUnknown is returned for unknown operations. OpUnknown ACLOperation = kmsg.ACLOperationUnknown // OpAny, used for listing and deleting, matches any operation. OpAny ACLOperation = kmsg.ACLOperationAny // OpAll is a shortcut for allowing / denying all operations. OpAll ACLOperation = kmsg.ACLOperationAll // OpRead is the READ operation. OpRead ACLOperation = kmsg.ACLOperationRead // OpWrite is the WRITE operation. OpWrite ACLOperation = kmsg.ACLOperationWrite // OpCreate is the CREATE operation. OpCreate ACLOperation = kmsg.ACLOperationCreate // OpDelete is the DELETE operation. OpDelete ACLOperation = kmsg.ACLOperationDelete // OpAlter is the ALTER operation. OpAlter ACLOperation = kmsg.ACLOperationAlter // OpDescribe is the DESCRIBE operation. OpDescribe ACLOperation = kmsg.ACLOperationDescribe // OpClusterAction is the CLUSTER_ACTION operation. This operation is // used for any broker<=>broker communication and is not needed by // clients. OpClusterAction ACLOperation = kmsg.ACLOperationClusterAction // OpDescribeConfigs is the DESCRIBE_CONFIGS operation. OpDescribeConfigs ACLOperation = kmsg.ACLOperationDescribeConfigs // OpAlterConfigs is the ALTER_CONFIGS operation. OpAlterConfigs ACLOperation = kmsg.ACLOperationAlterConfigs // OpIdempotentWrite is the IDEMPOTENT_WRITE operation. As of Kafka // 3.0+, this has been deprecated and replaced by the ability to WRITE // on any topic. OpIdempotentWrite ACLOperation = kmsg.ACLOperationIdempotentWrite )
type ACLPattern ¶
type ACLPattern = kmsg.ACLResourcePatternType
ACLPattern is a type alias for kmsg.ACLResourcePatternType, which is an enum containing all Kafka ACL resource pattern options.
Creating/listing/deleting ACLs works on a resource name basis: every ACL created has a name, and every ACL filtered for listing / deleting matches by name. The name by default is "literal", meaning created ACLs will have the exact name, and matched ACLs must match completely.
Prefixed names allow for creating an ACL that matches any prefix: principals foo-bar and foo-baz both have the prefix "foo-", meaning a READ on TOPIC for User:foo- with prefix pattern will allow both of those principals to read the topic.
Any and match are used for listing and deleting. Any will match any name, be it literal or prefix or a wildcard name. There is no need for specifying topics, groups, etc. when using any resource pattern.
Alternatively, match requires a name, but it matches any literal name (exact match), any prefix, and any wildcard.
const ( // ACLPatternUnknown is returned for unknown patterns. ACLPatternUnknown ACLPattern = kmsg.ACLResourcePatternTypeUnknown // ACLPatternAny is the ANY resource pattern. ACLPatternAny ACLPattern = kmsg.ACLResourcePatternTypeAny // ACLPatternMatch is the MATCH resource pattern. ACLPatternMatch ACLPattern = kmsg.ACLResourcePatternTypeMatch // ACLPatternLiteral is the LITERAL resource pattern, the default. ACLPatternLiteral ACLPattern = kmsg.ACLResourcePatternTypeLiteral // ACLPatternPrefixed is the PREFIXED resource pattern. ACLPatternPrefixed ACLPattern = kmsg.ACLResourcePatternTypePrefixed )
type AlterAllReplicaLogDirsResponses ¶
type AlterAllReplicaLogDirsResponses map[int32]AlterReplicaLogDirsResponses
AlterAllReplicaLogDirsResponses contains per-broker responses to altered partition directories.
func (AlterAllReplicaLogDirsResponses) Each ¶
func (rs AlterAllReplicaLogDirsResponses) Each(fn func(AlterReplicaLogDirsResponse))
Each calls fn for every response.
func (AlterAllReplicaLogDirsResponses) Sorted ¶
func (rs AlterAllReplicaLogDirsResponses) Sorted() []AlterReplicaLogDirsResponse
Sorted returns the responses sorted by broker, topic, and partition.
type AlterClientQuotaEntry ¶
type AlterClientQuotaEntry struct { Entity ClientQuotaEntity // Entity is the entity to alter quotas for. Ops []AlterClientQuotaOp // Ops are quotas to set or remove. }
AlterClientQuotaEntry pairs an entity with quotas to set or remove.
type AlterClientQuotaOp ¶
type AlterClientQuotaOp struct { Key string // Key is the quota configuration key to set or remove. Value float64 // Value is the quota configuration value to set or remove. Remove bool // Remove, if true, removes this quota rather than sets it. }
AlterClientQuotaOp sets or remove a client quota.
type AlterConfig ¶
type AlterConfig struct { Op IncrementalOp // Op is the incremental alter operation to perform. Name string // Name is the name of the config to alter. Value *string // Value is the value to use when altering, if any. }
AlterConfig is an individual key/value operation to perform when incrementally altering configs.
This package includes a StringPtr function to aid in building config values.
type AlterConfigsResponse ¶
type AlterConfigsResponse struct { Name string // Name is the name of this resource (topic name or broker number). Err error // Err is non-nil if the config could not be altered. }
AlteredConfigsResponse contains the response for an individual alteration.
type AlterConfigsResponses ¶
type AlterConfigsResponses []AlterConfigsResponse
AlterConfigsResponses contains responses for many alterations.
func (AlterConfigsResponses) On ¶
func (rs AlterConfigsResponses) On(name string, fn func(*AlterConfigsResponse) error) (AlterConfigsResponse, error)
On calls fn for the response name if it exists, returning the response and the error returned from fn. If fn is nil, this simply returns the response.
The fn is given a copy of the response. This function returns the copy as well; any modifications within fn are modifications on the returned copy.
If the resource does not exist, this returns kerr.UnknownTopicOrPartition.
type AlterPartitionAssignmentsReq ¶
AlterPartitionAssignmentsReq is the input for a request to alter partition assignments. The keys are topics and partitions, and the final slice corresponds to brokers that replicas will be assigneed to. If the brokers for a given partition are null, the request will *cancel* any active reassignment for that partition.
func (*AlterPartitionAssignmentsReq) Assign ¶
func (r *AlterPartitionAssignmentsReq) Assign(t string, p int32, brokers []int32)
Assign specifies brokers that a partition should be placed on. Using null for the brokers cancels a pending reassignment of the parititon.
func (*AlterPartitionAssignmentsReq) CancelAssign ¶
func (r *AlterPartitionAssignmentsReq) CancelAssign(t string, p int32)
CancelAssign cancels a reassignment of the given partition.
type AlterPartitionAssignmentsResponse ¶
type AlterPartitionAssignmentsResponse struct { Topic string // Topic is the topic that was assigned. Partition int32 // Partition is the partition that was assigned. Err error // Err is non-nil if this assignment errored. ErrMessage string // ErrMessage is an optional additional message on error. }
AlterPartitionAssignmentsResponse contains a response for an individual partition that was assigned.
type AlterPartitionAssignmentsResponses ¶
type AlterPartitionAssignmentsResponses map[string]map[int32]AlterPartitionAssignmentsResponse
AlterPartitionAssignmentsResponses contains responses to all partitions in an alter assignment request.
func (AlterPartitionAssignmentsResponses) Each ¶
func (rs AlterPartitionAssignmentsResponses) Each(fn func(AlterPartitionAssignmentsResponse))
Each calls fn for every response.
func (AlterPartitionAssignmentsResponses) Sorted ¶
func (rs AlterPartitionAssignmentsResponses) Sorted() []AlterPartitionAssignmentsResponse
Sorted returns the responses sorted by topic and partition.
type AlterReplicaLogDirsReq ¶
AlterReplicaLogDirsReq is the input for a request to alter replica log directories. The key is the directory that all topics and partitions in the topic set will move to.
func (*AlterReplicaLogDirsReq) Add ¶
func (r *AlterReplicaLogDirsReq) Add(d string, s TopicsSet)
Add merges the input topic set into the given directory.
type AlterReplicaLogDirsResponse ¶
type AlterReplicaLogDirsResponse struct { Broker int32 // Broker is the broker this response came from. Dir string // Dir is the directory this partition was requested to be moved to. Topic string // Topic is the topic for this partition. Partition int32 // Partition is the partition that was moved. Err error // Err is non-nil if this move had an error. }
AlterReplicaLogDirsResponse contains a the response for an individual altered partition directory.
func (AlterReplicaLogDirsResponse) Less ¶
func (a AlterReplicaLogDirsResponse) Less(other AlterReplicaLogDirsResponse) bool
Less returns if the response is less than the other by broker, dir, topic, and partition.
type AlterReplicaLogDirsResponses ¶
type AlterReplicaLogDirsResponses map[string]map[int32]AlterReplicaLogDirsResponse
AlterReplicaLogDirsResponses contains responses to altered partition directories for a single broker.
func (AlterReplicaLogDirsResponses) Each ¶
func (rs AlterReplicaLogDirsResponses) Each(fn func(AlterReplicaLogDirsResponse))
Each calls fn for every response.
func (AlterReplicaLogDirsResponses) Sorted ¶
func (rs AlterReplicaLogDirsResponses) Sorted() []AlterReplicaLogDirsResponse
Sorted returns the responses sorted by topic and partition.
type AlteredClientQuota ¶
type AlteredClientQuota struct { Entity ClientQuotaEntity // Entity is the entity this result is for. Err error // Err is non-nil if the alter operation on this entity failed. ErrMessage string // ErrMessage is an optional additional message on error. }
AlteredClientQuota is the result for a single entity that was altered.
type AlteredClientQuotas ¶
type AlteredClientQuotas []AlteredClientQuota
AlteredClientQuotas contains results for all altered entities.
type AlteredUserSCRAM ¶
type AlteredUserSCRAM struct { User string // User is the username that was altered. Err error // Err is any error encountered when altering the user. ErrMessage string // ErrMessage a potential extra message describing any error. }
AlteredUserSCRAM is the result of an alter operation.
type AlteredUserSCRAMs ¶
type AlteredUserSCRAMs map[string]AlteredUserSCRAM
AlteredUserSCRAMs contains altered user SCRAM credentials keyed by user.
func (AlteredUserSCRAMs) AllFailed ¶
func (as AlteredUserSCRAMs) AllFailed() bool
AllFailed returns whether all altered user credentials are errored.
func (AlteredUserSCRAMs) Each ¶
func (as AlteredUserSCRAMs) Each(fn func(AlteredUserSCRAM))
Each calls fn for every altered user.
func (AlteredUserSCRAMs) EachError ¶
func (as AlteredUserSCRAMs) EachError(fn func(AlteredUserSCRAM))
EachError calls fn for every altered user that has a non-nil error.
func (AlteredUserSCRAMs) Error ¶
func (as AlteredUserSCRAMs) Error() error
Error iterates over all altered users and returns the first error encountered, if any.
func (AlteredUserSCRAMs) Ok ¶
func (as AlteredUserSCRAMs) Ok() bool
Ok returns true if there are no errors. This is a shortcut for rs.Error() == nil.
func (AlteredUserSCRAMs) Sorted ¶
func (as AlteredUserSCRAMs) Sorted() []AlteredUserSCRAM
Sorted returns the altered user credentials ordered by user.
type AuthError ¶
type AuthError struct {
Err error // Err is the inner *kerr.Error authorization error.
}
AuthError can be returned from requests for resources that you are not authorized for.
type BrokerApiVersions ¶
type BrokerApiVersions struct { NodeID int32 // NodeID is the node this API versions response is for. Err error // Err is non-nil if the API versions request failed. // contains filtered or unexported fields }
BrokerApiVersions contains the API versions for a single broker.
func (*BrokerApiVersions) EachKeySorted ¶
func (v *BrokerApiVersions) EachKeySorted(fn func(key, min, max int16))
EachKeySorted calls fn for every API key in the broker response, from the smallest API key to the largest.
func (*BrokerApiVersions) KeyMaxVersion ¶
func (v *BrokerApiVersions) KeyMaxVersion(key int16) (max int16, exists bool)
KeyVersions returns the broker's max version for an API key and whether this broker supports the request.
func (*BrokerApiVersions) KeyMinVersion ¶
func (v *BrokerApiVersions) KeyMinVersion(key int16) (min int16, exists bool)
KeyVersions returns the broker's min version for an API key and whether this broker supports the request.
func (*BrokerApiVersions) KeyVersions ¶
func (v *BrokerApiVersions) KeyVersions(key int16) (min, max int16, exists bool)
KeyVersions returns the broker's min and max version for an API key and whether this broker supports the request.
func (*BrokerApiVersions) Raw ¶
func (v *BrokerApiVersions) Raw() *kmsg.ApiVersionsResponse
Raw returns the raw API versions response.
func (*BrokerApiVersions) VersionGuess ¶
func (v *BrokerApiVersions) VersionGuess(opt ...kversion.VersionGuessOpt) string
VersionGuess returns the best guess of Kafka that this broker is. This is a shorcut for:
kversion.FromApiVersionsResponse(v.Raw()).VersionGuess(opt...)
Check the kversion.VersionGuess API docs for more details.
type BrokerDetail ¶
type BrokerDetail = kgo.BrokerMetadata
BrokerDetail is a type alias for kgo.BrokerMetadata.
type BrokerDetails ¶
type BrokerDetails []BrokerDetail
BrokerDetails contains the details for many brokers.
func (BrokerDetails) NodeIDs ¶
func (ds BrokerDetails) NodeIDs() []int32
NodeIDs returns the IDs of all nodes.
type BrokersApiVersions ¶
type BrokersApiVersions map[int32]BrokerApiVersions
BrokerApiVersions contains API versions for all brokers that are reachable from a metadata response.
func (BrokersApiVersions) Each ¶
func (vs BrokersApiVersions) Each(fn func(BrokerApiVersions))
Each calls fn for every broker response.
func (BrokersApiVersions) Sorted ¶
func (vs BrokersApiVersions) Sorted() []BrokerApiVersions
Sorted returns all broker responses sorted by node ID.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is an admin client.
This is a simple wrapper around a *kgo.Client to provide helper admin methods.
func NewOptClient ¶
NewOptClient returns a new client directly from kgo options. This is a wrapper around creating a new *kgo.Client and then creating an admin client.
func (*Client) AlterAllReplicaLogDirs ¶
func (cl *Client) AlterAllReplicaLogDirs(ctx context.Context, alter AlterReplicaLogDirsReq) (AlterAllReplicaLogDirsResponses, error)
AlterAllReplicaLogDirs alters the log directories for the input topic partitions, moving each partition to the requested directory. This function moves all replicas on any broker.
This may return *ShardErrors.
func (*Client) AlterBrokerConfigs ¶
func (cl *Client) AlterBrokerConfigs(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error)
AlterBrokerConfigs incrementally alters broker configuration values. If brokers are specified, this updates each specific broker. If no brokers are specified, this updates whole-cluster broker configuration values.
This method requires talking to a cluster that supports IncrementalAlterConfigs (officially introduced in Kafka v2.3, but many broker reimplementations support this request even if they do not support all other requests from Kafka v2.3).
This admin client does not support the original AlterConfigs request. The original request is problematic: any existing dynamic configurations that were not specified in the AlterConfigs request itself would be lost.
This may return *ShardErrors. You may consider checking ValidateAlterBrokerConfigs before using this method.
func (*Client) AlterBrokerReplicaLogDirs ¶
func (cl *Client) AlterBrokerReplicaLogDirs(ctx context.Context, broker int32, alter AlterReplicaLogDirsReq) (AlterReplicaLogDirsResponses, error)
AlterBrokerReplicaLogDirs alters the log directories for the input topic on the given broker, moving each partition to the requested directory.
func (*Client) AlterClientQuotas ¶
func (cl *Client) AlterClientQuotas(ctx context.Context, entries []AlterClientQuotaEntry) (AlteredClientQuotas, error)
AlterClientQuotas alters quotas for the input entries. You may consider checking ValidateAlterClientQuotas before using this method.
func (*Client) AlterPartitionAssignments ¶
func (cl *Client) AlterPartitionAssignments(ctx context.Context, req AlterPartitionAssignmentsReq) (AlterPartitionAssignmentsResponses, error)
AlterPartitionAssignments alters partition assignments for the requested partitions, returning an error if the response could not be issued or if you do not have permissions.
func (*Client) AlterTopicConfigs ¶
func (cl *Client) AlterTopicConfigs(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error)
AlterTopicConfigs incrementally alters topic configuration values.
This method requires talking to a cluster that supports IncrementalAlterConfigs (officially introduced in Kafka v2.3, but many broker reimplementations support this request even if they do not support all other requests from Kafka v2.3).
This admin client does not support the original AlterConfigs request. The original request is problematic: any existing dynamic configurations that were not specified in the AlterConfigs request itself would be lost.
This may return *ShardErrors. You may consider checking ValidateAlterTopicConfigs before using this method.
func (*Client) AlterUserSCRAMs ¶
func (cl *Client) AlterUserSCRAMs(ctx context.Context, del []DeleteSCRAM, upsert []UpsertSCRAM) (AlteredUserSCRAMs, error)
AlterUserSCRAMs deletes, updates, or creates (inserts) user SCRAM credentials. Note that a username can only appear once across both upserts and deletes. This modifies elements of the upsert slice that need to have a salted password generated.
func (*Client) ApiVersions ¶
func (cl *Client) ApiVersions(ctx context.Context) (BrokersApiVersions, error)
ApiVersions queries every broker in a metadata response for their API versions. This returns an error only if the metadata request fails.
func (*Client) BrokerMetadata ¶
BrokerMetadata issues a metadata request and returns it, and does not ask for any topics.
This returns an error if the request fails to be issued, or an *AuthErr.
func (*Client) CommitAllOffsets ¶
CommitAllOffsets is identical to CommitOffsets, but returns an error if the offset commit was successful, but some offset within the commit failed to be committed.
This is a shortcut function provided to avoid checking two errors, but you must be careful with this if partially successful commits can be a problem for you.
func (*Client) CommitOffsets ¶
func (cl *Client) CommitOffsets(ctx context.Context, group string, os Offsets) (OffsetResponses, error)
CommitOffsets issues an offset commit request for the input offsets.
This function can be used to manually commit offsets when directly consuming partitions outside of an actual consumer group. For example, if you assign partitions manually, but want still use Kafka to checkpoint what you have consumed, you can manually issue an offset commit request with this method.
This does not return on authorization failures, instead, authorization failures are included in the responses.
func (*Client) CreateACLs ¶
func (cl *Client) CreateACLs(ctx context.Context, b *ACLBuilder) (CreateACLsResults, error)
CreateACLs creates a batch of ACLs using the ACL builder, validating the input before issuing the CreateACLs request.
If the input is invalid, or if the response fails, or if the response does not contain as many ACLs as we issued in our create request, this returns an error.
func (*Client) CreateDelegationToken ¶
func (cl *Client) CreateDelegationToken(ctx context.Context, d CreateDelegationToken) (DelegationToken, error)
CreateDelegationToken creates a delegation token, which is a scoped SCRAM-SHA-256 username and password.
Creating delegation tokens allows for an (ideally) quicker and easier method of enabling authorization for a wide array of clients. Rather than having to manage many passwords external to Kafka, you only need to manage a few accounts and use those to create delegation tokens per client.
Note that delegation tokens inherit the same ACLs as the user creating the token. Thus, if you want to properly scope ACLs, you should not create delegation tokens with admin accounts.
This can return *AuthError.
func (*Client) CreatePartitions ¶
func (cl *Client) CreatePartitions(ctx context.Context, add int, topics ...string) (CreatePartitionsResponses, error)
CreatePartitions issues a create partitions request for the given topics, adding "add" partitions to each topic. This request lets Kafka choose where the new partitions should be.
This does not return an error on authorization failures for the create partitions request itself, instead, authorization failures are included in the responses. Before adding partitions, this request must issue a metadata request to learn the current count of partitions. If that fails, this returns the metadata request error. If you already know the final amount of partitions you want, you can use UpdatePartitions to set the count directly (rather than adding to the current count). You may consider checking ValidateCreatePartitions before using this method.
func (*Client) CreateTopics ¶
func (cl *Client) CreateTopics( ctx context.Context, partitions int32, replicationFactor int16, configs map[string]*string, topics ...string, ) (CreateTopicResponses, error)
CreateTopics issues a create topics request with the given partitions, replication factor, and (optional) configs for every topic. Under the hood, this uses the default 15s request timeout and lets Kafka choose where to place partitions.
This package includes a StringPtr function to aid in building config values.
This does not return an error on authorization failures, instead, authorization failures are included in the responses. This only returns an error if the request fails to be issued. You may consider checking ValidateCreateTopics before using this method.
func (*Client) DeleteACLs ¶
func (cl *Client) DeleteACLs(ctx context.Context, b *ACLBuilder) (DeleteACLsResults, error)
DeleteACLs deletes a batch of ACLs using the ACL builder, validating the input before issuing the DeleteACLs request.
If the input is invalid, or if the response fails, or if the response does not contain as many ACL results as we issued in our delete request, this returns an error.
Deleting ACLs works on a filter basis: a single filter can match many ACLs. For example, deleting with operation ANY matches any operation. For safety / verification purposes, you an DescribeACLs with the same builder first to see what would be deleted.
func (*Client) DeleteGroups ¶
DeleteGroups deletes all groups specified.
The purpose of this request is to allow operators a way to delete groups after Kafka 1.1, which removed RetentionTimeMillis from offset commits. See KIP-229 for more details.
This may return *ShardErrors. This does not return on authorization failures, instead, authorization failures are included in the responses.
func (*Client) DeleteOffsets ¶
func (cl *Client) DeleteOffsets(ctx context.Context, group string, s TopicsSet) (DeleteOffsetsResponses, error)
DeleteOffsets deletes offsets for the given group.
Originally, offset commits were persisted in Kafka for some retention time. This posed problematic for infrequently committing consumers, so the retention time concept was removed in Kafka v2.1 in favor of deleting offsets for a group only when the group became empty. However, if a group stops consuming from a topic, then the offsets will persist and lag monitoring for the group will notice an ever increasing amount of lag for these no-longer-consumed topics. Thus, Kafka v2.4 introduced an OffsetDelete request to allow admins to manually delete offsets for no longer consumed topics.
This method requires talking to Kafka v2.4+. This returns an *AuthErr if the user is not authorized to delete offsets in the group at all. This does not return on per-topic authorization failures, instead, per-topic authorization failures are included in the responses.
func (*Client) DeleteRecords ¶
DeleteRecords issues a delete records request for the given offsets. Per offset, only the Offset field needs to be set.
To delete records, Kafka sets the LogStartOffset for partitions to the requested offset. All segments whose max partition is before the requested offset are deleted, and any records within the segment before the requested offset can no longer be read.
This does not return an error on authorization failures, instead, authorization failures are included in the responses.
This may return *ShardErrors.
func (*Client) DeleteTopics ¶
DeleteTopics issues a delete topics request for the given topic names with a 15s timeout.
This does not return an error on authorization failures, instead, authorization failures are included in the responses. This only returns an error if the request fails to be issued.
func (*Client) DescribeACLs ¶
func (cl *Client) DescribeACLs(ctx context.Context, b *ACLBuilder) (DescribeACLsResults, error)
DescribeACLs describes a batch of ACLs using the ACL builder, validating the input before issuing DescribeACLs requests.
If the input is invalid, or if any response fails, this returns an error.
Listing ACLs works on a filter basis: a single filter can match many ACLs. For example, describing with operation ANY matches any operation. Under the hood, this method issues one describe request per filter, because describing ACLs does not work on a batch basis (unlike creating & deleting). The return of this function can be used to see what would be deleted given the same builder input.
func (*Client) DescribeAllLogDirs ¶
DescribeAllLogDirs describes the log directores for every input topic partition on every broker. If the input set is nil, this describes all log directories.
This may return *ShardErrors.
func (*Client) DescribeBrokerConfigs ¶
func (cl *Client) DescribeBrokerConfigs( ctx context.Context, brokers ...int32, ) (ResourceConfigs, error)
DescribeBrokerConfigs returns configuration for the requested brokers. If no brokers are requested, a single request is issued and any broker in the cluster replies with the cluster-level dynamic config values.
This may return *ShardErrors.
func (*Client) DescribeBrokerLogDirs ¶
func (cl *Client) DescribeBrokerLogDirs(ctx context.Context, broker int32, s TopicsSet) (DescribedLogDirs, error)
DescribeBrokerLogDirs describes the log directories for the input topic partitions on the given broker. If the input set is nil, this describes all log directories.
func (*Client) DescribeClientQuotas ¶
func (cl *Client) DescribeClientQuotas(ctx context.Context, strict bool, entityComponents []DescribeClientQuotaComponent) (DescribedClientQuotas, error)
DescribeClientQuotas describes client quotas. If strict is true, the response includes only the requested components.
func (*Client) DescribeDelegationTokens ¶
func (cl *Client) DescribeDelegationTokens(ctx context.Context, owners ...Principal) (DelegationTokens, error)
DescribeDelegationTokens describes delegation tokens. This returns either all delegation tokens, or returns only tokens with owners in the requested owners list.
This can return *AuthError.
func (*Client) DescribeGroups ¶
DescribeGroups describes either all groups specified, or all groups in the cluster if none are specified.
This may return *ShardErrors.
If no groups are specified and this method first lists groups, and list groups returns a *ShardErrors, this function describes all successfully listed groups and appends the list shard errors to any describe shard errors.
If only one group is described, there will be at most one request issued, and there is no need to deeply inspect the error.
func (*Client) DescribeProducers ¶
func (cl *Client) DescribeProducers(ctx context.Context, s TopicsSet) (DescribedProducersTopics, error)
DescribeProducers describes all producers that are transactionally producing to the requested topic set. This request can be used to detect hanging transactions or other transaction related problems. If the input set is empty, this requests data for all partitions.
This may return *ShardErrors or *AuthError.
func (*Client) DescribeTopicConfigs ¶
func (cl *Client) DescribeTopicConfigs( ctx context.Context, topics ...string, ) (ResourceConfigs, error)
DescribeTopicConfigs returns the configuration for the requested topics.
This may return *ShardErrors.
func (*Client) DescribeTransactions ¶
func (cl *Client) DescribeTransactions(ctx context.Context, txnIDs ...string) (DescribedTransactions, error)
DescribeTransactions describes either all transactional IDs specified, or all transactional IDs in the cluster if none are specified.
This may return *ShardErrors or *AuthError.
If no transactional IDs are specified and this method first lists transactional IDs, and listing IDs returns a *ShardErrors, this function describes all successfully listed IDs and appends the list shard errors to any describe shard errors.
If only one ID is described, there will be at most one request issued and there is no need to deeply inspect the error.
func (*Client) DescribeUserSCRAMs ¶
func (cl *Client) DescribeUserSCRAMs(ctx context.Context, users ...string) (DescribedUserSCRAMs, error)
DescribeUserSCRAMs returns a small bit of information about all users in the input request that have SCRAM passwords configured. No users requests all users.
func (*Client) ElectLeaders ¶
func (cl *Client) ElectLeaders(ctx context.Context, how ElectLeadersHow, s TopicsSet) (ElectLeadersResults, error)
ElectLeaders elects leaders for partitions. This request was added in Kafka 2.2 to replace the previously-ZooKeeper-only option of triggering leader elections. See KIP-183 for more details.
Kafka 2.4 introduced the ability to use unclean leader election. If you use unclean leader election on a Kafka 2.2 or 2.3 cluster, the client will instead fall back to preferred replica (clean) leader election. You can check the result's How function (or field) to see.
If s is nil, this will elect leaders for all partitions.
This will return *AuthError if you do not have ALTER on CLUSTER for kafka-cluster.
func (*Client) ExpireDelegationToken ¶
func (cl *Client) ExpireDelegationToken(ctx context.Context, hmac []byte, expiry time.Duration) (expiryTimestamp time.Time, err error)
ExpireDelegationToken changes a delegation token's expiry timestamp and returns the new expiry timestamp, which is min(now+expiry, maxTimestamp). This request can be used to force tokens to expire quickly, or to give tokens a grace period before expiry. Using an expiry of -1 expires the token immediately.
This can return *AuthError.
func (*Client) FetchManyOffsets ¶
func (cl *Client) FetchManyOffsets(ctx context.Context, groups ...string) FetchOffsetsResponses
FetchManyOffsets issues a fetch offsets requests for each group specified.
This function is a batch version of FetchOffsets. FetchOffsets and CommitOffsets are important to provide as simple APIs for users that manage group offsets outside of a consumer group. Each individual group may have an auth error.
func (*Client) FetchOffsets ¶
FetchOffsets issues an offset fetch requests for all topics and partitions in the group. Because Kafka returns only partitions you are authorized to fetch, this only returns an auth error if you are not authorized to describe the group at all.
This method requires talking to Kafka v0.11+.
func (*Client) FetchOffsetsForTopics ¶
func (cl *Client) FetchOffsetsForTopics(ctx context.Context, group string, topics ...string) (OffsetResponses, error)
FetchOffsetsForTopics is a helper function that returns the currently committed offsets for the given group, as well as default -1 offsets for any topic/partition that does not yet have a commit.
If any partition fetched or listed has an error, this function returns an error. The returned offset responses are ready to be used or converted directly to pure offsets with `Into`, and again into kgo offsets with another `Into`.
func (*Client) FindGroupCoordinators ¶
func (cl *Client) FindGroupCoordinators(ctx context.Context, groups ...string) FindCoordinatorResponses
FindGroupCoordinators returns the coordinator for all requested group names.
This may return *ShardErrors or *AuthError.
func (*Client) FindTxnCoordinators ¶
func (cl *Client) FindTxnCoordinators(ctx context.Context, txnIDs ...string) FindCoordinatorResponses
FindTxnCoordinators returns the coordinator for all requested transactional IDs.
This may return *ShardErrors or *AuthError.
func (*Client) LeaveGroup ¶
func (cl *Client) LeaveGroup(ctx context.Context, b *LeaveGroupBuilder) (LeaveGroupResponses, error)
LeaveGroup causes instance IDs to leave a group.
This function allows manually removing members using instance IDs from a group, which allows for fast scale down / host replacement (see KIP-345 for more detail). This returns an *AuthErr if the use is not authorized to remove members from groups.
func (*Client) ListBrokers ¶
func (cl *Client) ListBrokers(ctx context.Context) (BrokerDetails, error)
ListBrokers issues a metadata request and returns BrokerDetails. This returns an error if the request fails to be issued, or an *AuthError.
func (*Client) ListCommittedOffsets ¶
func (cl *Client) ListCommittedOffsets(ctx context.Context, topics ...string) (ListedOffsets, error)
ListCommittedOffsets returns newest committed offsets for each partition in each requested topic. A committed offset may be slightly less than the latest offset. In Kafka terms, committed means the last stable offset, and newest means the high watermark. Record offsets in active, uncommitted transactions will not be returned. If no topics are specified, all topics are listed.
This may return *ShardErrors.
func (*Client) ListEndOffsets ¶
ListEndOffsets returns the end (newest) offsets for each partition in each requested topic. In Kafka terms, this returns high watermarks. If no topics are specified, all topics are listed.
This may return *ShardErrors.
func (*Client) ListGroups ¶
ListGroups returns all groups in the cluster. If you are talking to Kafka 2.6+, filter states can be used to return groups only in the requested states. By default, this returns all groups. In almost all cases, DescribeGroups is more useful.
This may return *ShardErrors.
func (*Client) ListOffsetsAfterMilli ¶
func (cl *Client) ListOffsetsAfterMilli(ctx context.Context, millisecond int64, topics ...string) (ListedOffsets, error)
ListOffsetsAfterMilli returns the first offsets after the requested millisecond timestamp. Unlike listing start/end/committed offsets, offsets returned from this function also include the timestamp of the offset. If no topics are specified, all topics are listed. If a partition has no offsets after the requested millisecond, the offset will be the current end offset.
This may return *ShardErrors.
func (*Client) ListPartitionReassignments ¶
func (cl *Client) ListPartitionReassignments(ctx context.Context, s TopicsSet) (ListPartitionReassignmentsResponses, error)
ListPartitionReassignments lists the state of any active reassignments for all requested partitions, returning an error if the response could not be issued or if you do not have permissions.
func (*Client) ListStartOffsets ¶
ListStartOffsets returns the start (oldest) offsets for each partition in each requested topic. In Kafka terms, this returns the log start offset. If no topics are specified, all topics are listed.
This may return *ShardErrors.
func (*Client) ListTopics ¶
ListTopics issues a metadata request and returns TopicDetails. Specific topics to describe can be passed as additional arguments. If no topics are specified, all topics are requested. Internal topics are not returned unless specifically requested. To see all topics including internal topics, use ListTopicsWithInternal.
This returns an error if the request fails to be issued, or an *AuthError.
func (*Client) ListTopicsWithInternal ¶
func (cl *Client) ListTopicsWithInternal( ctx context.Context, topics ...string, ) (TopicDetails, error)
ListTopicsWithInternal is the same as ListTopics, but does not filter internal topics before returning.
func (*Client) ListTransactions ¶
func (cl *Client) ListTransactions(ctx context.Context, producerIDs []int64, filterStates []string) (ListedTransactions, error)
ListTransactions returns all transactions and their states in the cluster. Filter states can be used to return transactions only in the requested states. By default, this returns all transactions you have DESCRIBE access to. Producer IDs can be specified to filter for transactions from the given producer.
This may return *ShardErrors or *AuthError.
func (*Client) Metadata ¶
Metadata issues a metadata request and returns it. Specific topics to describe can be passed as additional arguments. If no topics are specified, all topics are requested.
This returns an error if the request fails to be issued, or an *AuthErr.
func (*Client) OffetForLeaderEpoch ¶
func (cl *Client) OffetForLeaderEpoch(ctx context.Context, r OffsetForLeaderEpochRequest) (OffsetsForLeaderEpochs, error)
OffsetForLeaderEpoch requests end offsets for the requested leader epoch in partitions in the request. This is a relatively advanced and client internal request, for more details, see the doc comments on the OffsetForLeaderEpoch type.
This may return *ShardErrors or *AuthError.
func (*Client) RenewDelegationToken ¶
func (cl *Client) RenewDelegationToken(ctx context.Context, hmac []byte, renewTime time.Duration) (expiryTimestamp time.Time, err error)
RenewDelegationToken renews a delegation token that has not yet hit its max timestamp and returns the new expiry timestamp.
This can return *AuthError.
func (*Client) SetTimeoutMillis ¶
SetTimeoutMillis sets the timeout to use for requests that have a timeout, overriding the default of 15,000 (15s).
Not all requests have timeouts. Most requests are expected to return immediately or are expected to deliberately hang. The following requests have timeout fields:
Produce CreateTopics DeleteTopics DeleteRecords CreatePartitions ElectLeaders AlterPartitionAssignments ListPartitionReassignments UpdateFeatures
Not all requests above are supported in the admin API.
func (*Client) UpdatePartitions ¶
func (cl *Client) UpdatePartitions(ctx context.Context, set int, topics ...string) (CreatePartitionsResponses, error)
UpdatePartitions issues a create partitions request for the given topics, setting the final partition count to "set" for each topic. This request lets Kafka choose where the new partitions should be.
This does not return an error on authorization failures for the create partitions request itself, instead, authorization failures are included in the responses. Unlike CreatePartitions, this request uses your "set" value to set the new final count of partitions. "set" must be equal to or larger than the current count of partitions in the topic. All topics will have the same final count of partitions (unlike CreatePartitions, which allows you to add a specific count of partitions to topics that have a different amount of current partitions). You may consider checking ValidateUpdatePartitions before using this method.
func (*Client) ValidateAlterBrokerConfigs ¶
func (cl *Client) ValidateAlterBrokerConfigs(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error)
ValidateAlterBrokerConfigs validates an incremental alter config for the given brokers with the config alterations.
This returns exactly what AlterBrokerConfigs returns, but does not actually alter configurations.
func (*Client) ValidateAlterClientQuotas ¶
func (cl *Client) ValidateAlterClientQuotas(ctx context.Context, entries []AlterClientQuotaEntry) (AlteredClientQuotas, error)
ValidateAlterClientQuotas validates an alter client quota request. This returns exactly what AlterClientQuotas returns, but does not actually alter quotas.
func (*Client) ValidateAlterTopicConfigs ¶
func (cl *Client) ValidateAlterTopicConfigs(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error)
ValidateAlterTopicConfigs validates an incremental alter config for the given topics with the config alterations.
This returns exactly what AlterTopicConfigs returns, but does not actually alter configurations.
func (*Client) ValidateCreatePartitions ¶
func (cl *Client) ValidateCreatePartitions(ctx context.Context, add int, topics ...string) (CreatePartitionsResponses, error)
ValidateCreatePartitions validates a create partitions request for adding "add" partitions to the given topics.
This uses the same logic as CreatePartitions, but with the request's ValidateOnly field set to true. The response is the same response you would receive from CreatePartitions, but no partitions are actually added.
func (*Client) ValidateCreateTopics ¶
func (cl *Client) ValidateCreateTopics( ctx context.Context, partitions int32, replicationFactor int16, configs map[string]*string, topics ...string, ) (CreateTopicResponses, error)
ValidateCreateTopics validates a create topics request with the given partitions, replication factor, and (optional) configs for every topic.
This package includes a StringPtr function to aid in building config values.
This uses the same logic as CreateTopics, but with the request's ValidateOnly field set to true. The response is the same response you would receive from CreateTopics, but no topics are actually created.
func (*Client) ValidateUpdatePartitions ¶
func (cl *Client) ValidateUpdatePartitions(ctx context.Context, set int, topics ...string) (CreatePartitionsResponses, error)
ValidateUpdatePartitions validates a create partitions request for setting the partition count on the given topics to "set".
This uses the same logic as UpdatePartitions, but with the request's ValidateOnly field set to true. The response is the same response you would receive from UpdatePartitions, but no partitions are actually added.
func (*Client) WriteTxnMarkers ¶
func (cl *Client) WriteTxnMarkers(ctx context.Context, markers ...TxnMarkers) (TxnMarkersResponses, error)
WriteTxnMarkers writes transaction markers to brokers. This is an advanced admin way to close out open transactions. See KIP-664 for more details.
This may return *ShardErrors or *AuthError.
type ClientQuotaEntity ¶
type ClientQuotaEntity []ClientQuotaEntityComponent
ClientQuotaEntity contains the components that make up a single entity.
func (ClientQuotaEntity) String ¶
func (ds ClientQuotaEntity) String() string
String returns {key=value, key=value}, joining all entities with a ", " and wrapping in braces.
type ClientQuotaEntityComponent ¶
type ClientQuotaEntityComponent struct { Type string // Type is the entity type ("user", "client-id", "ip"). Name *string // Name is the entity name, or null if the default. }
ClientQuotaEntityComponent is a quota entity component.
func (ClientQuotaEntityComponent) String ¶
func (d ClientQuotaEntityComponent) String() string
String returns key=value, or key=<default> if value is nil.
type ClientQuotaValue ¶
type ClientQuotaValue struct { Key string // Key is the quota configuration key. Value float64 // Value is the quota configuration value. }
ClientQuotaValue is a quota name and value.
func (ClientQuotaValue) String ¶
func (d ClientQuotaValue) String() string
String returns key=value.
type ClientQuotaValues ¶
type ClientQuotaValues []ClientQuotaValue
ClientQuotaValues contains all client quota values.
type Config ¶
type Config struct { Key string // Key is the config name. Value *string // Value is the config value, if any. Sensitive bool // Sensitive is if this config is sensitive (if so, Value is nil). Source kmsg.ConfigSource // Source is where this config is defined from. // Synonyms contains fallback key/value pairs for this same // configuration key in order or preference. That is, if a config entry // is both dynamically defined and has a default value as well, the top // level config will be the dynamic value, while the synonym will be // the default. Synonyms []ConfigSynonym }
Config is a configuration for a resource (topic, broker)
func (*Config) MaybeValue ¶
MaybeValue returns the config's value if it is non-nil, otherwise an empty string.
type ConfigSynonym ¶
type ConfigSynonym struct { Key string // Key is the fallback config name. Value *string // Value is the fallback config value, if any (sensitive is elided). Source kmsg.ConfigSource // Source is where this config synonym is defined from. }
ConfigSynonym is a fallback value for a config.
type CreateACLsResult ¶
type CreateACLsResult struct { Principal string Host string Type kmsg.ACLResourceType // Type is the type of resource this is. Name string // Name is the name of the resource allowed / denied. Pattern ACLPattern // Pattern is the name pattern. Operation ACLOperation // Operation is the operation allowed / denied. Permission kmsg.ACLPermissionType // Permission is whether this is allowed / denied. Err error // Err is the error for this ACL creation. }
CreateACLsResult is a result for an individual ACL creation.
type CreateACLsResults ¶
type CreateACLsResults []CreateACLsResult
CreateACLsResults contains all results to created ACLs.
type CreateDelegationToken ¶
type CreateDelegationToken struct { // Owner overrides the owner of the token from the principal issuing // the request to the principal in this field. This allows a superuser // to create tokens without requiring individual user credentials, and // for a superuser to run clients on behalf of another user. These // fields require Kafka 3.3+; see KIP-373 for more details. Owner *Principal // Renewers is a list of principals that can renew the delegation // token in addition to the owner of the token. This list does not // include the owner. Renewers []Principal // MaxLifetime is how long the delegation token is valid for. // If -1, the default is the server's delegation.token.max.lifetime.ms, // which is by default 7d. MaxLifetime time.Duration }
CreateDelegationToken is a create delegation token request, allowing you to create scoped tokens with the same ACLs as the creator. This allows you to more easily manage authorization for a wide array of clients. All delegation tokens use SCRAM-SHA-256 SASL for authorization.
type CreatePartitionsResponse ¶
type CreatePartitionsResponse struct { Topic string // Topic is the topic this response is for. Err error // Err is non-nil if partitions were unable to be added to this topic. }
CreatePartitionsResponse contains the response for an individual topic from a create partitions request.
type CreatePartitionsResponses ¶
type CreatePartitionsResponses map[string]CreatePartitionsResponse
CreatePartitionsResponses contains per-topic responses for a create partitions request.
func (CreatePartitionsResponses) On ¶
func (rs CreatePartitionsResponses) On(topic string, fn func(*CreatePartitionsResponse) error) (CreatePartitionsResponse, error)
On calls fn for the response topic if it exists, returning the response and the error returned from fn. If fn is nil, this simply returns the response.
The fn is given a copy of the response. This function returns the copy as well; any modifications within fn are modifications on the returned copy.
If the topic does not exist, this returns kerr.UnknownTopicOrPartition.
func (CreatePartitionsResponses) Sorted ¶
func (rs CreatePartitionsResponses) Sorted() []CreatePartitionsResponse
Sorted returns all create partitions responses sorted by topic.
type CreateTopicResponse ¶
type CreateTopicResponse struct { Topic string // Topic is the topic that was created. ID TopicID // ID is the topic ID for this topic, if talking to Kafka v2.8+. Err error // Err is any error preventing this topic from being created. }
CreateTopicResponse contains the response for an individual created topic.
type CreateTopicResponses ¶
type CreateTopicResponses map[string]CreateTopicResponse
CreateTopicRepsonses contains per-topic responses for created topics.
func (CreateTopicResponses) On ¶
func (rs CreateTopicResponses) On(topic string, fn func(*CreateTopicResponse) error) (CreateTopicResponse, error)
On calls fn for the response topic if it exists, returning the response and the error returned from fn. If fn is nil, this simply returns the response.
The fn is given a copy of the response. This function returns the copy as well; any modifications within fn are modifications on the returned copy.
If the topic does not exist, this returns kerr.UnknownTopicOrPartition.
func (CreateTopicResponses) Sorted ¶
func (rs CreateTopicResponses) Sorted() []CreateTopicResponse
Sorted returns all create topic responses sorted first by topic ID, then by topic name.
type CredInfo ¶
type CredInfo struct { // Mechanism is the SCRAM mechanism a password exists for. This is 0 // for UNKNOWN, 1 for SCRAM-SHA-256, and 2 for SCRAM-SHA-512. Mechanism ScramMechanism // Iterations is the number of SCRAM iterations for this password. Iterations int32 }
CredInfo contains the SCRAM mechanism and iterations for a password.
type DelegationToken ¶
type DelegationToken struct { // Owner is the owner of the delegation token. Owner Principal // TokenRequesterPrincipal is the principal of the creator of the // token. This exists for v3+, where you can override the owner. // For prior than v3, this is just the Owner. TokenRequesterPrincipal Principal // IssueTimestamp is timestamp the delegation token creation request // is received within the broker. IssueTimestamp time.Time // ExpiryTimestamp is the timestamp the delegation token will expire. // This field is: // min(MaxTimestamp, IssueTimestamp+delegation.token.expiry.time.ms) // where the default expiry is 24hr. ExpiryTimestamp time.Time // MaxTimestamp is the timestamp past which the delegation token cannot // be renewed. This is either the requested MaxLifetime, or the // broker's delegation.token.max.lifetime.ms which is 7d by default. MaxTimestamp time.Time // TokenID is the username of this token for use in authorization. TokenID string // HMAC is the password of this token for use for in authorization. HMAC []byte // Renewers is the list of principals that can renew this token in // addition to the owner (which always can). Renewers []Principal }
DelegationToken contains information about a delegation token.
type DelegationTokens ¶
type DelegationTokens []DelegationToken
DelegationTokens contains a list of delegation tokens.
type DeleteACLsResult ¶
type DeleteACLsResult struct { Principal *string // Principal is the optional user that was used in this filter. Host *string // Host is the optional host that was used in this filter. Type kmsg.ACLResourceType // Type is the type of resource used for this filter. Name *string // Name is the name of the resource used for this filter. Pattern ACLPattern // Pattern is the name pattern used for this filter. Operation ACLOperation // Operation is the operation used for this filter. Permission kmsg.ACLPermissionType // Permission is permission used for this filter. Deleted DeletedACLs // Deleted contains all ACLs this delete filter matched. Err error // Err is non-nil if this filter has an error. }
DeleteACLsResult contains the input used for a delete ACL filter, and the deletes that the filter matched or the error for this filter.
All fields but Deleted and Err are set from the request input. The response sets either Deleted (potentially to nothing if the filter matched nothing) or Err.
type DeleteACLsResults ¶
type DeleteACLsResults []DeleteACLsResult
DeleteACLsResults contains all results to deleted ACLs.
type DeleteGroupResponse ¶
type DeleteGroupResponse struct { Group string // Group is the group this response is for. Err error // Err is non-nil if the group failed to be deleted. }
DeleteGroupResponse contains the response for an individual deleted group.
type DeleteGroupResponses ¶
type DeleteGroupResponses map[string]DeleteGroupResponse
DeleteGroupResponses contains per-group responses to deleted groups.
func (DeleteGroupResponses) On ¶
func (rs DeleteGroupResponses) On(group string, fn func(*DeleteGroupResponse) error) (DeleteGroupResponse, error)
On calls fn for the response group if it exists, returning the response and the error returned from fn. If fn is nil, this simply returns the group.
The fn is given a copy of the response. This function returns the copy as well; any modifications within fn are modifications on the returned copy.
If the group does not exist, this returns kerr.GroupIDNotFound.
func (DeleteGroupResponses) Sorted ¶
func (ds DeleteGroupResponses) Sorted() []DeleteGroupResponse
Sorted returns all deleted group responses sorted by group name.
type DeleteOffsetsResponses ¶
DeleteOffsetsResponses contains the per topic, per partition errors. If an offset deletion for a partition was successful, the error will be nil.
type DeleteRecordsResponse ¶
type DeleteRecordsResponse struct { Topic string // Topic is the topic this response is for. Partition int32 // Partition is the partition this response is for. LowWatermark int64 // LowWatermark is the new earliest / start offset for this partition if the request was successful. Err error // Err is any error preventing the delete records request from being successful for this partition. }
DeleteRecordsResponse contains the response for an individual partition from a delete records request.
type DeleteRecordsResponses ¶
type DeleteRecordsResponses map[string]map[int32]DeleteRecordsResponse
DeleteRecordsResponses contains per-partition responses to a delete records request.
func (DeleteRecordsResponses) Each ¶
func (ds DeleteRecordsResponses) Each(fn func(DeleteRecordsResponse))
Each calls fn for every delete records response.
func (DeleteRecordsResponses) Lookup ¶
func (ds DeleteRecordsResponses) Lookup(t string, p int32) (DeleteRecordsResponse, bool)
Lookup returns the response at t and p and whether it exists.
func (DeleteRecordsResponses) On ¶
func (rs DeleteRecordsResponses) On(topic string, partition int32, fn func(*DeleteRecordsResponse) error) (DeleteRecordsResponse, error)
On calls fn for the response topic/partition if it exists, returning the response and the error returned from fn. If fn is nil, this simply returns the response.
The fn is given a copy of the response. This function returns the copy as well; any modifications within fn are modifications on the returned copy.
If the topic or partition does not exist, this returns kerr.UnknownTopicOrPartition.
func (DeleteRecordsResponses) Sorted ¶
func (rs DeleteRecordsResponses) Sorted() []DeleteRecordsResponse
Sorted returns all delete records responses sorted first by topic, then by partition.
type DeleteSCRAM ¶
type DeleteSCRAM struct { User string // User is the username to match for deletion. Mechanism ScramMechanism // Mechanism is the mechanism to match to delete a password for. }
DeleteSCRAM deletes a password with the given mechanism for the user.
type DeleteTopicResponse ¶
type DeleteTopicResponse struct { Topic string // Topic is the topic that was deleted, if not using topic IDs. ID TopicID // ID is the topic ID for this topic, if talking to Kafka v2.8+ and using topic IDs. Err error // Err is any error preventing this topic from being deleted. }
DeleteTopicResponse contains the response for an individual deleted topic.
type DeleteTopicResponses ¶
type DeleteTopicResponses map[string]DeleteTopicResponse
DeleteTopicResponses contains per-topic responses for deleted topics.
func (DeleteTopicResponses) On ¶
func (rs DeleteTopicResponses) On(topic string, fn func(*DeleteTopicResponse) error) (DeleteTopicResponse, error)
On calls fn for the response topic if it exists, returning the response and the error returned from fn. If fn is nil, this simply returns the response.
The fn is given a copy of the response. This function returns the copy as well; any modifications within fn are modifications on the returned copy.
If the topic does not exist, this returns kerr.UnknownTopicOrPartition.
func (DeleteTopicResponses) Sorted ¶
func (rs DeleteTopicResponses) Sorted() []DeleteTopicResponse
Sorted returns all delete topic responses sorted first by topic ID, then by topic name.
type DeletedACL ¶
type DeletedACL struct { Principal string // Principal is this deleted ACL's principal. Host string // Host is this deleted ACL's host. Type kmsg.ACLResourceType // Type is this deleted ACL's resource type. Name string // Name is this deleted ACL's resource name. Pattern ACLPattern // Pattern is this deleted ACL's resource name pattern. Operation ACLOperation // Operation is this deleted ACL's operation. Permission kmsg.ACLPermissionType // Permission this deleted ACLs permission. Err error // Err is non-nil if this match has an error. }
DeletedACL an ACL that was deleted.
type DeletedACLs ¶
type DeletedACLs []DeletedACL
DeletedACLs contains ACLs that were deleted from a single delete filter.
type DescribeACLsResult ¶
type DescribeACLsResult struct { Principal *string // Principal is the optional user that was used in this filter. Host *string // Host is the optional host that was used in this filter. Type kmsg.ACLResourceType // Type is the type of resource used for this filter. Name *string // Name is the name of the resource used for this filter. Pattern ACLPattern // Pattern is the name pattern used for this filter. Operation ACLOperation // Operation is the operation used for this filter. Permission kmsg.ACLPermissionType // Permission is permission used for this filter. Described DescribedACLs // Described contains all ACLs this describe filter matched. Err error // Err is non-nil if this filter has an error. }
DescribeACLsResults contains the input used for a describe ACL filter, and the describes that the filter matched or the error for this filter.
All fields but Described and Err are set from the request input. The response sets either Described (potentially to nothing if the filter matched nothing) or Err.
type DescribeACLsResults ¶
type DescribeACLsResults []DescribeACLsResult
DescribeACLsResults contains all results to described ACLs.
type DescribeClientQuotaComponent ¶
type DescribeClientQuotaComponent struct { Type string // Type is the type of entity component to describe ("user", "client-id", "ip"). MatchName *string // MatchName is the name to match again; this is only needed when MatchType is 0 (exact). MatchType QuotasMatchType // MatchType is how to match an entity. }
DescribeClientQuotaComponent is an input entity component to describing client quotas: we define the type of quota ("client-id", "user"), how to match, and the match name if needed.
type DescribedACL ¶
type DescribedACL struct { Principal string // Principal is this described ACL's principal. Host string // Host is this described ACL's host. Type kmsg.ACLResourceType // Type is this described ACL's resource type. Name string // Name is this described ACL's resource name. Pattern ACLPattern // Pattern is this described ACL's resource name pattern. Operation ACLOperation // Operation is this described ACL's operation. Permission kmsg.ACLPermissionType // Permission this described ACLs permission. }
DescribedACL is an ACL that was described.
type DescribedACLs ¶
type DescribedACLs []DescribedACL
DescribedACLs contains ACLs that were described from a single describe filter.
type DescribedAllLogDirs ¶
type DescribedAllLogDirs map[int32]DescribedLogDirs
DescribedAllLogDirs contains per-broker responses to described log directories.
func (DescribedAllLogDirs) Each ¶
func (ds DescribedAllLogDirs) Each(fn func(DescribedLogDir))
Each calls fn for every described log dir in all responses.
func (DescribedAllLogDirs) Sorted ¶
func (ds DescribedAllLogDirs) Sorted() []DescribedLogDir
Sorted returns each log directory sorted by broker, then by directory.
type DescribedClientQuota ¶
type DescribedClientQuota struct { Entity ClientQuotaEntity // Entity is the entity of this described client quota. Values ClientQuotaValues // Values contains the quota valies for this entity. }
DescribedClientQuota contains a described quota. A single quota is made up of multiple entities and multiple values, for example, "user=foo" is one component of the entity, and "client-id=bar" is another.
type DescribedClientQuotas ¶
type DescribedClientQuotas []DescribedClientQuota
DescribedClientQuota contains client quotas that were described.
type DescribedGroup ¶
type DescribedGroup struct { Group string // Group is the name of the described group. Coordinator BrokerDetail // Coordinator is the coordinator broker for this group. State string // State is the state this group is in (Empty, Dead, Stable, etc.). ProtocolType string // ProtocolType is the type of protocol the group is using, "consumer" for normal consumers, "connect" for Kafka connect. Protocol string // Protocol is the partition assignor strategy this group is using. Members []DescribedGroupMember // Members contains the members of this group sorted first by InstanceID, or if nil, by MemberID. Err error // Err is non-nil if the group could not be described. }
DescribedGroup contains data from a describe groups response for a single group.
func (*DescribedGroup) AssignedPartitions ¶
func (d *DescribedGroup) AssignedPartitions() TopicsSet
AssignedPartitions returns the set of unique topics and partitions that are assigned across all members in this group.
This function is only relevant if the group is of type "consumer".
type DescribedGroupMember ¶
type DescribedGroupMember struct { MemberID string // MemberID is the Kafka assigned member ID of this group member. InstanceID *string // InstanceID is a potential user assigned instance ID of this group member (KIP-345). ClientID string // ClientID is the Kafka client given ClientID of this group member. ClientHost string // ClientHost is the host this member is running on. Join GroupMemberMetadata // Join is what this member sent in its join group request; what it wants to consume. Assigned GroupMemberAssignment // Assigned is what this member was assigned to consume by the leader. }
DescribedGroupMember is the detail of an individual group member as returned by a describe groups response.
type DescribedGroups ¶
type DescribedGroups map[string]DescribedGroup
DescribedGroups contains data for multiple groups from a describe groups response.
func (DescribedGroups) AssignedPartitions ¶
func (ds DescribedGroups) AssignedPartitions() TopicsSet
AssignedPartitions returns the set of unique topics and partitions that are assigned across all members in all groups. This is the all-group analogue to DescribedGroup.AssignedPartitions.
This function is only relevant for groups of type "consumer".
func (DescribedGroups) Names ¶
func (ds DescribedGroups) Names() []string
Topics returns a sorted list of all group names.
func (DescribedGroups) On ¶
func (rs DescribedGroups) On(group string, fn func(*DescribedGroup) error) (DescribedGroup, error)
On calls fn for the group if it exists, returning the group and the error returned from fn. If fn is nil, this simply returns the group.
The fn is given a shallow copy of the group. This function returns the copy as well; any modifications within fn are modifications on the returned copy. Modifications on a described group's inner fields are persisted to the original map (because slices are pointers).
If the group does not exist, this returns kerr.GroupIDNotFound.
func (DescribedGroups) Sorted ¶
func (ds DescribedGroups) Sorted() []DescribedGroup
Sorted returns all groups sorted by group name.
type DescribedLogDir ¶
type DescribedLogDir struct { Broker int32 // Broker is the broker being described. Dir string // Dir is the described directory. Topics DescribedLogDirTopics // Partitions are the partitions in this directory. Err error // Err is non-nil if this directory could not be described. }
DescribedLogDir is a described log directory.
func (DescribedLogDir) Size ¶
func (ds DescribedLogDir) Size() int64
Size returns the total size of all partitions in this directory. This is a shortcut for .Topics.Size().
type DescribedLogDirPartition ¶
type DescribedLogDirPartition struct { Broker int32 // Broker is the broker this partition is on. Dir string // Dir is the directory this partition lives in. Topic string // Topic is the topic for this partition. Partition int32 // Partition is this partition. Size int64 // Size is the total size of the log segments of this partition, in bytes. // OffsetLag is how far behind the log end offset this partition is. // The math is: // // if IsFuture { // logEndOffset - futureLogEndOffset // } else { // max(highWaterMark - logEndOffset) // } // OffsetLag int64 // IsFuture is true if this replica was created by an // AlterReplicaLogDirsRequest and will replace the current log of the // replica in the future. IsFuture bool }
DescribedLogDirPartition is the information for a single partitions described log directory.
func (DescribedLogDirPartition) Less ¶
func (p DescribedLogDirPartition) Less(other DescribedLogDirPartition) bool
Less returns if one dir partition is less than the other, by dir, topic, partition, and size.
func (DescribedLogDirPartition) LessBySize ¶
func (p DescribedLogDirPartition) LessBySize(other DescribedLogDirPartition) bool
LessBySize returns if one dir partition is less than the other by size, otherwise by normal Less semantics.
type DescribedLogDirTopics ¶
type DescribedLogDirTopics map[string]map[int32]DescribedLogDirPartition
DescribedLogDirTopics contains per-partition described log directories.
func (DescribedLogDirTopics) Each ¶
func (ds DescribedLogDirTopics) Each(fn func(p DescribedLogDirPartition))
Each calls fn for every partition.
func (DescribedLogDirTopics) Lookup ¶
func (ds DescribedLogDirTopics) Lookup(t string, p int32) (DescribedLogDirPartition, bool)
Lookup returns the described partition if it exists.
func (DescribedLogDirTopics) Size ¶
func (ds DescribedLogDirTopics) Size() int64
Size returns the total size of all partitions in this directory.
func (DescribedLogDirTopics) Sorted ¶
func (ds DescribedLogDirTopics) Sorted() []DescribedLogDirPartition
Sorted returns all partitions sorted by topic then partition.
func (DescribedLogDirTopics) SortedBySize ¶
func (ds DescribedLogDirTopics) SortedBySize() []DescribedLogDirPartition
SortedBySize returns all partitions sorted by smallest size to largest. If partitions are of equal size, the sorting is topic then partition.
type DescribedLogDirs ¶
type DescribedLogDirs map[string]DescribedLogDir
DescribedLogDirs contains per-directory responses to described log directories for a single broker.
func (DescribedLogDirs) Each ¶
func (ds DescribedLogDirs) Each(fn func(DescribedLogDir))
Each calls fn for each log directory.
func (DescribedLogDirs) EachError ¶
func (ds DescribedLogDirs) EachError(fn func(DescribedLogDir))
EachError calls fn for every directory that has a non-nil error.
func (DescribedLogDirs) EachPartition ¶
func (ds DescribedLogDirs) EachPartition(fn func(d DescribedLogDirPartition))
Each calls fn for each partition in any directory.
func (DescribedLogDirs) Error ¶
func (ds DescribedLogDirs) Error() error
Error iterates over all directories and returns the first error encounted, if any. This can be used to check if describing was entirely successful or not.
func (DescribedLogDirs) LargestPartitionBySize ¶
func (ds DescribedLogDirs) LargestPartitionBySize() (DescribedLogDirPartition, bool)
LargestPartitionBySize returns the largest partition by directory size, or no partition if there are no partitions.
func (DescribedLogDirs) Lookup ¶
func (ds DescribedLogDirs) Lookup(d, t string, p int32) (DescribedLogDirPartition, bool)
Lookup returns the described partition if it exists.
func (DescribedLogDirs) LookupPartition ¶
func (ds DescribedLogDirs) LookupPartition(t string, p int32) (DescribedLogDirPartition, bool)
LookupPartition returns the described partition if it exists in any directory. Brokers should only have one replica of a partition, so this should always find at most one partition.
func (DescribedLogDirs) Ok ¶
func (ds DescribedLogDirs) Ok() bool
Ok returns true if there are no errors. This is a shortcut for ds.Error() == nil.
func (DescribedLogDirs) Size ¶
func (ds DescribedLogDirs) Size() int64
Size returns the total size of all directories.
func (DescribedLogDirs) SmallestPartitionBySize ¶
func (ds DescribedLogDirs) SmallestPartitionBySize() (DescribedLogDirPartition, bool)
SmallestPartitionBySize returns the smallest partition by directory size, or no partition if there are no partitions.
func (DescribedLogDirs) Sorted ¶
func (ds DescribedLogDirs) Sorted() []DescribedLogDir
Sorted returns all directories sorted by dir.
func (DescribedLogDirs) SortedBySize ¶
func (ds DescribedLogDirs) SortedBySize() []DescribedLogDir
SortedBySize returns all directories sorted from smallest total directory size to largest.
func (DescribedLogDirs) SortedPartitions ¶
func (ds DescribedLogDirs) SortedPartitions() []DescribedLogDirPartition
SortedPartitions returns all partitions sorted by dir, then topic, then partition.
func (DescribedLogDirs) SortedPartitionsBySize ¶
func (ds DescribedLogDirs) SortedPartitionsBySize() []DescribedLogDirPartition
SortedPartitionsBySize returns all partitions across all directories sorted by smallest to largest, falling back to by broker, dir, topic, and partition.
type DescribedProducer ¶
type DescribedProducer struct { Leader int32 // Leader is the leader broker for this topic / partition. Topic string // Topic is the topic being produced to. Partition int32 // Partition is the partition being produced to. ProducerID int64 // ProducerID is the producer ID that produced. ProducerEpoch int16 // ProducerEpoch is the epoch that produced. LastSequence int32 // LastSequence is the last sequence number the producer produced. LastTimestamp int64 // LastTimestamp is the last time this producer produced. CoordinatorEpoch int32 // CoordinatorEpoch is the epoch of the transactional coordinator for the last produce. CurrentTxnStartOffset int64 // CurrentTxnStartOffset is the first offset in the transaction. }
DescribedProducer contains the state of a transactional producer's last produce.
func (*DescribedProducer) Less ¶
func (l *DescribedProducer) Less(r *DescribedProducer) bool
Less returns whether the left described producer is less than the right, in order of:
- Topic
- Partition
- ProducerID
- ProducerEpoch
- LastTimestamp
- LastSequence
type DescribedProducers ¶
type DescribedProducers map[int64]DescribedProducer
DescribedProducers maps producer IDs to the full described producer.
func (DescribedProducers) Each ¶
func (ds DescribedProducers) Each(fn func(DescribedProducer))
Each calls fn for each described producer.
func (DescribedProducers) Sorted ¶
func (ds DescribedProducers) Sorted() []DescribedProducer
Sorted returns the described producers sorted by topic, partition, and producer ID.
type DescribedProducersPartition ¶
type DescribedProducersPartition struct { Leader int32 // Leader is the leader broker for this topic / partition. Topic string // Topic is the topic whose producer's were described. Partition int32 // Partition is the partition whose producer's were described. ActiveProducers DescribedProducers // ActiveProducers are producer's actively transactionally producing to this partition. Err error // Err is non-nil if describing this partition failed. }
DescribedProducersPartition is a partition whose producer's were described.
type DescribedProducersPartitions ¶
type DescribedProducersPartitions map[int32]DescribedProducersPartition
DescribedProducersPartitions contains partitions whose producer's were described.
func (DescribedProducersPartitions) Each ¶
func (ds DescribedProducersPartitions) Each(fn func(DescribedProducersPartition))
Each calls fn for each partition.
func (DescribedProducersPartitions) EachProducer ¶
func (ds DescribedProducersPartitions) EachProducer(fn func(DescribedProducer))
EachProducer calls fn for each producer in all partitions.
func (DescribedProducersPartitions) Sorted ¶
func (ds DescribedProducersPartitions) Sorted() []DescribedProducersPartition
Sorted returns the described partitions sorted by topic and partition.
func (DescribedProducersPartitions) SortedProducers ¶
func (ds DescribedProducersPartitions) SortedProducers() []DescribedProducer
SortedProducer returns all producers sorted first by partition, then by producer ID.
type DescribedProducersTopic ¶
type DescribedProducersTopic struct { Topic string // Topic is the topic whose producer's were described. Partitions DescribedProducersPartitions // Partitions are partitions whose producer's were described. }
DescribedProducersTopic contains topic partitions whose producer's were described.
type DescribedProducersTopics ¶
type DescribedProducersTopics map[string]DescribedProducersTopic
DescribedProducersTopics contains topics whose producer's were described.
func (DescribedProducersTopics) Each ¶
func (ds DescribedProducersTopics) Each(fn func(DescribedProducersTopic))
Each calls fn for every topic.
func (DescribedProducersTopics) EachPartition ¶
func (ds DescribedProducersTopics) EachPartition(fn func(DescribedProducersPartition))
EachPartitions calls fn for all topic partitions.
func (DescribedProducersTopics) EachProducer ¶
func (ds DescribedProducersTopics) EachProducer(fn func(DescribedProducer))
EachProducer calls fn for each producer in all topics and partitions.
func (DescribedProducersTopics) Sorted ¶
func (ds DescribedProducersTopics) Sorted() []DescribedProducersTopic
Sorted returns the described topics sorted by topic.
func (DescribedProducersTopics) SortedPartitions ¶
func (ds DescribedProducersTopics) SortedPartitions() []DescribedProducersPartition
Sorted returns the described partitions sorted by topic and partition.
func (DescribedProducersTopics) SortedProducers ¶
func (ds DescribedProducersTopics) SortedProducers() []DescribedProducer
SortedProducer returns all producers sorted first by partition, then by producer ID.
type DescribedTransaction ¶
type DescribedTransaction struct { Coordinator int32 // Coordinator is the coordinator broker for this transactional ID. TxnID string // TxnID is the name of this transactional ID. State string // State is the state this transaction is in (Empty, Ongoing, PrepareCommit, PrepareAbort, CompleteCommit, CompleteAbort, Dead, PrepareEpochFence). TimeoutMillis int32 // TimeoutMillis is the timeout of this transaction in milliseconds. StartTimestamp int64 // StartTimestamp is millisecond when this transaction started. ProducerID int64 // ProducerID is the ID in use by the transactional ID. ProducerEpoch int16 // ProducerEpoch is the epoch associated with the produce rID. // Topics is the set of partitions in the transaction, if active. When // preparing to commit or abort, this includes only partitions which do // not have markers. This does not include topics the user is not // authorized to describe. Topics TopicsSet Err error // Err is non-nil if the transaction could not be described. }
DescribedTransaction contains data from a describe transactions response for a single transactional ID.
type DescribedTransactions ¶
type DescribedTransactions map[string]DescribedTransaction
DescribedTransactions contains information from a describe transactions response.
func (DescribedTransactions) Each ¶
func (ds DescribedTransactions) Each(fn func(DescribedTransaction))
Each calls fn for each described transaction.
func (DescribedTransactions) On ¶
func (rs DescribedTransactions) On(txnID string, fn func(*DescribedTransaction) error) (DescribedTransaction, error)
On calls fn for the transactional ID if it exists, returning the transaction and the error returned from fn. If fn is nil, this simply returns the transaction.
The fn is given a shallow copy of the transaction. This function returns the copy as well; any modifications within fn are modifications on the returned copy. Modifications on a described transaction's inner fields are persisted to the original map (because slices are pointers).
If the transaction does not exist, this returns kerr.TransactionalIDNotFound.
func (DescribedTransactions) Sorted ¶
func (ds DescribedTransactions) Sorted() []DescribedTransaction
Sorted returns all described transactions sorted by transactional ID.
func (DescribedTransactions) TransactionalIDs ¶
func (ds DescribedTransactions) TransactionalIDs() []string
TransactionalIDs returns a sorted list of all transactional IDs.
type DescribedUserSCRAM ¶
type DescribedUserSCRAM struct { User string // User is the user this described user credential is for. CredInfos []CredInfo // CredInfos contains SCRAM mechanisms the user has passwords for. Err error // Err is any error encountered when describing the user. ErrMessage string // ErrMessage a potential extra message describing any error. }
DescribedUserSCRAM contains a user, the SCRAM mechanisms that the user has passwords for, and if describing the user SCRAM credentials errored.
type DescribedUserSCRAMs ¶
type DescribedUserSCRAMs map[string]DescribedUserSCRAM
DescribedUserSCRAMs contains described user SCRAM credentials keyed by user.
func (DescribedUserSCRAMs) AllFailed ¶
func (ds DescribedUserSCRAMs) AllFailed() bool
AllFailed returns whether all described user credentials are errored.
func (DescribedUserSCRAMs) Each ¶
func (ds DescribedUserSCRAMs) Each(fn func(DescribedUserSCRAM))
Each calls fn for every described user.
func (DescribedUserSCRAMs) EachError ¶
func (ds DescribedUserSCRAMs) EachError(fn func(DescribedUserSCRAM))
EachError calls fn for every described user that has a non-nil error.
func (DescribedUserSCRAMs) Error ¶
func (ds DescribedUserSCRAMs) Error() error
Error iterates over all described users and returns the first error encountered, if any.
func (DescribedUserSCRAMs) Ok ¶
func (ds DescribedUserSCRAMs) Ok() bool
Ok returns true if there are no errors. This is a shortcut for rs.Error() == nil.
func (DescribedUserSCRAMs) Sorted ¶
func (ds DescribedUserSCRAMs) Sorted() []DescribedUserSCRAM
Sorted returns the described user credentials ordered by user.
type ElectLeadersHow ¶
type ElectLeadersHow int8
ElectLeadersHow is how partition leaders should be elected.
const ( // ElectPreferredReplica elects the preferred replica for a partition. ElectPreferredReplica ElectLeadersHow = 0 // ElectLiveReplica elects the first life replica if there are no // in-sync replicas (i.e., this is unclean leader election). ElectLiveReplica ElectLeadersHow = 1 )
type ElectLeadersResult ¶
type ElectLeadersResult struct { Topic string // Topic is the topic this result is for. Partition int32 // Partition is the partition this result is for. How ElectLeadersHow // How is the type of election that was performed. Err error // Err is non-nil if electing this partition's leader failed, such as the partition not existing or the preferred leader is not available and you used ElectPreferredReplica. ErrMessage string // ErrMessage a potential extra message describing any error. }
ElectLeadersResult is the result for a single partition in an elect leaders request.
type ElectLeadersResults ¶
type ElectLeadersResults map[string]map[int32]ElectLeadersResult
ElectLeadersResults contains per-topic, per-partition results for an elect leaders request.
type FetchOffsetsResponse ¶
type FetchOffsetsResponse struct { Group string // Group is the offsets these fetches correspond to. Fetched OffsetResponses // Fetched contains offsets fetched for this group, if any. Err error // Err contains any error preventing offsets from being fetched. }
FetchOffsetsResponse contains a fetch offsets response for a single group.
func (FetchOffsetsResponse) CommittedPartitions ¶
func (r FetchOffsetsResponse) CommittedPartitions() TopicsSet
CommittedPartitions returns the set of unique topics and partitions that have been committed to in this group.
type FetchOffsetsResponses ¶
type FetchOffsetsResponses map[string]FetchOffsetsResponse
FetchOFfsetsResponses contains responses for many fetch offsets requests.
func (FetchOffsetsResponses) AllFailed ¶
func (rs FetchOffsetsResponses) AllFailed() bool
AllFailed returns whether all fetch offsets requests failed.
func (FetchOffsetsResponses) CommittedPartitions ¶
func (rs FetchOffsetsResponses) CommittedPartitions() TopicsSet
CommittedPartitions returns the set of unique topics and partitions that have been committed to across all members in all responses. This is the all-group analogue to FetchOffsetsResponse.CommittedPartitions.
func (FetchOffsetsResponses) EachError ¶
func (rs FetchOffsetsResponses) EachError(fn func(FetchOffsetsResponse))
EachError calls fn for every response that as a non-nil error.
func (FetchOffsetsResponses) On ¶
func (rs FetchOffsetsResponses) On(group string, fn func(*FetchOffsetsResponse) error) (FetchOffsetsResponse, error)
On calls fn for the response group if it exists, returning the response and the error returned from fn. If fn is nil, this simply returns the group.
The fn is given a copy of the response. This function returns the copy as well; any modifications within fn are modifications on the returned copy.
If the group does not exist, this returns kerr.GroupIDNotFound.
type FindCoordinatorResponse ¶
type FindCoordinatorResponse struct { Name string // Name is the coordinator key this response is for. NodeID int32 // NodeID is the node ID of the coordinator for this key. Host string // Host is the host of the coordinator for this key. Port int32 // Port is the port of the coordinator for this key. Err error // Err is any error encountered when requesting the coordinator. ErrMessage string // ErrMessage a potential extra message describing any error. }
FindCoordinatorResponse contains information for the coordinator for a group or transactional ID.
type FindCoordinatorResponses ¶
type FindCoordinatorResponses map[string]FindCoordinatorResponse
FindCoordinatorResponses contains responses to finding coordinators for groups or transactions.
func (FindCoordinatorResponses) AllFailed ¶
func (rs FindCoordinatorResponses) AllFailed() bool
AllFailed returns whether all responses are errored.
func (FindCoordinatorResponses) Each ¶
func (rs FindCoordinatorResponses) Each(fn func(FindCoordinatorResponse))
Each calls fn for every response.
func (FindCoordinatorResponses) EachError ¶
func (rs FindCoordinatorResponses) EachError(fn func(FindCoordinatorResponse))
EachError calls fn for every response that has a non-nil error.
func (FindCoordinatorResponses) Error ¶
func (rs FindCoordinatorResponses) Error() error
Error iterates over all responses and returns the first error encountered, if any.
func (FindCoordinatorResponses) Ok ¶
func (rs FindCoordinatorResponses) Ok() bool
Ok returns true if there are no errors. This is a shortcut for rs.Error() == nil.
func (FindCoordinatorResponses) Sorted ¶
func (rs FindCoordinatorResponses) Sorted() []FindCoordinatorResponse
Sorted returns all coordinator responses sorted by name.
type GroupLag ¶
type GroupLag map[string]map[int32]GroupMemberLag
GroupLag is the per-topic, per-partition lag of members in a group.
func CalculateGroupLag ¶
func CalculateGroupLag( group DescribedGroup, commit OffsetResponses, endOffsets ListedOffsets, ) GroupLag
CalculateGroupLag returns the per-partition lag of all members in a group. The input to this method is the returns from the following methods (make sure to check shard errors):
// Note that FetchOffsets exists to fetch only one group's offsets, // but some of the code below slightly changes. groups := DescribeGroups(ctx, group) commits := FetchManyOffsets(ctx, group) var endOffsets ListedOffsets listPartitions := described.AssignedPartitions() listPartitions.Merge(commits.CommittedPartitions() if topics := listPartitions.Topics(); len(topics) > 0 { endOffsets = ListEndOffsets(ctx, listPartitions.Topics()) } for _, group := range groups { lag := CalculateGroupLag(group, commits[group.Group].Fetched, endOffsets) }
If assigned partitions are missing in the listed end offsets, the partition will have an error indicating it is missing. A missing topic or partition in the commits is assumed to be nothing committing yet.
func (GroupLag) Lookup ¶
func (l GroupLag) Lookup(t string, p int32) (GroupMemberLag, bool)
Lookup returns the lag at t and p and whether it exists.
func (GroupLag) Sorted ¶
func (l GroupLag) Sorted() []GroupMemberLag
Sorted returns the per-topic, per-partition lag by member sorted in order by topic then partition.
func (GroupLag) TotalByTopic ¶
func (l GroupLag) TotalByTopic() GroupTopicsLag
TotalByTopic returns the total lag for each topic.
type GroupMemberAssignment ¶
type GroupMemberAssignment struct {
// contains filtered or unexported fields
}
GroupMemberAssignment is the assignment that a leader sent / a member received in a SyncGroup request. This can have one of three types:
*kmsg.ConsumerMemberAssignment, if the group's ProtocolType is "consumer" *kmsg.ConnectMemberAssignment, if the group's ProtocolType is "connect" []byte, if the group's ProtocolType is unknown
func (GroupMemberAssignment) AsConnect ¶
func (m GroupMemberAssignment) AsConnect() (*kmsg.ConnectMemberAssignment, bool)
AsConnect returns the assignment as ConnectMemberAssignment if possible.
func (GroupMemberAssignment) AsConsumer ¶
func (m GroupMemberAssignment) AsConsumer() (*kmsg.ConsumerMemberAssignment, bool)
AsConsumer returns the assignment as a ConsumerMemberAssignment if possible.
func (GroupMemberAssignment) Raw ¶
func (m GroupMemberAssignment) Raw() ([]byte, bool)
Raw returns the assignment as a raw byte slice, if it is neither of consumer type nor connect type.
type GroupMemberLag ¶
type GroupMemberLag struct { // Member is a reference to the group member consuming this partition. // If the group is in state Empty, the member will be nil. Member *DescribedGroupMember Commit Offset // Commit is this member's current offset commit. End ListedOffset // EndOffset is a reference to the end offset of this partition. Lag int64 // Lag is how far behind this member is, or -1 if there is a commit error or list offset error. Err error // Err is either the commit error, or the list end offsets error, or nil. }
GroupMemberLag is the lag between a group member's current offset commit and the current end offset.
If either the offset commits have load errors, or the listed end offsets have load errors, the Lag field will be -1 and the Err field will be set (to the first of either the commit error, or else the list error).
If the group is in the Empty state, lag is calculated for all partitions in a topic, but the member is nil. The calculate function assumes that any assigned topic is meant to be entirely consumed. If the group is Empty and topics could not be listed, some partitions may be missing.
func (*GroupMemberLag) IsEmpty ¶
func (g *GroupMemberLag) IsEmpty() bool
IsEmpty returns if the this lag is for a group in the Empty state.
type GroupMemberMetadata ¶
type GroupMemberMetadata struct {
// contains filtered or unexported fields
}
GroupMemberMetadata is the metadata that a client sent in a JoinGroup request. This can have one of three types:
*kmsg.ConsumerMemberMetadata, if the group's ProtocolType is "consumer" *kmsg.ConnectMemberMetadata, if the group's ProtocolType is "connect" []byte, if the group's ProtocolType is unknown
func (GroupMemberMetadata) AsConnect ¶
func (m GroupMemberMetadata) AsConnect() (*kmsg.ConnectMemberMetadata, bool)
AsConnect returns the metadata as ConnectMemberMetadata if possible.
func (GroupMemberMetadata) AsConsumer ¶
func (m GroupMemberMetadata) AsConsumer() (*kmsg.ConsumerMemberMetadata, bool)
AsConsumer returns the metadata as a ConsumerMemberMetadata if possible.
func (GroupMemberMetadata) Raw ¶
func (m GroupMemberMetadata) Raw() ([]byte, bool)
Raw returns the metadata as a raw byte slice, if it is neither of consumer type nor connect type.
type GroupTopicsLag ¶
GroupTopicsLag is the total lag per topic within a group.
func (GroupTopicsLag) Sorted ¶
func (l GroupTopicsLag) Sorted() []TopicLag
Sorted returns the per-topic lag, sorted by topic.
type IncrementalOp ¶
type IncrementalOp int8
IncrementalOp is a typed int8 that is used for incrementally updating configuration keys for topics and brokers.
const ( // SetConfig is an incremental operation to set an individual config // key. SetConfig IncrementalOp = iota // DeleteConfig is an incremental operation to delete an individual // config key. DeleteConfig // AppendConfig is an incremental operation to append a value to a // config key that is a list type. AppendConfig // SubtractConfig is an incremental operation to remove a value from a // config key that is a list type. SubtractConfig )
type LeaveGroupBuilder ¶
type LeaveGroupBuilder struct {
// contains filtered or unexported fields
}
LeaveGroupBuilder helps build a leave group request, rather than having a function signature (string, string, ...string).
All functions on this type accept and return the same pointer, allowing for easy build-and-use usage.
func LeaveGroup ¶
func LeaveGroup(group string) *LeaveGroupBuilder
LeaveGroup returns a LeaveGroupBuilder for the input group.
func (*LeaveGroupBuilder) InstanceIDs ¶
func (b *LeaveGroupBuilder) InstanceIDs(ids ...string) *LeaveGroupBuilder
InstanceIDs are members to remove from a group.
func (*LeaveGroupBuilder) Reason ¶
func (b *LeaveGroupBuilder) Reason(reason string) *LeaveGroupBuilder
Reason attaches a reason to all members in the leave group request. This requires Kafka 3.2+.
type LeaveGroupResponse ¶
type LeaveGroupResponse struct { Group string // Group is the group that was left. InstanceID string // InstanceID is the instance ID that left the group. MemberID string // MemberID is the member ID that left the group. Err error // Err is non-nil if this member did not exist. }
LeaveGroupResponse contains the response for an individual instance ID that left a group.
type LeaveGroupResponses ¶
type LeaveGroupResponses map[string]LeaveGroupResponse
LeaveGroupResponses contains responses for each member of a leave group request. The map key is the instance ID that was removed from the group.
func (LeaveGroupResponses) Each ¶
func (ls LeaveGroupResponses) Each(fn func(l LeaveGroupResponse))
Each calls fn for every removed member.
func (LeaveGroupResponses) EachError ¶
func (ls LeaveGroupResponses) EachError(fn func(l LeaveGroupResponse))
EachError calls fn for every removed member that has a non-nil error.
func (LeaveGroupResponses) Error ¶
func (ls LeaveGroupResponses) Error() error
Error iterates over all removed members and returns the first error encountered, if any.
func (LeaveGroupResponses) Ok ¶
func (ls LeaveGroupResponses) Ok() bool
Ok returns true if there are no errors. This is a shortcut for ls.Error() == nil.
func (LeaveGroupResponses) Sorted ¶
func (ls LeaveGroupResponses) Sorted() []LeaveGroupResponse
Sorted returns all removed group members by instance ID.
type ListPartitionReassignmentsResponse ¶
type ListPartitionReassignmentsResponse struct { Topic string // Topic is the topic that was listed. Partition int32 // Partition is the partition that was listed. Replicas []int32 // Replicas are the partition's current replicas. AddingReplicas []int32 // AddingReplicas are replicas currently being added to the partition. RemovingReplicas []int32 // RemovingReplicas are replicas currently being removed from the partition. }
ListPartitionReassignmentsResponse contains a response for an individual partition that was listed.
type ListPartitionReassignmentsResponses ¶
type ListPartitionReassignmentsResponses map[string]map[int32]ListPartitionReassignmentsResponse
ListPartitionReassignmentsResponses contains responses to all partitions in a list reassignment request.
func (ListPartitionReassignmentsResponses) Each ¶
func (rs ListPartitionReassignmentsResponses) Each(fn func(ListPartitionReassignmentsResponse))
Each calls fn for every response.
func (ListPartitionReassignmentsResponses) Sorted ¶
func (rs ListPartitionReassignmentsResponses) Sorted() []ListPartitionReassignmentsResponse
Sorted returns the responses sorted by topic and partition.
type ListedGroup ¶
type ListedGroup struct { Coordinator int32 // Coordinator is the node ID of the coordinator for this group. Group string // Group is the name of this group. ProtocolType string // ProtocolType is the type of protocol the group is using, "consumer" for normal consumers, "connect" for Kafka connect. State string // State is the state this group is in (Empty, Dead, Stable, etc.; only if talking to Kafka 2.6+). }
ListedGroup contains data from a list groups response for a single group.
type ListedGroups ¶
type ListedGroups map[string]ListedGroup
ListedGroups contains information from a list groups response.
func (ListedGroups) Groups ¶
func (ls ListedGroups) Groups() []string
Groups returns a sorted list of all group names.
func (ListedGroups) Sorted ¶
func (ls ListedGroups) Sorted() []ListedGroup
Sorted returns all groups sorted by group name.
type ListedOffset ¶
type ListedOffset struct { Topic string // Topic is the topic this offset is for. Partition int32 // Partition is the partition this offset is for. Timestamp int64 // Timestamp is the millisecond of the offset if listing after a time, otherwise -1. Offset int64 // Offset is the record offset, or -1 if one could not be found. LeaderEpoch int32 // LeaderEpoch is the leader epoch at this offset, if any, otherwise -1. Err error // Err is non-nil if the partition has a load error. }
ListedOffset contains record offset information.
type ListedOffsets ¶
type ListedOffsets map[string]map[int32]ListedOffset
ListedOffsets contains per-partition record offset information that is returned from any of the List.*Offsets functions.
func (ListedOffsets) Each ¶
func (l ListedOffsets) Each(fn func(ListedOffset))
Each calls fn for each listed offset.
func (ListedOffsets) Error ¶
func (l ListedOffsets) Error() error
Error iterates over all offsets and returns the first error encountered, if any. This can be to check if a listing was entirely successful or not.
Note that offset listing can be partially successful. For example, some offsets could succeed to be listed, while other could fail (maybe one partition is offline). If this is something you need to worry about, you may need to check all offsets manually.
func (ListedOffsets) KOffsets ¶
func (l ListedOffsets) KOffsets() map[string]map[int32]kgo.Offset
KOffsets returns these listed offsets as a kgo offset map.
func (ListedOffsets) Lookup ¶
func (l ListedOffsets) Lookup(t string, p int32) (ListedOffset, bool)
Lookup returns the offset at t and p and whether it exists.
func (ListedOffsets) Offsets ¶
func (l ListedOffsets) Offsets() Offsets
Offsets returns these listed offsets as offsets.
type ListedTransaction ¶
type ListedTransaction struct { Coordinator int32 // Coordinator the coordinator broker for this transactional ID. TxnID string // TxnID is the name of this transactional ID. ProducerID int64 // ProducerID is the producer ID for this transaction. State string // State is the state this transaction is in (Empty, Ongoing, PrepareCommit, PrepareAbort, CompleteCommit, CompleteAbort, Dead, PrepareEpochFence). }
ListedTransaction contains data from a list transactions response for a single transactional ID.
type ListedTransactions ¶
type ListedTransactions map[string]ListedTransaction
ListedTransactions contains information from a list transactions response.
func (ListedTransactions) Each ¶
func (ls ListedTransactions) Each(fn func(ListedTransaction))
Each calls fn for each listed transaction.
func (ListedTransactions) Sorted ¶
func (ls ListedTransactions) Sorted() []ListedTransaction
Sorted returns all transactions sorted by transactional ID.
func (ListedTransactions) TransactionalIDs ¶
func (ls ListedTransactions) TransactionalIDs() []string
TransactionalIDs returns a sorted list of all transactional IDs.
type Metadata ¶
type Metadata struct { Cluster string // Cluster is the cluster name, if any. Controller int32 // Controller is the node ID of the controller broker, if available, otherwise -1. Brokers BrokerDetails // Brokers contains broker details, sorted by default. Topics TopicDetails // Topics contains topic details. }
Metadata is the data from a metadata response.
type Offset ¶
type Offset struct { Topic string Partition int32 At int64 // Offset is the partition to set. LeaderEpoch int32 // LeaderEpoch is the broker leader epoch of the record at this offset. Metadata string // Metadata, if non-empty, is used for offset commits. }
Offset is an offset for a topic.
type OffsetForLeaderEpoch ¶
type OffsetForLeaderEpoch struct { NodeID int32 // NodeID is the node that is the leader of this topic / partition. Topic string // Topic is the topic this leader epoch response is for. Partition int32 // Partition is the partition this leader epoch response is for. // LeaderEpoch is either // // 1) -1, if the requested LeaderEpoch is unknown. // // 2) Less than the requested LeaderEpoch, if the requested LeaderEpoch // exists but has no records in it. For example, epoch 1 had end offset // 37, then epoch 2 and 3 had no records: if you request LeaderEpoch 3, // this will return LeaderEpoch 1 with EndOffset 37. // // 3) Equal to the requested LeaderEpoch, if the requested LeaderEpoch // is equal to or less than the current epoch for the partition. LeaderEpoch int32 // EndOffset is either // // 1) The LogEndOffset, if the broker has the same LeaderEpoch as the // request. // // 2) the beginning offset of the next LeaderEpoch, if the broker has a // higher LeaderEpoch. // // The second option allows the user to detect data loss: if the // consumer consumed past the EndOffset that is returned, then the // consumer should reset to the returned offset and the consumer knows // that everything from the returned offset to the requested offset was // lost. EndOffset int64 // Err is non-nil if this partition had a response error. Err error }
OffsetForLeaderEpoch contains a response for a single partition in an OffsetForLeaderEpoch request.
type OffsetForLeaderEpochRequest ¶
OffsetForLeaderEpochRequest contains topics, partitions, and leader epochs to request offsets for in an OffsetForLeaderEpoch.
func (*OffsetForLeaderEpochRequest) Add ¶
func (l *OffsetForLeaderEpochRequest) Add(topic string, partition, leaderEpoch int32)
Add adds a topic, partition, and leader epoch to the request.
type OffsetResponse ¶
OffsetResponse contains the response for an individual offset for offset methods.
type OffsetResponses ¶
type OffsetResponses map[string]map[int32]OffsetResponse
OffsetResponses contains per-partition responses to offset methods.
func (*OffsetResponses) Add ¶
func (os *OffsetResponses) Add(o OffsetResponse)
Add adds an offset for a given topic/partition to this OffsetResponses map (even if it exists).
func (OffsetResponses) DeleteFunc ¶
func (os OffsetResponses) DeleteFunc(fn func(OffsetResponse) bool)
DeleteFunc deletes any offset for which fn returns true.
func (OffsetResponses) Each ¶
func (os OffsetResponses) Each(fn func(OffsetResponse))
Each calls fn for every offset.
func (OffsetResponses) EachError ¶
func (os OffsetResponses) EachError(fn func(o OffsetResponse))
EachError calls fn for every offset that as a non-nil error.
func (OffsetResponses) Error ¶
func (os OffsetResponses) Error() error
Error iterates over all offsets and returns the first error encountered, if any. This can be used to check if an operation was entirely successful or not.
Note that offset operations can be partially successful. For example, some offsets could succeed in an offset commit while others fail (maybe one topic does not exist for some reason, or you are not authorized for one topic). If this is something you need to worry about, you may need to check all offsets manually.
func (OffsetResponses) KOffsets ¶
func (os OffsetResponses) KOffsets() map[string]map[int32]kgo.Offset
KOffsets returns these offset responses as a kgo offset map.
func (OffsetResponses) Keep ¶
func (os OffsetResponses) Keep(o Offsets)
Keep filters the responses to only keep the input offsets.
func (OffsetResponses) KeepFunc ¶
func (os OffsetResponses) KeepFunc(fn func(OffsetResponse) bool)
DeleteFunc keeps only the offsets for which fn returns true.
func (OffsetResponses) Lookup ¶
func (os OffsetResponses) Lookup(t string, p int32) (OffsetResponse, bool)
Lookup returns the offset at t and p and whether it exists.
func (OffsetResponses) Offsets ¶
func (os OffsetResponses) Offsets() Offsets
Offsets returns these offset responses as offsets.
func (OffsetResponses) Ok ¶
func (os OffsetResponses) Ok() bool
Ok returns true if there are no errors. This is a shortcut for os.Error() == nil.
func (OffsetResponses) Partitions ¶
func (os OffsetResponses) Partitions() TopicsSet
Partitions returns the set of unique topics and partitions in these offsets.
func (OffsetResponses) Sorted ¶
func (os OffsetResponses) Sorted() []OffsetResponse
Sorted returns the responses sorted by topic and partition.
type Offsets ¶
Offsets wraps many offsets and is the type used for offset functions.
func OffsetsFromFetches ¶
OffsetsFromFetches returns Offsets for the final record in any partition in the fetches. This is a helper to enable committing an entire returned batch.
This function looks at only the last record per partition, assuming that the last record is the highest offset (which is the behavior returned by kgo's Poll functions). The returned offsets are one past the offset contained in the records.
func OffsetsFromRecords ¶
OffsetsFromRecords returns offsets for all given records, using the highest offset per partition. The returned offsets are one past the offset contained in the records.
func (*Offsets) Add ¶
Add adds an offset for a given topic/partition to this Offsets map.
If the partition already exists, the offset is only added if:
- the new leader epoch is higher than the old, or
- the leader epochs equal, and the new offset is higher than the old
If you would like to add offsets forcefully no matter what, use the Delete method before this.
func (*Offsets) AddOffset ¶
AddOffset is a helper to add an offset for a given topic and partition. The leader epoch field must be -1 if you do not know the leader epoch or if you do not have an offset yet.
func (Offsets) DeleteFunc ¶
DeleteFunc calls fn for every offset, deleting the offset if fn returns true.
func (Offsets) KeepFunc ¶
KeepFunc calls fn for every offset, keeping the offset if fn returns true.
type OffsetsForLeaderEpochs ¶
type OffsetsForLeaderEpochs map[string]map[int32]OffsetForLeaderEpoch
OffsetsForLeaderEpochs contains responses for partitions in a OffsetForLeaderEpochRequest.
type OffsetsList ¶
type OffsetsList []Offset
OffsetsList wraps many offsets and is a helper for building Offsets.
func (OffsetsList) KOffsets ¶
func (l OffsetsList) KOffsets() map[string]map[int32]kgo.Offset
KOffsets returns this list as a kgo offset map.
func (OffsetsList) Offsets ¶
func (l OffsetsList) Offsets() Offsets
Offsets returns this list as the non-list Offsets. All fields in each Offset must be set properly.
type Partition ¶
type Partition struct { Topic string // Topic is the topic for this partition. Partition int32 // Partition is this partition's number. }
Partition is a partition for a topic.
type PartitionDetail ¶
type PartitionDetail struct { Topic string // Topic is the topic this partition belongs to. Partition int32 // Partition is the partition number these details are for. Leader int32 // Leader is the broker leader, if there is one, otherwise -1. LeaderEpoch int32 // LeaderEpoch is the leader's current epoch. Replicas []int32 // Replicas is the list of replicas. ISR []int32 // ISR is the list of in sync replicas. OfflineReplicas []int32 // OfflineReplicas is the list of offline replicas. Err error // Err is non-nil if the partition currently has a load error. }
PartitionDetail is the detail of a partition as returned by a metadata response. If the partition fails to load / has an error, then only the partition number itself and the Err fields will be set.
type PartitionDetails ¶
type PartitionDetails map[int32]PartitionDetail
PartitionDetails contains details for partitions as returned by a metadata response.
func (PartitionDetails) NumReplicas ¶
func (ds PartitionDetails) NumReplicas() int
NumReplicas returns the number of replicas for these partitions
It is assumed that all partitions have the same number of replicas, so this simply returns the number of replicas in the first encountered partition.
func (PartitionDetails) Numbers ¶
func (ds PartitionDetails) Numbers() []int32
Numbers returns a sorted list of all partition numbers.
func (PartitionDetails) Sorted ¶
func (ds PartitionDetails) Sorted() []PartitionDetail
Sorted returns the partitions in sorted order.
type Partitions ¶
type Partitions []Partition
Partitions wraps many partitions.
func (Partitions) TopicsList ¶
func (ps Partitions) TopicsList() TopicsList
TopicsList returns these partitions as sorted TopicsList.
func (Partitions) TopicsSet ¶
func (ps Partitions) TopicsSet() TopicsSet
TopicsSet returns these partitions as TopicsSet.
type Principal ¶
type Principal struct { Type string // Type is the type of a principal owner or renewer. If empty, this defaults to "User". Name string // Name is the name of a principal owner or renewer. }
Principal is a principal that owns or renews a delegation token. This is the same as an ACL's principal, but rather than being a single string, the type and name are split into two fields.
type QuotasMatchType ¶
type QuotasMatchType = kmsg.QuotasMatchType
QuotasMatchType specifies how to match a described client quota entity.
0 means to match the name exactly: user=foo will only match components of entity type "user" and entity name "foo".
1 means to match the default of the name: entity type "user" with a default match will return the default quotas for user entities.
2 means to match any name: entity type "user" with any matching will return both names and defaults.
type ResourceConfig ¶
type ResourceConfig struct { Name string // Name is the name of this resource. Configs []Config // Configs are the configs for this topic. Err error // Err is any error preventing configs from loading (likely, an unknown topic). }
ResourceConfig contains the configuration values for a resource (topic, broker, broker logger).
type ResourceConfigs ¶
type ResourceConfigs []ResourceConfig
ResourceConfigs contains the configuration values for many resources.
func (ResourceConfigs) On ¶
func (rs ResourceConfigs) On(name string, fn func(*ResourceConfig) error) (ResourceConfig, error)
On calls fn for the response config if it exists, returning the config and the error returned from fn. If fn is nil, this simply returns the config.
The fn is given a copy of the config. This function returns the copy as well; any modifications within fn are modifications on the returned copy.
If the resource does not exist, this returns kerr.UnknownTopicOrPartition.
type ScramMechanism ¶
type ScramMechanism int8
ScramMechanism is a SCRAM mechanism.
const ( // ScramSha256 represents the SCRAM-SHA-256 mechanism. ScramSha256 ScramMechanism = 1 // ScramSha512 represents the SCRAM-SHA-512 mechanism. ScramSha512 ScramMechanism = 2 )
func (ScramMechanism) String ¶
func (s ScramMechanism) String() string
String returns either SCRAM-SHA-256, SCRAM-SHA-512, or UNKNOWN.
type ShardError ¶
type ShardError struct { Req kmsg.Request // Req is a piece of the original request. Err error // Err is the error that resulted in this request failing. // Broker, if non-nil, is the broker this request was meant to be // issued to. If the NodeID is -1, then this piece of the request // failed before being mapped to a broker. Broker BrokerDetail }
ShardError is a piece of a request that failed. See ShardErrors for more detail.
type ShardErrors ¶
type ShardErrors struct { Name string // Name is the name of the request these shard errors are for. AllFailed bool // AllFailed indicates if the original request was entirely unsuccessful. Errs []ShardError // Errs contains all individual shard errors. }
ShardErrors contains each individual error shard of a request.
Under the hood, some requests to Kafka need to be mapped to brokers, split, and sent to many brokers. The kgo.Client handles this all internally, but returns the individual pieces that were requested as "shards". Internally, each of these pieces can also fail, and they can all fail uniquely.
The kadm package takes one further step and hides the failing pieces into one meta error, the ShardErrors. Methods in this package that can return this meta error are documented; if desired, you can use errors.As to check and unwrap any ShardErrors return.
If a request returns ShardErrors, it is possible that some aspects of the request were still successful. You can check ShardErrors.AllFailed as a shortcut for whether any of the response is usable or not.
func (*ShardErrors) Error ¶
func (e *ShardErrors) Error() string
Error returns an error indicating the name of the request that failed, the number of separate errors, and the first error.
type TopicDetail ¶
type TopicDetail struct { Topic string // Topic is the topic these details are for. ID TopicID // TopicID is the topic's ID, or all 0 if the broker does not support IDs. IsInternal bool // IsInternal is whether the topic is an internal topic. Partitions PartitionDetails // Partitions contains details about the topic's partitions. Err error // Err is non-nil if the topic could not be loaded. }
TopicDetail is the detail of a topic as returned by a metadata response. If the topic fails to load / has an error, then there will be no partitions.
type TopicDetails ¶
type TopicDetails map[string]TopicDetail
TopicDetails contains details for topics as returned by a metadata response.
func (TopicDetails) EachError ¶
func (ds TopicDetails) EachError(fn func(TopicDetail))
EachError calls fn for each topic that could not be loaded.
func (TopicDetails) EachPartition ¶
func (ds TopicDetails) EachPartition(fn func(PartitionDetail))
EachPartition calls fn for every partition in all topics.
func (TopicDetails) FilterInternal ¶
func (ds TopicDetails) FilterInternal()
FilterInternal deletes any internal topics from this set of topic details.
func (TopicDetails) Has ¶
func (ds TopicDetails) Has(topic string) bool
Has returns whether the topic details has the given topic and, if so, that the topic's load error is not an unknown topic error.
func (TopicDetails) Names ¶
func (ds TopicDetails) Names() []string
Topics returns a sorted list of all topic names.
func (TopicDetails) Sorted ¶
func (ds TopicDetails) Sorted() []TopicDetail
Sorted returns all topics in sorted order.
func (TopicDetails) TopicsList ¶
func (ds TopicDetails) TopicsList() TopicsList
TopicsList returns the topics and partitions as a list.
func (TopicDetails) TopicsSet ¶
func (ds TopicDetails) TopicsSet() TopicsSet
TopicsSet returns the topics and partitions as a set.
type TopicID ¶
type TopicID [16]byte
TopicID is the 16 byte underlying topic ID.
func (TopicID) MarshalJSON ¶
MarshalJSON returns the topic ID encoded as quoted base64.
type TopicPartitions ¶
TopicPartitions is a topic and partitions.
type TopicsList ¶
type TopicsList []TopicPartitions
TopicsList is a list of topics and partitions.
func (TopicsList) Each ¶
func (l TopicsList) Each(fn func(t string, p int32))
Each calls fn for each topic / partition in the topics list.
func (TopicsList) EachPartitions ¶
func (l TopicsList) EachPartitions(fn func(t string, ps []int32))
EachPartitions calls fn for each topic and its partitions in the topics list.
func (TopicsList) EmptyTopics ¶
func (l TopicsList) EmptyTopics() []string
EmptyTopics returns all topics with no partitions.
func (TopicsList) IntoSet ¶
func (l TopicsList) IntoSet() TopicsSet
IntoSet returns this list as a set.
func (TopicsList) Topics ¶
func (l TopicsList) Topics() []string
Topics returns all topics in this set in sorted order.
type TopicsSet ¶
TopicsSet is a set of topics and, per topic, a set of partitions.
All methods provided for TopicsSet are safe to use on a nil (default) set.
func (*TopicsSet) Add ¶
Add adds partitions for a topic to the topics set. If no partitions are added, this still creates the topic.
func (TopicsSet) Delete ¶
Delete removes partitions from a topic from the topics set. If the topic ends up with no partitions, the topic is removed from the set.
func (TopicsSet) EachPartitions ¶
EachPartitions calls fn for each topic and its partitions in the topics set.
func (TopicsSet) EmptyTopics ¶
EmptyTopics returns all topics with no partitions.
func (TopicsSet) IntoList ¶
func (s TopicsSet) IntoList() TopicsList
IntoList returns this set as a list.
func (TopicsSet) Sorted ¶
func (s TopicsSet) Sorted() TopicsList
Sorted returns this set as a list in topic-sorted order, with each topic having sorted partitions.
type TxnMarkers ¶
type TxnMarkers struct { ProducerID int64 // ProducerID is the ID to write markers for. ProducerEpoch int16 // ProducerEpoch is the epoch to write markers for. Commit bool // Commit is true if we are committing, false if we are aborting. CoordinatorEpoch int32 // CoordinatorEpoch is the epoch of the transactional coordinator we are writing to; this is used for fencing. Topics TopicsSet // Topics are topics and partitions to write markers for. }
TxnMarkers marks the end of a partition: the producer ID / epoch doing the writing, whether this is a commit, the coordinator epoch of the broker we are writing to (for fencing), and the topics and partitions that we are writing this abort or commit for.
This is a very low level admin request and should likely be built from data in a DescribeProducers response. See KIP-664 if you are trying to use this.
type TxnMarkersPartitionResponse ¶
type TxnMarkersPartitionResponse struct { NodeID int32 // NodeID is the node that this marker was written to. ProducerID int64 // ProducerID corresponds to the PID in the write marker request. Topic string // Topic is the topic being responded to. Partition int32 // Partition is the partition being responded to. Err error // Err is non-nil if the WriteTxnMarkers request for this pid/topic/partition failed. }
TxnMarkersPartitionResponse is a response to a topic's partition within a single marker written.
type TxnMarkersPartitionResponses ¶
type TxnMarkersPartitionResponses map[int32]TxnMarkersPartitionResponse
TxnMarkersPartitionResponses contains per-partition responses to a WriteTxnMarkers request.
func (TxnMarkersPartitionResponses) Each ¶
func (ps TxnMarkersPartitionResponses) Each(fn func(TxnMarkersPartitionResponse))
Each calls fn for each partition.
func (TxnMarkersPartitionResponses) Sorted ¶
func (ps TxnMarkersPartitionResponses) Sorted() []TxnMarkersPartitionResponse
Sorted returns all partitions sorted by partition.
type TxnMarkersResponse ¶
type TxnMarkersResponse struct { ProducerID int64 // ProducerID corresponds to the PID in the write marker request. Topics TxnMarkersTopicResponses // Topics contains the topics that markers were written for, for this ProducerID. }
TxnMarkersResponse is a response for a single marker written.
type TxnMarkersResponses ¶
type TxnMarkersResponses map[int64]TxnMarkersResponse
TxnMarkersResponse contains per-partition-ID responses to a WriteTxnMarkers request.
func (TxnMarkersResponses) Each ¶
func (ms TxnMarkersResponses) Each(fn func(TxnMarkersResponse))
Each calls fn for each marker response.
func (TxnMarkersResponses) EachPartition ¶
func (ms TxnMarkersResponses) EachPartition(fn func(TxnMarkersPartitionResponse))
EachPartition calls fn for every partition in all topics in all marker responses.
func (TxnMarkersResponses) EachTopic ¶
func (ms TxnMarkersResponses) EachTopic(fn func(TxnMarkersTopicResponse))
EachTopic calls fn for every topic in all marker responses.
func (TxnMarkersResponses) Sorted ¶
func (ms TxnMarkersResponses) Sorted() []TxnMarkersResponse
Sorted returns all markers sorted by producer ID.
func (TxnMarkersResponses) SortedPartitions ¶
func (ms TxnMarkersResponses) SortedPartitions() []TxnMarkersPartitionResponse
SortedPartitions returns all marker topic partitions sorted by producer ID then topic then partition.
func (TxnMarkersResponses) SortedTopics ¶
func (ms TxnMarkersResponses) SortedTopics() []TxnMarkersTopicResponse
SortedTopics returns all marker topics sorted by producer ID then topic.
type TxnMarkersTopicResponse ¶
type TxnMarkersTopicResponse struct { ProducerID int64 // ProducerID corresponds to the PID in the write marker request. Topic string // Topic is the topic being responded to. Partitions TxnMarkersPartitionResponses // Partitions are the responses for partitions in this marker. }
TxnMarkersTopicResponse is a response to a topic within a single marker written.
type TxnMarkersTopicResponses ¶
type TxnMarkersTopicResponses map[string]TxnMarkersTopicResponse
TxnMarkersTopicResponses contains per-topic responses to a WriteTxnMarkers request.
func (TxnMarkersTopicResponses) Each ¶
func (ts TxnMarkersTopicResponses) Each(fn func(TxnMarkersTopicResponse))
Each calls fn for each topic.
func (TxnMarkersTopicResponses) EachPartition ¶
func (ts TxnMarkersTopicResponses) EachPartition(fn func(TxnMarkersPartitionResponse))
EachPartition calls fn for every partition in all topics.
func (TxnMarkersTopicResponses) Sorted ¶
func (ts TxnMarkersTopicResponses) Sorted() []TxnMarkersTopicResponse
Sorted returns all topics sorted by topic.
func (TxnMarkersTopicResponses) SortedPartitions ¶
func (ts TxnMarkersTopicResponses) SortedPartitions() []TxnMarkersPartitionResponse
SortedPartitions returns all topics sorted by topic then partition.
type UpsertSCRAM ¶
type UpsertSCRAM struct { User string // User is the username to use. Mechanism ScramMechanism // Mechanism is the mechanism to use. Iterations int32 // Iterations is the SCRAM iterations to use; must be between 4096 and 16384. Password string // Password is the password to salt and convert to a salted password. Requires Salt and SaltedPassword to be empty. Salt []byte // Salt must be paired with SaltedPassword and requires Password to be empty. SaltedPassword []byte // SaltedPassword must be paired with Salt and requires Password to be empty. }
UpsertSCRAM either updates or creates (inserts) a new password for a user. There are two ways to specify a password: either with the Password field directly, or by specifying both Salt and SaltedPassword. If you specify just a password, this package generates a 24 byte salt and uses pbkdf2 to create the salted password.