Documentation ¶
Overview ¶
Package kafka_go is an abstraction over popular kafka client sarama (https://github.com/Shopify/sarama). Though sarama provides good enough APIs to integrate with a kafka cluster but still lags simplicity and need a bit of domain knowledge even for a standard use case. End user has to maintain fail safety, reclaim after re-balancing or similar scenarios, API doesn't seems very intuitive for the first time kafka users. kafka_go tries to solves all such problems with its easy to understand APIs to start consuming from a kafka cluster with significant less domain knowledge and complete fail safety.
Note: this package implements at-least once analogy of message consumption, user will have to maintain idempotence on their own.
Index ¶
- Variables
- func DefaultKafkaProducerParam(brokers []string) *kafkaProducerParam
- func NewKafkaConsumer(params *KafkaConsumerParam) (*kafkaConsumer, error)
- func NewKafkaProducer(params *kafkaProducerParam) (*kafkaProducer, error)
- func NewKafkaProducerQuick(brokers []string) (*kafkaProducer, error)
- type AcknowledgmentType
- type CompressionType
- type Consumer
- type ConsumerInterceptor
- type ConsumerMiddleware
- type KafkaConsumerParam
- type OffsetType
- type Producer
- type PublisherMessage
- type SubscriberMessage
- type TopicHandler
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // Logger is the kafka-go package single logger instance. It logs most important // event during the process of message consumption and production. This logger // can be overridden to complete stop any sort of logs to be printed on standard output // example: // Logger = log.New(ioutil.Discard, "[kafka-go]", log.LstdFlags) // above will discard all the logs printed by kafka-go logger Logger = log.New(os.Stdout, "[kafka-go]", log.LstdFlags) )
Functions ¶
func DefaultKafkaProducerParam ¶
func DefaultKafkaProducerParam(brokers []string) *kafkaProducerParam
DefaultKafkaProducerParam is only way to create kafkaProducerParam instance to be used for producer instance creation. This hsa been restricted to avoid user from setting bad default values while configuring a producer. User interested in changing the params should first create param instance using this method and then change the specific param key if needed.
example:
params := DefaultKafkaProducerParam(brokers) params.LogError = false
func NewKafkaConsumer ¶
func NewKafkaConsumer(params *KafkaConsumerParam) (*kafkaConsumer, error)
NewKafkaConsumer creates a new kafkaConsumer instance used for consuming from kafka cluster. Needs KafkaConsumerParam to instantiate the connection.
example:
consumer, err := NewKafkaConsumer(&KafkaConsumerParam{ Brokers: []string{"localhost:9091", "localhost:9092", "localhost:9093"}, GroupID: "test-cg", OffsetInitial: OtNewest, Topics: []string{"test-topic"}, Handlers: map[string]TopicHandler{ "test-topic": &testTopicHandler{}, }, }) if err != nil { // handle error } // start the blocking consumer process consumer.Start(context.Background())
Refer to test example for better understanding ¶
Note: consumer instance is not goroutine safe and should be used once (in one goroutine) to start the consumer. If multiple such consumer is required even to the same broker set and consumer group, consider creating a new one using this function. However using same instance to start consumer multiple times won't be fatal.
func NewKafkaProducer ¶
func NewKafkaProducer(params *kafkaProducerParam) (*kafkaProducer, error)
NewKafkaProducer creates a kafkaProducer instance used for producing messages to kafka cluster. Input kafkaProducerParam should be created using DefaultKafkaProducerParam and then the required params should changed from there. Its not allowed to construct kafkaProducerParam directly. This is enforced to prevent user from setting unexpected default values.
If the user is only interested with providing broker address and do not willing to changes any default param values, NewKafkaProducerQuick can be used instead in such cases. which only takes the broker as input.
example:
params := DefaultKafkaProducerParam([]string{"localhost:9091", "localhost:9092", "localhost:9093"}) params.Retry = 5 params.Acknowledge = AtAll producer, err := NewKafkaProducer(params)
func NewKafkaProducerQuick ¶
NewKafkaProducerQuick is quicker way to instantiate a producer instance by just providing the brokers address all together. It will take case of other params and set them to there best default values. In case user want to have better configured producer as per their use case, they should rather use NewKafkaProducer with custom kafkaProducerParam key values. DefaultKafkaProducerParam is the only way to create kafkaProducerParam because changing any other params associated.
Types ¶
type AcknowledgmentType ¶
type AcknowledgmentType int16
AcknowledgmentType specifies the kind of ack end user should expect while publishing message to the cluster
const ( // Acknowledge from all in-sync replicas AtAll AcknowledgmentType = iota - 1 // No acknowledgement required AtNone // Acknowledge only from master broker AtLocal )
type CompressionType ¶
type CompressionType int8
CompressionType specifies the type of message compression before publishing messages to the cluster
const ( // No compression CtNone CompressionType = iota // GZIP compression type CtGzip // SAPPY data compression CtSnappy // LZ4 algorithm compression CtLz4 // Z-standard compression CtZstd )
type Consumer ¶
type Consumer interface { //Start should trigger the actual message consumption process, it should be blocking in nature to avoid killing //process immaturely. Start(ctx context.Context) //Stop should trigger the closure of consumption process. Should cancel the context to relieve resources and take //care of possible leaks Stop() }
Consumer is the exposed functionality available to the end customer to interact with its consumer instance. The interface is implemented by kafkaConsumer in the package. The interface is generic enough can be used with any other pubsub services.
type ConsumerInterceptor ¶
type ConsumerInterceptor func(ctx context.Context, msg *SubscriberMessage, handler func(context.Context, *SubscriberMessage) bool) bool
Interceptor is construct similar to Around advice in AOP. An Interceptor will be able to not only touch the message or execute something before message being passed to the handler, but also get to do the needful post the handler returns.
B : task before handler A : task after handler Interceptor : IC
msg => IC_0 => {B_0 -> IC_1 => {{B_1 -> .... ->IC_n => {..{B_n -> [msg_handler] -> A_n}..} -> .... -> A_1}} -> A_0}
type ConsumerMiddleware ¶
type ConsumerMiddleware func(ctx context.Context, msg *SubscriberMessage)
Consumer functional Middleware to be used to touch message before it get passed to the actual message handler. Its similar to Before advice in AOP. Middleware can also set some sort of message state that can be retrieved and used later at the time of message handling. Can be thought of as pre handler across all topics and can be used to decorate message before passing it to the handler.
see SubscriberMessage.Meta
Middleware : MW
msg => MW_0 => MW_1 => ...... => MW_n => [msg_handler]
type KafkaConsumerParam ¶
type KafkaConsumerParam struct { // Brokers in kafka clusters Brokers []string // Consumer group id of this consumer group GroupID string // List of topics to start listening from Topics []string // Topic to handlers map to consumer message from a topic. Handlers map[string]TopicHandler // [Optional] // Topic to its fallback handler map. // If the Main handler returns false, it will try to fallback handler. // it will commit the offset not matter fallback handler returns true or false. // default - "no fallback" Fallbacks map[string]TopicHandler // [Optional] // List of Middleware to be triggered post claim of every message & before actual // message handling. Middleware will be triggered in increasing order order of index. // default - "no Middleware" Middleware []ConsumerMiddleware // [Optional] // List of Interceptor, like Middleware it trigger post claim of every message, but unlike // Middleware Interceptor is available after the actual handler return. Interceptors are // triggered in layered manner, lower index being the outer layer and vice versa. This is // similar to recursive call, the one called first will return last. // default - "noOpInterceptor" Interceptor []ConsumerInterceptor // [Optional] // Attach a meta map with every claimed message before passing it to actual handler, can be used // to persist state during the lifecycle of a claimed message. Middleware or Interceptor can also // use this meta to store variable across. default - "false" MessageMeta bool // [Optional] // Client identity for logging purpose ClientID string // [Optional] // The initial offset to use if no offset was previously committed. // Should be OtNewest or OtOldest. defaults - OtNewest. OffsetInitial OffsetType // [Optional] // kafka cluster version. eg - "2.2.1" default - "2.3.0" // supports versions from "0.8.x" to "2.3.x" Version string }
KafkaConsumerParam is the input expected from the user to start a consumer session with the kafka cluster.
type OffsetType ¶
type OffsetType string
OffsetType specifies strategy to determine the starting offset of a consumer group if there is not previously committed offset that consumer group in the cluster. Offset type setting will be ignored all together if client finds any existing committed offset in the cluster while registering its consumer process to the cluster.
const ( // Set offset to the offset of the next message to be appeared in the partition OtNewest OffsetType = "newest" // Set offset to the offset of the oldest available message present in the partition OtOldest OffsetType = "oldest" )
type Producer ¶
type Producer interface { // PublishSync send message to the pubsub cluster in Sync way. Call to this function is blocking and // returns only after the publish is done or result in an error. Meta contains the publish related meta info PublishSync(message *PublisherMessage) (meta map[string]interface{}, err error) // PublishSyncBulk send messages to the pubsub cluster in sync all at once. Call to this function is blocking // and return only after publish attempt is done for all the messages. Return error if the bulk publish is // partially successful PublishSyncBulk(messages []*PublisherMessage) error // PublishAsync send message to the pubsub cluster in Async way. Call to this function is non blocking and // returns immediately. PublishAsync(message *PublisherMessage) // PublishAsyncBulk send messages in bulk to the pubsub cluster in Async way. Call to this function is non blocking // and return immediately PublishAsyncBulk(messages []*PublisherMessage) // Close triggers the closure of the associated producer client to avoid any leaks Close() }
Producer is the exposed functionality to the end customer to interact with the producer instance. The interface is implemented by kafkaConsumer in the package
Example ¶
param := DefaultKafkaProducerParam([]string{"broker-1", "broker-2", "broker-3"}) param.LogError = false producer, err := NewKafkaProducer(param) if err != nil { Logger.Fatalf("Error creating producer client, %v", err) } data, _ := json.Marshal("test message") meta, err := producer.PublishSync(&PublisherMessage{ Topic: "test-topic", Key: "test-key", Data: data, }) if err != nil { Logger.Printf("Error producing message to kafka cluster, %v", err) } else { Logger.Printf("Message publishedm, meta %v", meta) }
Output:
type PublisherMessage ¶
type PublisherMessage struct { // Topic on which message is to be published Topic string // Partition key to decide partition Key string // Actual data to be published Data []byte }
PublisherMessage instance should be used to publish data on a topic.
type SubscriberMessage ¶
type SubscriberMessage struct { // topic of the message Topic string // partition within the topic Partition int32 // offset within the partition Offset int64 // partition key bytes Key []byte // actual message bytes Value []byte // any state to carry with message Meta map[string]interface{} }
SubscriberMessage instance will be received by configured topic handler. Contains data required in standard use cases.
type TopicHandler ¶
type TopicHandler interface { // Handle gets the actual message to be handled. A business logic for a given // message should ideally be implemented here. Handle(ctx context.Context, message *SubscriberMessage) bool }
TopicHandler should be implemented by the user to consume message from a topic. SubscriberMessage received from a topic forwarded to once of such handlers to take care of the business logic required.