kafka

package module
v3.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2022 License: MIT Imports: 23 Imported by: 32

README

dp-kafka

Kafka client wrapper using channels to abstract kafka consumers and producers. This library is built on top of Sarama

Configuration

By default, the library assumes plaintext connections, unless the configuration argument has a non-nil SecurityConfig field.

Setup app to use TLS

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed service that enables you to build and run applications that use Apache Kafka to process streaming data.

As of 2021, our apps have migrated from running our own Kafka to using Amazon MSK as it provides the control-plane operations, such as those for creating, updating, and deleting clusters and lets you use Apache Kafka data-plane operations, such as those for producing and consuming data.

To use TLS, please do the following:

1. Add kafka topics to manifest

First of all, we need to update the manifest of the app to contain the kafka topics which the app uses. This is done as follows:

  1. Create feature branch for the upcoming changes to the manifest of the app in dp-configs

  2. In the manifest of the app, add the revelant kafka topics at the end of the file as follows

kafka:
  topics:
    - name: `topic name 1` (e.g. `content-published`)
      subnets: [ `web` or `publishing` or both `web, publishing` ]
      access: [ `read` or `write` ]
    - name: `topic name 2` 
      subnets: [ `web` or `publishing` or both `web, publishing` ]
      access: [ `read` or `write` ]

More details of kafka section can be found here

An example of adding kafka topics to the manifest can be found here

  1. Review and merge these changes to continue with the next step
2. Create client certificate for the app - Run key-admin script

Next, we need to create a client certificate for the app so that it can connect and authenticate using TLS and its client certificate. This can be achieved by running the key-admin script.

Notes:

  1. The key-admin script checks the manifests of the apps in dp-configs to see whether a client certificate needs to be created for the app (by checking whether any kafka topics are mentioned in the manifest). Therefore, please make sure that your local machine is on the master branch for dp-configs which contains all your changes from the previous step.
  2. Remember to do Step 4 of the Client Certificate README to inject the relevant certificate details into the app's secrets (using the --secrets argument) unless the app is being migrated to AWS MSK in which case this step is done later
  3. Please remember to come back to this README after completing this task to continue with the process.

Follow the steps explained in the Client Certificate README to run key-admin

3. Apply kafka topics to AWS MKS - Run topic-manager script

Next, we need to create the kafka topics - used by the app - on AWS MSK. This can be achieved by running the topic-manager script. Running the script informs AWS MSK of:

  • any new topics (topic manager creates them)
  • new clients/apps (i.e. certs) using the service:
    • topic manager authorises the clients/apps (i.e. certs) to gain the right access (read and/or write) to its topics
    • renewed certs do not need a re-run of topic manager

Notes:

  1. The topic-manager script checks the manifests of the apps in dp-configs to see if any changes to kafka topics (adding or deleting topics) need to be applied to AWS MSK by checking the kafka topics mentioned in the manifest. Therefore, please make sure that your local machine is on the master branch for dp-configs which contains all your changes from the previous step.
  2. Please remember to come back to this README after completing this task to continue with the process.

Follow the steps explained in the Kafka Setup Tools README to run topic-manager

4. Add configs in the app to use AWS MSK

Once the components (in previous steps) have been setup for the app to use TLS, the app itself needs to be updated to use TLS. To achieve this, please do the following:

  1. Update the app README.md to include kafka configurations to connect to AWS MSK using TLS

    | Environment variable         | Default                                   | Description
    | ---------------------------- | ----------------------------------------- | -----------
    | KAFKA_ADDR                   | localhost:39092                           | A list of kafka brokers (TLS-ready)
    | KAFKA_VERSION                | "1.0.2"                                   | The version of (TLS-ready) Kafka being used
    | KAFKA_SEC_PROTO              | _unset_                                   | if set to `TLS`, kafka connections will use TLS ([ref-1])
    | KAFKA_SEC_CA_CERTS           | _unset_                                   | CA cert chain for the server cert ([ref-1])
    | KAFKA_SEC_CLIENT_KEY         | _unset_                                   | PEM for the client key ([ref-1])
    | KAFKA_SEC_CLIENT_CERT        | _unset_                                   | PEM for the client certificate ([ref-1])
    | KAFKA_SEC_SKIP_VERIFY        | false                                     | ignores server certificate issues if `true` ([ref-1])
    
    
     [ref-1]:  https://github.com/ONSdigital/dp-kafka/tree/main/examples#tls 'kafka TLS examples documentation'
    
  2. Add the configurations to config.go

    // KafkaTLSProtocolFlag informs service to use TLS protocol for kafka
    const KafkaTLSProtocolFlag = "TLS"
    
    type Config struct {
         // TO-REMOVE: this struct already contains other configs in the app but update this struct accordingly with the following organised layout
         KafkaConfig                KafkaConfig
     }
    
     // KafkaConfig contains the config required to connect to Kafka
     type KafkaConfig struct {
         Brokers                  []string `envconfig:"KAFKA_ADDR"                            json:"-"`
         Version                  string   `envconfig:"KAFKA_VERSION"`
         SecProtocol              string   `envconfig:"KAFKA_SEC_PROTO"`
         SecCACerts               string   `envconfig:"KAFKA_SEC_CA_CERTS"`
         SecClientKey             string   `envconfig:"KAFKA_SEC_CLIENT_KEY"                  json:"-"`
         SecClientCert            string   `envconfig:"KAFKA_SEC_CLIENT_CERT"`
         SecSkipVerify            bool     `envconfig:"KAFKA_SEC_SKIP_VERIFY"`
     }
    
    // TO-REMOVE: set the default values of the kafka configs in the Get() as the following
    KafkaConfig: KafkaConfig{
             Brokers:                  []string{"localhost:9092"},
             Version:                  "1.0.2",
             SecProtocol:              "",
             SecCACerts:               "",
             SecClientCert:            "",
             SecClientKey:             "",
             SecSkipVerify:            false,
     },
    
  3. Update the test to check config default values in config_test.go accordingly

    So(cfg.KafkaConfig.Brokers[0], ShouldResemble, []string{"localhost:9092"})
    So(cfg.KafkaConfig.Version, ShouldEqual, "1.0.2")
    So(cfg.KafkaConfig.SecProtocol, ShouldEqual, "")
    So(cfg.KafkaConfig.SecCACerts, ShouldEqual, "")
    So(cfg.KafkaConfig.SecClientCert, ShouldEqual, "")
    So(cfg.KafkaConfig.SecClientKey, ShouldEqual, "")
    So(cfg.KafkaConfig.SecSkipVerify, ShouldBeFalse)
    

    An example of adding kafka configs to the app can be found here

  4. Continue to the next step in the Creation of Kafka Producer and/or Consumer

Creation

Kafka producers and consumers can be created with constructors that accept the required configuration. You may get the channels struct using producer.Channels() and consumer.Channels() respectively.

For optional config values, the Sarama default config values will be used unless an override is provided. If a compulsory value is not provided, the config validation will fail, and the error will be returned by the constructor function.

The constructor tries to initialise the producer/consumer by creating the underlying Sarama client, but failing to do so will not result in an error: an uninitialised consumer/producer will be returned instead.

Producer creation

The producer constructor accepts a configuration. The configuration needs to provide at least Brokers and ProducedTopic. You may also provide overrides for default Sarama configs, like KafkaVersion or MaxMessageBytes, and/or a SecurityConfig as described in the previous section.

    // Create Producer with config
    pConfig := &kafka.ProducerConfig{
        BrokerAddrs:     kafkaConfig.Brokers,       // compulsory
        Topic:           kafkaConfig.ProducedTopic, // compulsory
        KafkaVersion:    &kafkaConfig.Version,
        MaxMessageBytes: &kafkaConfig.MaxBytes,
    }
    if kafkaConfig.SecProtocol == config.KafkaTLSProtocolFlag {
        pConfig.SecurityConfig = kafka.GetSecurityConfig(
            kafkaConfig.SecCACerts,
            kafkaConfig.SecClientCert,
            kafkaConfig.SecClientKey,
            kafkaConfig.SecSkipVerify,
        )
    }
    producer, err := kafka.NewProducer(ctx, pConfig)
ConsumerGroup creation

The consumerGroup constructor accepts a configuration. The configuration needs to provide at least Brokers, Topic and GroupName. You may also provide overrides for default Sarama configs, like KafkaVersion or MaxMessageBytes, and/or a SecurityConfig as described in the previous section.

For concurrent consumers, you can override NumWorkers config (default is 1), which determine the number of go-routines that will handle messages.

For batch consumers, you can override BatchSize (default is 1) and BatchWaitTime (default is 200 ms), which determine the maximum number of kafka messages stored in a batch, and the maximum time to wait until the batch is processed, respectively, and/or a SecurityConfig as described in the previous section.

The constructor will create an Upstream channel with a buffer size that is the maximum between BatchSize and NumWorkers

    // Create ConsumerGroup with config
    cgConfig := &kafka.ConsumerGroupConfig{
        BrokerAddrs:  kafkaConfig.Brokers,       // compulsory
        Topic:        kafkaConfig.ConsumedTopic, // compulsory
        GroupName:    kafkaConfig.ConsumedGroup, // compulsory
        KafkaVersion: &kafkaConfig.Version,
        NumWorkers:   &kafkaConfig.KafkaParallelMessages,
    }
    if kafkaConfig.SecProtocol == config.KafkaTLSProtocolFlag {
        cgConfig.SecurityConfig = kafka.GetSecurityConfig(
            kafkaConfig.SecCACerts,
            kafkaConfig.SecClientCert,
            kafkaConfig.SecClientKey,
            kafkaConfig.SecSkipVerify,
        )
    }
    cg, err := kafka.NewConsumerGroup(ctx, cgConfig)
    cgConfig := &kafka.ConsumerGroupConfig{
        BrokerAddrs:  cfg.Brokers,       // compulsory
        Topic:        cfg.ConsumedTopic, // compulsory
        GroupName:    cfg.ConsumedGroup, // compulsory
        KafkaVersion: &cfg.KafkaVersion,
        NumWorkers:   &cfg.KafkaParallelMessages,
    }
    if kafkaConfig.SecProtocol == config.KafkaTLSProtocolFlag {
        cgConfig.SecurityConfig = kafka.GetSecurityConfig(
            kafkaConfig.SecCACerts,
            kafkaConfig.SecClientCert,
            kafkaConfig.SecClientKey,
            kafkaConfig.SecSkipVerify,
        )
    }
    cg, err := consumer.New(ctx, cgConfig)

Life-cycle

ConsumerStateMachine

Initialisation

If the producer/consumer can establish a connection with the Kafka cluster, it will be initialised at creation time, which is usually the case. But it might not be able to do so, for example if the kafka cluster is not running. If a producer/consumer is not initialised, it cannot contact the kafka broker, and it cannot send or receive any message. Any attempt to send a message in this state will result in an error being sent to the Errors channel.

An uninitialised producer/consumer will try to initialise later, asynchronously, in a retry loop following an exponential backoff strategy. You may also try to initialise it calling Initialise(). In any case, when the initialisation succeeds, the initialisation loop will exit, and the producer/consumer will go to its next state.

You can check if a producer/consumer is initialised by calling IsInitialised() or wait for it to be initialised by waiting for the 'Initialised' channel to be closed, like so:

    // wait in a parallel go-routine
    go func() {
        <-channels.Initialised
        doStuff()
    }()
    // block until kafka is initialised
    <-channels.Initialised
    doStuff()

Waiting for this channel is a convenient hook, but not a necessary requirement.

Note that initialised consumers will be in 'Stopped' state until Start is called.

Message production

Messages can be safely sent to Kafka by using the producer Send function, which marshals the provided event with the provided schema and then sends the data to the Output channel. An error will be returned if marshal fails or the Output channel is closed:

    err := producer.Send(schema, event)
    ...

Alternatively, you may send the byte array directly to the Output channel, like so:

    producer.Channels().Output <- []byte(msg)

Message consumption using handlers

This is the recommended approach.

You can register one handler in order to Consume messages in a managed way. There are two types of handlers available: single message and batch, but only one handler can be registered per consumer.

Single message

You may register a single-message handler like so:

    // Create Handler struct with all required dependencies
    type Handler struct {}

    // Implement the Handler func
    func (h *Handler) Handle(ctx context.Context, workerID int, msg kafka.Message) error {
        log.Info(ctx, "message processed", log.Data{"worker_id": workerID})
        return nil
    }

    ...

    // register the handler to your kafka consumer
    consumer.RegisterHandler(ctx, handler.Handle)

By default, messages are consumed sequentially, but you can provide a NumWorkers value greater than one to consume messages concurrently. One go-routine will be create for each worker.

Note that the number of concurrent messages that will be consumed is the number of partitions assigned to this consumer group. If NumWorkers is greater, the number of concurrent messages will be capped to the number of partitions.

Batch of messages

You may register a batch-message handler like so:

    // Create Handler struct with all required dependencies
    type Handler struct {}

    // Implement the Handler func
    func (h *Handler) Handle(ctx context.Context, batch []kafka.Message) error {
        log.Info(ctx, "batch processed", log.Data{"size": len(batch)})
        return nil
    }

    // register the batch handler to your kafka consumer
    svc.consumer.RegisterBatchHandler(ctx, handler.Handle)

Incoming messages will be accumulated until a total of BatchSize messages, or after a period of BatchWaitTime has elapsed. Then the Handler will be called with the batch of messages. No messages will be committed until the handler finishes its execution. Batch handling is sequential: NumWorkers is ignored.

Message consumption using Upstream channel directly

This approach is NOT recommended. Only to be used if you need to implement your own message processing strategy which is different than the default handlers.

Messages can be consumed by creating an infinite consumption loop. Once you are ready to consume another message, you need to call Release(), and once a message has finished being processed you need to call Commit(). You may call CommitAndRelease() to do both at the same time:

// consumer loop
func consume(upstream chan kafka.Message) {
    for {
        msg := <-upstream
        doStuff(msg)
        msg.CommitAndRelease()
    }
}

You may create a single go-routine to consume messages sequentially, or multiple parallel go-routines (workers) to consume them concurrently:

    // single consume go-routine
    go consume(channels.Upstream)
    // multiple workers to consume messages in parallel
    for w := 1; w <= kafkaConfig.ParallelMessages; w++ {
        go consume(channels.Upstream)
    }

You can consume up to as may messages in parallel as partitions are assigned to your consumer, more info in the deep dive section.

Message consumption - Sarama Kafka sessions

KafkaConcurrency

Sarama creates as many go-routines as partitions are assigned to the consumer, for the topic being consumed.

For example, if we have a topic with 60 partitions and we have 2 instances of a service that consumes that topic running at the same time, kafka will assign 30 partitions to each one.

Then Sarama will create 30 parallel go-routines, which this library uses in order to send messages to the upstream channel. Each go-routine waits for the message to finish being processed by waiting for the message-specific upstreamDone channel to be closed, like so:

    channels.Upstream <- msg
    <-msg.upstreamDone

The consumer can consume messages from the Upstream channel in parallel, up to the maximum number of partitions that Sarama allocated to the consumer. In the example above, that would be a maximum of 30 messages in parallel.

Each Sarama consumption go routine exists only during a particular session. Sessions are periodically destroyed and created by Sarama, according to Kafka events like a cluster re-balance (where the number of partitions assigned to a consumer may change). It is important that messages are released as soon as possible when this happens. The default message consumption timeout is 10 seconds in this scenario (determined by config.Consumer.Group.Session.Timeout).

When a session finishes, we call Consume() again, which tries to establish a new session. If an error occurs trying to establish a new session, it will be retried following an exponential backoff strategy.

Start/Stop consumer

A consumer can be stopped by calling Stop():

  • If the consumer was not initialised, this will set the initial state to 'stopped', so the consumer will do nothing until it is started.
  • If the consumer is Satarting/Consuming, the sarama handler will be notified that it needs to stop: it will stop processing any in-flight message, it will not consume any new message and it will terminate the session.

Stop() is asynchronous: if the consumer is consuming, it will return as soon as the control go-routine is notified, without waiting for it to finish. If the caller needs to block until the consumer has completely stopped consuming, then you can call StopAndWait(), which is synchronous.

A consumer can be started by calling Start():

  • If the consumer was not initialised, this will set the initial state to 'starting', so the consumer will start consuming straight away after being initialised.
  • If the consumer is Stopping/Stopped, the idle loop will be notified that it needs to start consuming.
Waiting for a state to be reached

If your code requires to perform some action only after a specific state has been reached, you may use the corresponding State channel. Each state channel is closed every time a state is reached, and reopened when the state is left. In order to access the channels in a thread-safe manner, they have been wrapped with a RW-Mutex, and a Wait() func has been implemented.

For example, the following code will only be executed once the Consuming state has been reached. If the consumer is already in Consuming state, the code will not block:

    ...
    // Wait for consuming state
    consumer.Channels().State.Consuming.Wait()
    doStuffThatRequiresConsuming()
    ...

A function called StateWait has been implemented for simplicity, which does the same. You can call it like so:

    ...
    // Wait for consuming state
    consumer.StateWait(kafka.Consuming)
    doStuffThatRequiresConsuming()
    ...
Waiting for a state in a select statement

If you require to wait until a state is reached or some other condition happens, you may use a select statement with the corresponding state channel.

If you need to do this, you will need to acquire the read lock on the state channel to prevent any race condition, releasing it as soon as your code is no longer waiting on the channel.

For example:

    m := consumer.Channels().State.Consuming.RWMutex()
    m.RLock()

    delay := time.NewTimer(someTime)
    select {
    case <-delay.C:
        m.RUnlock()
        ...
    case <-consumer.Channels().State.Consuming.Channel()
        m.RUnlock()
        // release the 'delay' timer as what is done in the code base
        ...
    }
    ...

WARNING: Make sure you release the lock as soon as possible. Blocking it will prevent the kafka consumer state machine from transitioning to the state again in the future!

Closing

Producers can be closed by calling the Close method.

For graceful handling of Closing consumers, it is advised to use the StopAndWait() method prior to the Close method. This will allow inflight messages to be completed and successfully call commit so that the message does not get replayed once the application restarts.

The Closer channel is used to signal to all the loops that they need to exit because the consumer is being closed.

After successfully closing a producer or consumer, the corresponding Closed channel is closed.

Headers

The headers are key-value pairs that are transparently passed by Kafka between producers and consumers.By default the traceid predefined header will be added to every kafka producer message.There is also the option to add custom headers to kafka by doing the following

// Create Producer with channels and config
pChannels := kafka.CreateProducerChannels()
pConfig := &kafka.ProducerConfig{MaxMessageBytes: &cfg.KafkaMaxBytes}
producer, err := kafka.NewProducer(ctx, cfg.Brokers, cfg.ProducedTopic, pChannels, pConfig)
producer.AddHeader(key, value)

The consumers can then retrieve these headers by the GetHeader api as follows.

// consumer loop
func consume(upstream chan kafka.Message) {
	for {
		msg := <-upstream
		value := msg.GetHeader(key)
		msg.Commit()
	}
}

Health-check

The health status of a consumer or producer can be obtained by calling Checker method, which updates the provided CheckState structure with the relevant information:

check, err = cli.Checker(ctx)
  • If a broker cannot be reached, the Status is set to CRITICAL.
  • If all brokers can be reached, but a broker does not provide the expected topic metadata, the Status is set to WARNING.
  • If all brokers can be reached and return the expected topic metadata, we try to initialise the consumer/producer. If it was already initialised, or the initialisation is successful, the Status is set to OK.

Health subscription

The consumer group implements dp-healthcheck's Subscriber interface. You can subscribe the kafka consumer to a set of dp-healthcheck Checks. Then the consumer will start consuming when 'Ok' status is reported and it will stop when 'WARNING' or 'CRITICAL' statuses are reported.

Assuming you have a healthcheck hc, you can subscribe a kafka consumer like so:

    hc.Subscribe(consumer, check1, check2, check3)

Warnings:

  • A kafka consumer may be subscribed to multiple checkers, but it must be subscribed to only one healthcheck library instance.

  • Once the consumer is subscribed, calls to Start/Stop must not be manually performed, as they would interfere with the managed calls on health change events.

Examples

See the examples below for some typical usages of this library.

Testing

Some mocks are provided, so that you can test your code interactions with this library. More details here.

Documentation

Index

Constants

View Source
const (
	Errors      = "Errors"
	Initialised = "Initialised"
	Consume     = "Consume"
	Closer      = "Closer"
	Closed      = "Closed"
	Upstream    = "Upstream"
	Output      = "Output"
)

channel names

View Source
const (
	OffsetNewest = sarama.OffsetNewest
	OffsetOldest = sarama.OffsetOldest
)

Consumer config constants

View Source
const ErrorChanBufferSize = 20
View Source
const MsgHealthyConsumerGroup = "kafka consumer group is healthy"

MsgHealthyConsumerGroup Check message returned when Kafka consumer group is healthy.

View Source
const MsgHealthyProducer = "kafka producer is healthy"

MsgHealthyProducer Check message returned when Kafka producer is healthy.

View Source
const ServiceName = "Kafka"

ServiceName is the name of this service: Kafka.

View Source
const (
	TraceIDHeaderKey = string(request.RequestIdKey)
)

Variables

View Source
var ErrTLSCannotLoadCACerts = errors.New("cannot load CA Certs")

ErrTLSCannotLoadCACerts is returned when the certs file cannot be loaded

Functions

func GetPrincipal

func GetPrincipal(app, subnet, domain string) string

func GetRetryTime

func GetRetryTime(attempt int, retryTime, maxRetryTime time.Duration) time.Duration

GetRetryTime will return a duration based on the attempt and initial retry time. It uses the algorithm `2^n` where `n` is the modulerised attempt number, so that we don't get values greater than 'maxRetryTime'. A randomization factor of ±25% is added to the initial retry time so that the server isn't being hit at the same time by many clients. The first attempt is assumed to be number 1, any negative or 0 value will be assumed to be 1.

func NewAdmin

func NewAdmin(brokerAddrs []string, adminCfg *AdminConfig) (sarama.ClusterAdmin, error)

NewAdmin creates an admin-based client

func SafeClose

func SafeClose(ch chan struct{}) (justClosed bool)

SafeClose closes a struct{} channel and ignores the panic if the channel was already closed

func SafeCloseBool

func SafeCloseBool(ch chan bool) (justClosed bool)

SafeCloseBool closes a bool channel and ignores the panic if the channel was already closed

func SafeCloseBytes

func SafeCloseBytes(ch chan []byte) (justClosed bool)

SafeCloseBytes closes a byte array channel and ignores the panic if the channel was already closed

func SafeCloseErr

func SafeCloseErr(ch chan error) (justClosed bool)

SafeCloseErr closes an error channel and ignores the panic if the channel was already closed

func SafeCloseMessage

func SafeCloseMessage(ch chan Message) (justClosed bool)

SafeCloseMessage closes a Message channel and ignores the panic if the channel was already closed

func SafeSendBool

func SafeSendBool(ch chan bool, val bool) (err error)

SafeSendBool sends a provided bool value to the provided bool chan and returns an error instead of panicking if the channel is closed

func SafeSendBytes

func SafeSendBytes(ch chan []byte, val []byte) (err error)

SafeSendBytes sends a provided byte array value to the provided byte array chan and returns an error instead of panicking if the channel is closed

func SafeSendErr

func SafeSendErr(ch chan error, val error) (err error)

SafeSendErr sends a provided error value to the provided error chan and returns an error instead of panicking if the channel is closed

func SafeSendProducerMessage

func SafeSendProducerMessage(ch chan<- *sarama.ProducerMessage, val *sarama.ProducerMessage) (err error)

SafeSendProducerMessage sends a provided ProducerMessage value to the provided ProducerMessage chan and returns an error instad of panicking if the channel is closed

func SetMaxMessageSize

func SetMaxMessageSize(maxSize int32)

SetMaxMessageSize sets the Sarama MaxRequestSize and MaxResponseSize values to the provided maxSize

func UnwrapLogData

func UnwrapLogData(err error) log.Data

UnwrapLogData recursively unwraps logData from an error. This allows an error to be wrapped with log.Data at each level of the call stack, and then extracted and combined here as a single log.Data entry. This allows us to log errors only once but maintain the context provided by log.Data at each level.

func WaitWithTimeout

func WaitWithTimeout(wg *sync.WaitGroup, tout time.Duration) bool

WaitWithTimeout blocks until all go-routines tracked by a WaitGroup are done or a timeout expires. It returns true if the timeout expired, or false if the waitgroup finished before the timeout.

Types

type Acls

type Acls []*sarama.AclCreation

func (Acls) Apply

func (t Acls) Apply(adm sarama.ClusterAdmin) error

type AdminConfig

type AdminConfig struct {
	KafkaVersion   *string
	KeepAlive      *time.Duration
	RetryBackoff   *time.Duration
	RetryMax       *int
	SecurityConfig *SecurityConfig
}

AdminConfig exposes the optional configurable parameters for an admin client to overwrite default Sarama config values. Any value that is not provided will use the default Sarama config value.

func (*AdminConfig) Get

func (a *AdminConfig) Get() (*sarama.Config, error)

Get creates a default sarama config and overwrites any values provided in pConfig

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch handles adding raw messages to a batch of ObservationExtracted events.

func NewBatch

func NewBatch(batchSize int) *Batch

NewBatch returns a new batch instance of the given size.

func (*Batch) Add

func (batch *Batch) Add(message Message)

Add a message to the batch.

func (*Batch) Clear

func (batch *Batch) Clear()

Clear will reset to batch to contain no messages

func (*Batch) Commit

func (batch *Batch) Commit()

Commit is called when the batch has been processed. The last message has been released already, so at this point we just need to commit

func (*Batch) IsEmpty

func (batch *Batch) IsEmpty() bool

IsEmpty returns true if the batch has no messages in it.

func (*Batch) IsFull

func (batch *Batch) IsFull() bool

IsFull returns true if the batch is full based on the configured maxSize.

func (*Batch) Size

func (batch *Batch) Size() int

Size returns the number of messages currently in the batch.

type BatchHandler

type BatchHandler func(ctx context.Context, batch []Message) error

BatchHandler represents a handler for processing a batch of kafka messages. This method will be called by only one go-routine at a time.

type Commiter

type Commiter interface {
	Commit() bool
}

Commiter represents an error type that defines a bool method to enable or disable a message/batch to be committed.

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

ConsumerGroup is a Kafka consumer group instance.

func NewConsumerGroup

func NewConsumerGroup(ctx context.Context, cgConfig *ConsumerGroupConfig) (*ConsumerGroup, error)

NewConsumerGroup creates a new consumer group with the provided parameters

func (*ConsumerGroup) Channels

func (cg *ConsumerGroup) Channels() *ConsumerGroupChannels

Channels returns the ConsumerGroup channels for this consumer group

func (*ConsumerGroup) Checker

func (cg *ConsumerGroup) Checker(ctx context.Context, state *healthcheck.CheckState) error

Checker checks health of Kafka consumer-group and updates the provided CheckState accordingly

func (*ConsumerGroup) Close

func (cg *ConsumerGroup) Close(ctx context.Context, optFuncs ...OptFunc) (err error)

Close safely closes the consumer and releases all resources. pass in a context with a timeout or deadline.

func (*ConsumerGroup) Initialise

func (cg *ConsumerGroup) Initialise(ctx context.Context) error

Initialise creates a new Sarama ConsumerGroup and the consumer/error loops, only if it was not already initialised.

func (*ConsumerGroup) IsInitialised

func (cg *ConsumerGroup) IsInitialised() bool

IsInitialised returns true only if Sarama ConsumerGroup has been correctly initialised.

func (*ConsumerGroup) LogErrors

func (cg *ConsumerGroup) LogErrors(ctx context.Context)

LogErrors creates a go-routine that waits on Errors channel and logs any error received. It exits on Closer channel closed.

func (*ConsumerGroup) OnHealthUpdate

func (cg *ConsumerGroup) OnHealthUpdate(status string)

OnHealthUpdate implements the healthcheck Subscriber interface so that the kafka consumer can be notified of state changes. This method is intended to be used for managed start/stop's only, and should not be called manually. WARNING: Having more than one notifier calling this method will result in unexpected behavior. - On Health status OK: start consuming - On Warning or Critical: stop consuming

func (*ConsumerGroup) RegisterBatchHandler

func (cg *ConsumerGroup) RegisterBatchHandler(ctx context.Context, batchHandler BatchHandler) error

func (*ConsumerGroup) RegisterHandler

func (cg *ConsumerGroup) RegisterHandler(ctx context.Context, h Handler) error

func (*ConsumerGroup) Start

func (cg *ConsumerGroup) Start() error

Start has different effects depending on the state: - Initialising: the consumer will try to start consuming straight away once it's initialised - Starting/Consumer: no change will happen - Stopping/Stopped: the consumer will start consuming - Closing: an error will be returned

func (*ConsumerGroup) State

func (cg *ConsumerGroup) State() State

State returns the state of the consumer group

func (*ConsumerGroup) StateWait added in v3.1.0

func (cg *ConsumerGroup) StateWait(state State)

StateWait blocks the calling thread until the provided state is reached

func (*ConsumerGroup) Stop

func (cg *ConsumerGroup) Stop() error

Stop has different effects depending on the state: - Initialising: the consumer will remain in the Stopped state once initialised, without consuming messages - Starting/Consumer: no change will happen - Stopping/Stopped: the consumer will start start consuming - Closing: an error will be returned This method does not wait until the consumerGroup reaches the stopped state, it only triggers the stopping action. Any error while trying to trigger the stop will be returned

func (*ConsumerGroup) StopAndWait

func (cg *ConsumerGroup) StopAndWait() error

StopAndWait has different effects depending on the state: - Initialising: the consumer will remain in the Stopped state once initialised, without consuming messages - Starting/Consumer: no change will happen - Stopping/Stopped: the consumer will start start consuming - Closing: an error will be returned This method waits until the consumerGroup reaches the stopped state if it was starting/consuming. Any error while trying to trigger the stop will be returned and the therad will not be blocked.

type ConsumerGroupChannels

type ConsumerGroupChannels struct {
	Upstream    chan Message
	Errors      chan error
	Initialised chan struct{}
	Consume     chan bool
	Closer      chan struct{}
	Closed      chan struct{}
	State       *ConsumerStateChannels
}

ConsumerGroupChannels represents the channels used by ConsumerGroup.

func CreateConsumerGroupChannels

func CreateConsumerGroupChannels(upstreamBufferSize, errorsBufferSize int) *ConsumerGroupChannels

CreateConsumerGroupChannels initialises a ConsumerGroupChannels with new channels. You can provide the buffer size to determine the number of messages that will be buffered in the upstream channel (to receive messages) The State channels are not initialised until the state machine is created.

func (*ConsumerGroupChannels) Validate

func (consumerChannels *ConsumerGroupChannels) Validate() error

Validate returns an Error with a list of missing channels if any consumer channel is nil

type ConsumerGroupConfig

type ConsumerGroupConfig struct {
	// Sarama config overrides
	KafkaVersion          *string
	KeepAlive             *time.Duration
	RetryBackoff          *time.Duration
	RetryBackoffFunc      *func(retries int) time.Duration
	Offset                *int64
	SecurityConfig        *SecurityConfig
	MessageConsumeTimeout *time.Duration

	// dp-kafka specific config overrides
	NumWorkers        *int
	BatchSize         *int
	BatchWaitTime     *time.Duration
	MinRetryPeriod    *time.Duration
	MaxRetryPeriod    *time.Duration
	MinBrokersHealthy *int
	Topic             string
	GroupName         string
	BrokerAddrs       []string
}

ConsumerGroupConfig exposes the configurable parameters for a consumer group to overwrite default config values and any other defult config values set by dp-kafka. Any value that is not provided will use the default Sarama config value, or the default dp-kafka value. The only 3 compulsory values are: - Topic - GroupName - BrokerAddrs

func (*ConsumerGroupConfig) Get

func (c *ConsumerGroupConfig) Get() (*sarama.Config, error)

Get creates a default sarama config for a consumer-group and overwrites any values provided in cgConfig. If any required value is not provided or any override is invalid, an error will be returned

func (*ConsumerGroupConfig) Validate

func (c *ConsumerGroupConfig) Validate() error

Validate that compulsory values are provided in config

type ConsumerStateChannels

type ConsumerStateChannels struct {
	Initialising *StateChan
	Stopped      *StateChan
	Starting     *StateChan
	Consuming    *StateChan
	Stopping     *StateChan
	Closing      *StateChan
}

ConsumerStateChannels represents the channels that are used to notify of consumer-group state changes

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error is the handler package's error type. Is not meant to be compared as a a type, but information should be extracted via the interfaces it implements with callback functions. Is not guaranteed to remain exported so shouldn't be treated as such.

func NewError

func NewError(err error, logData map[string]interface{}) *Error

NewError creates a new Error

func (*Error) Error

func (e *Error) Error() string

Error implements the Go standard error interface

func (*Error) LogData

func (e *Error) LogData() map[string]interface{}

LogData implements the DataLogger interface which allows you extract embedded log.Data from an error

func (*Error) Unwrap

func (e *Error) Unwrap() error

Unwrap returns the wrapped error

type Handler

type Handler func(ctx context.Context, workerID int, msg Message) error

Handler represents a handler for processing a single kafka message. IMPORTANT: if maxConsumers > 1 then this method needs to be thread safe.

type HealthInfo

type HealthInfo struct {
	Reachable bool
	HasTopic  bool
}

HealthInfo contains the health information for one broker

type HealthInfoMap

type HealthInfoMap struct {
	// contains filtered or unexported fields
}

HealthInfoMap contains the health information for a set of brokers with a common topic expected to be available in all of them

func Healthcheck

func Healthcheck(ctx context.Context, brokers []interfaces.SaramaBroker, topic string, cfg *sarama.Config) HealthInfoMap

Healthcheck validates all the provided brokers for the provided topic. It returns a HealthInfoMap containing all the information.

func (*HealthInfoMap) Set

func (h *HealthInfoMap) Set(broker interfaces.SaramaBroker, healthInfo HealthInfo)

Set creates or overrides a HealthInfo value for the provided broker

func (*HealthInfoMap) UpdateStatus

func (h *HealthInfoMap) UpdateStatus(state *health.CheckState, minHealthyThreshold int, msgHealthy string) error

UpdateStatus update the provided health.Check state with the current state according to the provided minimum number of healthy brokers for the group to be considered healthy. If the health status is OK, the provided msgHealthy will be used as status message.

type IConsumerGroup

type IConsumerGroup interface {
	Channels() *ConsumerGroupChannels
	State() State
	StateWait(state State)
	RegisterHandler(ctx context.Context, h Handler) error
	RegisterBatchHandler(ctx context.Context, batchHandler BatchHandler) error
	Checker(ctx context.Context, state *healthcheck.CheckState) error
	IsInitialised() bool
	Initialise(ctx context.Context) error
	OnHealthUpdate(status string)
	Start() error
	Stop() error
	StopAndWait() error
	LogErrors(ctx context.Context)
	Close(ctx context.Context, optFuncs ...OptFunc) error
}

IConsumerGroup is an interface representing a Kafka Consumer Group, as implemented in dp-kafka/consumer

type IProducer

type IProducer interface {
	Channels() *ProducerChannels
	Checker(ctx context.Context, state *healthcheck.CheckState) error
	LogErrors(ctx context.Context)
	IsInitialised() bool
	Initialise(ctx context.Context) error
	Send(schema *avro.Schema, event interface{}) error
	Close(ctx context.Context) (err error)
	AddHeader(key, value string)
}

IProducer is an interface representing a Kafka Producer, as implemented in dp-kafka/producer

type Message

type Message interfaces.Message

type OptFunc added in v3.7.0

type OptFunc func()

OptFunc is basically an optional function that is run once the upstream channel is closed and before consumer closer is called. for example , while doing the graceful shutdown you would have received messages which are not processed, like you would have released a message when you receive it from upstream and added to a batch and after a certain time when the batch is processed then only messages are committed back . Now if you don't process this batch during the graceful shutdown this can create a lag in the consumer group.

The optional functions can basically do this for you. During the graceful shutdown you can pass, say for the above case the batch processing to process the unprocessed batch and commit them back to the consumer group while making sure no new messages are received.The optional function is run once the upstream channel is closed to make sure no new messages are received and before the consumer is closed so that the remaining messages can be processed and committed back to the consumer group.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer is a producer of Kafka messages

func NewProducer

func NewProducer(ctx context.Context, pConfig *ProducerConfig) (producer *Producer, err error)

NewProducer returns a new producer instance using the provided config and channels. The rest of the config is set to defaults. If any channel parameter is nil, an error will be returned.

func (*Producer) AddHeader added in v3.4.0

func (p *Producer) AddHeader(key, value string)

func (*Producer) Channels

func (p *Producer) Channels() *ProducerChannels

Channels returns the Producer channels for this producer

func (*Producer) Checker

func (p *Producer) Checker(ctx context.Context, state *healthcheck.CheckState) error

Checker checks health of Kafka producer and updates the provided CheckState accordingly

func (*Producer) Close

func (p *Producer) Close(ctx context.Context) (err error)

Close safely closes the producer and releases all resources. pass in a context with a timeout or deadline.

func (*Producer) Initialise

func (p *Producer) Initialise(ctx context.Context) error

Initialise creates a new Sarama AsyncProducer and the channel redirection, only if it was not already initialised.

func (*Producer) IsInitialised

func (p *Producer) IsInitialised() bool

IsInitialised returns true only if Sarama producer has been correctly initialised.

func (*Producer) LogErrors

func (p *Producer) LogErrors(ctx context.Context)

LogErrors creates a go-routine that waits on Errors channel and logs any error received. It exits on Closer channel closed.

func (*Producer) Send

func (p *Producer) Send(schema *avro.Schema, event interface{}) error

Send marshals the provided event with the provided schema, and sends it to kafka

type ProducerChannels

type ProducerChannels struct {
	Output      chan []byte
	Errors      chan error
	Initialised chan struct{}
	Closer      chan struct{}
	Closed      chan struct{}
}

ProducerChannels represents the channels used by Producer.

func CreateProducerChannels

func CreateProducerChannels() *ProducerChannels

CreateProducerChannels initialises a ProducerChannels with new channels.

func (*ProducerChannels) Validate

func (producerChannels *ProducerChannels) Validate() error

Validate returns an error with a list of missing channels if any producer channel is nil

type ProducerConfig

type ProducerConfig struct {
	// Sarama config overrides
	KafkaVersion      *string
	MaxMessageBytes   *int
	RetryMax          *int
	KeepAlive         *time.Duration
	RetryBackoff      *time.Duration
	RetryBackoffFunc  *func(retries, maxRetries int) time.Duration
	SecurityConfig    *SecurityConfig
	MinBrokersHealthy *int

	// dp-kafka specific config
	Topic          string
	BrokerAddrs    []string
	MinRetryPeriod *time.Duration
	MaxRetryPeriod *time.Duration
}

ProducerConfig exposes the optional configurable parameters for a producer to overwrite default Sarama config values. Any value that is not provided will use the default Sarama config value.

func (*ProducerConfig) Get

func (p *ProducerConfig) Get() (*sarama.Config, error)

Get creates a default sarama config and overwrites any values provided in pConfig

func (*ProducerConfig) Validate

func (p *ProducerConfig) Validate() error

Validate that compulsory values are provided in config

type SaramaMessage

type SaramaMessage struct {
	// contains filtered or unexported fields
}

SaramaMessage represents a Sarama specific Kafka message

func NewSaramaMessage

func NewSaramaMessage(m *sarama.ConsumerMessage, s sarama.ConsumerGroupSession, ud chan struct{}) *SaramaMessage

func (SaramaMessage) Commit

func (m SaramaMessage) Commit()

Commit marks the message as consumed, and then commits the offset to the backend

func (SaramaMessage) CommitAndRelease

func (m SaramaMessage) CommitAndRelease()

CommitAndRelease marks the message as consumed, commits the offset to the backend and releases the UpstreamDone channel

func (SaramaMessage) Context added in v3.4.0

func (M SaramaMessage) Context() context.Context

Context returns a context with traceid.

func (SaramaMessage) GetData

func (m SaramaMessage) GetData() []byte

GetData returns the message contents.

func (SaramaMessage) GetHeader added in v3.4.0

func (M SaramaMessage) GetHeader(key string) string

GetHeader takes a key for the header and returns the value if the key exist in the header.

func (SaramaMessage) Mark

func (m SaramaMessage) Mark()

Mark marks the message as consumed, but doesn't commit the offset to the backend

func (SaramaMessage) Offset

func (m SaramaMessage) Offset() int64

Offset returns the message offset

func (SaramaMessage) Release

func (m SaramaMessage) Release()

Release closes the UpstreamDone channel, but doesn't mark the message or commit the offset

func (SaramaMessage) UpstreamDone

func (m SaramaMessage) UpstreamDone() chan struct{}

UpstreamDone returns the upstreamDone channel. Closing this channel notifies that the message has been consumed (same effect as calling Release)

type SecurityConfig

type SecurityConfig struct {
	RootCACerts        string
	ClientCert         string
	ClientKey          string
	InsecureSkipVerify bool
}

SecurityConfig is common to producers and consumer configs, above

func GetSecurityConfig

func GetSecurityConfig(caCerts, clientCert, clientKey string, skipVerify bool) *SecurityConfig

type State

type State int
const (
	Initialising State = iota
	Stopped
	Starting
	Consuming
	Stopping
	Closing
)

func (State) String

func (s State) String() string

type StateChan added in v3.1.0

type StateChan struct {
	// contains filtered or unexported fields
}

StateChan provides a concurrency-safe channel for state machines, representing one state.

func NewStateChan added in v3.1.0

func NewStateChan() *StateChan

NewStateChan creates a new StateChan with a new struct channel and read-write mutex

func (*StateChan) Channel added in v3.1.0

func (sc *StateChan) Channel() chan struct{}

Get returns the channel wrapped by this StateChan struct in a concurrency-safe manner. Note: if you intend to wait on this channel, you will need to acquire a read lock on RWMutex() if you want to wait on the channel while a concurrent sc.leave() call may happen.

func (*StateChan) RWMutex added in v3.1.0

func (sc *StateChan) RWMutex() *sync.RWMutex

RWMutex returns the read-write mutex, so that callers can use it to prevent possible race conditions, if needed

func (*StateChan) Wait added in v3.1.0

func (sc *StateChan) Wait()

Wait blocks the calling thread until the state is reached (channel is closed)

type StateMachine

type StateMachine struct {
	// contains filtered or unexported fields
}

func NewConsumerStateMachine

func NewConsumerStateMachine() *StateMachine

func (*StateMachine) Get

func (sm *StateMachine) Get() State

Get returns the current state

func (*StateMachine) GetChan added in v3.1.0

func (sm *StateMachine) GetChan(s State) *StateChan

GetChan returns the StateChan pointer corresponding to the provided state

func (*StateMachine) Set

func (sm *StateMachine) Set(newState State)

Set sets the state machine to the provided state value

func (*StateMachine) SetIf

func (sm *StateMachine) SetIf(allowed []State, newState State) error

SetIf sets the state machine to the provided state value only if the current state is one of the values provided in the list

func (*StateMachine) String

func (sm *StateMachine) String() string

String returns the string representation of the current state

type TopicAuth

type TopicAuth struct {
	App        string
	Subnets    []string
	Topic      string
	Operations []sarama.AclOperation
	Hosts      []string
}

func (TopicAuth) GetAcls

func (t TopicAuth) GetAcls(domain string) Acls

type TopicAuthList

type TopicAuthList struct {
	Domain  string
	Brokers []string
	Acls    []TopicAuth
}

func (TopicAuthList) Apply

func (t TopicAuthList) Apply(adm sarama.ClusterAdmin) error

Directories

Path Synopsis
Package avro provides a user functionality to return the avro encoding of s.
Package avro provides a user functionality to return the avro encoding of s.
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL