kafka

package
v1.0.0-beta.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2021 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Overview

Example
package main

import (
	"fmt"
	"os"
	"os/signal"
	"sync/atomic"
	"time"

	"github.com/Shopify/sarama"
	"github.com/boxgo/box/pkg/client/kafka"
)

const (
	testTopic = "wechat_event"
)

func main() {
	kfk := kafka.StdConfig("default").Build()

	producer, err := kfk.NewSyncProducer()
	if err != nil {
		panic(err)
	}

	defer func() {
		if err := producer.Close(); err != nil {
			panic(err)
		}
	}()

	consumer, err := kfk.NewConsumer()
	if err != nil {
		panic(err)
	}

	defer func() {
		if err := consumer.Close(); err != nil {
			panic(err)
		}
	}()

	partitionConsumer, err := consumer.ConsumePartition(testTopic, 0, sarama.OffsetNewest)
	if err != nil {
		panic(err)
	}

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	var cnt int32

	go func() {
		for {
			select {
			case <-partitionConsumer.Messages():
				atomic.AddInt32(&cnt, 1)
			case <-signals:
				break
			}
		}
	}()

	partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
		Topic: testTopic,
		Value: sarama.StringEncoder("hi"),
	})
	if err != nil {
		panic(err)
	}

	time.Sleep(time.Second)

	fmt.Println(offset >= 0, partition == 0, atomic.LoadInt32(&cnt) > 0)
}
Output:

true true true

Index

Examples

Constants

View Source
const (
	// RangeBalanceStrategyName identifies strategies that use the range partition assignment strategy
	RangeBalanceStrategyName = sarama.RangeBalanceStrategyName

	// RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy
	RoundRobinBalanceStrategyName = sarama.RoundRobinBalanceStrategyName

	// StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
	StickyBalanceStrategyName = sarama.StickyBalanceStrategyName
)
View Source
const (
	// SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
	SASLTypeOAuth = sarama.SASLTypeOAuth
	// SASLTypePlaintext represents the SASL/PLAIN mechanism
	SASLTypePlaintext = sarama.SASLTypePlaintext
	// SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
	SASLTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256
	// SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
	SASLTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512
	SASLTypeGSSAPI      = sarama.SASLTypeGSSAPI
	// SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
	// server negotiate SASL auth using opaque packets.
	SASLHandshakeV0 = sarama.SASLHandshakeV0
	// SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
	// server negotiate SASL by wrapping tokens with Kafka protocol headers.
	SASLHandshakeV1 = sarama.SASLHandshakeV1
	// SASLExtKeyAuth is the reserved extension key name sent as part of the
	// SASL/OAUTHBEARER initial client response
	SASLExtKeyAuth = sarama.SASLExtKeyAuth
)
View Source
const (
	// OffsetNewest stands for the log head offset, i.e. the offset that will be
	// assigned to the next message that will be produced to the partition. You
	// can send this to a client's GetOffset method to get this offset, or when
	// calling ConsumePartition to start consuming new messages.
	OffsetNewest = sarama.OffsetNewest
	// OffsetOldest stands for the oldest offset available on the broker for a
	// partition. You can send this to a client's GetOffset method to get this
	// offset, or when calling ConsumePartition to start consuming from the
	// oldest offset that is still available on the broker.
	OffsetOldest = sarama.OffsetOldest
)
View Source
const (
	TOK_ID_KRB_AP_REQ   = sarama.TOK_ID_KRB_AP_REQ
	GSS_API_GENERIC_TAG = sarama.GSS_API_GENERIC_TAG
	KRB5_USER_AUTH      = sarama.KRB5_USER_AUTH
	KRB5_KEYTAB_AUTH    = sarama.KRB5_KEYTAB_AUTH
	GSS_API_INITIAL     = sarama.GSS_API_INITIAL
	GSS_API_VERIFY      = sarama.GSS_API_VERIFY
	GSS_API_FINISH      = sarama.GSS_API_FINISH
)
View Source
const (
	// APIKeySASLAuth is the API key for the SaslAuthenticate Kafka API
	APIKeySASLAuth = sarama.APIKeySASLAuth

	// GroupGenerationUndefined is a special value for the group generation field of Offset Commit Requests that should be used when a consumer group does not rely on Kafka for partition management.
	GroupGenerationUndefined = sarama.GroupGenerationUndefined

	// ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the request was received. The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.
	ReceiveTime int64 = sarama.ReceiveTime
)
View Source
const (
	ErrNoError                            = sarama.ErrNoError
	ErrUnknown                            = sarama.ErrUnknown
	ErrOffsetOutOfRange                   = sarama.ErrOffsetOutOfRange
	ErrInvalidMessage                     = sarama.ErrInvalidMessage
	ErrUnknownTopicOrPartition            = sarama.ErrUnknownTopicOrPartition
	ErrInvalidMessageSize                 = sarama.ErrInvalidMessageSize
	ErrLeaderNotAvailable                 = sarama.ErrLeaderNotAvailable
	ErrNotLeaderForPartition              = sarama.ErrNotLeaderForPartition
	ErrRequestTimedOut                    = sarama.ErrRequestTimedOut
	ErrBrokerNotAvailable                 = sarama.ErrBrokerNotAvailable
	ErrReplicaNotAvailable                = sarama.ErrReplicaNotAvailable
	ErrMessageSizeTooLarge                = sarama.ErrMessageSizeTooLarge
	ErrStaleControllerEpochCode           = sarama.ErrStaleControllerEpochCode
	ErrOffsetMetadataTooLarge             = sarama.ErrOffsetMetadataTooLarge
	ErrNetworkException                   = sarama.ErrNetworkException
	ErrOffsetsLoadInProgress              = sarama.ErrOffsetsLoadInProgress
	ErrConsumerCoordinatorNotAvailable    = sarama.ErrConsumerCoordinatorNotAvailable
	ErrNotCoordinatorForConsumer          = sarama.ErrNotCoordinatorForConsumer
	ErrInvalidTopic                       = sarama.ErrInvalidTopic
	ErrMessageSetSizeTooLarge             = sarama.ErrMessageSetSizeTooLarge
	ErrNotEnoughReplicas                  = sarama.ErrNotEnoughReplicas
	ErrNotEnoughReplicasAfterAppend       = sarama.ErrNotEnoughReplicasAfterAppend
	ErrInvalidRequiredAcks                = sarama.ErrInvalidRequiredAcks
	ErrIllegalGeneration                  = sarama.ErrIllegalGeneration
	ErrInconsistentGroupProtocol          = sarama.ErrInconsistentGroupProtocol
	ErrInvalidGroupId                     = sarama.ErrInvalidGroupId
	ErrUnknownMemberId                    = sarama.ErrUnknownMemberId
	ErrInvalidSessionTimeout              = sarama.ErrInvalidSessionTimeout
	ErrRebalanceInProgress                = sarama.ErrRebalanceInProgress
	ErrInvalidCommitOffsetSize            = sarama.ErrInvalidCommitOffsetSize
	ErrTopicAuthorizationFailed           = sarama.ErrTopicAuthorizationFailed
	ErrGroupAuthorizationFailed           = sarama.ErrGroupAuthorizationFailed
	ErrClusterAuthorizationFailed         = sarama.ErrClusterAuthorizationFailed
	ErrInvalidTimestamp                   = sarama.ErrInvalidTimestamp
	ErrUnsupportedSASLMechanism           = sarama.ErrUnsupportedSASLMechanism
	ErrIllegalSASLState                   = sarama.ErrIllegalSASLState
	ErrUnsupportedVersion                 = sarama.ErrUnsupportedVersion
	ErrTopicAlreadyExists                 = sarama.ErrTopicAlreadyExists
	ErrInvalidPartitions                  = sarama.ErrInvalidPartitions
	ErrInvalidReplicationFactor           = sarama.ErrInvalidReplicationFactor
	ErrInvalidReplicaAssignment           = sarama.ErrInvalidReplicaAssignment
	ErrInvalidConfig                      = sarama.ErrInvalidConfig
	ErrNotController                      = sarama.ErrNotController
	ErrInvalidRequest                     = sarama.ErrInvalidRequest
	ErrUnsupportedForMessageFormat        = sarama.ErrUnsupportedForMessageFormat
	ErrPolicyViolation                    = sarama.ErrPolicyViolation
	ErrOutOfOrderSequenceNumber           = sarama.ErrOutOfOrderSequenceNumber
	ErrDuplicateSequenceNumber            = sarama.ErrDuplicateSequenceNumber
	ErrInvalidProducerEpoch               = sarama.ErrInvalidProducerEpoch
	ErrInvalidTxnState                    = sarama.ErrInvalidTxnState
	ErrInvalidProducerIDMapping           = sarama.ErrInvalidProducerIDMapping
	ErrInvalidTransactionTimeout          = sarama.ErrInvalidTransactionTimeout
	ErrConcurrentTransactions             = sarama.ErrConcurrentTransactions
	ErrTransactionCoordinatorFenced       = sarama.ErrTransactionCoordinatorFenced
	ErrTransactionalIDAuthorizationFailed = sarama.ErrTransactionalIDAuthorizationFailed
	ErrSecurityDisabled                   = sarama.ErrSecurityDisabled
	ErrOperationNotAttempted              = sarama.ErrOperationNotAttempted
	ErrKafkaStorageError                  = sarama.ErrKafkaStorageError
	ErrLogDirNotFound                     = sarama.ErrLogDirNotFound
	ErrSASLAuthenticationFailed           = sarama.ErrSASLAuthenticationFailed
	ErrUnknownProducerID                  = sarama.ErrUnknownProducerID
	ErrReassignmentInProgress             = sarama.ErrReassignmentInProgress
	ErrDelegationTokenAuthDisabled        = sarama.ErrDelegationTokenAuthDisabled
	ErrDelegationTokenNotFound            = sarama.ErrDelegationTokenNotFound
	ErrDelegationTokenOwnerMismatch       = sarama.ErrDelegationTokenOwnerMismatch
	ErrDelegationTokenRequestNotAllowed   = sarama.ErrDelegationTokenRequestNotAllowed
	ErrDelegationTokenAuthorizationFailed = sarama.ErrDelegationTokenAuthorizationFailed
	ErrDelegationTokenExpired             = sarama.ErrDelegationTokenExpired
	ErrInvalidPrincipalType               = sarama.ErrInvalidPrincipalType
	ErrNonEmptyGroup                      = sarama.ErrNonEmptyGroup
	ErrGroupIDNotFound                    = sarama.ErrGroupIDNotFound
	ErrFetchSessionIDNotFound             = sarama.ErrFetchSessionIDNotFound
	ErrInvalidFetchSessionEpoch           = sarama.ErrInvalidFetchSessionEpoch
	ErrListenerNotFound                   = sarama.ErrListenerNotFound
	ErrTopicDeletionDisabled              = sarama.ErrTopicDeletionDisabled
	ErrFencedLeaderEpoch                  = sarama.ErrFencedLeaderEpoch
	ErrUnknownLeaderEpoch                 = sarama.ErrUnknownLeaderEpoch
	ErrUnsupportedCompressionType         = sarama.ErrUnsupportedCompressionType
	ErrStaleBrokerEpoch                   = sarama.ErrStaleBrokerEpoch
	ErrOffsetNotAvailable                 = sarama.ErrOffsetNotAvailable
	ErrMemberIdRequired                   = sarama.ErrMemberIdRequired
	ErrPreferredLeaderNotAvailable        = sarama.ErrPreferredLeaderNotAvailable
	ErrGroupMaxSizeReached                = sarama.ErrGroupMaxSizeReached
	ErrFencedInstancedId                  = sarama.ErrFencedInstancedId
)

Numeric error codes returned by the Kafka server.

Variables

View Source
var (
	// MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying
	// to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned
	// with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt
	// to process.
	MaxRequestSize = sarama.MaxRequestSize

	// MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If
	// a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to
	// protect the client from running out of memory. Please note that brokers do not have any natural limit on
	// the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers
	// (see https://issues.apache.org/jira/browse/KAFKA-2063).
	MaxResponseSize = sarama.MaxResponseSize
)
View Source
var (
	// ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored
	// or otherwise failed to respond.
	ErrOutOfBrokers = sarama.ErrOutOfBrokers

	// ErrClosedClient is the error returned when a method is called on a client that has been closed.
	ErrClosedClient = sarama.ErrClosedClient

	// ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does
	// not contain the expected information.
	ErrIncompleteResponse = sarama.ErrIncompleteResponse

	// ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index
	// (meaning one outside of the range [0...numPartitions-1]).
	ErrInvalidPartition = sarama.ErrInvalidPartition

	// ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.
	ErrAlreadyConnected = sarama.ErrAlreadyConnected

	// ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
	ErrNotConnected = sarama.ErrNotConnected

	// ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected
	// when requesting messages, since as an optimization the server is allowed to return a partial message at the end
	// of the message set.
	ErrInsufficientData = sarama.ErrInsufficientData

	// ErrShuttingDown is returned when a producer receives a message during shutdown.
	ErrShuttingDown = sarama.ErrShuttingDown

	// ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
	ErrMessageTooLarge = sarama.ErrMessageTooLarge

	// ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing
	// a RecordBatch.
	ErrConsumerOffsetNotAdvanced = sarama.ErrConsumerOffsetNotAdvanced

	// ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version
	// is lower than 0.10.0.0.
	ErrControllerNotAvailable = sarama.ErrControllerNotAvailable

	// ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update
	// the metadata.
	ErrNoTopicsToUpdateMetadata = sarama.ErrNoTopicsToUpdateMetadata
)

Functions

This section is empty.

Types

type AsyncSyncProducer

type AsyncSyncProducer sarama.AsyncProducer

type Config

type Config struct {
	Addrs             []string            `config:"addrs"`
	Net               Net                 `config:"net"`
	Metadata          Metadata            `config:"metadata"`
	Producer          ProducerConfig      `config:"producer"`
	Consumer          ConsumerConfig      `config:"consumer"`
	ClientID          string              `config:"clientId"`
	ChannelBufferSize int                 `config:"channelBufferSize"`
	Version           sarama.KafkaVersion `config:"version"`
	// contains filtered or unexported fields
}

Config 配置

func DefaultConfig

func DefaultConfig(key string) *Config

DefaultConfig 默认配置

func StdConfig

func StdConfig(key string, optionFunc ...OptionFunc) *Config

StdConfig 标准配置

func (*Config) Build

func (c *Config) Build() *Kafka

Build 构建实例

func (*Config) Path

func (c *Config) Path() string

Path 实例配置目录

type Consumer

type Consumer sarama.Consumer

type ConsumerConfig

type ConsumerConfig struct {
	GroupSessionTimeout        time.Duration                   `config:"groupSessionTimeout"`
	GroupHeartbeatInterval     time.Duration                   `config:"groupHeartbeatInterval"`
	GroupRebalanceStrategy     sarama.BalanceStrategy          `config:"groupRebalanceStrategy"`
	GroupRebalanceTimeout      time.Duration                   `config:"groupRebalanceTimeout"`
	GroupRebalanceRetryMax     int                             `config:"groupRebalanceRetryMax"`
	GroupRebalanceRetryBackoff time.Duration                   `config:"groupRebalanceRetryBackoff"`
	GroupMemberUserData        []byte                          `config:"groupMemberUserData"`
	RetryBackoff               time.Duration                   `config:"retryBackoff"`
	RetryBackoffFunc           func(retries int) time.Duration `config:"-"`
	FetchMin                   int32                           `config:"fetchMin"`
	FetchMax                   int32                           `config:"fetchMax"`
	FetchDefault               int32                           `config:"fetchDefault"`
	MaxWaitTime                time.Duration                   `config:"maxWaitTime"`
	MaxProcessingTime          time.Duration                   `config:"maxProcessingTime"`
	ReturnErrors               bool                            `config:"returnErrors"`
	OffsetsCommitInterval      time.Duration                   `config:"offsetsCommitInterval"`
	OffsetsInitial             int64                           `config:"offsetsInitial"`
	OffsetsRetention           time.Duration                   `config:"offsetsRetention"`
	OffsetRetryMax             int                             `config:"offsetRetryMax"`
	OffsetAutoCommitEnable     bool                            `config:"offsetAutoCommitEnable"`
	OffsetAutoCommitInterval   time.Duration                   `config:"offsetAutoCommitInterval"`
	IsolationLevel             sarama.IsolationLevel           `config:"isolationLevel"`
	Interceptors               []sarama.ConsumerInterceptor    `config:"-"`
}

type ConsumerGroup

type ConsumerGroup sarama.ConsumerGroup

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

func (Kafka) NewAsyncProducer

func (kfk Kafka) NewAsyncProducer() (AsyncSyncProducer, error)

func (Kafka) NewConsumer

func (kfk Kafka) NewConsumer() (Consumer, error)

func (Kafka) NewConsumerGroup

func (kfk Kafka) NewConsumerGroup(groupID string) (ConsumerGroup, error)

func (Kafka) NewSyncProducer

func (kfk Kafka) NewSyncProducer() (SyncProducer, error)

type Metadata

type Metadata struct {
	RetryMax         int                                         `config:"retryMax"`
	RetryBackoff     time.Duration                               `config:"retryBackoff"`
	RetryBackoffFunc func(retries, maxRetries int) time.Duration `config:"-"`
	RefreshFrequency time.Duration                               `config:"refreshFrequency"`
	Full             bool                                        `config:"full"`
	Timeout          time.Duration                               `config:"timeout"`
}

type Net

type Net struct {
	MaxOpenRequests int           `config:"maxOpenRequests"`
	DialTimeout     time.Duration `config:"dialTimeout"`
	ReadTimeout     time.Duration `config:"readTimeout"`
	WriteTimeout    time.Duration `config:"writeTimeout"`
	KeepAlive       time.Duration `config:"keepAlive"`
}

type OptionFunc

type OptionFunc func(*Config)

OptionFunc 选项信息

type ProducerConfig

type ProducerConfig struct {
	MaxMessageBytes  int                                         `config:"maxMessageBytes"`
	RequiredAcks     sarama.RequiredAcks                         `config:"requiredAcks"`
	Timeout          time.Duration                               `config:"timeout"`
	Compression      sarama.CompressionCodec                     `config:"compression"`
	CompressionLevel int                                         `config:"compressionLevel"`
	Partitioner      sarama.PartitionerConstructor               `config:"-"`
	Idempotent       bool                                        `config:"idempotent"`
	ReturnSuccesses  bool                                        `config:"returnSuccesses"`
	ReturnErrors     bool                                        `config:"returnErrors"`
	FlushBytes       int                                         `config:"flushBytes"`
	FlushMessages    int                                         `config:"flushMessages"`
	FlushFrequency   time.Duration                               `config:"flushFrequency"`
	FlushMaxMessages int                                         `config:"FlushMaxMessages"`
	RetryMax         int                                         `config:"retryMax"`
	RetryBackoff     time.Duration                               `config:"retryBackoff"`
	RetryBackoffFunc func(retries, maxRetries int) time.Duration `config:"-"`
	Interceptors     []sarama.ProducerInterceptor                `config:"-"`
}

type SyncProducer

type SyncProducer sarama.SyncProducer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL