kafka-pixy

command module
v0.8.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2015 License: Apache-2.0 Imports: 11 Imported by: 0

README

Kafka-Pixy

Build Status

Kafka-Pixy is a local aggregating HTTP proxy to Kafka messaging cluster. It is designed to hide the complexity of the Kafka client protocol and provide a stupid simple HTTP API that is trivial to implement in any language.

Kafka-Pixy works with Kafka 8.2.x or later, because it uses the Offset Commit/Fetch API released in that version. Although this API has been introduced in Kafka 0.8.1.1, the on-the-wire packet format was different back then, and Kafka-Pixy does not support the older format.

Aggregation

Kafka works best when messages are read/written in batches, but from application standpoint it is easier to deal with individual message read/writes. Kafka-Pixy provides message based API to clients, but internally it collects them in batches and submits them the way Kafka likes it the best. This behavior plays very well with the microservices architecture, where there are usually many tiny assorted service instances running on one beefy physical host. So Kafka-Pixy installed on that host would aggregate messages from all the service instances, herby effectively using the network bandwidth.

Locality

Kafka-Pixy is intended to run on the same host as the applications using it. Remember that it provides only message based API - no batching, therefore using it over network is suboptimal. To encourage local usage Kafka-Pixy binds to the Unix Domain Socket only by default. User has an option to enable a TCP listener but that is mostly for debugging purposes.

HTTP API

Produce

POST /topics/<topic>/messages?key=<key> - submits a message to the topic with name topic, using a hash of key to determine the shard where the message should go. The content type can be either text/plain or application/json.

By default a message is submitted to Kafka asynchronously, that is the HTTP request completes as soon as the proxy gets the message, and actual message submission to Kafka is performed afterwards. In this case the successful completion of the HTTP request does not guarantee that the message will ever get into Kafka, although the proxy will do its best to make that happen (retry several times and such). If delivery guarantee is of priority over request execution time, then synchronous delivery may be requested specifying the sync parameter (exact value does not matter). In this case the request blocks until the message is submitted to all in-sync replicas (see Kafka Documentation search for to "Availability and Durability Guarantees"). If the returned HTTP status code is anything but 200 OK then the message has not been submitted to Kafka and the response body contains the error details.

If key is not specified then the message is submitted to a random shard. Note that it is not the same as specifying an empty key value, for empty string is a valid key value, and therefore all messages with an empty key value go to the same shard.

E.g. if a Kafka-Pixy processes has been started with the --tcpAddr=0.0.0.0:8080 argument, then you can test it using curl as follows:

curl -X POST localhost:8080/topics/foo/messages?key=bar&sync \
     -H 'Content-Type: application/json' \
     -d '{"bar": "bazz"}'
Consume

GET /topics/<topic>/messages?group=<group> - consumes a message from the topic with name topic on behalf of a consumer group with name group. If there are no new messages in the topic the request will block waiting for 3 seconds. If there are no messages produced during this long poll waiting then the request will return 408 Request Timeout error, otherwise the response will be a JSON document of the following structure:

{
    "key": <base64 encoded key>,
    "value": <base64 encoded message body>,
    "partition": <partition number>,
    "offset": <message offset>
}

e.g.:

{
    "key": "0JzQsNGA0YPRgdGP",
    "value": "0JzQvtGPINC70Y7QsdC40LzQsNGPINC00L7Rh9C10L3RjNC60LA=",
    "partition": 0,
    "offset": 13}
}
Get Offsets

GET /topics/<topic>/offsets?group=<group> - returns offset information for all partitions of the specified topic including the next offset to be consumed by the specified consumer group. The structure of the returned JSON document is as follows:

[
  {
    "partition": <partition id>,
    "begin": <oldest offset>,
    "end": <newest offset>,
    "count": <the number of messages in the topic, equals to `end` - `begin`>,
    "offset": <next offset to be consumed by this consumer group>,
    "lag": <equals to `end` - `offset`>,
    "metadata": <arbitrary string committed with the offset, not used by Kafka-Pixy. It is omitted if empty>
  },
  ...
]
Set Offsets

POST /topics/<topic>/offsets?group=<group> - sets offsets to be consumed from the specified topic by a particular consumer group. The request content should be a list of JSON objects, where each objects defines an offset to be set for a particular partition:

[
  {
    "partition": <partition id>,
    "offset": <next offset to be consumed by this consumer group>,
    "metadata": <arbitrary string>
  },
  ...
]

Note that consumption by all consumer group members should cease before this call can be executed. That is necessary because while consuming Kafka-Pixy constantly updates partition offsets, and it does not expect them to be update by somebody else. So it only reads them on group initialization, that happens when a consumer group request comes after 20 seconds or more of the consumer group inactivity on all Kafka-Pixy working with the Kafka cluster.

List Consumers

GET /topics/<topic>/consumers[?group=<group>] - returns a list of consumers that are subscribed to the topic along with a list of partitions assigned to each consumer. If group is not specified then information is provided for all consumer groups subscribed to the topic.

e.g.:

curl -XGET localhost:19092/topic/some_queue/consumers

yields:

{
  "integrations": {
    "pixy_jobs1_62065_2015-09-24T22:21:05Z": [0,1,2,3],
    "pixy_jobs2_18075_2015-09-24T22:21:28Z": [4,5,6],
    "pixy_jobs3_336_2015-09-24T22:21:51Z": [7,8,9]
  },
  "logstash-customer": {
    "logstash-customer_logs01-1443116116450-7f54d246-0": [0,1,2],
    "logstash-customer_logs01-1443116116450-7f54d246-1": [3,4,5],
    "logstash-customer_logs01-1443116116450-7f54d246-2": [6,7],
    "logstash-customer_logs01-1443116116450-7f54d246-3": [8,9]
  },
  "logstash-reputation4": {
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-0": [0],
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-1": [1],
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-10": [2],
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-11": [3],
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-12": [4],
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-13": [5],
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-14": [6],
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-15": [7],
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-2": [8],
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-3": [9]
  },
  "test": {
    "pixy_core1_47288_2015-09-24T22:15:36Z": [0,1,2,3,4],
    "pixy_in7_102745_2015-09-24T22:24:14Z": [5,6,7,8,9]
  }
}

Delivery Guarantees

If a Kafka-Pixy instance dies (crashes or gets brutally killed with SIGKILL, or entire host running a Kafka-Pixy instance goes down, you name it), then some asynchronously produced messages can be lost, but synchronously produced messages are never lost. Some messages consumed just before the death can be consumed for the second time later either from the restarted Kafka-Pixy instance on the same host or a Kafka-Pixy instance running on another host.

A message is considered to be consumed by Kafka-Pixy if it is successfully sent over network in an HTTP response body. So if a client application dies before the message is processed, then it will be lost.

Command Line

Kafa-Pixy is designed to be very simple to run. It consists of a single executable that can be started just by passing a bunch of command line parameters to it - no configuration file needed. All configuration parameters that Kafka-Pixy accepts are listed below.

Parameter Description
kafkaPeers Comma separated list of Kafka brokers. Note that these are just seed brokers. The rest brokers are discovered automatically. (Default localhost:9092)
zookeeperPeers Comma separated list of ZooKeeper nodes followed by optional chroot. (Default localhost:2181)
unixAddr Unix Domain Socket that the primary HTTP API should listen on. (Default /var/run/kafka-pixy.sock)
tcpAddr TCP interface where the secondary HTTP API should listen. If not specified then Kafka-Pixy won't listen on a TCP socket.
pidFile Name of the pid file to create. (Default /var/run/kafka-pixy.pid)

You can run kafka-pixy -help to make it list all available command line parameters.

License

Kafka-Pixy is under the Apache 2.0 license. See the LICENSE file for details.

Documentation

The Go Gopher

There is no documentation for this package.

Directories

Path Synopsis
Godeps
_workspace/src/camlistore.org/pkg/throttle
Package throttle provides a net.Listener that returns artificially-delayed connections for testing real-world connectivity.
Package throttle provides a net.Listener that returns artificially-delayed connections for testing real-world connectivity.
_workspace/src/github.com/davecgh/go-spew/spew
Package spew implements a deep pretty printer for Go data structures to aid in debugging.
Package spew implements a deep pretty printer for Go data structures to aid in debugging.
_workspace/src/github.com/eapache/go-resiliency/breaker
Package breaker implements the circuit-breaker resiliency pattern for Go.
Package breaker implements the circuit-breaker resiliency pattern for Go.
_workspace/src/github.com/eapache/queue
Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki.
Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki.
_workspace/src/github.com/golang/snappy
Package snappy implements the snappy block-based compression format.
Package snappy implements the snappy block-based compression format.
_workspace/src/github.com/gorilla/context
Package context stores values shared during a request lifetime.
Package context stores values shared during a request lifetime.
_workspace/src/github.com/gorilla/mux
Package gorilla/mux implements a request router and dispatcher.
Package gorilla/mux implements a request router and dispatcher.
_workspace/src/github.com/klauspost/crc32
Package crc32 implements the 32-bit cyclic redundancy check, or CRC-32, checksum.
Package crc32 implements the 32-bit cyclic redundancy check, or CRC-32, checksum.
_workspace/src/github.com/mailgun/go-zookeeper/zk
Package zk is a native Go client library for the ZooKeeper orchestration service.
Package zk is a native Go client library for the ZooKeeper orchestration service.
_workspace/src/github.com/mailgun/manners
Package manners provides a wrapper for a standard net/http server that ensures all active HTTP client have completed their current request before the server shuts down.
Package manners provides a wrapper for a standard net/http server that ensures all active HTTP client have completed their current request before the server shuts down.
_workspace/src/github.com/mailgun/sarama
Package sarama provides client libraries for the Kafka 0.8 protocol.
Package sarama provides client libraries for the Kafka 0.8 protocol.
_workspace/src/github.com/mailgun/sarama/mocks
Package mocks provides mocks that can be used for testing applications that use Sarama.
Package mocks provides mocks that can be used for testing applications that use Sarama.
_workspace/src/gopkg.in/check.v1
Package check is a rich testing extension for Go's testing package.
Package check is a rich testing extension for Go's testing package.
tools

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL