Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type App ¶ added in v0.12.0
type App struct { // TCP address that gRPC API server should listen on. GRPCAddr string `yaml:"grpc_addr"` // TCP address that HTTP API server should listen on. TCPAddr string `yaml:"tcp_addr"` // Unix domain socket address that HTTP API server should listen on. // Listening on a unix domain socket is disabled by default. UnixAddr string `yaml:"unix_addr"` // An arbitrary number of proxies to different Kafka/ZooKeeper clusters can // be configured. Each proxy configuration is identified by a cluster name. Proxies map[string]*Proxy `yaml:"proxies"` // Default cluster is the one to be used in API calls that do not start with // prefix `/clusters/<cluster>`. If it is not explicitly provided, then the // one mentioned in the `Proxies` section first is assumed. DefaultCluster string `yaml:"default_cluster"` // TLS is the application TLS configuration TLS `yaml:"tls"` // Logging config Logging []LoggerCfg }
App defines Kafka-Pixy application configuration. It mirrors the structure of the JSON configuration file.
func DefaultApp ¶ added in v0.12.0
DefaultApp returns default application configuration where default proxy has the specified cluster.
func FromYAML ¶ added in v0.12.0
FromYAML parses configuration from a YAML string and performs basic validation of parameters.
func FromYAMLFile ¶ added in v0.12.0
FromYAMLFile parses configuration from a YAML file and performs basic validation of parameters.
func (*App) GRPCSecurityOpts ¶ added in v0.17.0
func (a *App) GRPCSecurityOpts() ([]grpc.ServerOption, error)
GRPCSecurityOpts returns an array (possibly empty) with gRPC security configuration if properly configured
type Compression ¶ added in v0.14.0
type Compression sarama.CompressionCodec
func (*Compression) UnmarshalText ¶ added in v0.14.0
func (c *Compression) UnmarshalText(text []byte) error
type KafkaVersion ¶ added in v0.14.0
type KafkaVersion struct {
// contains filtered or unexported fields
}
func (*KafkaVersion) IsAtLeast ¶ added in v0.14.0
func (kv *KafkaVersion) IsAtLeast(v sarama.KafkaVersion) bool
func (*KafkaVersion) Set ¶ added in v0.14.0
func (kv *KafkaVersion) Set(v sarama.KafkaVersion)
func (*KafkaVersion) UnmarshalText ¶ added in v0.14.0
func (kv *KafkaVersion) UnmarshalText(text []byte) error
type LoggerCfg ¶ added in v0.18.0
type LoggerCfg struct { // Name defines a logger to be used. It can be one of: console, syslog, or // udplog. Name string `json:"name"` // Severity indicates the minimum severity a logger will be logging messages at. Severity string `json:"severity"` // Logger parameters Params map[string]string `json:"params"` }
LoggerCfg represents a configuration of an individual logger.
type PartitionerConstructor ¶ added in v0.16.0
type PartitionerConstructor string
func (PartitionerConstructor) ToPartitionerConstructor ¶ added in v0.16.0
func (pc PartitionerConstructor) ToPartitionerConstructor() (sarama.PartitionerConstructor, error)
type Proxy ¶ added in v0.12.0
type Proxy struct { // Unique ID that identifies a Kafka-Pixy instance in both ZooKeeper and // Kafka. It is automatically generated by default and it is recommended to // leave it like that. ClientID string `yaml:"client_id"` Kafka struct { // List of seed Kafka peers that Kafka-Pixy should access to resolve // the Kafka cluster topology. SeedPeers []string `yaml:"seed_peers"` // Version of the Kafka cluster. Supported versions are 0.10.2.1 - 2.0.0 Version KafkaVersion // Optionally use TLS when connecting to Kafka. This must be enabled // for following options to be used. TLSEnabled bool `yaml:"tls"` // The path to the CA certificate (PEM) CACertFile string `yaml:"ca_certificate_file"` // The path to the Client Certificate (PEM) ClientCertFile string `yaml:"client_certificate_file"` // The path to the Client Key (PEM) ClientCertKeyFile string `yaml:"client_key_file"` // From the tls package: // InsecureSkipVerify controls whether a client verifies the // server's certificate chain and host name. // If InsecureSkipVerify is true, TLS accepts any certificate // presented by the server and any host name in that certificate. // In this mode, TLS is susceptible to man-in-the-middle attacks. // This should be used only for testing. InsecureSkipVerify bool `yaml:"insecure"` } `yaml:"kafka"` ZooKeeper struct { // List of seed ZooKeeper peers that Kafka-Pixy should access to // resolve the ZooKeeper cluster topology. SeedPeers []string `yaml:"seed_peers"` // A root directory in ZooKeeper to store consumers data. Chroot string `yaml:"chroot"` // ZooKeeper session timeout has to be a minimum of 2 times the // tickTime (as set in the server configuration) and a maximum of 20 // times the tickTime. The default ZooKeeper tickTime is 2 seconds. // // See http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkSessions SessionTimeout time.Duration `yaml:"session_timeout"` } `yaml:"zoo_keeper"` // Networking timeouts. These all pass through to sarama's `config.Net` // field. Net struct { // How long to wait for the initial connection. DialTimeout time.Duration `yaml:"dial_timeout"` // How long to wait for a response. ReadTimeout time.Duration `yaml:"read_timeout"` // How long to wait for a transmit. WriteTimeout time.Duration `yaml:"write_timeout"` // SASL support for SASL PLAIN SASL struct { // Whether or not to use SASL authentication when connecting to the broker (defaults to false). Enable bool `yaml:"enable"` // Whether or not to send the Kafka SASL handshake first if enabled // (defaults to true). You should only set this to false if you're using // a non-Kafka SASL proxy. Handshake bool `yaml:"handshake"` // User is the authentication identity (authcid) to present for // SASL/PLAIN User string `yaml:"user"` // Password for SASL/PLAIN authentication Password string `yaml:"password"` } `yaml:"sasl"` } `yaml:"net"` Producer struct { // Size of all buffered channels created by the producer module. ChannelBufferSize int `yaml:"channel_buffer_size"` // Size of maximum message in bytes MaxMessageBytes int `yaml:"max_message_bytes"` // The type of compression to use on messages. Compression Compression `yaml:"compression"` // The best-effort number of bytes needed to trigger a flush. FlushBytes int `yaml:"flush_bytes"` // The best-effort frequency of flushes. FlushFrequency time.Duration `yaml:"flush_frequency"` // How long to wait for the cluster to settle between retries. RetryBackoff time.Duration `yaml:"retry_backoff"` // The total number of times to retry sending a message. RetryMax int `yaml:"retry_max"` // The level of acknowledgement reliability needed from the broker. RequiredAcks RequiredAcks `yaml:"required_acks"` // Period of time that Kafka-Pixy should keep trying to submit buffered // messages to Kafka. It is recommended to make it large enough to survive // a ZooKeeper leader election in your setup. ShutdownTimeout time.Duration `yaml:"shutdown_timeout"` // How to assign incoming messages to a Kafka partition. Defaults to // using a hash of the specified message key, or random if the key is // unspecified. Partitioner PartitionerConstructor `yaml:"partitioner"` // The timeout to specify on individual produce requests to the broker. // The broker will wait for replication to complete up to this duration // before returning an error. Timeout time.Duration `yaml:"timeout"` } `yaml:"producer"` Consumer struct { // If set, Kafka-Pixy will not configure a consumer, and any attempts to // call the consumer APIs will return an error. Disabled bool `yaml:"disabled"` // Period of time that Kafka-Pixy should wait for an acknowledgement // before retrying. AckTimeout time.Duration `yaml:"ack_timeout"` // Size of all buffered channels created by the consumer module. ChannelBufferSize int `yaml:"channel_buffer_size"` // The number of bytes of messages to attempt to fetch for each // topic-partition in each fetch request. These bytes will be read into // memory for each partition, so this helps control the memory used by // the consumer. The fetch request size must be at least as large as // the maximum message size the server allows or else it is possible // for the producer to send messages larger than the consumer can fetch. FetchMaxBytes int `yaml:"fetch_max_bytes"` // The maximum amount of time the server will block before answering // the fetch request if there isn't data immediately available. FetchMaxWait time.Duration `yaml:"fetch_max_wait"` // Consume request will wait at most this long for a message from a // topic to become available before expiring. LongPollingTimeout time.Duration `yaml:"long_polling_timeout"` // The maximum number of unacknowledged messages allowed for a // particular group-topic-partition at a time. When this number is // reached subsequent consume requests will return long polling timeout // errors, until some of the pending messages are acknowledged. MaxPendingMessages int `yaml:"max_pending_messages"` // The maximum number of retries Kafka-Pixy will make to offer an // unack message. Messages that exceeded the number of retries are // discarded by Kafka-Pixy and acknowledged in Kafka. Zero retries // means that messages will be offered just once. // // If you want Kafka-Pixy to retry indefinitely, then set this // parameter to -1. MaxRetries int `yaml:"max_retries"` // How frequently to commit offsets to Kafka. OffsetsCommitInterval time.Duration `yaml:"offsets_commit_interval"` // Kafka-Pixy should wait this long after it gets notification that a // consumer joined/left a consumer group it is a member of before // rebalancing. RebalanceDelay time.Duration `yaml:"rebalance_delay"` // If a request to a Kafka-Pixy fails for any reason, then it should // wait this long before retrying. RetryBackoff time.Duration `yaml:"retry_backoff"` // Period of time that Kafka-Pixy should keep subscription to // a topic by a group in absence of requests from the consumer group. SubscriptionTimeout time.Duration `yaml:"subscription_timeout"` } `yaml:"consumer"` }
Proxy defines configuration of a proxy to a particular Kafka/ZooKeeper cluster.
func DefaultProxy ¶ added in v0.12.0
func DefaultProxy() *Proxy
DefaultProxy returns configuration used by default.
func (*Proxy) SaramaClientCfg ¶ added in v0.14.0
func (*Proxy) SaramaProducerCfg ¶ added in v0.14.0
SaramaProducerCfg returns a config for sarama producer.
type RequiredAcks ¶ added in v0.14.0
type RequiredAcks sarama.RequiredAcks
func (*RequiredAcks) UnmarshalText ¶ added in v0.14.0
func (ra *RequiredAcks) UnmarshalText(text []byte) error