Versions in this module Expand all Collapse all v0 v0.0.4 Sep 7, 2022 Changes in this version + type PlainCredsCallbackFunc func() (string, string, error) v0.0.2 Aug 30, 2022 v0.0.1 Aug 30, 2022 Changes in this version + const ACLOperationAll + const ACLOperationAlter + const ACLOperationAlterConfigs + const ACLOperationAny + const ACLOperationClusterAction + const ACLOperationCreate + const ACLOperationDelete + const ACLOperationDescribe + const ACLOperationDescribeConfigs + const ACLOperationIdempotentWrite + const ACLOperationRead + const ACLOperationUnknown + const ACLOperationWrite + const ACLPermissionTypeAllow + const ACLPermissionTypeAny + const ACLPermissionTypeDeny + const ACLPermissionTypeUnknown + const AlterOperationSet + const ConfigSourceDefault + const ConfigSourceDynamicBroker + const ConfigSourceDynamicDefaultBroker + const ConfigSourceDynamicTopic + const ConfigSourceStaticBroker + const ConfigSourceUnknown + const LibrdkafkaLinkInfo — darwin/amd64, linux/amd64, windows/amd64 + const OffsetBeginning + const OffsetEnd + const OffsetInvalid + const OffsetStored + const PartitionAny + const PurgeInFlight + const PurgeNonBlocking + const PurgeQueue + const ResourceAny + const ResourceBroker + const ResourceGroup + const ResourcePatternTypeAny + const ResourcePatternTypeLiteral + const ResourcePatternTypeMatch + const ResourcePatternTypePrefixed + const ResourcePatternTypeUnknown + const ResourceTopic + const ResourceUnknown + const TimestampCreateTime + const TimestampLogAppendTime + const TimestampNotAvailable + func LibraryVersion() (int, string) + func WriteErrorCodes(f *os.File) + type ACLBinding struct + Host string + Name string + Operation ACLOperation + PermissionType ACLPermissionType + Principal string + ResourcePatternType ResourcePatternType + Type ResourceType + type ACLBindingFilter = ACLBinding + type ACLBindingFilters []ACLBindingFilter + type ACLBindings []ACLBinding + func (a ACLBindings) Len() int + func (a ACLBindings) Less(i, j int) bool + func (a ACLBindings) Swap(i, j int) + type ACLOperation int + func ACLOperationFromString(aclOperationString string) (ACLOperation, error) + func (o ACLOperation) String() string + type ACLPermissionType int + func ACLPermissionTypeFromString(aclPermissionTypeString string) (ACLPermissionType, error) + func (o ACLPermissionType) String() string + type AdminClient struct + func NewAdminClient(conf *ConfigMap) (*AdminClient, error) + func NewAdminClientFromConsumer(c *Consumer) (a *AdminClient, err error) + func NewAdminClientFromProducer(p *Producer) (a *AdminClient, err error) + func (a *AdminClient) AlterConfigs(ctx context.Context, resources []ConfigResource, ...) (result []ConfigResourceResult, err error) + func (a *AdminClient) Close() + func (a *AdminClient) ClusterID(ctx context.Context) (clusterID string, err error) + func (a *AdminClient) ControllerID(ctx context.Context) (controllerID int32, err error) + func (a *AdminClient) CreateACLs(ctx context.Context, aclBindings ACLBindings, options ...CreateACLsAdminOption) (result []CreateACLResult, err error) + func (a *AdminClient) CreatePartitions(ctx context.Context, partitions []PartitionsSpecification, ...) (result []TopicResult, err error) + func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, ...) (result []TopicResult, err error) + func (a *AdminClient) DeleteACLs(ctx context.Context, aclBindingFilters ACLBindingFilters, ...) (result []DeleteACLsResult, err error) + func (a *AdminClient) DeleteTopics(ctx context.Context, topics []string, options ...DeleteTopicsAdminOption) (result []TopicResult, err error) + func (a *AdminClient) DescribeACLs(ctx context.Context, aclBindingFilter ACLBindingFilter, ...) (result *DescribeACLsResult, err error) + func (a *AdminClient) DescribeConfigs(ctx context.Context, resources []ConfigResource, ...) (result []ConfigResourceResult, err error) + func (a *AdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) + func (a *AdminClient) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error + func (a *AdminClient) SetOAuthBearerTokenFailure(errstr string) error + func (a *AdminClient) String() string + type AdminOption interface + type AdminOptionOperationTimeout struct + func SetAdminOperationTimeout(t time.Duration) (ao AdminOptionOperationTimeout) + type AdminOptionRequestTimeout struct + func SetAdminRequestTimeout(t time.Duration) (ao AdminOptionRequestTimeout) + type AdminOptionValidateOnly struct + func SetAdminValidateOnly(validateOnly bool) (ao AdminOptionValidateOnly) + type AlterConfigsAdminOption interface + type AlterOperation int + func (o AlterOperation) String() string + type AssignedPartitions struct + Partitions []TopicPartition + func (e AssignedPartitions) String() string + type BrokerMetadata struct + Host string + ID int32 + Port int + type ConfigEntry struct + Name string + Operation AlterOperation + Value string + func StringMapToConfigEntries(stringMap map[string]string, operation AlterOperation) []ConfigEntry + func (c ConfigEntry) String() string + type ConfigEntryResult struct + IsReadOnly bool + IsSensitive bool + IsSynonym bool + Name string + Source ConfigSource + Synonyms map[string]ConfigEntryResult + Value string + func (c ConfigEntryResult) String() string + type ConfigMap map[string]ConfigValue + func (m ConfigMap) Get(key string, defval ConfigValue) (ConfigValue, error) + func (m ConfigMap) Set(kv string) error + func (m ConfigMap) SetKey(key string, value ConfigValue) error + type ConfigResource struct + Config []ConfigEntry + Name string + Type ResourceType + func (c ConfigResource) String() string + type ConfigResourceResult struct + Config map[string]ConfigEntryResult + Error Error + Name string + Type ResourceType + func (c ConfigResourceResult) String() string + type ConfigSource int + func (t ConfigSource) String() string + type ConfigValue interface + type Consumer struct + func NewConsumer(conf *ConfigMap) (*Consumer, error) + func (c *Consumer) Assign(partitions []TopicPartition) (err error) + func (c *Consumer) Assignment() (partitions []TopicPartition, err error) + func (c *Consumer) AssignmentLost() bool + func (c *Consumer) Close() (err error) + func (c *Consumer) Commit() ([]TopicPartition, error) + func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error) + func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error) + func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) + func (c *Consumer) Events() chan Event + func (c *Consumer) GetConsumerGroupMetadata() (*ConsumerGroupMetadata, error) + func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) + func (c *Consumer) GetRebalanceProtocol() string + func (c *Consumer) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error) + func (c *Consumer) IncrementalAssign(partitions []TopicPartition) (err error) + func (c *Consumer) IncrementalUnassign(partitions []TopicPartition) (err error) + func (c *Consumer) Logs() chan LogEvent + func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) + func (c *Consumer) Pause(partitions []TopicPartition) (err error) + func (c *Consumer) Poll(timeoutMs int) (event Event) + func (c *Consumer) Position(partitions []TopicPartition) (offsets []TopicPartition, err error) + func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) + func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error) + func (c *Consumer) Resume(partitions []TopicPartition) (err error) + func (c *Consumer) Seek(partition TopicPartition, timeoutMs int) error + func (c *Consumer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error + func (c *Consumer) SetOAuthBearerTokenFailure(errstr string) error + func (c *Consumer) StoreMessage(m *Message) (storedOffsets []TopicPartition, err error) + func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error) + func (c *Consumer) String() string + func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error + func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error) + func (c *Consumer) Subscription() (topics []string, err error) + func (c *Consumer) Unassign() (err error) + func (c *Consumer) Unsubscribe() (err error) + type ConsumerGroupMetadata struct + func NewTestConsumerGroupMetadata(groupID string) (*ConsumerGroupMetadata, error) + type CreateACLResult struct + Error Error + type CreateACLsAdminOption interface + type CreatePartitionsAdminOption interface + type CreateTopicsAdminOption interface + type DeleteACLsAdminOption interface + type DeleteACLsResult = DescribeACLsResult + type DeleteTopicsAdminOption interface + type DescribeACLsAdminOption interface + type DescribeACLsResult struct + ACLBindings ACLBindings + Error Error + type DescribeConfigsAdminOption interface + type Error struct + func NewError(code ErrorCode, str string, fatal bool) (err Error) + func (e Error) Code() ErrorCode + func (e Error) Error() string + func (e Error) IsFatal() bool + func (e Error) IsRetriable() bool + func (e Error) String() string + func (e Error) TxnRequiresAbort() bool + type ErrorCode int + const ErrAllBrokersDown + const ErrApplication + const ErrAssignPartitions + const ErrAssignmentLost + const ErrAuthentication + const ErrAutoOffsetReset + const ErrBadCompression + const ErrBadMsg + const ErrBrokerNotAvailable + const ErrClusterAuthorizationFailed + const ErrConcurrentTransactions + const ErrConflict + const ErrCoordinatorLoadInProgress + const ErrCoordinatorNotAvailable + const ErrCritSysResource + const ErrDelegationTokenAuthDisabled + const ErrDelegationTokenAuthorizationFailed + const ErrDelegationTokenExpired + const ErrDelegationTokenNotFound + const ErrDelegationTokenOwnerMismatch + const ErrDelegationTokenRequestNotAllowed + const ErrDestroy + const ErrDuplicateResource + const ErrDuplicateSequenceNumber + const ErrElectionNotNeeded + const ErrEligibleLeadersNotAvailable + const ErrExistingSubscription + const ErrFail + const ErrFatal + const ErrFeatureUpdateFailed + const ErrFenced + const ErrFencedInstanceID + const ErrFencedLeaderEpoch + const ErrFetchSessionIDNotFound + const ErrFs + const ErrGaplessGuarantee + const ErrGroupAuthorizationFailed + const ErrGroupIDNotFound + const ErrGroupMaxSizeReached + const ErrGroupSubscribedToTopic + const ErrIllegalGeneration + const ErrIllegalSaslState + const ErrInProgress + const ErrInconsistent + const ErrInconsistentGroupProtocol + const ErrInconsistentVoterSet + const ErrIntr + const ErrInvalidArg + const ErrInvalidCommitOffsetSize + const ErrInvalidConfig + const ErrInvalidFetchSessionEpoch + const ErrInvalidGroupID + const ErrInvalidMsg + const ErrInvalidMsgSize + const ErrInvalidPartitions + const ErrInvalidPrincipalType + const ErrInvalidProducerEpoch + const ErrInvalidProducerIDMapping + const ErrInvalidRecord + const ErrInvalidReplicaAssignment + const ErrInvalidReplicationFactor + const ErrInvalidRequest + const ErrInvalidRequiredAcks + const ErrInvalidSessionTimeout + const ErrInvalidTimestamp + const ErrInvalidTransactionTimeout + const ErrInvalidTxnState + const ErrInvalidType + const ErrInvalidUpdateVersion + const ErrIsrInsuff + const ErrKafkaStorageError + const ErrKeyDeserialization + const ErrKeySerialization + const ErrLeaderNotAvailable + const ErrListenerNotFound + const ErrLogDirNotFound + const ErrMaxPollExceeded + const ErrMemberIDRequired + const ErrMsgSizeTooLarge + const ErrMsgTimedOut + const ErrNetworkException + const ErrNoError + const ErrNoOffset + const ErrNoReassignmentInProgress + const ErrNodeUpdate + const ErrNoent + const ErrNonEmptyGroup + const ErrNoop + const ErrNotConfigured + const ErrNotController + const ErrNotCoordinator + const ErrNotEnoughReplicas + const ErrNotEnoughReplicasAfterAppend + const ErrNotImplemented + const ErrNotLeaderForPartition + const ErrOffsetMetadataTooLarge + const ErrOffsetNotAvailable + const ErrOffsetOutOfRange + const ErrOperationNotAttempted + const ErrOutOfOrderSequenceNumber + const ErrOutdated + const ErrPartial + const ErrPartitionEOF + const ErrPolicyViolation + const ErrPreferredLeaderNotAvailable + const ErrPrevInProgress + const ErrPrincipalDeserializationFailure + const ErrProducerFenced + const ErrPurgeInflight + const ErrPurgeQueue + const ErrQueueFull + const ErrReadOnly + const ErrReassignmentInProgress + const ErrRebalanceInProgress + const ErrRecordListTooLarge + const ErrReplicaNotAvailable + const ErrRequestTimedOut + const ErrResolve + const ErrResourceNotFound + const ErrRetry + const ErrRevokePartitions + const ErrSaslAuthenticationFailed + const ErrSecurityDisabled + const ErrSsl + const ErrStaleBrokerEpoch + const ErrStaleCtrlEpoch + const ErrState + const ErrThrottlingQuotaExceeded + const ErrTimedOut + const ErrTimedOutQueue + const ErrTopicAlreadyExists + const ErrTopicAuthorizationFailed + const ErrTopicDeletionDisabled + const ErrTopicException + const ErrTransactionCoordinatorFenced + const ErrTransactionalIDAuthorizationFailed + const ErrTransport + const ErrUnacceptableCredential + const ErrUnderflow + const ErrUnknown + const ErrUnknownBroker + const ErrUnknownGroup + const ErrUnknownLeaderEpoch + const ErrUnknownMemberID + const ErrUnknownPartition + const ErrUnknownProducerID + const ErrUnknownProtocol + const ErrUnknownTopic + const ErrUnknownTopicOrPart + const ErrUnstableOffsetCommit + const ErrUnsupportedCompressionType + const ErrUnsupportedFeature + const ErrUnsupportedForMessageFormat + const ErrUnsupportedSaslMechanism + const ErrUnsupportedVersion + const ErrValueDeserialization + const ErrValueSerialization + const ErrWaitCache + const ErrWaitCoord + func (c ErrorCode) String() string + type Event interface + String func() string + type Handle interface + SetOAuthBearerToken func(oauthBearerToken OAuthBearerToken) error + SetOAuthBearerTokenFailure func(errstr string) error + type Header struct + Key string + Value []byte + func (h Header) String() string + type LogEvent struct + Level int + Message string + Name string + Tag string + Timestamp time.Time + func (logEvent LogEvent) String() string + type Message struct + Headers []Header + Key []byte + Opaque interface{} + Timestamp time.Time + TimestampType TimestampType + TopicPartition TopicPartition + Value []byte + func (m *Message) String() string + type Metadata struct + Brokers []BrokerMetadata + OriginatingBroker BrokerMetadata + Topics map[string]TopicMetadata + type MockCluster struct + func NewMockCluster(brokerCount int) (*MockCluster, error) + func (mc *MockCluster) BootstrapServers() string + func (mc *MockCluster) Close() + type OAuthBearerToken struct + Expiration time.Time + Extensions map[string]string + Principal string + TokenValue string + type OAuthBearerTokenRefresh struct + Config string + func (o OAuthBearerTokenRefresh) String() string + type Offset int64 + func NewOffset(offset interface{}) (Offset, error) + func OffsetTail(relativeOffset Offset) Offset + func (o *Offset) Set(offset interface{}) error + func (o Offset) String() string + type OffsetsCommitted struct + Error error + Offsets []TopicPartition + func (o OffsetsCommitted) String() string + type PartitionEOF TopicPartition + func (p PartitionEOF) String() string + type PartitionMetadata struct + Error Error + ID int32 + Isrs []int32 + Leader int32 + Replicas []int32 + type PartitionsSpecification struct + IncreaseTo int + ReplicaAssignment [][]int32 + Topic string + type Producer struct + func NewProducer(conf *ConfigMap) (*Producer, error) + func (p *Producer) AbortTransaction(ctx context.Context) error + func (p *Producer) BeginTransaction() error + func (p *Producer) Close() + func (p *Producer) CommitTransaction(ctx context.Context) error + func (p *Producer) Events() chan Event + func (p *Producer) Flush(timeoutMs int) int + func (p *Producer) GetFatalError() error + func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) + func (p *Producer) InitTransactions(ctx context.Context) error + func (p *Producer) Len() int + func (p *Producer) Logs() chan LogEvent + func (p *Producer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) + func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error + func (p *Producer) ProduceChannel() chan *Message + func (p *Producer) Purge(flags int) error + func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) + func (p *Producer) SendOffsetsToTransaction(ctx context.Context, offsets []TopicPartition, ...) error + func (p *Producer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error + func (p *Producer) SetOAuthBearerTokenFailure(errstr string) error + func (p *Producer) String() string + func (p *Producer) TestFatalError(code ErrorCode, str string) ErrorCode + type RebalanceCb func(*Consumer, Event) error + type ResourcePatternType int + func ResourcePatternTypeFromString(patternTypeString string) (ResourcePatternType, error) + func (t ResourcePatternType) String() string + type ResourceType int + func ResourceTypeFromString(typeString string) (ResourceType, error) + func (t ResourceType) String() string + type RevokedPartitions struct + Partitions []TopicPartition + func (e RevokedPartitions) String() string + type Stats struct + func (e Stats) String() string + type TimestampType int + func (t TimestampType) String() string + type TopicMetadata struct + Error Error + Partitions []PartitionMetadata + Topic string + type TopicPartition struct + Error error + Metadata *string + Offset Offset + Partition int32 + Topic *string + func (p TopicPartition) String() string + type TopicPartitions []TopicPartition + func (tps TopicPartitions) Len() int + func (tps TopicPartitions) Less(i, j int) bool + func (tps TopicPartitions) Swap(i, j int) + type TopicResult struct + Error Error + Topic string + func (t TopicResult) String() string + type TopicSpecification struct + Config map[string]string + NumPartitions int + ReplicaAssignment [][]int32 + ReplicationFactor int + Topic string