kafkactl

command module
v5.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2024 License: Apache-2.0 Imports: 3 Imported by: 0

README

:toc:
:toclevels: 2

= kafkactl

A command-line interface for interaction with Apache Kafka

image:https://github.com/deviceinsight/kafkactl/workflows/Lint%20%2F%20Test%20%2F%20IT/badge.svg?branch=main[Build Status,link=https://github.com/deviceinsight/kafkactl/actions]
| image:https://img.shields.io/badge/command-docs-blue.svg[command docs,link=https://deviceinsight.github.io/kafkactl/]

== Features

* command auto-completion for bash, zsh, fish shell including dynamic completion for e.g. topics or consumer groups.
* support for avro schemas
* Configuration of different contexts
* directly access kafka clusters inside your kubernetes cluster
* support for consuming and producing protobuf-encoded messages

image::https://asciinema.org/a/vmxrTA0h8CAXPnJnSFk5uHKzr.svg[asciicast,link=https://asciinema.org/a/vmxrTA0h8CAXPnJnSFk5uHKzr]

== Installation

You can install the pre-compiled binary or compile from source.

=== Install the pre-compiled binary

*snap*:

[,bash]
----
snap install kafkactl
----

*homebrew*:

[,bash]
----
# install tap repostory once
brew tap deviceinsight/packages
# install kafkactl
brew install deviceinsight/packages/kafkactl
# upgrade kafkactl
brew upgrade deviceinsight/packages/kafkactl
----

*deb/rpm*:

Download the .deb or .rpm from the https://github.com/deviceinsight/kafkactl/releases[releases page] and install with dpkg -i and rpm -i respectively.

*yay (AUR)*

There's a kafkactl https://aur.archlinux.org/packages/kafkactl/[AUR package] available for Arch. Install it with your AUR helper of choice (e.g. https://github.com/Jguer/yay[yay]):

[,bash]
----
yay -S kafkactl
----

*manually*:

Download the pre-compiled binaries from the https://github.com/deviceinsight/kafkactl/releases[releases page] and copy to the desired location.

=== Compiling from source

[,bash]
----
go get -u github.com/deviceinsight/kafkactl
----

*NOTE:* make sure that `kafkactl` is on PATH otherwise auto-completion won't work.

== Configuration

If no config file is found, a default config is generated in `$HOME/.config/kafkactl/config.yml`.
This configuration is suitable to get started with a single node cluster on a local machine.

=== Create a config file

Create `$HOME/.config/kafkactl/config.yml` with a definition of contexts that should be available

[,yaml]
----
contexts:
  default:
    brokers:
      - localhost:9092
  remote-cluster:
    brokers:
      - remote-cluster001:9092
      - remote-cluster002:9092
      - remote-cluster003:9092

    # optional: tls config
    tls:
      enabled: true
      ca: my-ca
      cert: my-cert
      certKey: my-key
      # set insecure to true to ignore all tls verification (defaults to false)
      insecure: false

    # optional: sasl support
    sasl:
      enabled: true
      username: admin
      password: admin
      # optional configure sasl mechanism as plaintext, scram-sha256, scram-sha512 (defaults to plaintext)
      mechanism: scram-sha512

    # optional: access clusters running kubernetes
    kubernetes:
      enabled: false
      binary: kubectl #optional
      kubeConfig: ~/.kube/config #optional
      kubeContext: my-cluster
      namespace: my-namespace
      # optional: docker image to use (the tag of the image will be suffixed by `-scratch` or `-ubuntu` depending on command)
      image: private.registry.com/deviceinsight/kafkactl
      # optional: secret for private docker registry
      imagePullSecret: registry-secret
      # optional: serviceAccount to use for the pod
      serviceAccount: my-service-account
      # optional: keep pod after exit (can be set to true for debugging)
      keepPod: true
      # optional: labels to add to the pod
      labels:
        key: value
      # optional: annotations to add to the pod
      annotations:
        key: value
      # optional: nodeSelector to add to the pod
      nodeSelector:
        key: value
    # optional: clientID config (defaults to kafkactl-{username})
    clientID: my-client-id

    # optional: kafkaVersion (defaults to 2.5.0)
    kafkaVersion: 1.1.1

    # optional: timeout for admin requests (defaults to 3s)
    requestTimeout: 10s

    # optional: avro schema registry
    avro:
      schemaRegistry: localhost:8081
      # optional: configure codec for (de)serialization as standard,avro (defaults to standard)
      # see: https://github.com/deviceinsight/kafkactl/issues/123
      jsonCodec: avro

    # optional: default protobuf messages search paths
    protobuf:
      importPaths:
        - "/usr/include/protobuf"
      protoFiles:
        - "someMessage.proto"
        - "otherMessage.proto"
      protosetFiles:
        - "/usr/include/protoset/other.protoset"

    producer:
      # optional: changes the default partitioner
      partitioner: "hash"

      # optional: changes default required acks in produce request
      # see: https://pkg.go.dev/github.com/IBM/sarama?utm_source=godoc#RequiredAcks
      requiredAcks: "WaitForAll"

      # optional: maximum permitted size of a message (defaults to 1000000)
      maxMessageBytes: 1000000

    consumer:
      # optional: isolationLevel (defaults to ReadCommitted)
      isolationLevel: ReadUncommitted

# optional for project config files
current-context: default
----

[#_config_file_read_order]
The config file location is resolved by

. checking for a provided commandline argument: `--config-file=$PATH_TO_CONFIG`
. evaluating the environment variable: `export KAFKA_CTL_CONFIG=$PATH_TO_CONFIG`
. checking for a project config file in the working directory (see <<_project_config_files>>)
. as default the config file is looked up from one of the following locations:
 ** `$HOME/.config/kafkactl/config.yml`
 ** `$HOME/.kafkactl/config.yml`
 ** `$SNAP_REAL_HOME/.kafkactl/config.yml`
 ** `$SNAP_DATA/kafkactl/config.yml`
 ** `/etc/kafkactl/config.yml`

[#_project_config_files]
==== Project config files

In addition to the config file locations above, _kafkactl_ allows to create a config file on project level.
A project config file is meant to be placed at the root level of a git repo and declares the kafka configuration
for this repository/project.

In order to identify the config file as belonging to _kafkactl_ the following names can be used:

* `kafkactl.yml`
* `.kafkactl.yml`

During initialization _kafkactl_ starts from the current working directory and recursively looks for a project level
config file. The recursive lookup ends at the boundary of a git repository (i.e. if a `.git` folder is found).
This way, _kafkactl_ can be used conveniently anywhere in the git repository.

Additionally, project config files have a special feature to use them read-only. Topically, if you configure more than
one context in a config file, and you switch the context with `kafkactl config use-context xy` this will lead to a write
operation on the config file to save the _current context_.

In order to avoid this for project config files, one can just omit the `current-context` parameter from the config file.
In this case _kafkactl_ will delegate read and write operations for the _current context_ to the next configuration file
according to <<_config_file_read_order, the config file read order>>.


=== Auto completion

==== bash

*NOTE:* if you installed via snap, bash completion should work automatically.

----
source <(kafkactl completion bash)
----

To load completions for each session, execute once:
Linux:

----
kafkactl completion bash > /etc/bash_completion.d/kafkactl
----

MacOS:

----
kafkactl completion bash > /usr/local/etc/bash_completion.d/kafkactl
----

==== zsh

If shell completion is not already enabled in your environment,
you will need to enable it. You can execute the following once:

----
echo "autoload -U compinit; compinit" >> ~/.zshrc
----

To load completions for each session, execute once:

----
kafkactl completion zsh > "${fpath[1]}/_kafkactl"
----

You will need to start a new shell for this setup to take effect.

==== Fish

----
kafkactl completion fish | source
----

To load completions for each session, execute once:

----
kafkactl completion fish > ~/.config/fish/completions/kafkactl.fish
----

== Documentation

The documentation for all available commands can be found here:

image::https://img.shields.io/badge/command-docs-blue.svg[command docs,link=https://deviceinsight.github.io/kafkactl/]

== Running in docker

Assuming your Kafka brokers are accessible under `kafka1:9092` and `kafka2:9092`, you can list topics by running:

[,bash]
----
docker run --env BROKERS="kafka1:9092 kafka2:9092" deviceinsight/kafkactl:latest get topics
----

If a more elaborate config is needed, you can mount it as a volume:

[,bash]
----
docker run -v /absolute/path/to/config.yml:/etc/kafkactl/config.yml deviceinsight/kafkactl get topics
----

== Running in Kubernetes

____
:construction: This feature is still experimental.
____

If your kafka cluster is not directly accessible from your machine, but it is accessible from a kubernetes cluster
which in turn is accessible via `kubectl` from your machine you can configure kubernetes support:

[,$yaml]
----
contexts:
  kafka-cluster:
    brokers:
      - broker1:9092
      - broker2:9092
    kubernetes:
      enabled: true
      binary: kubectl #optional
      kubeContext: k8s-cluster
      namespace: k8s-namespace
----

Instead of directly talking to kafka brokers a kafkactl docker image is deployed as a pod into the kubernetes
cluster, and the defined namespace. Standard-Input and Standard-Output are then wired between the pod and your shell
running kafkactl.

There are two options:

. You can run `kafkactl attach` with your kubernetes cluster configured. This will use `kubectl run` to create a pod
in the configured kubeContext/namespace which runs an image of kafkactl and gives you a `bash` into the container.
Standard-in is piped to the pod and standard-out, standard-err directly to your shell. You even get auto-completion.
. You can run any other kafkactl command with your kubernetes cluster configured. Instead of directly
querying the cluster a pod is deployed, and input/output are wired between pod and your shell.

The names of the brokers have to match the service names used to access kafka in your cluster. A command like this should
give you this information:

[,bash]
----
kubectl get svc | grep kafka
----

____
:bulb: The first option takes a bit longer to start up since an Ubuntu based docker image is used in order to have
a bash available. The second option uses a docker image build from scratch and should therefore be quicker.
Which option is more suitable, will depend on your use-case.
____

____
:warning: currently _kafkactl_ must *NOT* be installed via _snap_ in order for the kubernetes feature to work. The snap runs in a sandbox and is therefore unable to access the `kubectl` binary.
____

== Configuration via environment variables

Every key in the `config.yml` can be overwritten via environment variables. The corresponding environment variable
for a key can be found by applying the following rules:

. replace `.` by `_`
. replace `-` by `_`
. write the key name in ALL CAPS

e.g. the key `contexts.default.tls.certKey` has the corresponding environment variable `CONTEXTS_DEFAULT_TLS_CERTKEY`.

*NOTE:* an array variable can be written using whitespace as delimiter. For example `BROKERS` can be provided as
`BROKERS="broker1:9092 broker2:9092 broker3:9092"`.

If environment variables for the `default` context should be set, the prefix `CONTEXTS_DEFAULT_` can be omitted.
So, instead of `CONTEXTS_DEFAULT_TLS_CERTKEY` one can also set `TLS_CERTKEY`.
See *root_test.go* for more examples.

== Examples

=== Consuming messages

Consuming messages from a topic can be done with:

[,bash]
----
kafkactl consume my-topic
----

In order to consume starting from the oldest offset use:

[,bash]
----
kafkactl consume my-topic --from-beginning
----

The following example prints message `key` and `timestamp` as well as `partition` and `offset` in `yaml` format:

[,bash]
----
kafkactl consume my-topic --print-keys --print-timestamps -o yaml
----

To print partition in default output format use:

[,bash]
----
kafkactl consume my-topic --print-partitions
----

Headers of kafka messages can be printed with the parameter `--print-headers` e.g.:

[,bash]
----
kafkactl consume my-topic --print-headers -o yaml
----

If one is only interested in the last `n` messages this can be achieved by `--tail` e.g.:

[,bash]
----
kafkactl consume my-topic --tail=5
----

The consumer can be stopped when the latest offset is reached using `--exit` parameter e.g.:

[,bash]
----
kafkactl consume my-topic --from-beginning --exit
----

The consumer can compute the offset it starts from using a timestamp:

[,bash]
----
kafkactl consume my-topic --from-timestamp 1384216367189
kafkactl consume my-topic --from-timestamp 2014-04-26T17:24:37.123Z
kafkactl consume my-topic --from-timestamp 2014-04-26T17:24:37.123
kafkactl consume my-topic --from-timestamp 2009-08-12T22:15:09Z
kafkactl consume my-topic --from-timestamp 2017-07-19T03:21:51
kafkactl consume my-topic --from-timestamp 2013-04-01T22:43
kafkactl consume my-topic --from-timestamp 2014-04-26
----

The `from-timestamp` parameter supports different timestamp formats. It can either be a number representing the epoch milliseconds
or a string with a timestamp in one of the https://github.com/deviceinsight/kafkactl/blob/main/util/util.go#L10[supported date formats].

*NOTE:* `--from-timestamp` is not designed to schedule the beginning of consumer's consumption. The offset corresponding to the timestamp is computed at the beginning of the process. So if you set it to a date in the future, the consumer will start from the latest offset.

The consumer can be stopped when the offset corresponding to a particular timestamp is reached:

[,bash]
----
kafkactl consume my-topic --from-timestamp 2017-07-19T03:30:00 --to-timestamp 2017-07-19T04:30:00
----

The `to-timestamp` parameter supports the same formats as `from-timestamp`.

*NOTE:* `--to-timestamp` is not designed to schedule the end of consumer's consumption. The offset corresponding to the timestamp is computed at the begininng of the process. So if you set it to a date in the future, the consumer will stop at the current latest offset.

The following example prints keys in hex and values in base64:

[,bash]
----
kafkactl consume my-topic --print-keys --key-encoding=hex --value-encoding=base64
----

The consumer can convert protobuf messages to JSON in keys (optional) and values:

[,bash]
----
kafkactl consume my-topic --value-proto-type MyTopicValue --key-proto-type MyTopicKey --proto-file kafkamsg.proto
----

To join a consumer group and consume messages as a member of the group:

[,bash]
----
kafkactl consume my-topic --group my-consumer-group
----

If you want to limit the number of messages that will be read, specify `--max-messages`:

[,bash]
----
kafkactl consume my-topic --max-messages 2
----

=== Producing messages

Producing messages can be done in multiple ways. If we want to produce a message with `key='my-key'`,
`value='my-value'` to the topic `my-topic` this can be achieved with one of the following commands:

[,bash]
----
echo "my-key#my-value" | kafkactl produce my-topic --separator=#
echo "my-value" | kafkactl produce my-topic --key=my-key
kafkactl produce my-topic --key=my-key --value=my-value
----

If we have a file containing messages where each line contains `key` and `value` separated by `#`, the file can be
used as input to produce messages to topic `my-topic`:

[,bash]
----
cat myfile | kafkactl produce my-topic --separator=#
----

The same can be accomplished without piping the file to stdin with the `--file` parameter:

[,bash]
----
kafkactl produce my-topic --separator=# --file=myfile
----

If the messages in the input file need to be split by a different delimiter than `\n` a custom line separator can be provided:

[,bash]
----
kafkactl produce my-topic --separator=# --lineSeparator=|| --file=myfile
----

*NOTE:* if the file was generated with `kafkactl consume --print-keys --print-timestamps my-topic` the produce
command is able to detect the message timestamp in the input and will ignore it.

It is also possible to produce messages in json format:

[,bash]
----
# each line in myfile.json is expected to contain a json object with fields key, value
kafkactl produce my-topic --file=myfile.json --input-format=json
cat myfile.json | kafkactl produce my-topic --input-format=json
----

the number of messages produced per second can be controlled with the `--rate` parameter:

[,bash]
----
cat myfile | kafkactl produce my-topic --separator=# --rate=200
----

It is also possible to specify the partition to insert the message:

[,bash]
----
kafkactl produce my-topic --key=my-key --value=my-value --partition=2
----

Additionally, a different partitioning scheme can be used. When a `key` is provided the default partitioner
uses the `hash` of the `key` to assign a partition. So the same `key` will end up in the same partition:

[,bash]
----
# the following 3 messages will all be inserted to the same partition
kafkactl produce my-topic --key=my-key --value=my-value
kafkactl produce my-topic --key=my-key --value=my-value
kafkactl produce my-topic --key=my-key --value=my-value

# the following 3 messages will probably be inserted to different partitions
kafkactl produce my-topic --key=my-key --value=my-value --partitioner=random
kafkactl produce my-topic --key=my-key --value=my-value --partitioner=random
kafkactl produce my-topic --key=my-key --value=my-value --partitioner=random
----

Message headers can also be written:

[,bash]
----
kafkactl produce my-topic --key=my-key --value=my-value --header key1:value1 --header key2:value\:2
----

The following example writes the key from base64 and value from hex:

[,bash]
----
kafkactl produce my-topic --key=dGVzdC1rZXk= --key-encoding=base64 --value=0000000000000000 --value-encoding=hex
----

You can control how many replica acknowledgements are needed for a response:

[,bash]
----
kafkactl produce my-topic --key=my-key --value=my-value --required-acks=WaitForAll
----

Producing null values (tombstone record) is also possible:

[,bash]
----
 kafkactl produce my-topic --null-value
----

Producing protobuf message converted from JSON:

[,bash]
----
kafkactl produce my-topic --key='{"keyField":123}' --key-proto-type MyKeyMessage --value='{"valueField":"value"}' --value-proto-type MyValueMessage --proto-file kafkamsg.proto
----

A more complex protobuf message converted from a multi-line JSON string can be produced using a file input with custom separators. 

For example, if you have the following protobuf definition (`complex.proto`):

[,protobuf]
----
syntax = "proto3";

import "google/protobuf/timestamp.proto";

message ComplexMessage {
  CustomerInfo customer_info = 1;
  DeviceInfo device_info = 2;
}

message CustomerInfo {
  string customer_id = 1;
  string name = 2;
}

message DeviceInfo {
  string serial = 1;
  google.protobuf.Timestamp last_update  = 2;
}
----

And you have the following file (`complex-msg.txt`) that contains the key and value of the message:

[,text]
----
msg-key##
{
    "customer_info": {
        "customer_id": "12345",
        "name": "Bob"
    },
    "device_info": {
        "serial": "abcde",
        "last_update": "2024-03-02T07:01:02.000Z"
    }
}
+++
----

The command to produce the protobuf message using sample protobuf definition and input file would be:

[,bash]
----
kafkactl produce my-topic --value-proto-type=ComplexMessage --proto-file=complex.proto --lineSeparator='+++' --separator='##' --file=complex-msg.txt
----

=== Avro support

In order to enable avro support you just have to add the schema registry to your configuration:

[,$yaml]
----
contexts:
  localhost:
    avro:
      schemaRegistry: localhost:8081
----

==== Producing to an avro topic

`kafkactl` will lookup the topic in the schema registry in order to determine if key or value needs to be avro encoded.
If producing with the latest `schemaVersion` is sufficient, no additional configuration is needed an `kafkactl` handles
this automatically.

If however one needs to produce an older `schemaVersion` this can be achieved by providing the parameters `keySchemaVersion`, `valueSchemaVersion`.

===== Example

[,bash]
----
# create a topic
kafkactl create topic avro_topic
# add a schema for the topic value
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"record\", \"name\": \"LongList\", \"fields\" : [{\"name\": \"next\", \"type\": [\"null\", \"LongList\"], \"default\": null}]}"}' \
http://localhost:8081/subjects/avro_topic-value/versions
# produce a message
kafkactl produce avro_topic --value {\"next\":{\"LongList\":{}}}
# consume the message
kafkactl consume avro_topic --from-beginning --print-schema -o yaml
----

==== Consuming from an avro topic

As for producing `kafkactl` will also lookup the topic in the schema registry to determine if key or value needs to be
decoded with an avro schema.

The `consume` command handles this automatically and no configuration is needed.

An additional parameter `print-schema` can be provided to display the schema used for decoding.

=== Protobuf support

`kafkactl` can consume and produce protobuf-encoded messages. In order to enable protobuf serialization/deserialization
you should add flag `--value-proto-type` and optionally `--key-proto-type` (if keys encoded in protobuf format)
with type name. Protobuf-encoded messages are mapped with https://developers.google.com/protocol-buffers/docs/proto3#json[pbjson].

`kafkactl` will search messages in following order:

. Protoset files specified in `--protoset-file` flag
. Protoset files specified in `context.protobuf.protosetFiles` config value
. Proto files specified in `--proto-file` flag
. Proto files specified in `context.protobuf.protoFiles` config value

Proto files may require some dependencies in `import` sections. To specify additional lookup paths use
`--proto-import-path` flag or `context.protobuf.importPaths` config value.

If provided message types was not found `kafkactl` will return error.

Note that if you want to use raw proto files `protoc` installation don't need to be installed.

Also note that protoset files must be compiled with included imports:

[,bash]
----
protoc -o kafkamsg.protoset --include_imports kafkamsg.proto
----

==== Example

Assume you have following proto schema in `kafkamsg.proto`:

[,protobuf]
----
syntax = "proto3";

import "google/protobuf/timestamp.proto";

message TopicMessage {
  google.protobuf.Timestamp produced_at = 1;
  int64 num = 2;
}

message TopicKey {
  float fvalue = 1;
}
----

"well-known" `google/protobuf` types are included so no additional proto files needed.

To produce message run

[,bash]
----
kafkactl produce <topic> --key '{"fvalue":1.2}' --key-proto-type TopicKey --value '{"producedAt":"2021-12-01T14:10:12Z","num":"1"}' --value-proto-type TopicValue --proto-file kafkamsg.proto
----

or with protoset

[,bash]
----
kafkactl produce <topic> --key '{"fvalue":1.2}' --key-proto-type TopicKey --value '{"producedAt":"2021-12-01T14:10:12Z","num":"1"}' --value-proto-type TopicValue --protoset-file kafkamsg.protoset
----

To consume messages run

[,bash]
----
kafkactl consume <topic> --key-proto-type TopicKey --value-proto-type TopicValue --proto-file kafkamsg.proto
----

or with protoset

[,bash]
----
kafkactl consume <topic> --key-proto-type TopicKey --value-proto-type TopicValue --protoset-file kafkamsg.protoset
----

=== Altering topics

Using the `alter topic` command allows you to change the partition count, replication factor and topic-level
configurations of an existing topic.

The partition count can be increased with:

[,bash]
----
kafkactl alter topic my-topic --partitions 32
----

The replication factor can be altered with:

[,bash]
----
kafkactl alter topic my-topic --replication-factor 2
----

____
:information_source: when altering replication factor, kafkactl tries to keep the number of replicas assigned to each
broker balanced. If you need more control over the assigned replicas use `alter partition` directly.
____

The topic configs can be edited by supplying key value pairs as follows:

[,bash]
----
kafkactl alter topic my-topic --config retention.ms=3600000 --config cleanup.policy=compact
----

____
:bulb: use the flag `--validate-only` to perform a dry-run without actually modifying the topic
____

=== Altering partitions

The assigned replicas of a partition can directly be altered with:

[,bash]
----
# set brokers 102,103 as replicas for partition 3 of topic my-topic
kafkactl alter topic my-topic 3 -r 102,103
----

=== Clone topic

New topic may be created from existing topic as follows:

[,bash]
----
kafkactl clone topic source-topic target-topic
----

Source topic must exist, target topic must not exist.
`kafkactl` clones partitions count, replication factor and config entries.

=== Consumer groups

In order to get a list of consumer groups the `get consumer-groups` command can be used:

[,bash]
----
# all available consumer groups
kafkactl get consumer-groups
# only consumer groups for a single topic
kafkactl get consumer-groups --topic my-topic
# using command alias
kafkactl get cg
----

To get detailed information about the consumer group use `describe consumer-group`. If the parameter `--partitions`
is provided details will be printed for each partition otherwise the partitions are aggregated to the clients.

[,bash]
----
# describe a consumer group
kafkactl describe consumer-group my-group
# show partition details only for partitions with lag
kafkactl describe consumer-group my-group --only-with-lag
# show details only for a single topic
kafkactl describe consumer-group my-group --topic my-topic
# using command alias
kafkactl describe cg my-group
----

=== Delete Records from a topics

Command to be used to delete records from partition, which have an offset smaller than the provided offset.

[,bash]
----
# delete records with offset < 123 from partition 0 and offset < 456 from partition 1
kafkactl delete records my-topic --offset 0=123 --offset 1=456
----

=== Create consumer groups

A consumer-group can be created as follows:

[,bash]
----
# create group with offset for all partitions set to oldest
kafkactl create consumer-group my-group --topic my-topic --oldest
# create group with offset for all partitions set to newest
kafkactl create consumer-group my-group --topic my-topic --newest
# create group with offset for a single partition set to specific offset
kafkactl create consumer-group my-group --topic my-topic --partition 5 --offset 100
# create group for multiple topics with offset for all partitions set to oldest
kafkactl create consumer-group my-group --topic my-topic-a --topic my-topic-b --oldest
----

=== Clone consumer group

A consumer group may be created as clone of another consumer group as follows:

[,bash]
----
kafkactl clone consumer-group source-group target-group
----

Source group must exist and have committed offsets. Target group must not exist or don't have committed offsets.
`kafkactl` clones topic assignment and partition offsets.

=== Reset consumer group offsets

in order to ensure the reset does what it is expected, per default only
the results are printed without actually executing it. Use the additional parameter `--execute` to perform the reset.

[,bash]
----
# reset offset of for all partitions to oldest offset
kafkactl reset offset my-group --topic my-topic --oldest
# reset offset of for all partitions to newest offset
kafkactl reset offset my-group --topic my-topic --newest
# reset offset for a single partition to specific offset
kafkactl reset offset my-group --topic my-topic --partition 5 --offset 100
# reset offset to newest for all topics in the group
kafkactl reset offset my-group --all-topics --newest
# reset offset of for all partitions on multiple topics to oldest offset
kafkactl reset offset my-group --topic my-topic-a --topic my-topic-b --oldest
# reset offset to offset at a given timestamp(epoch)/datetime
kafkactl reset offset my-group --topic my-topic-a --to-datetime 2014-04-26T17:24:37.123Z
# reset offset to offset at a given timestamp(epoch)/datetime
kafkactl reset offset my-group --topic my-topic-a --to-datetime 1697726906352
----

=== Delete consumer group offsets

In order to delete a consumer group offset use `delete offset`

[,bash]
----
# delete offset for all partitions of topic my-topic
kafkactl delete offset my-group --topic my-topic
# delete offset for partition 1 of topic my-topic
kafkactl delete offset my-group --topic my-topic --partition 1
----

=== Delete consumer groups

In order to delete a consumer group or a list of consumer groups use `delete consumer-group`

[,bash]
----
# delete consumer group my-group
kafkactl delete consumer-group my-group
----

=== ACL Management

Available ACL operations are documented https://docs.confluent.io/platform/current/kafka/authorization.html#operations[here].

==== Create a new ACL

[,bash]
----
# create an acl that allows topic read for a user 'consumer'
kafkactl create acl --topic my-topic --operation read --principal User:consumer --allow
# create an acl that denies topic write for a user 'consumer' coming from a specific host
kafkactl create acl --topic my-topic --operation write --host 1.2.3.4 --principal User:consumer --deny
# allow multiple operations
kafkactl create acl --topic my-topic --operation read --operation describe --principal User:consumer --allow
# allow on all topics with prefix common prefix
kafkactl create acl --topic my-prefix --pattern prefixed --operation read --principal User:consumer --allow
----

==== List ACLs

[,bash]
----
# list all acl
kafkactl get acl
# list all acl (alias command)
kafkactl get access-control-list
# filter only topic resources
kafkactl get acl --topics
# filter only consumer group resources with operation read
kafkactl get acl --groups --operation read
----

==== Delete ACLs

[,bash]
----
# delete all topic read acls
kafkactl delete acl --topics --operation read --pattern any
# delete all topic acls for any operation
kafkactl delete acl --topics --operation any --pattern any
# delete all cluster acls for any operation
kafkactl delete acl --cluster --operation any --pattern any
# delete all consumer-group acls with operation describe, patternType prefixed and permissionType allow
kafkactl delete acl --groups --operation describe --pattern prefixed --allow
----

=== Getting Brokers

To get the list of brokers of a kafka cluster use `get brokers`

[,bash]
----
# get the list of brokers
kafkactl get brokers
----

=== Describe Broker

To view configs for a single broker use `describe broker`

[,bash]
----
# describe broker
kafkactl describe broker 1
----

== Development

In order to see linter errors before commit, add the following pre-commit hook:

[,bash]
----
pip install --user pre-commit
pre-commit install
----

=== Pull requests

[,shell]
----
# checkout locally
PULL_REQUEST_ID=123
LOCAL_BRANCH_NAME=feature/abc
git fetch origin pull/${PULL_REQUEST_ID}/head:${LOCAL_BRANCH_NAME}
git checkout ${LOCAL_BRANCH_NAME}

# push to PR
NAME=username
REMOTE_BRANCH_NAME=abc
git remote add $NAME git@github.com:$NAME/kafkactl.git
git push $NAME ${LOCAL_BRANCH_NAME}:${REMOTE_BRANCH_NAME}
----

Documentation

The Go Gopher

There is no documentation for this package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL