Documentation ¶
Overview ¶
Example ¶
package main import ( "fmt" "os" "os/signal" "sync/atomic" "time" "github.com/Shopify/sarama" "github.com/boxgo/box/pkg/client/kafka" ) const ( testTopic = "wechat_event" ) func main() { kfk := kafka.StdConfig("default").Build() producer, err := kfk.NewSyncProducer() if err != nil { panic(err) } defer func() { if err := producer.Close(); err != nil { panic(err) } }() consumer, err := kfk.NewConsumer() if err != nil { panic(err) } defer func() { if err := consumer.Close(); err != nil { panic(err) } }() partitionConsumer, err := consumer.ConsumePartition(testTopic, 0, sarama.OffsetNewest) if err != nil { panic(err) } signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) var cnt int32 go func() { for { select { case <-partitionConsumer.Messages(): atomic.AddInt32(&cnt, 1) case <-signals: break } } }() partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{ Topic: testTopic, Value: sarama.StringEncoder("hi"), }) if err != nil { panic(err) } time.Sleep(time.Second) fmt.Println(offset >= 0, partition == 0, atomic.LoadInt32(&cnt) > 0) }
Output: true true true
Index ¶
Examples ¶
Constants ¶
View Source
const ( // RangeBalanceStrategyName identifies strategies that use the range partition assignment strategy RangeBalanceStrategyName = sarama.RangeBalanceStrategyName // RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy RoundRobinBalanceStrategyName = sarama.RoundRobinBalanceStrategyName // StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy StickyBalanceStrategyName = sarama.StickyBalanceStrategyName )
View Source
const ( // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+) SASLTypeOAuth = sarama.SASLTypeOAuth // SASLTypePlaintext represents the SASL/PLAIN mechanism SASLTypePlaintext = sarama.SASLTypePlaintext // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism. SASLTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256 // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism. SASLTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512 SASLTypeGSSAPI = sarama.SASLTypeGSSAPI // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and // server negotiate SASL auth using opaque packets. SASLHandshakeV0 = sarama.SASLHandshakeV0 // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and // server negotiate SASL by wrapping tokens with Kafka protocol headers. SASLHandshakeV1 = sarama.SASLHandshakeV1 // SASLExtKeyAuth is the reserved extension key name sent as part of the // SASL/OAUTHBEARER initial client response SASLExtKeyAuth = sarama.SASLExtKeyAuth )
View Source
const ( // OffsetNewest stands for the log head offset, i.e. the offset that will be // assigned to the next message that will be produced to the partition. You // can send this to a client's GetOffset method to get this offset, or when // calling ConsumePartition to start consuming new messages. OffsetNewest = sarama.OffsetNewest // OffsetOldest stands for the oldest offset available on the broker for a // partition. You can send this to a client's GetOffset method to get this // offset, or when calling ConsumePartition to start consuming from the // oldest offset that is still available on the broker. OffsetOldest = sarama.OffsetOldest )
View Source
const ( TOK_ID_KRB_AP_REQ = sarama.TOK_ID_KRB_AP_REQ GSS_API_GENERIC_TAG = sarama.GSS_API_GENERIC_TAG KRB5_USER_AUTH = sarama.KRB5_USER_AUTH KRB5_KEYTAB_AUTH = sarama.KRB5_KEYTAB_AUTH GSS_API_INITIAL = sarama.GSS_API_INITIAL GSS_API_VERIFY = sarama.GSS_API_VERIFY GSS_API_FINISH = sarama.GSS_API_FINISH )
View Source
const ( // APIKeySASLAuth is the API key for the SaslAuthenticate Kafka API APIKeySASLAuth = sarama.APIKeySASLAuth // GroupGenerationUndefined is a special value for the group generation field of Offset Commit Requests that should be used when a consumer group does not rely on Kafka for partition management. GroupGenerationUndefined = sarama.GroupGenerationUndefined // ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the request was received. The timestamp is only used if message version 1 is used, which requires kafka 0.8.2. ReceiveTime int64 = sarama.ReceiveTime )
View Source
const ( ErrNoError = sarama.ErrNoError ErrUnknown = sarama.ErrUnknown ErrOffsetOutOfRange = sarama.ErrOffsetOutOfRange ErrInvalidMessage = sarama.ErrInvalidMessage ErrUnknownTopicOrPartition = sarama.ErrUnknownTopicOrPartition ErrInvalidMessageSize = sarama.ErrInvalidMessageSize ErrLeaderNotAvailable = sarama.ErrLeaderNotAvailable ErrNotLeaderForPartition = sarama.ErrNotLeaderForPartition ErrRequestTimedOut = sarama.ErrRequestTimedOut ErrBrokerNotAvailable = sarama.ErrBrokerNotAvailable ErrReplicaNotAvailable = sarama.ErrReplicaNotAvailable ErrMessageSizeTooLarge = sarama.ErrMessageSizeTooLarge ErrStaleControllerEpochCode = sarama.ErrStaleControllerEpochCode ErrOffsetMetadataTooLarge = sarama.ErrOffsetMetadataTooLarge ErrNetworkException = sarama.ErrNetworkException ErrOffsetsLoadInProgress = sarama.ErrOffsetsLoadInProgress ErrConsumerCoordinatorNotAvailable = sarama.ErrConsumerCoordinatorNotAvailable ErrNotCoordinatorForConsumer = sarama.ErrNotCoordinatorForConsumer ErrInvalidTopic = sarama.ErrInvalidTopic ErrMessageSetSizeTooLarge = sarama.ErrMessageSetSizeTooLarge ErrNotEnoughReplicas = sarama.ErrNotEnoughReplicas ErrNotEnoughReplicasAfterAppend = sarama.ErrNotEnoughReplicasAfterAppend ErrInvalidRequiredAcks = sarama.ErrInvalidRequiredAcks ErrIllegalGeneration = sarama.ErrIllegalGeneration ErrInconsistentGroupProtocol = sarama.ErrInconsistentGroupProtocol ErrInvalidGroupId = sarama.ErrInvalidGroupId ErrUnknownMemberId = sarama.ErrUnknownMemberId ErrInvalidSessionTimeout = sarama.ErrInvalidSessionTimeout ErrRebalanceInProgress = sarama.ErrRebalanceInProgress ErrInvalidCommitOffsetSize = sarama.ErrInvalidCommitOffsetSize ErrTopicAuthorizationFailed = sarama.ErrTopicAuthorizationFailed ErrGroupAuthorizationFailed = sarama.ErrGroupAuthorizationFailed ErrClusterAuthorizationFailed = sarama.ErrClusterAuthorizationFailed ErrInvalidTimestamp = sarama.ErrInvalidTimestamp ErrUnsupportedSASLMechanism = sarama.ErrUnsupportedSASLMechanism ErrIllegalSASLState = sarama.ErrIllegalSASLState ErrUnsupportedVersion = sarama.ErrUnsupportedVersion ErrTopicAlreadyExists = sarama.ErrTopicAlreadyExists ErrInvalidPartitions = sarama.ErrInvalidPartitions ErrInvalidReplicationFactor = sarama.ErrInvalidReplicationFactor ErrInvalidReplicaAssignment = sarama.ErrInvalidReplicaAssignment ErrInvalidConfig = sarama.ErrInvalidConfig ErrNotController = sarama.ErrNotController ErrInvalidRequest = sarama.ErrInvalidRequest ErrUnsupportedForMessageFormat = sarama.ErrUnsupportedForMessageFormat ErrPolicyViolation = sarama.ErrPolicyViolation ErrOutOfOrderSequenceNumber = sarama.ErrOutOfOrderSequenceNumber ErrDuplicateSequenceNumber = sarama.ErrDuplicateSequenceNumber ErrInvalidProducerEpoch = sarama.ErrInvalidProducerEpoch ErrInvalidTxnState = sarama.ErrInvalidTxnState ErrInvalidProducerIDMapping = sarama.ErrInvalidProducerIDMapping ErrInvalidTransactionTimeout = sarama.ErrInvalidTransactionTimeout ErrConcurrentTransactions = sarama.ErrConcurrentTransactions ErrTransactionCoordinatorFenced = sarama.ErrTransactionCoordinatorFenced ErrTransactionalIDAuthorizationFailed = sarama.ErrTransactionalIDAuthorizationFailed ErrSecurityDisabled = sarama.ErrSecurityDisabled ErrOperationNotAttempted = sarama.ErrOperationNotAttempted ErrKafkaStorageError = sarama.ErrKafkaStorageError ErrLogDirNotFound = sarama.ErrLogDirNotFound ErrSASLAuthenticationFailed = sarama.ErrSASLAuthenticationFailed ErrUnknownProducerID = sarama.ErrUnknownProducerID ErrReassignmentInProgress = sarama.ErrReassignmentInProgress ErrDelegationTokenAuthDisabled = sarama.ErrDelegationTokenAuthDisabled ErrDelegationTokenNotFound = sarama.ErrDelegationTokenNotFound ErrDelegationTokenOwnerMismatch = sarama.ErrDelegationTokenOwnerMismatch ErrDelegationTokenRequestNotAllowed = sarama.ErrDelegationTokenRequestNotAllowed ErrDelegationTokenAuthorizationFailed = sarama.ErrDelegationTokenAuthorizationFailed ErrDelegationTokenExpired = sarama.ErrDelegationTokenExpired ErrInvalidPrincipalType = sarama.ErrInvalidPrincipalType ErrNonEmptyGroup = sarama.ErrNonEmptyGroup ErrGroupIDNotFound = sarama.ErrGroupIDNotFound ErrFetchSessionIDNotFound = sarama.ErrFetchSessionIDNotFound ErrInvalidFetchSessionEpoch = sarama.ErrInvalidFetchSessionEpoch ErrListenerNotFound = sarama.ErrListenerNotFound ErrTopicDeletionDisabled = sarama.ErrTopicDeletionDisabled ErrFencedLeaderEpoch = sarama.ErrFencedLeaderEpoch ErrUnknownLeaderEpoch = sarama.ErrUnknownLeaderEpoch ErrUnsupportedCompressionType = sarama.ErrUnsupportedCompressionType ErrStaleBrokerEpoch = sarama.ErrStaleBrokerEpoch ErrOffsetNotAvailable = sarama.ErrOffsetNotAvailable ErrMemberIdRequired = sarama.ErrMemberIdRequired ErrPreferredLeaderNotAvailable = sarama.ErrPreferredLeaderNotAvailable ErrGroupMaxSizeReached = sarama.ErrGroupMaxSizeReached ErrFencedInstancedId = sarama.ErrFencedInstancedId )
Numeric error codes returned by the Kafka server.
Variables ¶
View Source
var ( // MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying // to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned // with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt // to process. MaxRequestSize = sarama.MaxRequestSize // MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If // a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to // protect the client from running out of memory. Please note that brokers do not have any natural limit on // the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers // (see https://issues.apache.org/jira/browse/KAFKA-2063). MaxResponseSize = sarama.MaxResponseSize )
View Source
var ( // ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored // or otherwise failed to respond. ErrOutOfBrokers = sarama.ErrOutOfBrokers // ErrClosedClient is the error returned when a method is called on a client that has been closed. ErrClosedClient = sarama.ErrClosedClient // ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does // not contain the expected information. ErrIncompleteResponse = sarama.ErrIncompleteResponse // ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index // (meaning one outside of the range [0...numPartitions-1]). ErrInvalidPartition = sarama.ErrInvalidPartition // ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting. ErrAlreadyConnected = sarama.ErrAlreadyConnected // ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected. ErrNotConnected = sarama.ErrNotConnected // ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected // when requesting messages, since as an optimization the server is allowed to return a partial message at the end // of the message set. ErrInsufficientData = sarama.ErrInsufficientData // ErrShuttingDown is returned when a producer receives a message during shutdown. ErrShuttingDown = sarama.ErrShuttingDown // ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max ErrMessageTooLarge = sarama.ErrMessageTooLarge // ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing // a RecordBatch. ErrConsumerOffsetNotAdvanced = sarama.ErrConsumerOffsetNotAdvanced // ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version // is lower than 0.10.0.0. ErrControllerNotAvailable = sarama.ErrControllerNotAvailable // ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update // the metadata. ErrNoTopicsToUpdateMetadata = sarama.ErrNoTopicsToUpdateMetadata )
Functions ¶
This section is empty.
Types ¶
type AsyncSyncProducer ¶
type AsyncSyncProducer sarama.AsyncProducer
type Config ¶
type Config struct { Addrs []string `config:"addrs"` Net Net `config:"net"` Metadata Metadata `config:"metadata"` Producer ProducerConfig `config:"producer"` Consumer ConsumerConfig `config:"consumer"` ClientID string `config:"clientId"` ChannelBufferSize int `config:"channelBufferSize"` Version sarama.KafkaVersion `config:"version"` // contains filtered or unexported fields }
Config 配置
type ConsumerConfig ¶
type ConsumerConfig struct { GroupSessionTimeout time.Duration `config:"groupSessionTimeout"` GroupHeartbeatInterval time.Duration `config:"groupHeartbeatInterval"` GroupRebalanceStrategy sarama.BalanceStrategy `config:"groupRebalanceStrategy"` GroupRebalanceTimeout time.Duration `config:"groupRebalanceTimeout"` GroupRebalanceRetryMax int `config:"groupRebalanceRetryMax"` GroupRebalanceRetryBackoff time.Duration `config:"groupRebalanceRetryBackoff"` GroupMemberUserData []byte `config:"groupMemberUserData"` RetryBackoff time.Duration `config:"retryBackoff"` RetryBackoffFunc func(retries int) time.Duration `config:"-"` FetchMin int32 `config:"fetchMin"` FetchMax int32 `config:"fetchMax"` FetchDefault int32 `config:"fetchDefault"` MaxWaitTime time.Duration `config:"maxWaitTime"` MaxProcessingTime time.Duration `config:"maxProcessingTime"` ReturnErrors bool `config:"returnErrors"` OffsetsCommitInterval time.Duration `config:"offsetsCommitInterval"` OffsetsInitial int64 `config:"offsetsInitial"` OffsetsRetention time.Duration `config:"offsetsRetention"` OffsetRetryMax int `config:"offsetRetryMax"` OffsetAutoCommitEnable bool `config:"offsetAutoCommitEnable"` OffsetAutoCommitInterval time.Duration `config:"offsetAutoCommitInterval"` IsolationLevel sarama.IsolationLevel `config:"isolationLevel"` Interceptors []sarama.ConsumerInterceptor `config:"-"` }
type ConsumerGroup ¶
type ConsumerGroup sarama.ConsumerGroup
type Kafka ¶
type Kafka struct {
// contains filtered or unexported fields
}
func (Kafka) NewAsyncProducer ¶
func (kfk Kafka) NewAsyncProducer() (AsyncSyncProducer, error)
func (Kafka) NewConsumer ¶
func (Kafka) NewConsumerGroup ¶
func (kfk Kafka) NewConsumerGroup(groupID string) (ConsumerGroup, error)
func (Kafka) NewSyncProducer ¶
func (kfk Kafka) NewSyncProducer() (SyncProducer, error)
type Metadata ¶
type Metadata struct { RetryMax int `config:"retryMax"` RetryBackoff time.Duration `config:"retryBackoff"` RetryBackoffFunc func(retries, maxRetries int) time.Duration `config:"-"` RefreshFrequency time.Duration `config:"refreshFrequency"` Full bool `config:"full"` Timeout time.Duration `config:"timeout"` }
type ProducerConfig ¶
type ProducerConfig struct { MaxMessageBytes int `config:"maxMessageBytes"` RequiredAcks sarama.RequiredAcks `config:"requiredAcks"` Timeout time.Duration `config:"timeout"` Compression sarama.CompressionCodec `config:"compression"` CompressionLevel int `config:"compressionLevel"` Partitioner sarama.PartitionerConstructor `config:"-"` Idempotent bool `config:"idempotent"` ReturnSuccesses bool `config:"returnSuccesses"` ReturnErrors bool `config:"returnErrors"` FlushBytes int `config:"flushBytes"` FlushMessages int `config:"flushMessages"` FlushFrequency time.Duration `config:"flushFrequency"` FlushMaxMessages int `config:"FlushMaxMessages"` RetryMax int `config:"retryMax"` RetryBackoff time.Duration `config:"retryBackoff"` RetryBackoffFunc func(retries, maxRetries int) time.Duration `config:"-"` Interceptors []sarama.ProducerInterceptor `config:"-"` }
type SyncProducer ¶
type SyncProducer sarama.SyncProducer
Click to show internal directories.
Click to hide internal directories.