Documentation ¶
Overview ¶
Package kafkabp provides Apache Kafka library implementations.
It wraps the Shopify/sarama Go client library with Baseplate.go integrations.
Index ¶
- Constants
- Variables
- func AWSAvailabilityZoneRackID() string
- func ConsumeAllPartitionsFunc(partitionID int32) bool
- type ConsumeErrorFunc
- type ConsumeMessageFunc
- type ConsumePartitionFunc
- type ConsumePartitionFuncProvider
- type Consumer
- type ConsumerConfig
- type GroupConsumerHandler
- type RackIDFunc
- type SaramaConfigOverrider
- type SimpleHTTPRackIDConfig
Constants ¶
const ( OffsetOldest = "oldest" OffsetNewest = "newest" )
Allowed Offset values
const ( SimpleHTTPRackIDDefaultLimit = 1024 SimpleHTTPRackIDDefaultTimeout = 1 * time.Second )
Default values for SimpleHTTPRackIDConfig.
Variables ¶
var ( // ErrBrokersEmpty is thrown when the slice of brokers is empty. ErrBrokersEmpty = errors.New("kafkabp: Brokers are empty") // ErrTopicEmpty is thrown when the topic is empty. ErrTopicEmpty = errors.New("kafkabp: Topic is empty") // ErrClientIDEmpty is thrown when the client ID is empty. ErrClientIDEmpty = errors.New("kafkabp: ClientID is empty") // ErrOffsetInvalid is thrown when an invalid offset is specified. ErrOffsetInvalid = errors.New("kafkabp: Offset is invalid") // ErrNilConsumePartitionFunc is thrown when ConsumePartitionFuncProvider // returns a nil ConsumePartitionFunc. ErrNilConsumePartitionFunc = errors.New("kafkabp: ConsumePartitionFunc is nil") )
Functions ¶
func AWSAvailabilityZoneRackID ¶
func AWSAvailabilityZoneRackID() string
AWSAvailabilityZoneRackID is a RackIDFunc implementation that returns AWS availability zone as the rack id.
It also caches the result globally, so if you have more than one AWSAvailabilityZoneRackID in your process only the first one actually makes the HTTP request, for example:
consumer1 := kafkabp.NewConsumer(kafkabp.ConsumerConfig{ RackID: kafkabp.AWSAvailabilityZoneRackID, Topic: "topic1", // other configs }) consumer2 := kafkabp.NewConsumer(kafkabp.ConsumerConfig{ RackID: kafkabp.AWSAvailabilityZoneRackID, Topic: "topic2", // other configs })
It uses AWS instance metadata HTTP API with 1second overall timeout and 1024 HTTP response read limits..
If there was an error retrieving rack id through AWS instance metadata API, the same error will be logged at slog's warning level every time AWSAvailabilityZoneRackID is called.
func ConsumeAllPartitionsFunc ¶ added in v0.9.12
ConsumeAllPartitionsFunc is a ConsumePartitionFunc that is to be used to specify all partitions to be consumed by the topic consumer.
This function always returns true, causing all partitions to be consumed.
Types ¶
type ConsumeErrorFunc ¶
type ConsumeErrorFunc func(err error)
ConsumeErrorFunc is a function type for consuming consumer errors.
Note that these are usually system level consuming errors (e.g. read from broker failed, etc.), not individual message consuming errors.
In most cases the implementation just needs to log the error and emit a counter, for example:
consumer.Consume( consumeMessageFunc, func(err error) { log.Errorw( context.Background(), "kafka consumer error", "err", err, // additional key value pairs, for example topic info ) // a prometheus counter consumerErrorCounter.Inc() }, )
type ConsumeMessageFunc ¶
type ConsumeMessageFunc func(ctx context.Context, msg *sarama.ConsumerMessage)
ConsumeMessageFunc is a function type for consuming consumer messages.
The implementation is expected to handle all consuming errors. For example, if there was anything wrong with handling the message and it needs to be retried, the ConsumeMessageFunc implementation should handle the retry (usually put the message into a retry topic).
type ConsumePartitionFunc ¶ added in v0.9.12
ConsumePartitionFunc is a function type for application to specify which partitions of the topic to consume data from.
func ConsumeAllPartitionsFuncProvider ¶ added in v0.9.12
func ConsumeAllPartitionsFuncProvider(numPartitions int) ConsumePartitionFunc
ConsumeAllPartitionsFuncProvider is a ConsumePartitionFuncProvider that always selects all partitions.
type ConsumePartitionFuncProvider ¶ added in v0.9.12
type ConsumePartitionFuncProvider func(numPartitions int) ConsumePartitionFunc
ConsumePartitionFuncProvider is a function type for application to provide a lambda function of ConsumePartitionFunc pinned for specific number of partitions. This allows creation of ConsumePartitionFunc once per reset. All PartitionConsumers when created decide if a partition is to be skipped or selected for consumption based on decision handed out by same instance implementation of ConsumePartitionFunc.
type Consumer ¶
type Consumer interface { io.Closer Consume(ConsumeMessageFunc, ConsumeErrorFunc) error // IsHealthy returns false after Consume returns. IsHealthy(ctx context.Context) bool }
Consumer defines the interface of a consumer struct.
It's also a superset of (implements) baseplate.HealthChecker.
func NewConsumer ¶
func NewConsumer(cfg ConsumerConfig) (Consumer, error)
NewConsumer creates a new Kafka consumer.
It creates one of the two different implementations of Kafka consumer, depending on whether GroupID in config is empty:
- If GroupID is non-empty, it creates a consumer that is part of a consumer group (sharing the same GroupID). The group will guarantee that every message is delivered to one of the consumers in the group exactly once. This is suitable for the traditional exactly-once message queue consumer use cases.
- If GroupID is empty, it creates a consumer that has the whole view of the topic. This implementation of Kafka consumer is suitable for use cases like deliver config/data through Kafka to services.
func NewConsumerWithConfigOverriders ¶ added in v0.9.16
func NewConsumerWithConfigOverriders(cfg ConsumerConfig, overriders ...SaramaConfigOverrider) (Consumer, error)
NewConsumerWithConfigOverriders is provided as an escape hatch for use cases requiring specific sarama config not supported by ConsumerConfig.
type ConsumerConfig ¶
type ConsumerConfig struct { // Required. Brokers specifies a slice of broker addresses. Brokers []string `yaml:"brokers"` // Required. Topic is used to specify the topic to consume. Topic string `yaml:"topic"` // Required. ClientID is used by Kafka broker to track clients' consuming // progresses on the topics. // // In most cases, every instance is expected to have a unique ClientID. // The Kubernetes pod ID is usually a good candidate for this unique ID. ClientID string `yaml:"clientID"` // Optional. When GroupID is non-empty, a new group consumer will be created // instead. Messages from the topic will be consumed by one of the consumers // in the group (sharing the same GroupID) exactly once. This is the usual use // case of streaming consumers. // // When GroupID is empty, each consumer will have the whole view of the topic // (based on Offset), so that is usually for use cases like to deliver // configs/data through Kafka brokers. // // When GroupID is non-empty, Version must be at least "0.10.2.0". GroupID string `yaml:"groupID"` // Optional. This is only applicable when GroupID is empty string. // When GroupID is empty, the configuration enables a TopicConsumer to read // from all partitions of the given kafka stream. // When ConsumePartitionFuncProvider is also specified, the TopicConsumer will // only consume partitions that evaluates to true with the given // predicate returned from ConsumePartitionFuncProvider. // If ConsumePartitionFuncProvider is specified, // it must return a non-nil predicate of type ConsumePartitionFunc, // else the topic consumer will return error ErrNilConsumePartitionFunc. // // This function is called once per reset. The returned ConsumePartitionFunc // is also called once per reset per partition. // The API is designed to be two-layer so that it's possible to shift all // heavylifting to ConsumePartitionFuncProvider to generate a []bool or set of // ints, and make the returned ConsumePartitionFunc just do a simple lookup. ConsumePartitionFuncProvider ConsumePartitionFuncProvider `yaml:"-"` // Optional. The version of the kafka broker this consumer is connected to. // In format of "0.10.2.0" or "2.4.0". // // When omitted, Sarama library would pick the oldest supported version in // order to maintain maximum backward compatibility, but some of the newer // features might be unavailable. For example, using GroupID requires the // version to be at least "0.10.2.0". Version string `yaml:"version"` // Optional. Defaults to "oldest". Valid values are "oldest" and "newest". // // Only used when GroupID is empty. Offset string `yaml:"offset"` // Optional. If non-nil, will be used to log errors. At present, this only // pertains to logging errors closing the existing consumer when calling // consumer.reset() when GroupID is empty. Logger log.Wrapper `yaml:"logger"` // Optional. The function to set rack id for this kafka client. // It should match rack configured on the broker(s). // // Currently it defaults to no rack id. // In the future the default might be changed to AWSAvailabilityZoneRackID. // // This feature is currently experimental. // It might not make any difference on your client, // or it might make things worse. // You are advised to test before using non-empty rack id in production. RackID RackIDFunc `yaml:"rackID"` }
ConsumerConfig can be used to configure a kafkabp Consumer.
Can be deserialized from YAML.
Example:
kafka: brokers: - 127.0.0.1:9090 - 127.0.0.2:9090 topic: sample-topic clientID: myclient version: 2.4.0 offset: oldest
func (*ConsumerConfig) NewSaramaConfig ¶
func (cfg *ConsumerConfig) NewSaramaConfig() (*sarama.Config, error)
NewSaramaConfig instantiates a sarama.Config with sane consumer defaults from sarama.NewConfig(), overwritten by values parsed from cfg.
type GroupConsumerHandler ¶
type GroupConsumerHandler struct { Callback ConsumeMessageFunc Topic string }
GroupConsumerHandler implements sarama.ConsumerGroupHandler.
It's exported so that users of this library can write mocks to test their ConsumeMessageFunc implementation.
func (GroupConsumerHandler) Cleanup ¶
func (h GroupConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited.
func (GroupConsumerHandler) ConsumeClaim ¶
func (h GroupConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim starts a consumer loop of ConsumerGroupClaim's Messages() chan.
func (GroupConsumerHandler) Setup ¶
func (h GroupConsumerHandler) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim.
type RackIDFunc ¶
type RackIDFunc func() string
RackIDFunc defines a function to provide the kafka rack id to use.
Rack id is not considered a crucial part of kakfa client configuration, so this function doesn't have the ability to return any errors. If an error occurred while retrieving the rack id, the implementation should return empty string and handle the error by itself (logging, panic, etc.).
See the following URL for more info regarding kafka's rack awareness feature: https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment
func FixedRackID ¶
func FixedRackID(id string) RackIDFunc
FixedRackID is a RackIDFunc implementation that returns a fixed rack id.
func SimpleHTTPRackID ¶
func SimpleHTTPRackID(cfg SimpleHTTPRackIDConfig) RackIDFunc
SimpleHTTPRackID is a RackIDFunc implementation that gets the rack id from an HTTP URL.
It's "simple" as in it always treat the HTTP response as plain text (so it shouldn't be used for JSON endpoints), read up to Limit bytes, and trim leading and trailing spaces before returning. If an HTTP error occurred it will be logged using Logger passed in.
func (*RackIDFunc) UnmarshalText ¶
func (r *RackIDFunc) UnmarshalText(text []byte) error
UnmarshalText implements encoding.TextUnmarshaler.
It makes RackIDFunc possible to be used directly in yaml and other config files.
Please note that this currently only support limited implementations:
- empty: Empty rack id (same as "fixed:"). Please note that this might be changed to "aws" in the future.
- "fixed:id": FixedRackID with given id. A special case of "fixed:" means no rack id.
- "aws": AWSAvailabilityZoneRackID.
- "http://url" or "https://url": SimpleHTTPRackID with log.DefaultWrapper and prometheus counter of kafkabp_http_rack_id_failure_total, default timeout & limit, and given URL.
- anything else: FixedRackID with the given value. For example "foobar" is the same as "fixed:foobar".
type SaramaConfigOverrider ¶ added in v0.9.16
SaramaConfigOverrider provides a way for users to override certain fields in *sarama.Config generated from ConsumerConfig.
type SimpleHTTPRackIDConfig ¶
type SimpleHTTPRackIDConfig struct { // URL to fetch rack id from. Required. URL string // Limit of how many bytes to read from the response. // // Optional, default to SimpleHTTPRackIDDefaultLimit. Limit int64 // HTTP client timeout. // // Optional, default to SimpleHTTPRackIDDefaultTimeout. Timeout time.Duration // Logger to be used on http errors. Optional. Logger log.Wrapper }
SimpleHTTPRackIDConfig defines the config to be used in SimpleHTTPRackID.