constructor-kafka - Constructor wrapper for sarama consumers and producers
Quick Start
package main
import (
"context"
"github.com/Shopify/sarama"
"github.com/stackopsd/config"
"github.com/stackopsd/constructor"
ckafka "github.com/stackopsd/constructor-kafka"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
syncProducerConst := ckafka.NewSyncProducerConstructor()
asyncProducerConst := ckafka.NewAsyncProducerConstructor()
consumerGroupConst := ckafka.NewConsumerGroupConstructor()
syncProducer := new(sarama.SyncProducer)
asyncProducer := new(sarama.AsyncProducer)
consumerGroup := new(sarama.ConsumerGroup)
// Create a loader from a source of configuration data.
syncProducerLoader := config.NewFileLoader("/path/to/some/config.json_or_yaml")
asyncProducerLoader := config.NewFileLoader("/path/to/some/config.json_or_yaml")
consumerGroupLoader := config.NewFileLoader("/path/to/some/config.json_or_yaml")
if err := constructor.New(ctx, syncProducerLoader, &syncProducerConst, syncProducer); err != nil {
panic(err)
}
if err := constructor.New(ctx, asyncProducerLoader, &asyncProducerConst, asyncProducer); err != nil {
panic(err)
}
if err := constructor.New(ctx, consumerGroupLoader, &consumerGroupConst, consumerGroup); err != nil {
panic(err)
}
// Publish and consume messages using the sarama clients.
}
Example Configuration
Note that more configuration options are expected over time. For now this
contains a fairly minimal subset of options required to get a producer or
consumer running. Extended options will need to be added in the future before
production use. For example, there is no current way to install a metrics
collector or fine tune the byte limits of consumer operations.
If loading from a file, the following represents all the possible options for
a sync producer:
client_id: "myService"
channel_buffer_size: 256 # Size of success/error channel buffers
version: "2.3.0"
addresses: # Addresses of initial brokers
- kafka:9092
- kafka2:9092
partitioner: "hashing" # One of random, hashing, roundrobin, or manual
compression: "none" # One of gzip, lz4, none, snappy, or zstd
compression_level: -1000 # Compression strategy specific seting.
required_acks: "local" # One of none, local, all.
ack_timeout: "10s" # Max time to wait when require_acks is all.
max_message_bytes: 1000000
net:
max_open_requests: 5 # Number of requests on a connection before blocking.
dial_timeout: "30s" # How long to wait for the initial connection.
read_timeout: "30s" # How long to wait for a response.
write_timeout: "30s" # How long to wait for a transmit.
keep_alive: "0s" # Network keep-alive time. 0 for disabled.
tls_enabled: true # Optional TLS support
tls: # All TLS options provided by https://github.com/stackopsd/constructor-tls
insecure_skip_verify: true
server_name: "my-server"
client_auth: "require_and_verify_client_cert"
rand:
choice: "crypto" # Can be zero if needed for testing.
certificates: # Set of certificates to present when authenticating.
bytes:
- cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
key: "<FULL PEM ENCODED KEY AS A STRING>"
- cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
key: "<FULL PEM ENCODED KEY AS A STRING>"
files:
- cert: "/path/to/pem/encoded/cert"
key: "/path/to/pem/encoded/key"
- cert: "/path/to/pem/encoded/cert"
key: "/path/to/pem/encoded/key"
root_cas: # Set of root authorities to use when verifying a server certificate.
bytes:
- cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
key: "<FULL PEM ENCODED KEY AS A STRING>"
- cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
key: "<FULL PEM ENCODED KEY AS A STRING>"
files:
- cert: "/path/to/pem/encoded/cert"
key: "/path/to/pem/encoded/key"
- cert: "/path/to/pem/encoded/cert"
key: "/path/to/pem/encoded/key"
client_cas: # Set of root authorities to use when verifying a client certificate.
bytes:
- cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
key: "<FULL PEM ENCODED KEY AS A STRING>"
- cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
key: "<FULL PEM ENCODED KEY AS A STRING>"
files:
- cert: "/path/to/pem/encoded/cert"
key: "/path/to/pem/encoded/key"
- cert: "/path/to/pem/encoded/cert"
key: "/path/to/pem/encoded/key"
The following are all options for the async producer:
client_id: "myService"
channel_buffer_size: 256 # Size of success/error channel buffers
version: "2.3.0"
addresses: # Addresses of initial brokers
- kafka:9092
- kafka2:9092
partitioner: "hashing" # One of random, hashing, roundrobin, or manual
compression: "none" # One of gzip, lz4, none, snappy, or zstd
compression_level: -1000 # Compression strategy specific seting.
required_acks: "local" # One of none, local, all.
ack_timeout: "10s" # Max time to wait when require_acks is all.
max_message_bytes: 1000000
return_sucesses: false # Enable the success channel on the producer.
return_errors: false # Enable the error channel on the producer.
net:
max_open_requests: 5 # Number of requests on a connection before blocking.
dial_timeout: "30s" # How long to wait for the initial connection.
read_timeout: "30s" # How long to wait for a response.
write_timeout: "30s" # How long to wait for a transmit.
keep_alive: "0s" # Network keep-alive time. 0 for disabled.
tls_enabled: true # Optional TLS support
tls: # All TLS options provided by https://github.com/stackopsd/constructor-tls
insecure_skip_verify: true
server_name: "my-server"
client_auth: "require_and_verify_client_cert"
rand:
choice: "crypto" # Can be zero if needed for testing.
certificates: # Set of certificates to present when authenticating.
bytes:
- cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
key: "<FULL PEM ENCODED KEY AS A STRING>"
- cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
key: "<FULL PEM ENCODED KEY AS A STRING>"
files:
- cert: "/path/to/pem/encoded/cert"
key: "/path/to/pem/encoded/key"
- cert: "/path/to/pem/encoded/cert"
key: "/path/to/pem/encoded/key"
root_cas: # Set of root authorities to use when verifying a server certificate.
bytes:
- cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
key: "<FULL PEM ENCODED KEY AS A STRING>"
- cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
key: "<FULL PEM ENCODED KEY AS A STRING>"
files:
- cert: "/path/to/pem/encoded/cert"
key: "/path/to/pem/encoded/key"
- cert: "/path/to/pem/encoded/cert"
key: "/path/to/pem/encoded/key"
client_cas: # Set of root authorities to use when verifying a client certificate.
bytes:
- cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
key: "<FULL PEM ENCODED KEY AS A STRING>"
- cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
key: "<FULL PEM ENCODED KEY AS A STRING>"
files:
- cert: "/path/to/pem/encoded/cert"
key: "/path/to/pem/encoded/key"
- cert: "/path/to/pem/encoded/cert"
key: "/path/to/pem/encoded/key"
and the following are the options for the consumer group:
client_id: "myService"
channel_buffer_size: 256 # Size of success/error channel buffers
version: "2.3.0"
addresses: # Addresses of initial brokers
- kafka:9092
- kafka2:9092
session_timeout: "10s" # Time in which the broker must receive a heartbeat.
heartbeat_interval: "3s" # Frequency of sending heartbeats to the broker.
rebalance_strategy: "range": # One of range, roundrobin, or sticky
rebalance_timeout: "60s" # Time allowed for the node to join after a rebalance.
return_errors: false # Enable the error channel on the consumer.
group_name: "myConsumerGroup" # Unique consumer group name.
net:
max_open_requests: 5 # Number of requests on a connection before blocking.
dial_timeout: "30s" # How long to wait for the initial connection.
read_timeout: "30s" # How long to wait for a response.
write_timeout: "30s" # How long to wait for a transmit.
keep_alive: "0s" # Network keep-alive time. 0 for disabled.
tls_enabled: true # Optional TLS support
tls: # All TLS options provided by https://github.com/stackopsd/constructor-tls
insecure_skip_verify: true
server_name: "my-server"
client_auth: "require_and_verify_client_cert"
rand:
choice: "crypto" # Can be zero if needed for testing.
certificates: # Set of certificates to present when authenticating.
bytes:
- cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
key: "<FULL PEM ENCODED KEY AS A STRING>"
- cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
key: "<FULL PEM ENCODED KEY AS A STRING>"
files:
- cert: "/path/to/pem/encoded/cert"
key: "/path/to/pem/encoded/key"
- cert: "/path/to/pem/encoded/cert"
key: "/path/to/pem/encoded/key"
root_cas: # Set of root authorities to use when verifying a server certificate.
bytes:
- cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
key: "<FULL PEM ENCODED KEY AS A STRING>"
- cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
key: "<FULL PEM ENCODED KEY AS A STRING>"
files:
- cert: "/path/to/pem/encoded/cert"
key: "/path/to/pem/encoded/key"
- cert: "/path/to/pem/encoded/cert"
key: "/path/to/pem/encoded/key"
client_cas: # Set of root authorities to use when verifying a client certificate.
bytes:
- cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
key: "<FULL PEM ENCODED KEY AS A STRING>"
- cert: "<FULL PEM ENCODED CERTIFICATE AS A STRING>"
key: "<FULL PEM ENCODED KEY AS A STRING>"
files:
- cert: "/path/to/pem/encoded/cert"
key: "/path/to/pem/encoded/key"
- cert: "/path/to/pem/encoded/cert"
key: "/path/to/pem/encoded/key"
License
Copyright 2019 Kevin Conway
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.