kafka

package
v1.0.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2022 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Overview

Example
package main

import (
	"fmt"
	"os"
	"os/signal"
	"sync/atomic"
	"time"

	"github.com/boxgo/box/pkg/client/kafka"
)

const (
	testTopic = "test"
)

func main() {
	kfk := kafka.StdConfig("default").Build()

	producer, err := kfk.NewSyncProducer()
	if err != nil {
		panic(err)
	}

	defer func() {
		if err := producer.Close(); err != nil {
			panic(err)
		}
	}()

	consumer, err := kfk.NewConsumer()
	if err != nil {
		panic(err)
	}

	defer func() {
		if err := consumer.Close(); err != nil {
			panic(err)
		}
	}()

	partitionConsumer, err := consumer.ConsumePartition(testTopic, 0, kafka.OffsetNewest)
	if err != nil {
		panic(err)
	}

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	var cnt int32

	go func() {
		for {
			select {
			case <-partitionConsumer.Messages():
				atomic.AddInt32(&cnt, 1)
			case <-signals:
				break
			}
		}
	}()

	partition, offset, err := producer.SendMessage(&kafka.ProducerMessage{
		Topic: testTopic,
		Value: kafka.StringEncoder("hi"),
	})
	if err != nil {
		panic(err)
	}

	time.Sleep(time.Second)

	fmt.Println(offset >= 0, partition == 0, atomic.LoadInt32(&cnt) > 0)
}
Output:

true true true

Index

Examples

Constants

View Source
const (
	// RangeBalanceStrategyName identifies strategies that use the range partition assignment strategy
	RangeBalanceStrategyName = sarama.RangeBalanceStrategyName

	// RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy
	RoundRobinBalanceStrategyName = sarama.RoundRobinBalanceStrategyName

	// StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
	StickyBalanceStrategyName = sarama.StickyBalanceStrategyName
)
View Source
const (
	// SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
	SASLTypeOAuth = sarama.SASLTypeOAuth
	// SASLTypePlaintext represents the SASL/PLAIN mechanism
	SASLTypePlaintext = sarama.SASLTypePlaintext
	// SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
	SASLTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256
	// SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
	SASLTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512
	SASLTypeGSSAPI      = sarama.SASLTypeGSSAPI
	// SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
	// server negotiate SASL auth using opaque packets.
	SASLHandshakeV0 = sarama.SASLHandshakeV0
	// SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
	// server negotiate SASL by wrapping tokens with Kafka protocol headers.
	SASLHandshakeV1 = sarama.SASLHandshakeV1
	// SASLExtKeyAuth is the reserved extension key name sent as part of the
	// SASL/OAUTHBEARER initial client response
	SASLExtKeyAuth = sarama.SASLExtKeyAuth
)
View Source
const (
	// OffsetNewest stands for the log head offset, i.e. the offset that will be
	// assigned to the next message that will be produced to the partition. You
	// can send this to a client's GetOffset method to get this offset, or when
	// calling ConsumePartition to start consuming new messages.
	OffsetNewest = sarama.OffsetNewest
	// OffsetOldest stands for the oldest offset available on the broker for a
	// partition. You can send this to a client's GetOffset method to get this
	// offset, or when calling ConsumePartition to start consuming from the
	// oldest offset that is still available on the broker.
	OffsetOldest = sarama.OffsetOldest
)
View Source
const (
	TOK_ID_KRB_AP_REQ   = sarama.TOK_ID_KRB_AP_REQ
	GSS_API_GENERIC_TAG = sarama.GSS_API_GENERIC_TAG
	KRB5_USER_AUTH      = sarama.KRB5_USER_AUTH
	KRB5_KEYTAB_AUTH    = sarama.KRB5_KEYTAB_AUTH
	GSS_API_INITIAL     = sarama.GSS_API_INITIAL
	GSS_API_VERIFY      = sarama.GSS_API_VERIFY
	GSS_API_FINISH      = sarama.GSS_API_FINISH
)
View Source
const (
	// APIKeySASLAuth is the API key for the SaslAuthenticate Kafka API
	APIKeySASLAuth = sarama.APIKeySASLAuth

	// GroupGenerationUndefined is a special value for the group generation field of Offset Commit Requests that should be used when a consumer group does not rely on Kafka for partition management.
	GroupGenerationUndefined = sarama.GroupGenerationUndefined

	// ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the request was received. The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.
	ReceiveTime int64 = sarama.ReceiveTime
)
View Source
const (
	ErrNoError                            = sarama.ErrNoError
	ErrUnknown                            = sarama.ErrUnknown
	ErrOffsetOutOfRange                   = sarama.ErrOffsetOutOfRange
	ErrInvalidMessage                     = sarama.ErrInvalidMessage
	ErrUnknownTopicOrPartition            = sarama.ErrUnknownTopicOrPartition
	ErrInvalidMessageSize                 = sarama.ErrInvalidMessageSize
	ErrLeaderNotAvailable                 = sarama.ErrLeaderNotAvailable
	ErrNotLeaderForPartition              = sarama.ErrNotLeaderForPartition
	ErrRequestTimedOut                    = sarama.ErrRequestTimedOut
	ErrBrokerNotAvailable                 = sarama.ErrBrokerNotAvailable
	ErrReplicaNotAvailable                = sarama.ErrReplicaNotAvailable
	ErrMessageSizeTooLarge                = sarama.ErrMessageSizeTooLarge
	ErrStaleControllerEpochCode           = sarama.ErrStaleControllerEpochCode
	ErrOffsetMetadataTooLarge             = sarama.ErrOffsetMetadataTooLarge
	ErrNetworkException                   = sarama.ErrNetworkException
	ErrOffsetsLoadInProgress              = sarama.ErrOffsetsLoadInProgress
	ErrConsumerCoordinatorNotAvailable    = sarama.ErrConsumerCoordinatorNotAvailable
	ErrNotCoordinatorForConsumer          = sarama.ErrNotCoordinatorForConsumer
	ErrInvalidTopic                       = sarama.ErrInvalidTopic
	ErrMessageSetSizeTooLarge             = sarama.ErrMessageSetSizeTooLarge
	ErrNotEnoughReplicas                  = sarama.ErrNotEnoughReplicas
	ErrNotEnoughReplicasAfterAppend       = sarama.ErrNotEnoughReplicasAfterAppend
	ErrInvalidRequiredAcks                = sarama.ErrInvalidRequiredAcks
	ErrIllegalGeneration                  = sarama.ErrIllegalGeneration
	ErrInconsistentGroupProtocol          = sarama.ErrInconsistentGroupProtocol
	ErrInvalidGroupId                     = sarama.ErrInvalidGroupId
	ErrUnknownMemberId                    = sarama.ErrUnknownMemberId
	ErrInvalidSessionTimeout              = sarama.ErrInvalidSessionTimeout
	ErrRebalanceInProgress                = sarama.ErrRebalanceInProgress
	ErrInvalidCommitOffsetSize            = sarama.ErrInvalidCommitOffsetSize
	ErrTopicAuthorizationFailed           = sarama.ErrTopicAuthorizationFailed
	ErrGroupAuthorizationFailed           = sarama.ErrGroupAuthorizationFailed
	ErrClusterAuthorizationFailed         = sarama.ErrClusterAuthorizationFailed
	ErrInvalidTimestamp                   = sarama.ErrInvalidTimestamp
	ErrUnsupportedSASLMechanism           = sarama.ErrUnsupportedSASLMechanism
	ErrIllegalSASLState                   = sarama.ErrIllegalSASLState
	ErrUnsupportedVersion                 = sarama.ErrUnsupportedVersion
	ErrTopicAlreadyExists                 = sarama.ErrTopicAlreadyExists
	ErrInvalidPartitions                  = sarama.ErrInvalidPartitions
	ErrInvalidReplicationFactor           = sarama.ErrInvalidReplicationFactor
	ErrInvalidReplicaAssignment           = sarama.ErrInvalidReplicaAssignment
	ErrInvalidConfig                      = sarama.ErrInvalidConfig
	ErrNotController                      = sarama.ErrNotController
	ErrInvalidRequest                     = sarama.ErrInvalidRequest
	ErrUnsupportedForMessageFormat        = sarama.ErrUnsupportedForMessageFormat
	ErrPolicyViolation                    = sarama.ErrPolicyViolation
	ErrOutOfOrderSequenceNumber           = sarama.ErrOutOfOrderSequenceNumber
	ErrDuplicateSequenceNumber            = sarama.ErrDuplicateSequenceNumber
	ErrInvalidProducerEpoch               = sarama.ErrInvalidProducerEpoch
	ErrInvalidTxnState                    = sarama.ErrInvalidTxnState
	ErrInvalidProducerIDMapping           = sarama.ErrInvalidProducerIDMapping
	ErrInvalidTransactionTimeout          = sarama.ErrInvalidTransactionTimeout
	ErrConcurrentTransactions             = sarama.ErrConcurrentTransactions
	ErrTransactionCoordinatorFenced       = sarama.ErrTransactionCoordinatorFenced
	ErrTransactionalIDAuthorizationFailed = sarama.ErrTransactionalIDAuthorizationFailed
	ErrSecurityDisabled                   = sarama.ErrSecurityDisabled
	ErrOperationNotAttempted              = sarama.ErrOperationNotAttempted
	ErrKafkaStorageError                  = sarama.ErrKafkaStorageError
	ErrLogDirNotFound                     = sarama.ErrLogDirNotFound
	ErrSASLAuthenticationFailed           = sarama.ErrSASLAuthenticationFailed
	ErrUnknownProducerID                  = sarama.ErrUnknownProducerID
	ErrReassignmentInProgress             = sarama.ErrReassignmentInProgress
	ErrDelegationTokenAuthDisabled        = sarama.ErrDelegationTokenAuthDisabled
	ErrDelegationTokenNotFound            = sarama.ErrDelegationTokenNotFound
	ErrDelegationTokenOwnerMismatch       = sarama.ErrDelegationTokenOwnerMismatch
	ErrDelegationTokenRequestNotAllowed   = sarama.ErrDelegationTokenRequestNotAllowed
	ErrDelegationTokenAuthorizationFailed = sarama.ErrDelegationTokenAuthorizationFailed
	ErrDelegationTokenExpired             = sarama.ErrDelegationTokenExpired
	ErrInvalidPrincipalType               = sarama.ErrInvalidPrincipalType
	ErrNonEmptyGroup                      = sarama.ErrNonEmptyGroup
	ErrGroupIDNotFound                    = sarama.ErrGroupIDNotFound
	ErrFetchSessionIDNotFound             = sarama.ErrFetchSessionIDNotFound
	ErrInvalidFetchSessionEpoch           = sarama.ErrInvalidFetchSessionEpoch
	ErrListenerNotFound                   = sarama.ErrListenerNotFound
	ErrTopicDeletionDisabled              = sarama.ErrTopicDeletionDisabled
	ErrFencedLeaderEpoch                  = sarama.ErrFencedLeaderEpoch
	ErrUnknownLeaderEpoch                 = sarama.ErrUnknownLeaderEpoch
	ErrUnsupportedCompressionType         = sarama.ErrUnsupportedCompressionType
	ErrStaleBrokerEpoch                   = sarama.ErrStaleBrokerEpoch
	ErrOffsetNotAvailable                 = sarama.ErrOffsetNotAvailable
	ErrMemberIdRequired                   = sarama.ErrMemberIdRequired
	ErrPreferredLeaderNotAvailable        = sarama.ErrPreferredLeaderNotAvailable
	ErrGroupMaxSizeReached                = sarama.ErrGroupMaxSizeReached
	ErrFencedInstancedId                  = sarama.ErrFencedInstancedId
)

Numeric error codes returned by the Kafka server.

Variables

View Source
var (
	// MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying
	// to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned
	// with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt
	// to process.
	MaxRequestSize = sarama.MaxRequestSize

	// MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If
	// a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to
	// protect the client from running out of memory. Please note that brokers do not have any natural limit on
	// the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers
	// (see https://issues.apache.org/jira/browse/KAFKA-2063).
	MaxResponseSize = sarama.MaxResponseSize
)
View Source
var (
	// ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored
	// or otherwise failed to respond.
	ErrOutOfBrokers = sarama.ErrOutOfBrokers

	// ErrClosedClient is the error returned when a method is called on a client that has been closed.
	ErrClosedClient = sarama.ErrClosedClient

	// ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does
	// not contain the expected information.
	ErrIncompleteResponse = sarama.ErrIncompleteResponse

	// ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index
	// (meaning one outside of the range [0...numPartitions-1]).
	ErrInvalidPartition = sarama.ErrInvalidPartition

	// ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.
	ErrAlreadyConnected = sarama.ErrAlreadyConnected

	// ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
	ErrNotConnected = sarama.ErrNotConnected

	// ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected
	// when requesting messages, since as an optimization the server is allowed to return a partial message at the end
	// of the message set.
	ErrInsufficientData = sarama.ErrInsufficientData

	// ErrShuttingDown is returned when a producer receives a message during shutdown.
	ErrShuttingDown = sarama.ErrShuttingDown

	// ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
	ErrMessageTooLarge = sarama.ErrMessageTooLarge

	// ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing
	// a RecordBatch.
	ErrConsumerOffsetNotAdvanced = sarama.ErrConsumerOffsetNotAdvanced

	// ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version
	// is lower than 0.10.0.0.
	ErrControllerNotAvailable = sarama.ErrControllerNotAvailable

	// ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update
	// the metadata.
	ErrNoTopicsToUpdateMetadata = sarama.ErrNoTopicsToUpdateMetadata
)

Functions

This section is empty.

Types

type AbortedTransaction

type AbortedTransaction = sarama.AbortedTransaction

type AccessToken

type AccessToken = sarama.AccessToken

type AccessTokenProvider

type AccessTokenProvider = sarama.AccessTokenProvider

type Acl

type Acl = sarama.Acl

type AclCreation

type AclCreation = sarama.AclCreation

type AclCreationResponse

type AclCreationResponse = sarama.AclCreationResponse

type AclFilter

type AclFilter = sarama.AclFilter

type AclOperation

type AclOperation = sarama.AclOperation

type AclPermissionType

type AclPermissionType = sarama.AclPermissionType

type AclResourcePatternType

type AclResourcePatternType = sarama.AclResourcePatternType

type AclResourceType

type AclResourceType = sarama.AclResourceType

type AddOffsetsToTxnRequest

type AddOffsetsToTxnRequest = sarama.AddOffsetsToTxnRequest

type AddOffsetsToTxnResponse

type AddOffsetsToTxnResponse = sarama.AddOffsetsToTxnResponse

type AddPartitionsToTxnRequest

type AddPartitionsToTxnRequest = sarama.AddPartitionsToTxnRequest

type AddPartitionsToTxnResponse

type AddPartitionsToTxnResponse = sarama.AddPartitionsToTxnResponse

type AlterConfigsRequest

type AlterConfigsRequest = sarama.AlterConfigsRequest

type AlterConfigsResource

type AlterConfigsResource = sarama.AlterConfigsResource

type AlterConfigsResourceResponse

type AlterConfigsResourceResponse = sarama.AlterConfigsResourceResponse

type AlterConfigsResponse

type AlterConfigsResponse = sarama.AlterConfigsResponse

type AlterPartitionReassignmentsRequest

type AlterPartitionReassignmentsRequest = sarama.AlterPartitionReassignmentsRequest

type AlterPartitionReassignmentsResponse

type AlterPartitionReassignmentsResponse = sarama.AlterPartitionReassignmentsResponse

type ApiVersionsRequest

type ApiVersionsRequest = sarama.ApiVersionsRequest

type ApiVersionsResponse

type ApiVersionsResponse = sarama.ApiVersionsResponse

type ApiVersionsResponseBlock

type ApiVersionsResponseBlock = sarama.ApiVersionsResponseBlock

type AsyncProducer

type AsyncProducer = sarama.AsyncProducer

type BalanceStrategy

type BalanceStrategy = sarama.BalanceStrategy

type BalanceStrategyPlan

type BalanceStrategyPlan = sarama.BalanceStrategyPlan

type Broker

type Broker = sarama.Broker

type ByteEncoder

type ByteEncoder = sarama.ByteEncoder

type Client

type Client = sarama.Client

type ClusterAdmin

type ClusterAdmin = sarama.ClusterAdmin

type CompressionCodec

type CompressionCodec = sarama.CompressionCodec

type Config

type Config struct {
	Addrs             []string            `config:"addrs"`
	Net               Net                 `config:"net"`
	Metadata          Metadata            `config:"metadata"`
	Producer          ProducerConfig      `config:"producer"`
	Consumer          ConsumerConfig      `config:"consumer"`
	ClientID          string              `config:"clientId"`
	ChannelBufferSize int                 `config:"channelBufferSize"`
	Version           sarama.KafkaVersion `config:"version"`
	// contains filtered or unexported fields
}

Config 配置

func DefaultConfig

func DefaultConfig(key string) *Config

DefaultConfig 默认配置

func StdConfig

func StdConfig(key string, optionFunc ...OptionFunc) *Config

StdConfig 标准配置

func (*Config) Build

func (c *Config) Build() *Kafka

Build 构建实例

func (*Config) Path

func (c *Config) Path() string

Path 实例配置目录

type ConfigEntry

type ConfigEntry = sarama.ConfigEntry

type ConfigKafka

type ConfigKafka = sarama.Config

type ConfigResource

type ConfigResource = sarama.ConfigResource

type ConfigResourceType

type ConfigResourceType = sarama.ConfigResourceType

type ConfigSource

type ConfigSource = sarama.ConfigSource

type ConfigSynonym

type ConfigSynonym = sarama.ConfigSynonym

type ConfigurationError

type ConfigurationError = sarama.ConfigurationError

type Consumer

type Consumer = sarama.Consumer

type ConsumerConfig

type ConsumerConfig struct {
	GroupSessionTimeout        time.Duration                   `config:"groupSessionTimeout"`
	GroupHeartbeatInterval     time.Duration                   `config:"groupHeartbeatInterval"`
	GroupRebalanceStrategy     sarama.BalanceStrategy          `config:"groupRebalanceStrategy" json:"-"`
	GroupRebalanceTimeout      time.Duration                   `config:"groupRebalanceTimeout"`
	GroupRebalanceRetryMax     int                             `config:"groupRebalanceRetryMax"`
	GroupRebalanceRetryBackoff time.Duration                   `config:"groupRebalanceRetryBackoff"`
	GroupMemberUserData        []byte                          `config:"groupMemberUserData"`
	RetryBackoff               time.Duration                   `config:"retryBackoff"`
	RetryBackoffFunc           func(retries int) time.Duration `config:"-" json:"-"`
	FetchMin                   int32                           `config:"fetchMin"`
	FetchMax                   int32                           `config:"fetchMax"`
	FetchDefault               int32                           `config:"fetchDefault"`
	MaxWaitTime                time.Duration                   `config:"maxWaitTime"`
	MaxProcessingTime          time.Duration                   `config:"maxProcessingTime"`
	ReturnErrors               bool                            `config:"returnErrors"`
	OffsetsCommitInterval      time.Duration                   `config:"offsetsCommitInterval"`
	OffsetsInitial             int64                           `config:"offsetsInitial"`
	OffsetsRetention           time.Duration                   `config:"offsetsRetention"`
	OffsetRetryMax             int                             `config:"offsetRetryMax"`
	OffsetAutoCommitEnable     bool                            `config:"offsetAutoCommitEnable"`
	OffsetAutoCommitInterval   time.Duration                   `config:"offsetAutoCommitInterval"`
	IsolationLevel             sarama.IsolationLevel           `config:"isolationLevel"`
	Interceptors               []sarama.ConsumerInterceptor    `config:"-" json:"-"`
}

type ConsumerError

type ConsumerError = sarama.ConsumerError

type ConsumerErrors

type ConsumerErrors = sarama.ConsumerErrors

type ConsumerGroup

type ConsumerGroup = sarama.ConsumerGroup

type ConsumerGroupClaim

type ConsumerGroupClaim = sarama.ConsumerGroupClaim

type ConsumerGroupHandler

type ConsumerGroupHandler = sarama.ConsumerGroupHandler

type ConsumerGroupMemberAssignment

type ConsumerGroupMemberAssignment = sarama.ConsumerGroupMemberAssignment

type ConsumerGroupMemberMetadata

type ConsumerGroupMemberMetadata = sarama.ConsumerGroupMemberMetadata

type ConsumerGroupSession

type ConsumerGroupSession = sarama.ConsumerGroupSession

type ConsumerInterceptor

type ConsumerInterceptor = sarama.ConsumerInterceptor

type ConsumerMessage

type ConsumerMessage = sarama.ConsumerMessage

type ConsumerMetadataRequest

type ConsumerMetadataRequest = sarama.ConsumerMetadataRequest

type ConsumerMetadataResponse

type ConsumerMetadataResponse = sarama.ConsumerMetadataResponse

type ControlRecord

type ControlRecord = sarama.ControlRecord

type ControlRecordType

type ControlRecordType = sarama.ControlRecordType

type CoordinatorType

type CoordinatorType = sarama.CoordinatorType

type CreateAclsRequest

type CreateAclsRequest = sarama.CreateAclsRequest

type CreateAclsResponse

type CreateAclsResponse = sarama.CreateAclsResponse

type CreatePartitionsRequest

type CreatePartitionsRequest = sarama.CreatePartitionsRequest

type CreatePartitionsResponse

type CreatePartitionsResponse = sarama.CreatePartitionsResponse

type CreateTopicsRequest

type CreateTopicsRequest = sarama.CreateTopicsRequest

type CreateTopicsResponse

type CreateTopicsResponse = sarama.CreateTopicsResponse

type DeleteAclsRequest

type DeleteAclsRequest = sarama.DeleteAclsRequest

type DeleteAclsResponse

type DeleteAclsResponse = sarama.DeleteAclsResponse

type DeleteGroupsRequest

type DeleteGroupsRequest = sarama.DeleteGroupsRequest

type DeleteGroupsResponse

type DeleteGroupsResponse = sarama.DeleteGroupsResponse

type DeleteRecordsRequest

type DeleteRecordsRequest = sarama.DeleteRecordsRequest

type DeleteRecordsRequestTopic

type DeleteRecordsRequestTopic = sarama.DeleteRecordsRequestTopic

type DeleteRecordsResponse

type DeleteRecordsResponse = sarama.DeleteRecordsResponse

type DeleteRecordsResponsePartition

type DeleteRecordsResponsePartition = sarama.DeleteRecordsResponsePartition

type DeleteRecordsResponseTopic

type DeleteRecordsResponseTopic = sarama.DeleteRecordsResponseTopic

type DeleteTopicsRequest

type DeleteTopicsRequest = sarama.DeleteTopicsRequest

type DeleteTopicsResponse

type DeleteTopicsResponse = sarama.DeleteTopicsResponse

type DescribeAclsRequest

type DescribeAclsRequest = sarama.DescribeAclsRequest

type DescribeAclsResponse

type DescribeAclsResponse = sarama.DescribeAclsResponse

type DescribeConfigsRequest

type DescribeConfigsRequest = sarama.DescribeConfigsRequest

type DescribeConfigsResponse

type DescribeConfigsResponse = sarama.DescribeConfigsResponse

type DescribeGroupsRequest

type DescribeGroupsRequest = sarama.DescribeGroupsRequest

type DescribeGroupsResponse

type DescribeGroupsResponse = sarama.DescribeGroupsResponse

type DescribeLogDirsRequest

type DescribeLogDirsRequest = sarama.DescribeLogDirsRequest

type DescribeLogDirsRequestTopic

type DescribeLogDirsRequestTopic = sarama.DescribeLogDirsRequestTopic

type DescribeLogDirsResponse

type DescribeLogDirsResponse = sarama.DescribeLogDirsResponse

type DescribeLogDirsResponseDirMetadata

type DescribeLogDirsResponseDirMetadata = sarama.DescribeLogDirsResponseDirMetadata

type DescribeLogDirsResponsePartition

type DescribeLogDirsResponsePartition = sarama.DescribeLogDirsResponsePartition

type DescribeLogDirsResponseTopic

type DescribeLogDirsResponseTopic = sarama.DescribeLogDirsResponseTopic

type DynamicConsistencyPartitioner

type DynamicConsistencyPartitioner = sarama.DynamicConsistencyPartitioner

type Encoder

type Encoder = sarama.Encoder

type EndTxnRequest

type EndTxnRequest = sarama.EndTxnRequest

type EndTxnResponse

type EndTxnResponse = sarama.EndTxnResponse

type ErrDeleteRecords

type ErrDeleteRecords = sarama.ErrDeleteRecords

type ErrReassignPartitions

type ErrReassignPartitions = sarama.ErrReassignPartitions

type FetchRequest

type FetchRequest = sarama.FetchRequest

type FetchResponse

type FetchResponse = sarama.FetchResponse

type FetchResponseBlock

type FetchResponseBlock = sarama.FetchResponseBlock

type FilterResponse

type FilterResponse = sarama.FilterResponse

type FindCoordinatorRequest

type FindCoordinatorRequest = sarama.FindCoordinatorRequest

type FindCoordinatorResponse

type FindCoordinatorResponse = sarama.FindCoordinatorResponse

type GSSAPIConfig

type GSSAPIConfig = sarama.GSSAPIConfig

type GSSAPIKerberosAuth

type GSSAPIKerberosAuth = sarama.GSSAPIKerberosAuth

type GSSApiHandlerFunc

type GSSApiHandlerFunc = sarama.GSSApiHandlerFunc

type GroupDescription

type GroupDescription = sarama.GroupDescription

type GroupMemberDescription

type GroupMemberDescription = sarama.GroupMemberDescription

type GroupProtocol

type GroupProtocol = sarama.GroupProtocol

type HashPartitionerOption

type HashPartitionerOption = sarama.HashPartitionerOption

type HeartbeatRequest

type HeartbeatRequest = sarama.HeartbeatRequest

type HeartbeatResponse

type HeartbeatResponse = sarama.HeartbeatResponse

type InitProducerIDRequest

type InitProducerIDRequest = sarama.InitProducerIDRequest

type InitProducerIDResponse

type InitProducerIDResponse = sarama.InitProducerIDResponse

type IsolationLevel

type IsolationLevel = sarama.IsolationLevel

type JoinGroupRequest

type JoinGroupRequest = sarama.JoinGroupRequest

type JoinGroupResponse

type JoinGroupResponse = sarama.JoinGroupResponse

type KError

type KError = sarama.KError

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

func (Kafka) Brokers

func (kfk Kafka) Brokers() []*Broker

Brokers returns the current set of active brokers as retrieved from cluster metadata.

func (Kafka) Close

func (kfk Kafka) Close() error

Close shuts down all broker connections managed by this client. It is required to call this function before a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers using a client before you close the client.

func (Kafka) Closed

func (kfk Kafka) Closed() bool

Closed returns true if the client has already had Close called on it

func (Kafka) Config

func (kfk Kafka) Config() *ConfigKafka

Config returns the Config struct of the client. This struct should not be altered after it has been created.

func (Kafka) Controller

func (kfk Kafka) Controller() (*Broker, error)

Controller returns the cluster controller broker. It will return a locally cached value if it's available. You can call RefreshController to update the cached value. Requires Kafka 0.10 or higher.

func (Kafka) Coordinator

func (kfk Kafka) Coordinator(consumerGroup string) (*Broker, error)

Coordinator returns the coordinating broker for a consumer group. It will return a locally cached value if it's available. You can call RefreshCoordinator to update the cached value. This function only works on Kafka 0.8.2 and higher.

func (Kafka) GetOffset

func (kfk Kafka) GetOffset(topic string, partitionID int32, time int64) (int64, error)

GetOffset queries the cluster to get the most recent available offset at the given time (in milliseconds) on the topic/partition combination. Time should be OffsetOldest for the earliest available offset, OffsetNewest for the offset of the message that will be produced next, or a time.

func (Kafka) InSyncReplicas

func (kfk Kafka) InSyncReplicas(topic string, partitionID int32) ([]int32, error)

InSyncReplicas returns the set of all in-sync replica IDs for the given partition. In-sync replicas are replicas which are fully caught up with the partition leader.

func (Kafka) InitProducerID

func (kfk Kafka) InitProducerID() (*InitProducerIDResponse, error)

InitProducerID retrieves information required for Idempotent Producer

func (Kafka) Leader

func (kfk Kafka) Leader(topic string, partitionID int32) (*Broker, error)

Leader returns the broker object that is the leader of the current topic/partition, as determined by querying the cluster metadata.

func (Kafka) NewAsyncProducer

func (kfk Kafka) NewAsyncProducer() (AsyncProducer, error)

func (Kafka) NewConsumer

func (kfk Kafka) NewConsumer() (Consumer, error)

func (Kafka) NewConsumerGroup

func (kfk Kafka) NewConsumerGroup(groupID string) (ConsumerGroup, error)

func (Kafka) NewSyncProducer

func (kfk Kafka) NewSyncProducer() (SyncProducer, error)

func (Kafka) OfflineReplicas

func (kfk Kafka) OfflineReplicas(topic string, partitionID int32) ([]int32, error)

OfflineReplicas returns the set of all offline replica IDs for the given partition. Offline replicas are replicas which are offline

func (Kafka) Partitions

func (kfk Kafka) Partitions(topic string) ([]int32, error)

Partitions returns the sorted list of all partition IDs for the given topic.

func (Kafka) RefreshBrokers

func (kfk Kafka) RefreshBrokers(addrs []string) error

RefreshBrokers takes a list of addresses to be used as seed brokers. Existing broker connections are closed and the updated list of seed brokers will be used for the next metadata fetch.

func (Kafka) RefreshController

func (kfk Kafka) RefreshController() (*Broker, error)

RefreshController retrieves the cluster controller from fresh metadata and stores it in the local cache. Requires Kafka 0.10 or higher.

func (Kafka) RefreshCoordinator

func (kfk Kafka) RefreshCoordinator(consumerGroup string) error

RefreshCoordinator retrieves the coordinator for a consumer group and stores it in local cache. This function only works on Kafka 0.8.2 and higher.

func (Kafka) RefreshMetadata

func (kfk Kafka) RefreshMetadata(topics ...string) error

RefreshMetadata takes a list of topics and queries the cluster to refresh the available metadata for those topics. If no topics are provided, it will refresh metadata for all topics.

func (Kafka) Replicas

func (kfk Kafka) Replicas(topic string, partitionID int32) ([]int32, error)

Replicas returns the set of all replica IDs for the given partition.

func (Kafka) Topics

func (kfk Kafka) Topics() ([]string, error)

Topics returns the set of available topics as retrieved from cluster metadata.

func (Kafka) WritablePartitions

func (kfk Kafka) WritablePartitions(topic string) ([]int32, error)

WritablePartitions returns the sorted list of all writable partition IDs for the given topic, where "writable" means "having a valid leader accepting writes".

type KafkaGSSAPIHandler

type KafkaGSSAPIHandler = sarama.KafkaGSSAPIHandler

type KafkaVersion

type KafkaVersion = sarama.KafkaVersion

type KerberosClient

type KerberosClient = sarama.KerberosClient

type KerberosGoKrb5Client

type KerberosGoKrb5Client = sarama.KerberosGoKrb5Client

type LeaveGroupRequest

type LeaveGroupRequest = sarama.LeaveGroupRequest

type LeaveGroupResponse

type LeaveGroupResponse = sarama.LeaveGroupResponse

type ListGroupsRequest

type ListGroupsRequest = sarama.ListGroupsRequest

type ListGroupsResponse

type ListGroupsResponse = sarama.ListGroupsResponse

type ListPartitionReassignmentsRequest

type ListPartitionReassignmentsRequest = sarama.ListPartitionReassignmentsRequest

type ListPartitionReassignmentsResponse

type ListPartitionReassignmentsResponse = sarama.ListPartitionReassignmentsResponse

type MatchingAcl

type MatchingAcl = sarama.MatchingAcl

type Message

type Message = sarama.Message

type MessageBlock

type MessageBlock = sarama.MessageBlock

type MessageSet

type MessageSet = sarama.MessageSet

type Metadata

type Metadata struct {
	RetryMax         int                                         `config:"retryMax"`
	RetryBackoff     time.Duration                               `config:"retryBackoff"`
	RetryBackoffFunc func(retries, maxRetries int) time.Duration `config:"-" json:"-"`
	RefreshFrequency time.Duration                               `config:"refreshFrequency"`
	Full             bool                                        `config:"full"`
	Timeout          time.Duration                               `config:"timeout"`
}

type MetadataRequest

type MetadataRequest = sarama.MetadataRequest

type MetadataResponse

type MetadataResponse = sarama.MetadataResponse

type MockAlterConfigsResponse

type MockAlterConfigsResponse = sarama.MockAlterConfigsResponse

type MockAlterConfigsResponseWithErrorCode

type MockAlterConfigsResponseWithErrorCode = sarama.MockAlterConfigsResponseWithErrorCode

type MockBroker

type MockBroker = sarama.MockBroker

type MockConsumerMetadataResponse

type MockConsumerMetadataResponse = sarama.MockConsumerMetadataResponse

type MockCreateAclsResponse

type MockCreateAclsResponse = sarama.MockCreateAclsResponse

type MockCreatePartitionsResponse

type MockCreatePartitionsResponse = sarama.MockCreatePartitionsResponse

type MockCreateTopicsResponse

type MockCreateTopicsResponse = sarama.MockCreateTopicsResponse

type MockDeleteAclsResponse

type MockDeleteAclsResponse = sarama.MockDeleteAclsResponse

type MockDeleteGroupsResponse

type MockDeleteGroupsResponse = sarama.MockDeleteGroupsResponse

type MockDeleteRecordsResponse

type MockDeleteRecordsResponse = sarama.MockDeleteRecordsResponse

type MockDeleteTopicsResponse

type MockDeleteTopicsResponse = sarama.MockDeleteTopicsResponse

type MockDescribeConfigsResponse

type MockDescribeConfigsResponse = sarama.MockDescribeConfigsResponse

type MockDescribeGroupsResponse

type MockDescribeGroupsResponse = sarama.MockDescribeGroupsResponse

type MockDescribeLogDirsResponse

type MockDescribeLogDirsResponse = sarama.MockDescribeLogDirsResponse

type MockFetchResponse

type MockFetchResponse = sarama.MockFetchResponse

type MockFindCoordinatorResponse

type MockFindCoordinatorResponse = sarama.MockFindCoordinatorResponse

type MockHeartbeatResponse

type MockHeartbeatResponse = sarama.MockHeartbeatResponse

type MockJoinGroupResponse

type MockJoinGroupResponse = sarama.MockJoinGroupResponse

type MockKerberosClient

type MockKerberosClient = sarama.MockKerberosClient

type MockLeaveGroupResponse

type MockLeaveGroupResponse = sarama.MockLeaveGroupResponse

type MockListAclsResponse

type MockListAclsResponse = sarama.MockListAclsResponse

type MockListGroupsResponse

type MockListGroupsResponse = sarama.MockListGroupsResponse

type MockListPartitionReassignmentsResponse

type MockListPartitionReassignmentsResponse = sarama.MockListPartitionReassignmentsResponse

type MockMetadataResponse

type MockMetadataResponse = sarama.MockMetadataResponse

type MockOffsetCommitResponse

type MockOffsetCommitResponse = sarama.MockOffsetCommitResponse

type MockOffsetFetchResponse

type MockOffsetFetchResponse = sarama.MockOffsetFetchResponse

type MockOffsetResponse

type MockOffsetResponse = sarama.MockOffsetResponse

type MockProduceResponse

type MockProduceResponse = sarama.MockProduceResponse

type MockResponse

type MockResponse = sarama.MockResponse

type MockSaslAuthenticateResponse

type MockSaslAuthenticateResponse = sarama.MockSaslAuthenticateResponse

type MockSaslHandshakeResponse

type MockSaslHandshakeResponse = sarama.MockSaslHandshakeResponse

type MockSequence

type MockSequence = sarama.MockSequence

type MockSyncGroupResponse

type MockSyncGroupResponse = sarama.MockSyncGroupResponse

type MockWrapper

type MockWrapper = sarama.MockWrapper

type MultiError

type MultiError = sarama.MultiError

type Net

type Net struct {
	MaxOpenRequests int           `config:"maxOpenRequests"`
	DialTimeout     time.Duration `config:"dialTimeout"`
	ReadTimeout     time.Duration `config:"readTimeout"`
	WriteTimeout    time.Duration `config:"writeTimeout"`
	KeepAlive       time.Duration `config:"keepAlive"`
}

type OffsetCommitRequest

type OffsetCommitRequest = sarama.OffsetCommitRequest

type OffsetCommitResponse

type OffsetCommitResponse = sarama.OffsetCommitResponse

type OffsetFetchRequest

type OffsetFetchRequest = sarama.OffsetFetchRequest

type OffsetFetchResponse

type OffsetFetchResponse = sarama.OffsetFetchResponse

type OffsetFetchResponseBlock

type OffsetFetchResponseBlock = sarama.OffsetFetchResponseBlock

type OffsetManager

type OffsetManager = sarama.OffsetManager

type OffsetRequest

type OffsetRequest = sarama.OffsetRequest

type OffsetResponse

type OffsetResponse = sarama.OffsetResponse

type OffsetResponseBlock

type OffsetResponseBlock = sarama.OffsetResponseBlock

type OptionFunc

type OptionFunc func(*Config)

OptionFunc 选项信息

type PacketDecodingError

type PacketDecodingError = sarama.PacketDecodingError

type PacketEncodingError

type PacketEncodingError = sarama.PacketEncodingError

type PartitionConsumer

type PartitionConsumer = sarama.PartitionConsumer

type PartitionError

type PartitionError = sarama.PartitionError

type PartitionMetadata

type PartitionMetadata = sarama.PartitionMetadata

type PartitionOffsetManager

type PartitionOffsetManager = sarama.PartitionOffsetManager

type PartitionOffsetMetadata

type PartitionOffsetMetadata = sarama.PartitionOffsetMetadata

type PartitionReplicaReassignmentsStatus

type PartitionReplicaReassignmentsStatus = sarama.PartitionReplicaReassignmentsStatus

type Partitioner

type Partitioner = sarama.Partitioner

type PartitionerConstructor

type PartitionerConstructor = sarama.PartitionerConstructor

type ProduceRequest

type ProduceRequest = sarama.ProduceRequest

type ProduceResponse

type ProduceResponse = sarama.ProduceResponse

type ProduceResponseBlock

type ProduceResponseBlock = sarama.ProduceResponseBlock

type ProducerConfig

type ProducerConfig struct {
	MaxMessageBytes  int                                         `config:"maxMessageBytes"`
	RequiredAcks     sarama.RequiredAcks                         `config:"requiredAcks"`
	Timeout          time.Duration                               `config:"timeout"`
	Compression      sarama.CompressionCodec                     `config:"compression"`
	CompressionLevel int                                         `config:"compressionLevel"`
	Partitioner      sarama.PartitionerConstructor               `config:"-" json:"-"`
	Idempotent       bool                                        `config:"idempotent"`
	ReturnSuccesses  bool                                        `config:"returnSuccesses"`
	ReturnErrors     bool                                        `config:"returnErrors"`
	FlushBytes       int                                         `config:"flushBytes"`
	FlushMessages    int                                         `config:"flushMessages"`
	FlushFrequency   time.Duration                               `config:"flushFrequency"`
	FlushMaxMessages int                                         `config:"FlushMaxMessages"`
	RetryMax         int                                         `config:"retryMax"`
	RetryBackoff     time.Duration                               `config:"retryBackoff"`
	RetryBackoffFunc func(retries, maxRetries int) time.Duration `config:"-" json:"-"`
	Interceptors     []sarama.ProducerInterceptor                `config:"-" json:"-"`
}

type ProducerError

type ProducerError = sarama.ProducerError

type ProducerErrors

type ProducerErrors = sarama.ProducerErrors

type ProducerInterceptor

type ProducerInterceptor = sarama.ProducerInterceptor

type ProducerMessage

type ProducerMessage = sarama.ProducerMessage

type Record

type Record = sarama.Record

type RecordBatch

type RecordBatch = sarama.RecordBatch

type RecordHeader

type RecordHeader = sarama.RecordHeader

type Records

type Records = sarama.Records

type RequestNotifierFunc

type RequestNotifierFunc = sarama.RequestNotifierFunc

type RequestResponse

type RequestResponse = sarama.RequestResponse

type RequiredAcks

type RequiredAcks = sarama.RequiredAcks

type Resource

type Resource = sarama.Resource

type ResourceAcls

type ResourceAcls = sarama.ResourceAcls

type ResourceResponse

type ResourceResponse = sarama.ResourceResponse

type SASLMechanism

type SASLMechanism = sarama.SASLMechanism

type SCRAMClient

type SCRAMClient = sarama.SCRAMClient

type SaslAuthenticateRequest

type SaslAuthenticateRequest = sarama.SaslAuthenticateRequest

type SaslAuthenticateResponse

type SaslAuthenticateResponse = sarama.SaslAuthenticateResponse

type SaslHandshakeRequest

type SaslHandshakeRequest = sarama.SaslHandshakeRequest

type SaslHandshakeResponse

type SaslHandshakeResponse = sarama.SaslHandshakeResponse

type StdLogger

type StdLogger = sarama.StdLogger

type StickyAssignorUserData

type StickyAssignorUserData = sarama.StickyAssignorUserData

type StickyAssignorUserDataV0

type StickyAssignorUserDataV0 = sarama.StickyAssignorUserDataV0

type StickyAssignorUserDataV1

type StickyAssignorUserDataV1 = sarama.StickyAssignorUserDataV1

type StringEncoder

type StringEncoder = sarama.StringEncoder

type SyncGroupRequest

type SyncGroupRequest = sarama.SyncGroupRequest

type SyncGroupResponse

type SyncGroupResponse = sarama.SyncGroupResponse

type SyncProducer

type SyncProducer = sarama.SyncProducer

type TestReporter

type TestReporter = sarama.TestReporter

type Timestamp

type Timestamp = sarama.Timestamp

type TopicDetail

type TopicDetail = sarama.TopicDetail

type TopicError

type TopicError = sarama.TopicError

type TopicMetadata

type TopicMetadata = sarama.TopicMetadata

type TopicPartition

type TopicPartition = sarama.TopicPartition

type TopicPartitionError

type TopicPartitionError = sarama.TopicPartitionError

type TxnOffsetCommitRequest

type TxnOffsetCommitRequest = sarama.TxnOffsetCommitRequest

type TxnOffsetCommitResponse

type TxnOffsetCommitResponse = sarama.TxnOffsetCommitResponse

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL