Documentation ¶
Overview ¶
Package libkafka is a low level golang library for producing to and consuming from Kafka 1.0+. It has no external dependencies. It is not modeled on the Java client. All API calls are synchronous and all code executes in the calling goroutine.
Project Scope ¶
The library focuses on non transactional production and consumption. It implements single partition Producer and Consumer. Multi partition producers and consumers are built on top of this library (example: https://github.com/mkocikowski/kafkaclient).
Get Started ¶
Read the documentation for the "batch" and "client" packages.
Design Decisions ¶
1. Focus on record batches. Kafka protocol Produce and Fetch API calls operate on sets of record batches. Record batch is the unit at which messages are produced and fetched. It also is the unit at which data is partitioned and compressed. In libkafka producers and consumers operate on batches of records. Building and parsing of record batches is separate from Producing and Fetching. Record batch compression and decompression implementations are provided by the library user.
2. Synchronous single-partition calls. Kafka wire protocol is asynchronous: on a single connection there can be multiple requests awaiting response from the Kafka broker. In addition, many API calls (such as Produce and Fetch) can combine data for multiple topics and partitions in a single call. Libkafka maintains a separate connection for every topic-partition and calls on that connection are synchronous, and each call is for only one topic-partition. That makes call handling (and failure) logic simpler.
3. Wide use of reflection. All API calls (requests and responses) are defined as structs and marshaled using reflection. This is not a performance problem, because API calls are not frequent. Marshaling and unmarshaling of individual records within record batches (which has big performance impact) is done without using reflection.
4. Limited use of data hiding. The library is not intended to be child proof. Most internal structures are exposed to make debugging and metrics collection easier.
Index ¶
Constants ¶
const ( ERR_UNKNOWN_SERVER_ERROR = -1 ERR_NONE = 0 ERR_OFFSET_OUT_OF_RANGE = 1 ERR_CORRUPT_MESSAGE = 2 // retriable: True ERR_UNKNOWN_TOPIC_OR_PARTITION = 3 // retriable: True ERR_INVALID_FETCH_SIZE = 4 ERR_LEADER_NOT_AVAILABLE = 5 // retriable: True ERR_NOT_LEADER_FOR_PARTITION = 6 // retriable: True ERR_REQUEST_TIMED_OUT = 7 // retriable: True ERR_BROKER_NOT_AVAILABLE = 8 ERR_REPLICA_NOT_AVAILABLE = 9 ERR_MESSAGE_TOO_LARGE = 10 ERR_STALE_CONTROLLER_EPOCH = 11 ERR_OFFSET_METADATA_TOO_LARGE = 12 ERR_NETWORK_EXCEPTION = 13 // retriable: True ERR_COORDINATOR_LOAD_IN_PROGRESS = 14 // retriable: True ERR_COORDINATOR_NOT_AVAILABLE = 15 // retriable: True ERR_NOT_COORDINATOR = 16 // retriable: True ERR_INVALID_TOPIC_EXCEPTION = 17 ERR_RECORD_LIST_TOO_LARGE = 18 ERR_NOT_ENOUGH_REPLICAS = 19 // retriable: True ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20 // retriable: True ERR_INVALID_REQUIRED_ACKS = 21 ERR_ILLEGAL_GENERATION = 22 ERR_INCONSISTENT_GROUP_PROTOCOL = 23 ERR_INVALID_GROUP_ID = 24 ERR_UNKNOWN_MEMBER_ID = 25 ERR_INVALID_SESSION_TIMEOUT = 26 ERR_REBALANCE_IN_PROGRESS = 27 ERR_INVALID_COMMIT_OFFSET_SIZE = 28 ERR_TOPIC_AUTHORIZATION_FAILED = 29 ERR_GROUP_AUTHORIZATION_FAILED = 30 ERR_CLUSTER_AUTHORIZATION_FAILED = 31 ERR_INVALID_TIMESTAMP = 32 ERR_UNSUPPORTED_SASL_MECHANISM = 33 ERR_ILLEGAL_SASL_STATE = 34 ERR_UNSUPPORTED_VERSION = 35 ERR_TOPIC_ALREADY_EXISTS = 36 ERR_INVALID_PARTITIONS = 37 ERR_INVALID_REPLICATION_FACTOR = 38 ERR_INVALID_REPLICA_ASSIGNMENT = 39 ERR_INVALID_CONFIG = 40 ERR_NOT_CONTROLLER = 41 // retriable: True ERR_INVALID_REQUEST = 42 ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT = 43 ERR_POLICY_VIOLATION = 44 ERR_OUT_OF_ORDER_SEQUENCE_NUMBER = 45 ERR_DUPLICATE_SEQUENCE_NUMBER = 46 ERR_INVALID_PRODUCER_EPOCH = 47 ERR_INVALID_TXN_STATE = 48 ERR_INVALID_PRODUCER_ID_MAPPING = 49 ERR_INVALID_TRANSACTION_TIMEOUT = 50 ERR_CONCURRENT_TRANSACTIONS = 51 ERR_TRANSACTION_COORDINATOR_FENCED = 52 ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53 ERR_SECURITY_DISABLED = 54 ERR_OPERATION_NOT_ATTEMPTED = 55 ERR_KAFKA_STORAGE_ERROR = 56 // retriable: True ERR_LOG_DIR_NOT_FOUND = 57 ERR_SASL_AUTHENTICATION_FAILED = 58 ERR_UNKNOWN_PRODUCER_ID = 59 ERR_REASSIGNMENT_IN_PROGRESS = 60 ERR_DELEGATION_TOKEN_AUTH_DISABLED = 61 ERR_DELEGATION_TOKEN_NOT_FOUND = 62 ERR_DELEGATION_TOKEN_OWNER_MISMATCH = 63 ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED = 64 ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED = 65 ERR_DELEGATION_TOKEN_EXPIRED = 66 ERR_INVALID_PRINCIPAL_TYPE = 67 ERR_NON_EMPTY_GROUP = 68 ERR_GROUP_ID_NOT_FOUND = 69 ERR_FETCH_SESSION_ID_NOT_FOUND = 70 // retriable: True ERR_INVALID_FETCH_SESSION_EPOCH = 71 // retriable: True ERR_LISTENER_NOT_FOUND = 72 // retriable: True ERR_TOPIC_DELETION_DISABLED = 73 ERR_FENCED_LEADER_EPOCH = 74 // retriable: True ERR_UNKNOWN_LEADER_EPOCH = 75 // retriable: True ERR_UNSUPPORTED_COMPRESSION_TYPE = 76 ERR_STALE_BROKER_EPOCH = 77 ERR_OFFSET_NOT_AVAILABLE = 78 // retriable: True ERR_MEMBER_ID_REQUIRED = 79 ERR_PREFERRED_LEADER_NOT_AVAILABLE = 80 // retriable: True ERR_GROUP_MAX_SIZE_REACHED = 81 )
Variables ¶
var ( // DialTimeout value is used in net.DialTimeout calls to connect to // kafka brokers (partition leaders, group coordinators, bootstrap // hosts). DialTimeout = 5 * time.Second // RequestTimeout used for setting deadlines while communicating via // TCP. Any single api call (request-response) can not take longer than // RequestTimeout. Set it to zero to prevent setting connection // deadlines. MaxWaitTimeMs for fetch requests should not be greater // than RequestTimeout. RequestTimeout = 60 * time.Second // ConnectionTTL specifies the max time a partition-client connection // to a broker will stay open (connection will be closed and re-opened // on first request after the TTL). The TTL counts from the time // connection was opened, not when it was last used. Default value of 0 // means "ignore this setting" (connections will stay open "forever"). ConnectionTTL time.Duration = 0 )
Changing timeouts is not safe for concurrent use. If you want to change them, do it once, right at the beginning.
Functions ¶
This section is empty.
Types ¶
type Compressor ¶ added in v0.0.5
type Compressor = batch.Compressor
type Decompressor ¶ added in v0.0.5
type Decompressor = batch.Decompressor
Directories ¶
Path | Synopsis |
---|---|
Package api defines Kafka protocol requests and responses.
|
Package api defines Kafka protocol requests and responses. |
Package batch implements functions for building, marshaling, and unmarshaling Kafka record batches.
|
Package batch implements functions for building, marshaling, and unmarshaling Kafka record batches. |
Package client has code for making api calls to brokers.
|
Package client has code for making api calls to brokers. |
fetcher
Package fetcher implements a single partition Kafka fetcher.
|
Package fetcher implements a single partition Kafka fetcher. |
producer
Package producer implements a single partition Kafka producer.
|
Package producer implements a single partition Kafka producer. |
Package record implements functions for marshaling and unmarshaling individual Kafka records.
|
Package record implements functions for marshaling and unmarshaling individual Kafka records. |
Package varint implements varint and ZigZag encoding and decoding.
|
Package varint implements varint and ZigZag encoding and decoding. |
Package wire implements functions for marshaling and unmarshaling Kafka requests and responses.
|
Package wire implements functions for marshaling and unmarshaling Kafka requests and responses. |