Versions in this module Expand all Collapse all v2 v2.0.1 Nov 20, 2024 Changes in this version + const AlterOperationSet + const LibrdkafkaLinkInfo — darwin/amd64, linux/amd64, windows/amd64 + const OffsetBeginning + const OffsetEnd + const OffsetInvalid + const OffsetStored + const PartitionAny + const PurgeInFlight + const PurgeNonBlocking + const PurgeQueue + func LibraryVersion() (int, string) + func WriteErrorCodes(f *os.File) + type ACLBinding struct + Host string + Name string + Operation ACLOperation + PermissionType ACLPermissionType + Principal string + ResourcePatternType ResourcePatternType + Type ResourceType + type ACLBindingFilter = ACLBinding + type ACLBindingFilters []ACLBindingFilter + type ACLBindings []ACLBinding + func (a ACLBindings) Len() int + func (a ACLBindings) Less(i, j int) bool + func (a ACLBindings) Swap(i, j int) + type ACLOperation int + const ACLOperationAll + const ACLOperationAlter + const ACLOperationAlterConfigs + const ACLOperationAny + const ACLOperationClusterAction + const ACLOperationCreate + const ACLOperationDelete + const ACLOperationDescribe + const ACLOperationDescribeConfigs + const ACLOperationIdempotentWrite + const ACLOperationRead + const ACLOperationUnknown + const ACLOperationWrite + func ACLOperationFromString(aclOperationString string) (ACLOperation, error) + func (o ACLOperation) String() string + type ACLPermissionType int + const ACLPermissionTypeAllow + const ACLPermissionTypeAny + const ACLPermissionTypeDeny + const ACLPermissionTypeUnknown + func ACLPermissionTypeFromString(aclPermissionTypeString string) (ACLPermissionType, error) + func (o ACLPermissionType) String() string + type AdminClient struct + func NewAdminClient(conf *ConfigMap) (*AdminClient, error) + func NewAdminClientFromConsumer(c *Consumer) (a *AdminClient, err error) + func NewAdminClientFromProducer(p *Producer) (a *AdminClient, err error) + func (a *AdminClient) AlterConfigs(ctx context.Context, resources []ConfigResource, ...) (result []ConfigResourceResult, err error) + func (a *AdminClient) AlterConsumerGroupOffsets(ctx context.Context, groupsPartitions []ConsumerGroupTopicPartitions, ...) (acgor AlterConsumerGroupOffsetsResult, err error) + func (a *AdminClient) AlterUserScramCredentials(ctx context.Context, upsertions []UserScramCredentialUpsertion, ...) (result AlterUserScramCredentialsResult, err error) + func (a *AdminClient) Close() + func (a *AdminClient) ClusterID(ctx context.Context) (clusterID string, err error) + func (a *AdminClient) ControllerID(ctx context.Context) (controllerID int32, err error) + func (a *AdminClient) CreateACLs(ctx context.Context, aclBindings ACLBindings, options ...CreateACLsAdminOption) (result []CreateACLResult, err error) + func (a *AdminClient) CreatePartitions(ctx context.Context, partitions []PartitionsSpecification, ...) (result []TopicResult, err error) + func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, ...) (result []TopicResult, err error) + func (a *AdminClient) DeleteACLs(ctx context.Context, aclBindingFilters ACLBindingFilters, ...) (result []DeleteACLsResult, err error) + func (a *AdminClient) DeleteConsumerGroups(ctx context.Context, groups []string, ...) (result DeleteConsumerGroupsResult, err error) + func (a *AdminClient) DeleteRecords(ctx context.Context, recordsToDelete []TopicPartition, ...) (result DeleteRecordsResults, err error) + func (a *AdminClient) DeleteTopics(ctx context.Context, topics []string, options ...DeleteTopicsAdminOption) (result []TopicResult, err error) + func (a *AdminClient) DescribeACLs(ctx context.Context, aclBindingFilter ACLBindingFilter, ...) (result *DescribeACLsResult, err error) + func (a *AdminClient) DescribeCluster(ctx context.Context, options ...DescribeClusterAdminOption) (result DescribeClusterResult, err error) + func (a *AdminClient) DescribeConfigs(ctx context.Context, resources []ConfigResource, ...) (result []ConfigResourceResult, err error) + func (a *AdminClient) DescribeConsumerGroups(ctx context.Context, groups []string, ...) (result DescribeConsumerGroupsResult, err error) + func (a *AdminClient) DescribeTopics(ctx context.Context, topics TopicCollection, ...) (result DescribeTopicsResult, err error) + func (a *AdminClient) DescribeUserScramCredentials(ctx context.Context, users []string, ...) (result DescribeUserScramCredentialsResult, err error) + func (a *AdminClient) ElectLeaders(ctx context.Context, electLeaderRequest ElectLeadersRequest, ...) (result ElectLeadersResult, err error) + func (a *AdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) + func (a *AdminClient) IncrementalAlterConfigs(ctx context.Context, resources []ConfigResource, ...) (result []ConfigResourceResult, err error) + func (a *AdminClient) IsClosed() bool + func (a *AdminClient) ListConsumerGroupOffsets(ctx context.Context, groupsPartitions []ConsumerGroupTopicPartitions, ...) (lcgor ListConsumerGroupOffsetsResult, err error) + func (a *AdminClient) ListConsumerGroups(ctx context.Context, options ...ListConsumerGroupsAdminOption) (result ListConsumerGroupsResult, err error) + func (a *AdminClient) ListOffsets(ctx context.Context, topicPartitionOffsets map[TopicPartition]OffsetSpec, ...) (result ListOffsetsResult, err error) + func (a *AdminClient) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error + func (a *AdminClient) SetOAuthBearerTokenFailure(errstr string) error + func (a *AdminClient) SetSaslCredentials(username, password string) error + func (a *AdminClient) String() string + type AdminOption interface + type AdminOptionIncludeAuthorizedOperations struct + func SetAdminOptionIncludeAuthorizedOperations(val bool) (ao AdminOptionIncludeAuthorizedOperations) + type AdminOptionIsolationLevel struct + func SetAdminIsolationLevel(isolationLevel IsolationLevel) (ao AdminOptionIsolationLevel) + type AdminOptionMatchConsumerGroupStates struct + func SetAdminMatchConsumerGroupStates(val []ConsumerGroupState) (ao AdminOptionMatchConsumerGroupStates) + type AdminOptionMatchConsumerGroupTypes struct + func SetAdminMatchConsumerGroupTypes(val []ConsumerGroupType) (ao AdminOptionMatchConsumerGroupTypes) + type AdminOptionOperationTimeout struct + func SetAdminOperationTimeout(t time.Duration) (ao AdminOptionOperationTimeout) + type AdminOptionRequestTimeout struct + func SetAdminRequestTimeout(t time.Duration) (ao AdminOptionRequestTimeout) + type AdminOptionRequireStableOffsets struct + func SetAdminRequireStableOffsets(val bool) (ao AdminOptionRequireStableOffsets) + type AdminOptionValidateOnly struct + func SetAdminValidateOnly(validateOnly bool) (ao AdminOptionValidateOnly) + type AlterConfigOpType int + const AlterConfigOpTypeAppend + const AlterConfigOpTypeDelete + const AlterConfigOpTypeSet + const AlterConfigOpTypeSubtract + func (o AlterConfigOpType) String() string + type AlterConfigsAdminOption interface + type AlterConsumerGroupOffsetsAdminOption interface + type AlterConsumerGroupOffsetsResult struct + ConsumerGroupsTopicPartitions []ConsumerGroupTopicPartitions + type AlterOperation int + func (o AlterOperation) String() string + type AlterUserScramCredentialsAdminOption interface + type AlterUserScramCredentialsResult struct + Errors map[string]Error + type AssignedPartitions struct + Partitions []TopicPartition + func (e AssignedPartitions) String() string + type BrokerMetadata struct + Host string + ID int32 + Port int + type ConfigEntry struct + IncrementalOperation AlterConfigOpType + Name string + Operation AlterOperation + Value string + func StringMapToConfigEntries(stringMap map[string]string, operation AlterOperation) []ConfigEntry + func StringMapToIncrementalConfigEntries(stringMap map[string]string, operationMap map[string]AlterConfigOpType) []ConfigEntry + func (c ConfigEntry) String() string + type ConfigEntryResult struct + IsDefault bool + IsReadOnly bool + IsSensitive bool + IsSynonym bool + Name string + Source ConfigSource + Synonyms map[string]ConfigEntryResult + Value string + func (c ConfigEntryResult) String() string + type ConfigMap map[string]ConfigValue + func (m ConfigMap) Get(key string, defval ConfigValue) (ConfigValue, error) + func (m ConfigMap) Set(kv string) error + func (m ConfigMap) SetKey(key string, value ConfigValue) error + type ConfigResource struct + Config []ConfigEntry + Name string + Type ResourceType + func (c ConfigResource) String() string + type ConfigResourceResult struct + Config map[string]ConfigEntryResult + Error Error + Name string + Type ResourceType + func (c ConfigResourceResult) String() string + type ConfigSource int + const ConfigSourceDefault + const ConfigSourceDynamicBroker + const ConfigSourceDynamicDefaultBroker + const ConfigSourceDynamicTopic + const ConfigSourceStaticBroker + const ConfigSourceUnknown + func (t ConfigSource) String() string + type ConfigValue interface + type Consumer struct + func NewConsumer(conf *ConfigMap) (*Consumer, error) + func (c *Consumer) Assign(partitions []TopicPartition) (err error) + func (c *Consumer) Assignment() (partitions []TopicPartition, err error) + func (c *Consumer) AssignmentLost() bool + func (c *Consumer) Close() (err error) + func (c *Consumer) Commit() ([]TopicPartition, error) + func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error) + func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error) + func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) + func (c *Consumer) Events() chan Event + func (c *Consumer) GetConsumerGroupMetadata() (*ConsumerGroupMetadata, error) + func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) + func (c *Consumer) GetRebalanceProtocol() string + func (c *Consumer) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error) + func (c *Consumer) IncrementalAssign(partitions []TopicPartition) (err error) + func (c *Consumer) IncrementalUnassign(partitions []TopicPartition) (err error) + func (c *Consumer) IsClosed() bool + func (c *Consumer) Logs() chan LogEvent + func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) + func (c *Consumer) Pause(partitions []TopicPartition) (err error) + func (c *Consumer) Poll(timeoutMs int) (event Event) + func (c *Consumer) Position(partitions []TopicPartition) (offsets []TopicPartition, err error) + func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) + func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error) + func (c *Consumer) Resume(partitions []TopicPartition) (err error) + func (c *Consumer) Seek(partition TopicPartition, ignoredTimeoutMs int) error + func (c *Consumer) SeekPartitions(partitions []TopicPartition) ([]TopicPartition, error) + func (c *Consumer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error + func (c *Consumer) SetOAuthBearerTokenFailure(errstr string) error + func (c *Consumer) SetSaslCredentials(username, password string) error + func (c *Consumer) StoreMessage(m *Message) (storedOffsets []TopicPartition, err error) + func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error) + func (c *Consumer) String() string + func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error + func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error) + func (c *Consumer) Subscription() (topics []string, err error) + func (c *Consumer) Unassign() (err error) + func (c *Consumer) Unsubscribe() (err error) + type ConsumerGroupDescription struct + AuthorizedOperations []ACLOperation + Coordinator Node + Error Error + GroupID string + IsSimpleConsumerGroup bool + Members []MemberDescription + PartitionAssignor string + State ConsumerGroupState + type ConsumerGroupListing struct + GroupID string + IsSimpleConsumerGroup bool + State ConsumerGroupState + Type ConsumerGroupType + type ConsumerGroupMetadata struct + func NewTestConsumerGroupMetadata(groupID string) (*ConsumerGroupMetadata, error) + type ConsumerGroupResult struct + Error Error + Group string + func (g ConsumerGroupResult) String() string + type ConsumerGroupState int + const ConsumerGroupStateCompletingRebalance + const ConsumerGroupStateDead + const ConsumerGroupStateEmpty + const ConsumerGroupStatePreparingRebalance + const ConsumerGroupStateStable + const ConsumerGroupStateUnknown + func ConsumerGroupStateFromString(stateString string) (ConsumerGroupState, error) + func (t ConsumerGroupState) String() string + type ConsumerGroupTopicPartitions struct + Group string + Partitions []TopicPartition + func (gtp ConsumerGroupTopicPartitions) String() string + type ConsumerGroupType int + const ConsumerGroupTypeClassic + const ConsumerGroupTypeConsumer + const ConsumerGroupTypeUnknown + func ConsumerGroupTypeFromString(typeString string) ConsumerGroupType + func (t ConsumerGroupType) String() string + type CreateACLResult struct + Error Error + type CreateACLsAdminOption interface + type CreatePartitionsAdminOption interface + type CreateTopicsAdminOption interface + type DeleteACLsAdminOption interface + type DeleteACLsResult = DescribeACLsResult + type DeleteConsumerGroupsAdminOption interface + type DeleteConsumerGroupsResult struct + ConsumerGroupResults []ConsumerGroupResult + type DeleteRecordsAdminOption interface + type DeleteRecordsResult struct + DeletedRecords *DeletedRecords + TopicPartition TopicPartition + type DeleteRecordsResults struct + DeleteRecordsResults []DeleteRecordsResult + type DeleteTopicsAdminOption interface + type DeletedRecords struct + LowWatermark Offset + type DescribeACLsAdminOption interface + type DescribeACLsResult struct + ACLBindings ACLBindings + Error Error + type DescribeClusterAdminOption interface + type DescribeClusterResult struct + AuthorizedOperations []ACLOperation + ClusterID *string + Controller *Node + Nodes []Node + type DescribeConfigsAdminOption interface + type DescribeConsumerGroupsAdminOption interface + type DescribeConsumerGroupsResult struct + ConsumerGroupDescriptions []ConsumerGroupDescription + type DescribeTopicsAdminOption interface + type DescribeTopicsResult struct + TopicDescriptions []TopicDescription + type DescribeUserScramCredentialsAdminOption interface + type DescribeUserScramCredentialsResult struct + Descriptions map[string]UserScramCredentialsDescription + type ElectLeadersAdminOption interface + type ElectLeadersRequest struct + func NewElectLeadersRequest(electionType ElectionType, partitions []TopicPartition) ElectLeadersRequest + type ElectLeadersResult struct + TopicPartitions []TopicPartition + type ElectionType int + const ElectionTypePreferred + const ElectionTypeUnclean + func ElectionTypeFromString(electionTypeString string) (ElectionType, error) + type Error struct + func NewError(code ErrorCode, str string, fatal bool) (err Error) + func (e Error) Code() ErrorCode + func (e Error) Error() string + func (e Error) IsFatal() bool + func (e Error) IsRetriable() bool + func (e Error) IsTimeout() bool + func (e Error) String() string + func (e Error) TxnRequiresAbort() bool + type ErrorCode int + const ErrAllBrokersDown + const ErrApplication + const ErrAssignPartitions + const ErrAssignmentLost + const ErrAuthentication + const ErrAutoOffsetReset + const ErrBadCompression + const ErrBadMsg + const ErrBrokerNotAvailable + const ErrClusterAuthorizationFailed + const ErrConcurrentTransactions + const ErrConflict + const ErrCoordinatorLoadInProgress + const ErrCoordinatorNotAvailable + const ErrCritSysResource + const ErrDelegationTokenAuthDisabled + const ErrDelegationTokenAuthorizationFailed + const ErrDelegationTokenExpired + const ErrDelegationTokenNotFound + const ErrDelegationTokenOwnerMismatch + const ErrDelegationTokenRequestNotAllowed + const ErrDestroy + const ErrDuplicateResource + const ErrDuplicateSequenceNumber + const ErrElectionNotNeeded + const ErrEligibleLeadersNotAvailable + const ErrExistingSubscription + const ErrFail + const ErrFatal + const ErrFeatureUpdateFailed + const ErrFenced + const ErrFencedInstanceID + const ErrFencedLeaderEpoch + const ErrFencedMemberEpoch + const ErrFetchSessionIDNotFound + const ErrFs + const ErrGaplessGuarantee + const ErrGroupAuthorizationFailed + const ErrGroupIDNotFound + const ErrGroupMaxSizeReached + const ErrGroupSubscribedToTopic + const ErrIllegalGeneration + const ErrIllegalSaslState + const ErrInProgress + const ErrInconsistent + const ErrInconsistentGroupProtocol + const ErrInconsistentVoterSet + const ErrIntr + const ErrInvalidArg + const ErrInvalidCommitOffsetSize + const ErrInvalidConfig + const ErrInvalidDifferentRecord + const ErrInvalidFetchSessionEpoch + const ErrInvalidGroupID + const ErrInvalidMsg + const ErrInvalidMsgSize + const ErrInvalidPartitions + const ErrInvalidPrincipalType + const ErrInvalidProducerEpoch + const ErrInvalidProducerIDMapping + const ErrInvalidRecord + const ErrInvalidReplicaAssignment + const ErrInvalidReplicationFactor + const ErrInvalidRequest + const ErrInvalidRequiredAcks + const ErrInvalidSessionTimeout + const ErrInvalidTimestamp + const ErrInvalidTransactionTimeout + const ErrInvalidTxnState + const ErrInvalidType + const ErrInvalidUpdateVersion + const ErrIsrInsuff + const ErrKafkaStorageError + const ErrKeyDeserialization + const ErrKeySerialization + const ErrLeaderNotAvailable + const ErrListenerNotFound + const ErrLogDirNotFound + const ErrLogTruncation + const ErrMaxPollExceeded + const ErrMemberIDRequired + const ErrMsgSizeTooLarge + const ErrMsgTimedOut + const ErrNetworkException + const ErrNoError + const ErrNoOffset + const ErrNoReassignmentInProgress + const ErrNodeUpdate + const ErrNoent + const ErrNonEmptyGroup + const ErrNoop + const ErrNotConfigured + const ErrNotController + const ErrNotCoordinator + const ErrNotEnoughReplicas + const ErrNotEnoughReplicasAfterAppend + const ErrNotImplemented + const ErrNotLeaderForPartition + const ErrOffsetMetadataTooLarge + const ErrOffsetNotAvailable + const ErrOffsetOutOfRange + const ErrOperationNotAttempted + const ErrOutOfOrderSequenceNumber + const ErrOutdated + const ErrPartial + const ErrPartitionEOF + const ErrPolicyViolation + const ErrPreferredLeaderNotAvailable + const ErrPrevInProgress + const ErrPrincipalDeserializationFailure + const ErrProducerFenced + const ErrPurgeInflight + const ErrPurgeQueue + const ErrQueueFull + const ErrReadOnly + const ErrReassignmentInProgress + const ErrRebalanceInProgress + const ErrRecordListTooLarge + const ErrReplicaNotAvailable + const ErrRequestTimedOut + const ErrResolve + const ErrResourceNotFound + const ErrRetry + const ErrRevokePartitions + const ErrSaslAuthenticationFailed + const ErrSecurityDisabled + const ErrSsl + const ErrStaleBrokerEpoch + const ErrStaleCtrlEpoch + const ErrStaleMemberEpoch + const ErrState + const ErrTelemetryTooLarge + const ErrThrottlingQuotaExceeded + const ErrTimedOut + const ErrTimedOutQueue + const ErrTopicAlreadyExists + const ErrTopicAuthorizationFailed + const ErrTopicDeletionDisabled + const ErrTopicException + const ErrTransactionCoordinatorFenced + const ErrTransactionalIDAuthorizationFailed + const ErrTransport + const ErrUnacceptableCredential + const ErrUnderflow + const ErrUnknown + const ErrUnknownBroker + const ErrUnknownGroup + const ErrUnknownLeaderEpoch + const ErrUnknownMemberID + const ErrUnknownPartition + const ErrUnknownProducerID + const ErrUnknownProtocol + const ErrUnknownSubscriptionID + const ErrUnknownTopic + const ErrUnknownTopicID + const ErrUnknownTopicOrPart + const ErrUnreleasedInstanceID + const ErrUnstableOffsetCommit + const ErrUnsupportedAssignor + const ErrUnsupportedCompressionType + const ErrUnsupportedFeature + const ErrUnsupportedForMessageFormat + const ErrUnsupportedSaslMechanism + const ErrUnsupportedVersion + const ErrValueDeserialization + const ErrValueSerialization + const ErrWaitCache + const ErrWaitCoord + func (c ErrorCode) String() string + type Event interface + String func() string + type Handle interface + IsClosed func() bool + SetOAuthBearerToken func(oauthBearerToken OAuthBearerToken) error + SetOAuthBearerTokenFailure func(errstr string) error + type Header struct + Key string + Value []byte + func (h Header) String() string + type IsolationLevel int + const IsolationLevelReadCommitted + const IsolationLevelReadUncommitted + type ListConsumerGroupOffsetsAdminOption interface + type ListConsumerGroupOffsetsResult struct + ConsumerGroupsTopicPartitions []ConsumerGroupTopicPartitions + type ListConsumerGroupsAdminOption interface + type ListConsumerGroupsResult struct + Errors []error + Valid []ConsumerGroupListing + type ListOffsetsAdminOption interface + type ListOffsetsResult struct + ResultInfos map[TopicPartition]ListOffsetsResultInfo + type ListOffsetsResultInfo struct + Error Error + LeaderEpoch *int32 + Offset Offset + Timestamp int64 + type LogEvent struct + Level int + Message string + Name string + Tag string + Timestamp time.Time + func (logEvent LogEvent) String() string + type MemberAssignment struct + TopicPartitions []TopicPartition + type MemberDescription struct + Assignment MemberAssignment + ClientID string + ConsumerID string + GroupInstanceID string + Host string + type Message struct + Headers []Header + Key []byte + LeaderEpoch *int32 + Opaque interface{} + Timestamp time.Time + TimestampType TimestampType + TopicPartition TopicPartition + Value []byte + func (m *Message) String() string + type Metadata struct + Brokers []BrokerMetadata + OriginatingBroker BrokerMetadata + Topics map[string]TopicMetadata + type MockCluster struct + func NewMockCluster(brokerCount int) (*MockCluster, error) + func (mc *MockCluster) BootstrapServers() string + func (mc *MockCluster) Close() + func (mc *MockCluster) CreateTopic(topic string, partitions, replicationFactor int) error + func (mc *MockCluster) SetBrokerDown(brokerID int) error + func (mc *MockCluster) SetBrokerUp(brokerID int) error + func (mc *MockCluster) SetRoundtripDuration(brokerID int, duration time.Duration) error + type Node struct + Host string + ID int + Port int + Rack *string + func (n Node) String() string + type OAuthBearerToken struct + Expiration time.Time + Extensions map[string]string + Principal string + TokenValue string + type OAuthBearerTokenRefresh struct + Config string + func (o OAuthBearerTokenRefresh) String() string + type Offset int64 + func NewOffset(offset interface{}) (Offset, error) + func OffsetTail(relativeOffset Offset) Offset + func (o *Offset) Set(offset interface{}) error + func (o Offset) String() string + type OffsetSpec int64 + const EarliestOffsetSpec + const LatestOffsetSpec + const MaxTimestampOffsetSpec + func NewOffsetSpecForTimestamp(timestamp int64) OffsetSpec + type OffsetsCommitted struct + Error error + Offsets []TopicPartition + func (o OffsetsCommitted) String() string + type PartitionEOF TopicPartition + func (p PartitionEOF) String() string + type PartitionMetadata struct + Error Error + ID int32 + Isrs []int32 + Leader int32 + Replicas []int32 + type PartitionsSpecification struct + IncreaseTo int + ReplicaAssignment [][]int32 + Topic string + type Producer struct + func NewProducer(conf *ConfigMap) (*Producer, error) + func (p *Producer) AbortTransaction(ctx context.Context) error + func (p *Producer) BeginTransaction() error + func (p *Producer) Close() + func (p *Producer) CommitTransaction(ctx context.Context) error + func (p *Producer) Events() chan Event + func (p *Producer) Flush(timeoutMs int) int + func (p *Producer) GetFatalError() error + func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) + func (p *Producer) InitTransactions(ctx context.Context) error + func (p *Producer) IsClosed() bool + func (p *Producer) Len() int + func (p *Producer) Logs() chan LogEvent + func (p *Producer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) + func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error + func (p *Producer) ProduceChannel() chan *Message + func (p *Producer) Purge(flags int) error + func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) + func (p *Producer) SendOffsetsToTransaction(ctx context.Context, offsets []TopicPartition, ...) error + func (p *Producer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error + func (p *Producer) SetOAuthBearerTokenFailure(errstr string) error + func (p *Producer) SetSaslCredentials(username, password string) error + func (p *Producer) String() string + func (p *Producer) TestFatalError(code ErrorCode, str string) ErrorCode + type RebalanceCb func(*Consumer, Event) error + type ResourcePatternType int + const ResourcePatternTypeAny + const ResourcePatternTypeLiteral + const ResourcePatternTypeMatch + const ResourcePatternTypePrefixed + const ResourcePatternTypeUnknown + func ResourcePatternTypeFromString(patternTypeString string) (ResourcePatternType, error) + func (t ResourcePatternType) String() string + type ResourceType int + const ResourceAny + const ResourceBroker + const ResourceGroup + const ResourceTopic + const ResourceUnknown + func ResourceTypeFromString(typeString string) (ResourceType, error) + func (t ResourceType) String() string + type RevokedPartitions struct + Partitions []TopicPartition + func (e RevokedPartitions) String() string + type ScramCredentialInfo struct + Iterations int + Mechanism ScramMechanism + type ScramMechanism int + const ScramMechanismSHA256 + const ScramMechanismSHA512 + const ScramMechanismUnknown + func ScramMechanismFromString(mechanism string) (ScramMechanism, error) + func (o ScramMechanism) String() string + type Stats struct + func (e Stats) String() string + type TimestampType int + const TimestampCreateTime + const TimestampLogAppendTime + const TimestampNotAvailable + func (t TimestampType) String() string + type TopicCollection struct + func NewTopicCollectionOfTopicNames(names []string) TopicCollection + type TopicDescription struct + AuthorizedOperations []ACLOperation + Error Error + IsInternal bool + Name string + Partitions []TopicPartitionInfo + TopicID UUID + type TopicMetadata struct + Error Error + Partitions []PartitionMetadata + Topic string + type TopicPartition struct + Error error + LeaderEpoch *int32 + Metadata *string + Offset Offset + Partition int32 + Topic *string + func (p TopicPartition) String() string + type TopicPartitionInfo struct + Isr []Node + Leader *Node + Partition int + Replicas []Node + type TopicPartitions []TopicPartition + func (tps TopicPartitions) Len() int + func (tps TopicPartitions) Less(i, j int) bool + func (tps TopicPartitions) Swap(i, j int) + type TopicResult struct + Error Error + Topic string + func (t TopicResult) String() string + type TopicSpecification struct + Config map[string]string + NumPartitions int + ReplicaAssignment [][]int32 + ReplicationFactor int + Topic string + type UUID struct + func (uuid UUID) GetLeastSignificantBits() int64 + func (uuid UUID) GetMostSignificantBits() int64 + func (uuid UUID) String() string + type UserScramCredentialDeletion struct + Mechanism ScramMechanism + User string + type UserScramCredentialUpsertion struct + Password []byte + Salt []byte + ScramCredentialInfo ScramCredentialInfo + User string + type UserScramCredentialsDescription struct + Error Error + ScramCredentialInfos []ScramCredentialInfo + User string