kafka

package module
v3.0.0-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2021 License: MIT Imports: 18 Imported by: 32

README

dp-kafka

Kafka client wrapper using channels to abstract kafka consumers and producers. This library is built on top of Sarama

Configuration

By default, the library assumes plaintext connections, unless the configuration argument has a non-nil SecurityConfig field.

Setup app to use TLS

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed service that enables you to build and run applications that use Apache Kafka to process streaming data.

As of 2021, our apps have migrated from running our own Kafka to using Amazon MSK as it provides the control-plane operations, such as those for creating, updating, and deleting clusters and lets you use Apache Kafka data-plane operations, such as those for producing and consuming data.

To use TLS, please do the following:

1. Add kafka topics to manifest

First of all, we need to update the manifest of the app to contain the kafka topics which the app uses. This is done as follows:

  1. Create feature branch for the upcoming changes to the manifest of the app in dp-configs
  2. In the manifest of the app, add the revelant kafka topics at the end of the file as follows
kafka:
  topics:
    - name: `topic name 1` (e.g. `content-published`)
      subnets: [ `web` or `publishing` or both `web, publishing` ]
      access: [ `read` or `write` ]
    - name: `topic name 2` 
      subnets: [ `web` or `publishing` or both `web, publishing` ]
      access: [ `read` or `write` ]

More details of kafka section can be found here

An example of adding kafka topics to the manifest can be found here 3. Review and merge these changes to continue with the next step

2. Create client certificate for the app - Run key-admin script

Next, we need to create a client certificate for the app so that it can connect and authenticate using TLS and its client certificate. This can be achieved by running the key-admin script.

Notes:

  1. The key-admin script checks the manifests of the apps in dp-configs to see whether a client certificate needs to be created for the app (by checking whether any kafka topics are mentioned in the manifest). Therefore, please make sure that your local machine is on the master branch for dp-configs which contains all your changes from the previous step.
  2. Remember to do Step 4 of the Client Certificate README to inject the relevant certificate details into the app's secrets (using the --secrets argument) unless the app is being migrated to AWS MSK in which case this step is done later
  3. Please remember to come back to this README after completing this task to continue with the process.

Follow the steps explained in the Client Certificate README to run key-admin

3. Apply kafka topics to AWS MKS - Run topic-manager script

Next, we need to create the kafka topics - used by the app - on AWS MSK. This can be achieved by running the topic-manager script. Running the script informs AWS MSK of:

  • any new topics (topic manager creates them)
  • new clients/apps (i.e. certs) using the service:
    • topic manager authorises the clients/apps (i.e. certs) to gain the right access (read and/or write) to its topics
    • renewed certs do not need a re-run of topic manager

Notes:

  1. The topic-manager script checks the manifests of the apps in dp-configs to see if any changes to kafka topics (adding or deleting topics) need to be applied to AWS MSK by checking the kafka topics mentioned in the manifest. Therefore, please make sure that your local machine is on the master branch for dp-configs which contains all your changes from the previous step.
  2. Please remember to come back to this README after completing this task to continue with the process.

Follow the steps explained in the Kafka Setup Tools README to run topic-manager

4. Add configs in the app to use AWS MSK

Once the components (in previous steps) have been setup for the app to use TLS, the app itself needs to be updated to use TLS. To achieve this, please do the following:

  1. Update the app README.md to include kafka configurations to connect to AWS MSK using TLS

    | Environment variable         | Default                                   | Description
    | ---------------------------- | ----------------------------------------- | -----------
    | KAFKA_ADDR                   | localhost:9092                            | The kafka broker addresses (can be comma separated)
    | KAFKA_VERSION                | "1.0.2"                                   | The kafka version that this service expects to connect to
    | KAFKA_SEC_PROTO              | _unset_                                   | if set to `TLS`, kafka connections will use TLS [[1]](#notes_1)
    | KAFKA_SEC_CA_CERTS           | _unset_                                   | CA cert chain for the server cert [[1]](#notes_1)
    | KAFKA_SEC_CLIENT_KEY         | _unset_                                   | PEM for the client key [[1]](#notes_1)
    | KAFKA_SEC_CLIENT_CERT        | _unset_                                   | PEM for the client certificate [[1]](#notes_1)
    | KAFKA_SEC_SKIP_VERIFY        | false                                     | ignores server certificate issues if `true` [[1]](#notes_1)
    
    **Notes:**
    
        1. <a name="notes_1">For more info, see the [kafka TLS examples documentation](https://github.com/ONSdigital/dp-kafka/tree/main/examples#tls)</a>
    
  2. Add the configurations to config.go

    // KafkaTLSProtocolFlag informs service to use TLS protocol for kafka
    const KafkaTLSProtocolFlag = "TLS"
    
    type Config struct {
         // TO-REMOVE: this struct already contains other configs in the app but update this struct accordingly with the following organised layout
         KafkaConfig                KafkaConfig
     }
    
     // KafkaConfig contains the config required to connect to Kafka
     type KafkaConfig struct {
         Brokers                  []string `envconfig:"KAFKA_ADDR"                            json:"-"`
         Version                  string   `envconfig:"KAFKA_VERSION"`
         SecProtocol              string   `envconfig:"KAFKA_SEC_PROTO"`
         SecCACerts               string   `envconfig:"KAFKA_SEC_CA_CERTS"`
         SecClientKey             string   `envconfig:"KAFKA_SEC_CLIENT_KEY"                  json:"-"`
         SecClientCert            string   `envconfig:"KAFKA_SEC_CLIENT_CERT"`
         SecSkipVerify            bool     `envconfig:"KAFKA_SEC_SKIP_VERIFY"`
     }
    
    // TO-REMOVE: set the default values of the kafka configs in the Get() as the following
    KafkaConfig: KafkaConfig{
             Brokers:                  []string{"localhost:9092"},
             Version:                  "1.0.2",
             SecProtocol:              "",
             SecCACerts:               "",
             SecClientCert:            "",
             SecClientKey:             "",
             SecSkipVerify:            false,
     },
    
  3. Update the test to check config default values in config_test.go accordingly

    So(cfg.KafkaConfig.Brokers[0], ShouldResemble, []string{"localhost:9092"})
    So(cfg.KafkaConfig.Version, ShouldEqual, "1.0.2")
    So(cfg.KafkaConfig.SecProtocol, ShouldEqual, "")
    So(cfg.KafkaConfig.SecCACerts, ShouldEqual, "")
    So(cfg.KafkaConfig.SecClientCert, ShouldEqual, "")
    So(cfg.KafkaConfig.SecClientKey, ShouldEqual, "")
    So(cfg.KafkaConfig.SecSkipVerify, ShouldBeFalse)
    

    An example of adding kafka configs to the app can be found here

  4. Continue to the next step in the Creation of Kafka Producer and/or Consumer

Creation

Kafka producers and consumers can be created with constructors that accept the required configuration. You may get the channels struct using producer.Channels() and consumer.Channels() respectively.

For optional config values, the Sarama default config values will be used unless an override is provided. If a compulsory value is not provided, the config validation will fail, and the error will be returned by the constructor function.

The constructor tries to initialise the producer/consumer by creating the underlying Sarama client, but failing to do so will not result in an error: an uninitialised consumer/producer will be returned instead.

Producer creation

The producer constructor accepts a configuration. The configuration needs to provide at least Brokers and ProducedTopic. You may also provide overrides for default Sarama configs, like KafkaVersion or MaxMessageBytes, and/or a SecurityConfig as described in the previous section.

    // Create Producer with config
    pConfig := &kafka.ProducerConfig{
        BrokerAddrs:     kafkaConfig.Brokers,       // compulsory
        Topic:           kafkaConfig.ProducedTopic, // compulsory
        KafkaVersion:    &kafkaConfig.Version,
        MaxMessageBytes: &kafkaConfig.MaxBytes,
    }
    if kafkaConfig.SecProtocol == config.KafkaTLSProtocolFlag {
        pConfig.SecurityConfig = kafka.GetSecurityConfig(
            kafkaConfig.SecCACerts,
            kafkaConfig.SecClientCert,
            kafkaConfig.SecClientKey,
            kafkaConfig.SecSkipVerify,
        )
    }
    producer, err := kafka.NewProducer(ctx, pConfig)
ConsumerGroup creation

The consumerGroup constructor accepts a configuration. The configuration needs to provide at least Brokers, Topic and GroupName. You may also provide overrides for default Sarama configs, like KafkaVersion or MaxMessageBytes, and/or a SecurityConfig as described in the previous section.

For concurrent consumers, you can override NumWorkers config (default is 1), which determine the number of go-routines that will handle messages.

For batch consumers, you can override BatchSize (default is 1) and BatchWaitTime (default is 200 ms), which determine the maximum number of kafka messages stored in a batch, and the maximum time to wait until the batch is processed, respectively, and/or a SecurityConfig as described in the previous section.

The constructor will create an Upstream channel with a buffer size that is the maximum between BatchSize and NumWorkers

    // Create ConsumerGroup with config
    cgConfig := &kafka.ConsumerGroupConfig{
        BrokerAddrs:  kafkaConfig.Brokers,       // compulsory
        Topic:        kafkaConfig.ConsumedTopic, // compulsory
        GroupName:    kafkaConfig.ConsumedGroup, // compulsory
        KafkaVersion: &kafkaConfig.Version,
        NumWorkers:   &kafkaConfig.KafkaParallelMessages,
    }
    if kafkaConfig.SecProtocol == config.KafkaTLSProtocolFlag {
        cgConfig.SecurityConfig = kafka.GetSecurityConfig(
            kafkaConfig.SecCACerts,
            kafkaConfig.SecClientCert,
            kafkaConfig.SecClientKey,
            kafkaConfig.SecSkipVerify,
        )
    }
    cg, err := kafka.NewConsumerGroup(ctx, cgConfig)
    cgConfig := &kafka.ConsumerGroupConfig{
        BrokerAddrs:  cfg.Brokers,       // compulsory
        Topic:        cfg.ConsumedTopic, // compulsory
        GroupName:    cfg.ConsumedGroup, // compulsory
        KafkaVersion: &cfg.KafkaVersion,
        NumWorkers:   &cfg.KafkaParallelMessages,
    }
    if kafkaConfig.SecProtocol == config.KafkaTLSProtocolFlag {
        cgConfig.SecurityConfig = kafka.GetSecurityConfig(
            kafkaConfig.SecCACerts,
            kafkaConfig.SecClientCert,
            kafkaConfig.SecClientKey,
            kafkaConfig.SecSkipVerify,
        )
    }
    cg, err := consumer.New(ctx, cgConfig)

Life-cycle

ConsumerStateMachine

Initialisation

If the producer/consumer can establish a connection with the Kafka cluster, it will be initialised at creation time, which is usually the case. But it might not be able to do so, for example if the kafka cluster is not running. If a producer/consumer is not initialised, it cannot contact the kafka broker, and it cannot send or receive any message. Any attempt to send a message in this state will result in an error being sent to the Errors channel.

An uninitialised producer/consumer will try to initialise later, asynchronously, in a retry loop following an exponential backoff strategy. You may also try to initialise it calling Initialise(). In any case, when the initialisation succeeds, the initialisation loop will exit, and the producer/consumer will go to its next state.

You can check if a producer/consumer is initialised by calling IsInitialised() or wait for it to be initialised by waiting for the 'Initialised' channel to be closed, like so:

    // wait in a parallel go-routine
    go func() {
        <-channels.Initialised
        doStuff()
    }()
    // block until kafka is initialised
    <-channels.Initialised
    doStuff()

Waiting for this channel is a convenient hook, but not a necessary requirement.

Note that initialised consumers will be in 'Stopped' state until Start is called.

Message production

Messages are sent to Kafka by sending them to a producer Output channel, as byte arrays:

    producer.Channels().Output <- []byte(msg)

You may obtain the byte array from Marshaling an event using the avro Schema.

Message consumption using handlers

This is the recommended approach.

You can register one handler in order to Consume messages in a managed way. There are two types of handlers available: single message and batch, but only one handler can be registered per consumer.

Single message

You may register a single-message handler like so:

    // Create Handler struct with all required dependencies
    type Handler struct {}

    // Implement the Handler func
    func (h *Handler) Handle(ctx context.Context, workerID int, msg kafka.Message) error {
        log.Info(ctx, "message processed", log.Data{"worker_id": workerID})
        return nil
    }

    ...

    // register the handler to your kafka consumer
    consumer.RegisterHandler(ctx, handler.Handle)

By default, messages are consumed sequentially, but you can provide a NumWorkers value greater than one to consume messages concurrently. One go-routine will be create for each worker.

Note that the number of concurrent messages that will be consumed is the number of partitions assigned to this consumer group. If NumWorkers is greater, the number of concurrent messages will be capped to the number of partitions.

Batch of messages

You may register a batch-message handler like so:

    // Create Handler struct with all required dependencies
    type Handler struct {}

    // Implement the Handler func
    func (h *Handler) Handle(ctx context.Context, batch []kafka.Message) error {
        log.Info(ctx, "batch processed", log.Data{"size": len(batch)})
        return nil
    }

    // register the batch handler to your kafka consumer
    svc.consumer.RegisterBatchHandler(ctx, handler.Handle)

Incoming messages will be accumulated until a total of BatchSize messages, or after a period of BatchWaitTime has elapsed. Then the Handler will be called with the batch of messages. No messages will be committed until the handler finishes its execution. Batch handling is sequential: NumWorkers is ignored.

Message consumption using Upstream channel directly

This approach is NOT recommended. Only to be used if you need to implement your own message processing strategy which is different than the default handlers.

Messages can be consumed by creating an infinite consumption loop. Once you are ready to consume another message, you need to call Release(), and once a message has finished being processed you need to call Commit(). You may call CommitAndRelease() to do both at the same time:

// consumer loop
func consume(upstream chan kafka.Message) {
    for {
        msg := <-upstream
        doStuff(msg)
        msg.CommitAndRelease()
    }
}

You may create a single go-routine to consume messages sequentially, or multiple parallel go-routines (workers) to consume them concurrently:

    // single consume go-routine
    go consume(channels.Upstream)
    // multiple workers to consume messages in parallel
    for w := 1; w <= kafkaConfig.ParallelMessages; w++ {
        go consume(channels.Upstream)
    }

You can consume up to as may messages in parallel as partitions are assigned to your consumer, more info in the deep dive section.

Message consumption - Sarama Kafka sessions

KafkaConcurrency

Sarama creates as many go-routines as partitions are assigned to the consumer, for the topic being consumed.

For example, if we have a topic with 60 partitions and we have 2 instances of a service that consumes that topic running at the same time, kafka will assign 30 partitions to each one.

Then Sarama will create 30 parallel go-routines, which this library uses in order to send messages to the upstream channel. Each go-routine waits for the message to finish being processed by waiting for the message-specific upstreamDone channel to be closed, like so:

    channels.Upstream <- msg
    <-msg.upstreamDone

The consumer can consume messages from the Upstream channel in parallel, up to the maximum number of partitions that Sarama allocated to the consumer. In the example above, that would be a maximum of 30 messages in parallel.

Each Sarama consumption go routine exists only during a particular session. Sessions are periodically destroyed and created by Sarama, according to Kafka events like a cluster re-balance (where the number of partitions assigned to a consumer may change). It is important that messages are released as soon as possible when this happens. The default message consumption timeout is 10 seconds in this scenario (determined by config.Consumer.Group.Session.Timeout).

When a session finishes, we call Consume() again, which tries to establish a new session. If an error occurs trying to establish a new session, it will be retried following an exponential backoff strategy.

Start/Stop consumer

A consumer can be stopped by calling Stop():

  • If the consumer was not initialised, this will set the initial state to 'stopped', so the consumer will do nothing until it is started.
  • If the consumer is Satarting/Consuming, the sarama handler will be notified that it needs to stop: it will stop processing any in-flight message, it will not consume any new message and it will terminate the session.

Stop() is asynchronous: if the consumer is consuming, it will return as soon as the control go-routine is notified, without waiting for it to finish. If the caller needs to block until the consumer has completely stopped consuming, then you can call StopAndWait(), which is synchronous.

A consumer can be started by calling Start():

  • If the consumer was not initialised, this will set the initial state to 'starting', so the consumer will start consuming straight away after being initialised.
  • If the consumer is Stopping/Stopped, the idle loop will be notified that it needs to start consuming.

Closing

Producers can be closed by calling the Close method.

For graceful handling of Closing consumers, it is advised to use the StopAndWait() method prior to the Close method. This will allow inflight messages to be completed and successfully call commit so that the message does not get replayed once the application restarts.

The Closer channel is used to signal to all the loops that they need to exit because the consumer is being closed.

After successfully closing a producer or consumer, the corresponding Closed channel is closed.

Health-check

The health status of a consumer or producer can be obtained by calling Checker method, which updates the provided CheckState structure with the relevant information:

check, err = cli.Checker(ctx)
  • If a broker cannot be reached, the Status is set to CRITICAL.
  • If all brokers can be reached, but a broker does not provide the expected topic metadata, the Status is set to WARNING.
  • If all brokers can be reached and return the expected topic metadata, we try to initialise the consumer/producer. If it was already initialised, or the initialisation is successful, the Status is set to OK.

Health subscription

The consumer group implements dp-healthcheck's Subscriber interface. You can subscribe the kafka consumer to a set of dp-healthcheck Checks. Then the consumer will start consuming when 'Ok' status is reported and it will stop when 'WARNING' or 'CRITICAL' statuses are reported.

Assuming you have a healthcheck hc, you can subscribe a kafka consumer like so:

    hc.Subscribe(consumer, check1, check2, check3)

Warnings:

  • A kafka consumer may be subscribed to multiple checkers, but it must be subscribed to only one healthcheck library instance.

  • Once the consumer is subscribed, calls to Start/Stop must not be manually performed, as they would interfere with the managed calls on health change events.

Examples

See the examples below for some typical usages of this library.

Testing

Some mocks are provided, so that you can test your code interactions with this library. More details here.

Documentation

Index

Constants

View Source
const (
	Errors       = "Errors"
	Initialised  = "Initialised"
	Consume      = "Consume"
	Closer       = "Closer"
	Closed       = "Closed"
	Upstream     = "Upstream"
	UpstreamDone = "UpstreamDone"
	Output       = "Output"
)

channel names

View Source
const (
	OffsetNewest = sarama.OffsetNewest
	OffsetOldest = sarama.OffsetOldest
)

Common constants

View Source
const (
	MsgHealthyProducer      = "kafka producer is healthy"
	MsgHealthyConsumerGroup = "kafka consumer group is healthy"
)
View Source
const ServiceName = "Kafka"

ServiceName is the name of this service: Kafka.

Variables

View Source
var ConsumeErrRetryPeriod = 250 * time.Millisecond

ConsumeErrRetryPeriod is the initial time period between consumer retries on error (for consumer groups)

View Source
var ErrTLSCannotLoadCACerts = errors.New("cannot load CA Certs")

ErrTLSCannotLoadCACerts is returned when the certs file cannot be loaded

View Source
var InitRetryPeriod = 250 * time.Millisecond

InitRetryPeriod is the initial time period between initialisation retries (for producers and consumer gropus)

View Source
var MaxRetryInterval = 31 * time.Second

MaxRetryInterval is the maximum time between retries (plus or minus a random amount)

Functions

func GetPrincipal

func GetPrincipal(app, subnet, domain string) string

func GetRetryTime

func GetRetryTime(attempt int, retryTime time.Duration) time.Duration

GetRetryTime will return a duration based on the attempt and initial retry time. It uses the algorithm `2^n` where `n` is the attempt number (double the previous) plus a randomization factor of ±25% of the initial retry time (so that the server isn't being hit at the same time by many clients)

func Healthcheck

func Healthcheck(ctx context.Context, brokers []SaramaBroker, topic string, cfg *sarama.Config) error

Healthcheck implements the common healthcheck logic for kafka producers and consumers, by contacting the provided brokers and asking for topic metadata. Possible errors: - ErrBrokersNotReachable if a broker cannot be contacted. - ErrInvalidBrokers if topic metadata is not returned by a broker.

func NewAdmin

func NewAdmin(brokerAddrs []string, adminCfg *AdminConfig) (sarama.ClusterAdmin, error)

NewAdmin creates an admin-based client

func SetMaxMessageSize

func SetMaxMessageSize(maxSize int32)

SetMaxMessageSize sets the Sarama MaxRequestSize and MaxResponseSize values to the provided maxSize

func SetMaxRetryInterval

func SetMaxRetryInterval(maxPause time.Duration)

SetMaxRetryInterval sets MaxRetryInterval to its duration argument

func UnwrapLogData

func UnwrapLogData(err error) log.Data

UnwrapLogData recursively unwraps logData from an error

func WaitWithTimeout

func WaitWithTimeout(ctx context.Context, wg *sync.WaitGroup) bool

WaitWithTimeout blocks until all go-routines tracked by a WaitGroup are done, or until the timeout defined in a context expires. It returns true only if the context timeout expired

Types

type Acls

type Acls []*sarama.AclCreation

func (Acls) Apply

func (t Acls) Apply(adm sarama.ClusterAdmin) error

type AdminConfig

type AdminConfig struct {
	KafkaVersion   *string
	KeepAlive      *time.Duration
	RetryBackoff   *time.Duration
	RetryMax       *int
	SecurityConfig *SecurityConfig
}

AdminConfig exposes the optional configurable parameters for an admin client to overwrite default Sarama config values. Any value that is not provied will use the default Sarama config value.

func (*AdminConfig) Get

func (a *AdminConfig) Get() (*sarama.Config, error)

Get creates a default sarama config and overwrites any values provided in pConfig

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch handles adding raw messages to a batch of ObservationExtracted events.

func NewBatch

func NewBatch(batchSize int) *Batch

NewBatch returns a new batch instance of the given size.

func (*Batch) Add

func (batch *Batch) Add(ctx context.Context, message Message)

Add a message to the batch.

func (*Batch) Clear

func (batch *Batch) Clear()

Clear will reset to batch to contain no messages

func (*Batch) Commit

func (batch *Batch) Commit()

Commit is called when the batch has been processed. The last message has been released already, so at this point we just need to commit

func (*Batch) IsEmpty

func (batch *Batch) IsEmpty() bool

IsEmpty returns true if the batch has no messages in it.

func (*Batch) IsFull

func (batch *Batch) IsFull() bool

IsFull returns true if the batch is full based on the configured maxSize.

func (*Batch) Size

func (batch *Batch) Size() int

Size returns the number of messages currently in the batch.

type BatchHandler

type BatchHandler func(ctx context.Context, batch []Message) error

BatchHandler represents a handler for processing a batch of kafka messages. This method will be called by only one go-routine at a time.

type Commiter

type Commiter interface {
	Commit() bool
}

Commiter represents an error type that defines a bool method to enable or disable a message/batch to be committed.

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

ConsumerGroup is a Kafka consumer group instance.

func NewConsumerGroup

func NewConsumerGroup(ctx context.Context, cgConfig *ConsumerGroupConfig) (*ConsumerGroup, error)

NewConsumerGroup creates a new consumer group with the provided parameters

func (*ConsumerGroup) Channels

func (cg *ConsumerGroup) Channels() *ConsumerGroupChannels

Channels returns the ConsumerGroup channels for this consumer group

func (*ConsumerGroup) Checker

func (cg *ConsumerGroup) Checker(ctx context.Context, state *healthcheck.CheckState) error

Checker checks health of Kafka consumer-group and updates the provided CheckState accordingly

func (*ConsumerGroup) Close

func (cg *ConsumerGroup) Close(ctx context.Context) (err error)

Close safely closes the consumer and releases all resources. pass in a context with a timeout or deadline. Passing a nil context will provide no timeout but is not recommended

func (*ConsumerGroup) Initialise

func (cg *ConsumerGroup) Initialise(ctx context.Context) error

Initialise creates a new Sarama ConsumerGroup and the consumer/error loops, only if it was not already initialised.

func (*ConsumerGroup) IsInitialised

func (cg *ConsumerGroup) IsInitialised() bool

IsInitialised returns true only if Sarama ConsumerGroup has been correctly initialised.

func (*ConsumerGroup) LogErrors

func (cg *ConsumerGroup) LogErrors(ctx context.Context)

LogErrors creates a go-routine that waits on Errors channel and logs any error received. It exits on Closer channel closed.

func (*ConsumerGroup) OnHealthUpdate

func (cg *ConsumerGroup) OnHealthUpdate(status string)

OnHealthUpdate implements the healthcheck Subscriber interface so that the kafka consumer can be notified of state changes. This method is intended to be used for managed start/stop's only, and should not be called manually. WARNING: Having more than one notifier calling this method will result in unexpected behavior. - On Health status OK: start consuming - On Warning or Critical: stop consuming

func (*ConsumerGroup) RegisterBatchHandler

func (cg *ConsumerGroup) RegisterBatchHandler(ctx context.Context, batchHandler BatchHandler) error

func (*ConsumerGroup) RegisterHandler

func (cg *ConsumerGroup) RegisterHandler(ctx context.Context, h Handler) error

func (*ConsumerGroup) Start

func (cg *ConsumerGroup) Start() error

Start has different effects depending on the state: - Initialising: the consumer will try to start consuming straight away once it's initialised - Starting/Consumer: no change will happen - Stopping/Stopped: the consumer will start consuming - Closing: an error will be returned

func (*ConsumerGroup) State

func (cg *ConsumerGroup) State() string

State returns the state of the consumer group

func (*ConsumerGroup) Stop

func (cg *ConsumerGroup) Stop()

Stop has different effects depending on the state: - Initialising: the consumer will remain in the Stopped state once initialised, without consuming messages - Starting/Consumer: no change will happen - Stopping/Stopped: the consumer will start start consuming - Closing: an error will be returned This method does not wait until the consumerGroup reaches the stopped state, it only triggers the stopping action.

func (*ConsumerGroup) StopAndWait

func (cg *ConsumerGroup) StopAndWait()

StopAndWait has different effects depending on the state: - Initialising: the consumer will remain in the Stopped state once initialised, without consuming messages - Starting/Consumer: no change will happen - Stopping/Stopped: the consumer will start start consuming - Closing: an error will be returned This method waits until the consumerGroup reaches the stopped state if it was starting/consuming.

type ConsumerGroupChannels

type ConsumerGroupChannels struct {
	Upstream    chan Message
	Errors      chan error
	Initialised chan struct{}
	Consume     chan bool
	Closer      chan struct{}
	Closed      chan struct{}
}

ConsumerGroupChannels represents the channels used by ConsumerGroup.

func CreateConsumerGroupChannels

func CreateConsumerGroupChannels(bufferSize int) *ConsumerGroupChannels

CreateConsumerGroupChannels initialises a ConsumerGroupChannels with new channels. You can provide the buffer size to determine the number of messages that will be buffered in the upstream channel (to receive messages)

func (*ConsumerGroupChannels) Validate

func (consumerChannels *ConsumerGroupChannels) Validate() error

Validate returns an Error with a list of missing channels if any consumer channel is nil

type ConsumerGroupConfig

type ConsumerGroupConfig struct {
	// Sarama config overrides
	KafkaVersion          *string
	KeepAlive             *time.Duration
	RetryBackoff          *time.Duration
	RetryBackoffFunc      *func(retries int) time.Duration
	Offset                *int64
	SecurityConfig        *SecurityConfig
	MessageConsumeTimeout *time.Duration

	// dp-kafka specific config overrides
	NumWorkers    *int
	BatchSize     *int
	BatchWaitTime *time.Duration
	Topic         string
	GroupName     string
	BrokerAddrs   []string
}

ConsumerGroupConfig exposes the configurable parameters for a consumer group to overwrite default config values and any other defult config values set by dp-kafka. Any value that is not provied will use the default Sarama config value, or the default dp-kafka value. The only 3 compulsory values are: - Topic - GroupName - BrokerAddrs

func (*ConsumerGroupConfig) Get

func (c *ConsumerGroupConfig) Get() (*sarama.Config, error)

Get creates a default sarama config for a consumer-group and overwrites any values provided in cgConfig. If any required value is not provided or any override is invalid, an error will be returned

func (*ConsumerGroupConfig) Validate

func (c *ConsumerGroupConfig) Validate() error

Validate that compulsory values are provided in config

type ErrBrokersNotReachable

type ErrBrokersNotReachable struct {
	Addrs []string
}

ErrBrokersNotReachable is an Error type for 'Broker Not reachable' with a list of unreachable addresses

func (*ErrBrokersNotReachable) Error

func (e *ErrBrokersNotReachable) Error() string

Error returns the error message with a list of unreachable addresses

type ErrInvalidBrokers

type ErrInvalidBrokers struct {
	Addrs []string
}

ErrInvalidBrokers is an Error type for 'Invalid topic info' with a list of invalid broker addresses

func (*ErrInvalidBrokers) Error

func (e *ErrInvalidBrokers) Error() string

Error returns the error message with a list of broker addresses that returned unexpected responses

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error is the handler package's error type. Is not meant to be compared as a a type, but information should be extracted via the interfaces it implements with callback functions. Is not guaranteed to remain exported so shouldn't be treated as such.

func NewError

func NewError(err error, logData map[string]interface{}) *Error

NewError creates a new Error

func (*Error) Error

func (e *Error) Error() string

Error implements the Go standard error interface

func (*Error) LogData

func (e *Error) LogData() map[string]interface{}

LogData implements the DataLogger interface which allows you extract embedded log.Data from an error

func (*Error) Unwrap

func (e *Error) Unwrap() error

Unwrap returns the wrapped error

type Handler

type Handler func(ctx context.Context, workerID int, msg Message) error

Handler represents a handler for processing a single kafka message. IMPORTANT: if maxConsumers > 1 then this method needs to be thread safe.

type IConsumerGroup

type IConsumerGroup interface {
	Channels() *ConsumerGroupChannels
	State() string
	RegisterHandler(ctx context.Context, h Handler) error
	RegisterBatchHandler(ctx context.Context, batchHandler BatchHandler) error
	Checker(ctx context.Context, state *healthcheck.CheckState) error
	IsInitialised() bool
	Initialise(ctx context.Context) error
	OnHealthUpdate(status string)
	Start() error
	Stop()
	StopAndWait()
	LogErrors(ctx context.Context)
	Close(ctx context.Context) error
}

IConsumerGroup is an interface representing a Kafka Consumer Group, as implemented in dp-kafka/consumer

type IProducer

type IProducer interface {
	Channels() *ProducerChannels
	Checker(ctx context.Context, state *healthcheck.CheckState) error
	LogErrors(ctx context.Context)
	IsInitialised() bool
	Initialise(ctx context.Context) error
	Send(schema *avro.Schema, event interface{}) error
	Close(ctx context.Context) (err error)
}

IProducer is an interface representing a Kafka Producer, as implemented in dp-kafka/producer

type Message

type Message interface {

	// GetData returns the message contents.
	GetData() []byte

	// Mark marks the message as consumed, but doesn't commit the offset to the backend
	Mark()

	// Commit marks the message as consumed and commits its offset to the backend
	Commit()

	// Release closes the UpstreamDone channel for this message
	Release()

	// CommitAndRelease marks a message as consumed, commits it and closes the UpstreamDone channel
	CommitAndRelease()

	// Offset returns the message offset
	Offset() int64

	// UpstreamDone returns the upstreamDone channel. Closing this channel notifies that the message has been consumed
	UpstreamDone() chan struct{}
}

Message represents a single kafka message.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer is a producer of Kafka messages

func NewProducer

func NewProducer(ctx context.Context, pConfig *ProducerConfig) (producer *Producer, err error)

New returns a new producer instance using the provided config and channels. The rest of the config is set to defaults. If any channel parameter is nil, an error will be returned.

func (*Producer) Channels

func (p *Producer) Channels() *ProducerChannels

Channels returns the Producer channels for this producer

func (*Producer) Checker

func (p *Producer) Checker(ctx context.Context, state *healthcheck.CheckState) error

Checker checks health of Kafka producer and updates the provided CheckState accordingly

func (*Producer) Close

func (p *Producer) Close(ctx context.Context) (err error)

Close safely closes the producer and releases all resources. pass in a context with a timeout or deadline. Passing a nil context will provide no timeout and this is not recommended

func (*Producer) Initialise

func (p *Producer) Initialise(ctx context.Context) error

Initialise creates a new Sarama AsyncProducer and the channel redirection, only if it was not already initialised.

func (*Producer) IsInitialised

func (p *Producer) IsInitialised() bool

IsInitialised returns true only if Sarama producer has been correctly initialised.

func (*Producer) LogErrors

func (p *Producer) LogErrors(ctx context.Context)

LogErrors creates a go-routine that waits on Errors channel and logs any error received. It exits on Closer channel closed.

func (*Producer) Send

func (p *Producer) Send(schema *avro.Schema, event interface{}) error

Send marshals the provided event with the provided schema, and sends it to kafka

type ProducerChannels

type ProducerChannels struct {
	Output      chan []byte
	Errors      chan error
	Initialised chan struct{}
	Closer      chan struct{}
	Closed      chan struct{}
}

ProducerChannels represents the channels used by Producer.

func CreateProducerChannels

func CreateProducerChannels() *ProducerChannels

CreateProducerChannels initialises a ProducerChannels with new channels.

func (*ProducerChannels) Validate

func (producerChannels *ProducerChannels) Validate() error

Validate returns an error with a list of missing channels if any producer channel is nil

type ProducerConfig

type ProducerConfig struct {
	// Sarama config overrides
	KafkaVersion     *string
	MaxMessageBytes  *int
	RetryMax         *int
	KeepAlive        *time.Duration
	RetryBackoff     *time.Duration
	RetryBackoffFunc *func(retries, maxRetries int) time.Duration
	SecurityConfig   *SecurityConfig

	// dp-kafka specific config
	Topic       string
	BrokerAddrs []string
}

ProducerConfig exposes the optional configurable parameters for a producer to overwrite default Sarama config values. Any value that is not provied will use the default Sarama config value.

func (*ProducerConfig) Get

func (p *ProducerConfig) Get() (*sarama.Config, error)

Get creates a default sarama config and overwrites any values provided in pConfig

func (*ProducerConfig) Validate

func (p *ProducerConfig) Validate() error

Validate that compulsory values are provided in config

type SaramaAsyncProducer

type SaramaAsyncProducer = sarama.AsyncProducer

SaramaAsyncProducer is a wrapper around sarama.AsyncProducer

type SaramaBroker

type SaramaBroker interface {
	Addr() string
	Connected() (bool, error)
	Open(conf *sarama.Config) error
	GetMetadata(request *sarama.MetadataRequest) (*sarama.MetadataResponse, error)
	Close() error
}

type SaramaConsumerGroup

type SaramaConsumerGroup = sarama.ConsumerGroup

SaramaConsumerGroup is a wrapper around sarama.ConsumerGroup

type SaramaConsumerGroupClaim

type SaramaConsumerGroupClaim = sarama.ConsumerGroupClaim

SaramaConsumerGroupClaim is a wrapper around sarama.ConsumerGroupClaim

type SaramaConsumerGroupSession

type SaramaConsumerGroupSession = sarama.ConsumerGroupSession

SaramaConsumerGroupSession is a wrapper around sarama.ConsumerGroupSession

type SaramaMessage

type SaramaMessage struct {
	// contains filtered or unexported fields
}

SaramaMessage represents a Sarama specific Kafka message

func NewSaramaMessage

func NewSaramaMessage(m *sarama.ConsumerMessage, s sarama.ConsumerGroupSession, ud chan struct{}) *SaramaMessage

func (SaramaMessage) Commit

func (M SaramaMessage) Commit()

Commit marks the message as consumed, and then commits the offset to the backend

func (SaramaMessage) CommitAndRelease

func (M SaramaMessage) CommitAndRelease()

CommitAndRelease marks the message as consumed, commits the offset to the backend and releases the UpstreamDone channel

func (SaramaMessage) GetData

func (M SaramaMessage) GetData() []byte

GetData returns the message contents.

func (SaramaMessage) Mark

func (M SaramaMessage) Mark()

Mark marks the message as consumed, but doesn't commit the offset to the backend

func (SaramaMessage) Offset

func (M SaramaMessage) Offset() int64

Offset returns the message offset

func (SaramaMessage) Release

func (M SaramaMessage) Release()

Release closes the UpstreamDone channel, but doesn't mark the message or commit the offset

func (SaramaMessage) UpstreamDone

func (M SaramaMessage) UpstreamDone() chan struct{}

UpstreamDone returns the upstreamDone channel. Closing this channel notifies that the message has been consumed (same effect as calling Release)

type SecurityConfig

type SecurityConfig struct {
	RootCACerts        string
	ClientCert         string
	ClientKey          string
	InsecureSkipVerify bool
}

SecurityConfig is common to producers and consumer configs, above

func GetSecurityConfig

func GetSecurityConfig(caCerts, clientCert, clientKey string, skipVerify bool) *SecurityConfig

type State

type State int
const (
	Initialising State = iota
	Stopped
	Starting
	Consuming
	Stopping
	Closing
)

func (State) String

func (s State) String() string

type StateMachine

type StateMachine struct {
	// contains filtered or unexported fields
}

func NewConsumerStateMachine

func NewConsumerStateMachine(st State) *StateMachine

func (*StateMachine) Get

func (sm *StateMachine) Get() State

Get returns the current state

func (*StateMachine) Set

func (sm *StateMachine) Set(newState State)

Set sets the state machine to the provided state value

func (*StateMachine) SetIf

func (sm *StateMachine) SetIf(allowed []State, newState State) error

SetIf sets the state machine to the provided state value only if the current state is one of the values provided in the list

func (*StateMachine) String

func (sm *StateMachine) String() string

String returns the string representation of the current state

type TopicAuth

type TopicAuth struct {
	App        string
	Subnets    []string
	Topic      string
	Operations []sarama.AclOperation
	Hosts      []string
}

func (TopicAuth) GetAcls

func (t TopicAuth) GetAcls(domain string) Acls

type TopicAuthList

type TopicAuthList struct {
	Domain  string
	Brokers []string
	Acls    []TopicAuth
}

func (TopicAuthList) Apply

func (t TopicAuthList) Apply(adm sarama.ClusterAdmin) error

Directories

Path Synopsis
Package avro provides a user functionality to return the avro encoding of s.
Package avro provides a user functionality to return the avro encoding of s.
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL