Documentation ¶
Overview ¶
Package sarama provides client libraries for the Kafka 0.8 protocol. The Client, Producer and Consumer objects are the core of the high-level API. The Broker and Request/Response objects permit more precise control.
The Request/Response objects and properties are mostly undocumented, as they line up exactly with the protocol fields documented by Kafka at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
Index ¶
- Constants
- Variables
- type Broker
- func (b *Broker) Addr() string
- func (b *Broker) Close() (err error)
- func (b *Broker) CommitOffset(clientID string, request *OffsetCommitRequest) (*OffsetCommitResponse, error)
- func (b *Broker) Connected() (bool, error)
- func (b *Broker) Fetch(clientID string, request *FetchRequest) (*FetchResponse, error)
- func (b *Broker) FetchOffset(clientID string, request *OffsetFetchRequest) (*OffsetFetchResponse, error)
- func (b *Broker) GetAvailableOffsets(clientID string, request *OffsetRequest) (*OffsetResponse, error)
- func (b *Broker) GetConsumerMetadata(clientID string, request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error)
- func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*MetadataResponse, error)
- func (b *Broker) ID() int32
- func (b *Broker) Open(conf *BrokerConfig) error
- func (b *Broker) Produce(clientID string, request *ProduceRequest) (*ProduceResponse, error)
- type BrokerConfig
- type ByteEncoder
- type Client
- func (client *Client) Close() error
- func (client *Client) Closed() bool
- func (client *Client) GetOffset(topic string, partitionID int32, where OffsetTime) (int64, error)
- func (client *Client) Leader(topic string, partitionID int32) (*Broker, error)
- func (client *Client) Partitions(topic string) ([]int32, error)
- func (client *Client) RefreshAllMetadata() error
- func (client *Client) RefreshTopicMetadata(topics ...string) error
- func (client *Client) Replicas(topic string, partitionID int32) ([]int32, error)
- func (client *Client) ReplicasInSync(topic string, partitionID int32) ([]int32, error)
- func (client *Client) Topics() ([]string, error)
- func (client *Client) WritablePartitions(topic string) ([]int32, error)
- type ClientConfig
- type CompressionCodec
- type ConfigurationError
- type ConstantPartitioner
- type Consumer
- type ConsumerConfig
- type ConsumerError
- type ConsumerErrors
- type ConsumerMessage
- type ConsumerMetadataRequest
- type ConsumerMetadataResponse
- type Encoder
- type FetchRequest
- type FetchResponse
- type FetchResponseBlock
- type HashPartitioner
- type KError
- type Message
- type MessageBlock
- type MessageSet
- type MetadataRequest
- type MetadataResponse
- type OffsetCommitRequest
- type OffsetCommitResponse
- type OffsetFetchRequest
- type OffsetFetchResponse
- type OffsetFetchResponseBlock
- type OffsetMethod
- type OffsetRequest
- type OffsetResponse
- type OffsetResponseBlock
- type OffsetTime
- type PacketDecodingError
- type PacketEncodingError
- type PartitionConsumer
- type PartitionConsumerConfig
- type PartitionMetadata
- type Partitioner
- type PartitionerConstructor
- type ProduceRequest
- type ProduceResponse
- type ProduceResponseBlock
- type Producer
- type ProducerConfig
- type ProducerError
- type ProducerErrors
- type ProducerMessage
- type RandomPartitioner
- type RequiredAcks
- type RoundRobinPartitioner
- type StdLogger
- type StringEncoder
- type SyncProducer
- type TopicMetadata
Examples ¶
Constants ¶
const ReceiveTime int64 = -1
ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the request was received.
Variables ¶
var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated")
ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.
var ErrClosedClient = errors.New("kafka: Tried to use a client that was closed")
ErrClosedClient is the error returned when a method is called on a client that has been closed.
var ErrIncompleteResponse = errors.New("kafka: Response did not contain all the expected topic/partition blocks")
ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does not contain the expected information.
var ErrInsufficientData = errors.New("kafka: Insufficient data to decode packet, more bytes expected")
var ErrInvalidPartition = errors.New("kafka: Partitioner returned an invalid partition index")
ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index (meaning one outside of the range [0...numPartitions-1]).
var ErrMessageTooLarge = errors.New("kafka: Message is larger than MaxFetchSize")
ErrMessageTooLarge is returned when the next message to consume is larger than the configured MaxFetchSize
var ErrNotConnected = errors.New("kafka: broker not connected")
ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
var ErrOutOfBrokers = errors.New("kafka: Client has run out of available brokers to talk to. Is your cluster reachable?")
ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored or otherwise failed to respond.
var ErrShuttingDown = errors.New("kafka: Message received by producer in process of shutting down")
ErrShuttingDown is returned when a producer receives a message during shutdown.
var MaxRequestSize uint32 = 100 * 1024 * 1024
MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt to process.
var MaxResponseSize int32 = 100 * 1024 * 1024
MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If a broker returns a response message larger than this value, Sarama will return a PacketDecodingError. The default of 100 MiB is aligned with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt to process.
var PanicHandler func(interface{})
PanicHandler is called for recovering from panics spawned internally to the library (and thus not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
Example ¶
broker := NewBroker("localhost:9092") err := broker.Open(nil) if err != nil { return err } defer broker.Close() request := MetadataRequest{Topics: []string{"myTopic"}} response, err := broker.GetMetadata("myClient", &request) if err != nil { return err } fmt.Println("There are", len(response.Topics), "topics active in the cluster.") return nil
Output:
func NewBroker ¶
NewBroker creates and returns a Broker targetting the given host:port address. This does not attempt to actually connect, you have to call Open() for that.
func (*Broker) Addr ¶
Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
func (*Broker) CommitOffset ¶
func (b *Broker) CommitOffset(clientID string, request *OffsetCommitRequest) (*OffsetCommitResponse, error)
func (*Broker) Connected ¶
Connected returns true if the broker is connected and false otherwise. If the broker is not connected but it had tried to connect, the error from that connection attempt is also returned.
func (*Broker) Fetch ¶
func (b *Broker) Fetch(clientID string, request *FetchRequest) (*FetchResponse, error)
func (*Broker) FetchOffset ¶
func (b *Broker) FetchOffset(clientID string, request *OffsetFetchRequest) (*OffsetFetchResponse, error)
func (*Broker) GetAvailableOffsets ¶
func (b *Broker) GetAvailableOffsets(clientID string, request *OffsetRequest) (*OffsetResponse, error)
func (*Broker) GetConsumerMetadata ¶
func (b *Broker) GetConsumerMetadata(clientID string, request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error)
func (*Broker) GetMetadata ¶
func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*MetadataResponse, error)
func (*Broker) ID ¶
ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
func (*Broker) Open ¶
func (b *Broker) Open(conf *BrokerConfig) error
Open tries to connect to the Broker if it is not already connected or connecting, but does not block waiting for the connection to complete. This means that any subsequent operations on the broker will block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call, follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or AlreadyConnected. If conf is nil, the result of NewBrokerConfig() is used.
func (*Broker) Produce ¶
func (b *Broker) Produce(clientID string, request *ProduceRequest) (*ProduceResponse, error)
type BrokerConfig ¶
type BrokerConfig struct { MaxOpenRequests int // How many outstanding requests the broker is allowed to have before blocking attempts to send (default 5). // All three of the below configurations are similar to the `socket.timeout.ms` setting in JVM kafka. DialTimeout time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error (default 30s). ReadTimeout time.Duration // How long to wait for a response before timing out and returning an error (default 30s). WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error (default 30s). }
BrokerConfig is used to pass multiple configuration options to Broker.Open.
func NewBrokerConfig ¶
func NewBrokerConfig() *BrokerConfig
NewBrokerConfig returns a new broker configuration with sane defaults.
func (*BrokerConfig) Validate ¶
func (config *BrokerConfig) Validate() error
Validate checks a BrokerConfig instance. This will return a ConfigurationError if the specified values don't make sense.
type ByteEncoder ¶
type ByteEncoder []byte
ByteEncoder implements the Encoder interface for Go byte slices so that you can do things like
producer.SendMessage(nil, sarama.ByteEncoder([]byte{0x00}))
func (ByteEncoder) Encode ¶
func (b ByteEncoder) Encode() ([]byte, error)
func (ByteEncoder) Length ¶
func (b ByteEncoder) Length() int
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a generic Kafka client. It manages connections to one or more Kafka brokers. You MUST call Close() on a client to avoid leaks, it will not be garbage-collected automatically when it passes out of scope. A single client can be safely shared by multiple concurrent Producers and Consumers.
func NewClient ¶
func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
NewClient creates a new Client with the given client ID. It connects to one of the given broker addresses and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot be retrieved from any of the given broker addresses, the client is not created.
func (*Client) Close ¶
Close shuts down all broker connections managed by this client. It is required to call this function before a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers using a client before you close the client.
func (*Client) GetOffset ¶
GetOffset queries the cluster to get the most recent available offset at the given time on the topic/partition combination.
func (*Client) Leader ¶
Leader returns the broker object that is the leader of the current topic/partition, as determined by querying the cluster metadata.
func (*Client) Partitions ¶
Partitions returns the sorted list of all partition IDs for the given topic.
func (*Client) RefreshAllMetadata ¶
RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.
func (*Client) RefreshTopicMetadata ¶
RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the available metadata for those topics.
func (*Client) ReplicasInSync ¶
ReplicasInSync returns the set of all in-sync replica IDs for the given partition. Note: kafka's metadata here is known to be stale in many cases, and should not generally be trusted. This method should be considered effectively deprecated.
type ClientConfig ¶
type ClientConfig struct { MetadataRetries int // How many times to retry a metadata request when a partition is in the middle of leader election. WaitForElection time.Duration // How long to wait for leader election to finish between retries. DefaultBrokerConf *BrokerConfig // Default configuration for broker connections created by this client. BackgroundRefreshFrequency time.Duration // How frequently the client will refresh the cluster metadata in the background. Defaults to 10 minutes. Set to 0 to disable. }
ClientConfig is used to pass multiple configuration options to NewClient.
func NewClientConfig ¶
func NewClientConfig() *ClientConfig
NewClientConfig creates a new ClientConfig instance with sensible defaults
func (*ClientConfig) Validate ¶
func (config *ClientConfig) Validate() error
Validate checks a ClientConfig instance. This will return a ConfigurationError if the specified values don't make sense.
type CompressionCodec ¶
type CompressionCodec int8
CompressionCodec represents the various compression codecs recognized by Kafka in messages.
const ( CompressionNone CompressionCodec = 0 CompressionGZIP CompressionCodec = 1 CompressionSnappy CompressionCodec = 2 )
type ConfigurationError ¶
type ConfigurationError string
ConfigurationError is the type of error returned from NewClient, NewProducer or NewConsumer when the specified configuration is invalid.
func (ConfigurationError) Error ¶
func (err ConfigurationError) Error() string
type ConstantPartitioner ¶
type ConstantPartitioner struct {
Constant int32
}
ConstantPartitioner implements the Partitioner interface by just returning a constant value.
func (*ConstantPartitioner) Partition ¶
func (p *ConstantPartitioner) Partition(key Encoder, numPartitions int32) (int32, error)
func (*ConstantPartitioner) RequiresConsistency ¶
func (p *ConstantPartitioner) RequiresConsistency() bool
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer manages PartitionConsumers which process Kafka messages from brokers.
func NewConsumer ¶
func NewConsumer(client *Client, config *ConsumerConfig) (*Consumer, error)
NewConsumer creates a new consumer attached to the given client.
func (*Consumer) ConsumePartition ¶
func (c *Consumer) ConsumePartition(topic string, partition int32, config *PartitionConsumerConfig) (*PartitionConsumer, error)
ConsumePartition creates a PartitionConsumer on the given topic/partition with the given configuration. It will return an error if this Consumer is already consuming on the given topic/partition.
type ConsumerConfig ¶
type ConsumerConfig struct { // The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available. // The default is 1, as 0 causes the consumer to spin when no messages are available. MinFetchSize int32 // The maximum amount of time the broker will wait for MinFetchSize bytes to become available before it // returns fewer than that anyways. The default is 250ms, since 0 causes the consumer to spin when no events are available. // 100-500ms is a reasonable range for most cases. Kafka only supports precision up to milliseconds; nanoseconds will be truncated. MaxWaitTime time.Duration }
ConsumerConfig is used to pass multiple configuration options to NewConsumer.
func NewConsumerConfig ¶
func NewConsumerConfig() *ConsumerConfig
NewConsumerConfig creates a ConsumerConfig instance with sane defaults.
func (*ConsumerConfig) Validate ¶
func (config *ConsumerConfig) Validate() error
Validate checks a ConsumerConfig instance. It will return a ConfigurationError if the specified value doesn't make sense.
type ConsumerError ¶
ConsumerError is what is provided to the user when an error occurs. It wraps an error and includes the topic and partition.
func (ConsumerError) Error ¶
func (ce ConsumerError) Error() string
type ConsumerErrors ¶
type ConsumerErrors []*ConsumerError
ConsumerErrors is a type that wraps a batch of errors and implements the Error interface. It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors when stopping.
func (ConsumerErrors) Error ¶
func (ce ConsumerErrors) Error() string
type ConsumerMessage ¶
ConsumerMessage encapsulates a Kafka message returned by the consumer.
type ConsumerMetadataRequest ¶
type ConsumerMetadataRequest struct {
ConsumerGroup string
}
type Encoder ¶
Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Kafka message. Length() is provided as an optimization, and must return the same as len() on the result of Encode().
type FetchRequest ¶
type FetchResponse ¶
type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
}
func (*FetchResponse) AddError ¶
func (fr *FetchResponse) AddError(topic string, partition int32, err KError)
func (*FetchResponse) AddMessage ¶
func (fr *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64)
func (*FetchResponse) GetBlock ¶
func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock
type FetchResponseBlock ¶
type FetchResponseBlock struct { Err KError HighWaterMarkOffset int64 MsgSet MessageSet }
type HashPartitioner ¶
type HashPartitioner struct {
// contains filtered or unexported fields
}
HashPartitioner implements the Partitioner interface. If the key is nil, or fails to encode, then a random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes is used modulus the number of partitions. This ensures that messages with the same key always end up on the same partition.
func (*HashPartitioner) Partition ¶
func (p *HashPartitioner) Partition(key Encoder, numPartitions int32) (int32, error)
func (*HashPartitioner) RequiresConsistency ¶
func (p *HashPartitioner) RequiresConsistency() bool
type KError ¶
type KError int16
KError is the type of error that can be returned directly by the Kafka broker. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
const ( ErrNoError KError = 0 ErrUnknown KError = -1 ErrOffsetOutOfRange KError = 1 ErrInvalidMessage KError = 2 ErrUnknownTopicOrPartition KError = 3 ErrInvalidMessageSize KError = 4 ErrLeaderNotAvailable KError = 5 ErrNotLeaderForPartition KError = 6 ErrRequestTimedOut KError = 7 ErrBrokerNotAvailable KError = 8 ErrReplicaNotAvailable KError = 9 ErrMessageSizeTooLarge KError = 10 ErrStaleControllerEpochCode KError = 11 ErrOffsetMetadataTooLarge KError = 12 ErrOffsetsLoadInProgress KError = 14 ErrConsumerCoordinatorNotAvailable KError = 15 ErrNotCoordinatorForConsumer KError = 16 )
Numeric error codes returned by the Kafka server.
type Message ¶
type Message struct { Codec CompressionCodec // codec used to compress the message contents Key []byte // the message key, may be nil Value []byte // the message contents Set *MessageSet // the message set a message might wrap // contains filtered or unexported fields }
type MessageBlock ¶
func (*MessageBlock) Messages ¶
func (msb *MessageBlock) Messages() []*MessageBlock
Messages convenience helper which returns either all the messages that are wrapped in this block
type MessageSet ¶
type MessageSet struct { PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock Messages []*MessageBlock }
type MetadataRequest ¶
type MetadataRequest struct {
Topics []string
}
type MetadataResponse ¶
type MetadataResponse struct { Brokers []*Broker Topics []*TopicMetadata }
func (*MetadataResponse) AddBroker ¶
func (m *MetadataResponse) AddBroker(addr string, id int32)
func (*MetadataResponse) AddTopicPartition ¶
func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError)
type OffsetCommitRequest ¶
type OffsetCommitRequest struct { ConsumerGroup string // contains filtered or unexported fields }
type OffsetCommitResponse ¶
type OffsetFetchRequest ¶
type OffsetFetchRequest struct { ConsumerGroup string // contains filtered or unexported fields }
func (*OffsetFetchRequest) AddPartition ¶
func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32)
type OffsetFetchResponse ¶
type OffsetFetchResponse struct {
Blocks map[string]map[int32]*OffsetFetchResponseBlock
}
type OffsetMethod ¶
type OffsetMethod int
OffsetMethod is passed in ConsumerConfig to tell the consumer how to determine the starting offset.
const ( // OffsetMethodNewest causes the consumer to start at the most recent available offset, as // determined by querying the broker. OffsetMethodNewest OffsetMethod = iota // OffsetMethodOldest causes the consumer to start at the oldest available offset, as // determined by querying the broker. OffsetMethodOldest // OffsetMethodManual causes the consumer to interpret the OffsetValue in the ConsumerConfig as the // offset at which to start, allowing the user to manually specify their desired starting offset. OffsetMethodManual )
type OffsetRequest ¶
type OffsetRequest struct {
// contains filtered or unexported fields
}
func (*OffsetRequest) AddBlock ¶
func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time OffsetTime, maxOffsets int32)
type OffsetResponse ¶
type OffsetResponse struct {
Blocks map[string]map[int32]*OffsetResponseBlock
}
func (*OffsetResponse) AddTopicPartition ¶
func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64)
func (*OffsetResponse) GetBlock ¶
func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock
type OffsetResponseBlock ¶
type OffsetTime ¶
type OffsetTime int64
OffsetTime is used in Offset Requests to ask for all messages before a certain time. Any positive int64 value will be interpreted as milliseconds, or use the special constants defined here.
const ( // LatestOffsets askes for the latest offsets. LatestOffsets OffsetTime = -1 // EarliestOffset askes for the earliest available offset. Note that because offsets are pulled in descending order, // asking for the earliest offset will always return you a single element. EarliestOffset OffsetTime = -2 )
type PacketDecodingError ¶
type PacketDecodingError struct {
Info string
}
PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.
func (PacketDecodingError) Error ¶
func (err PacketDecodingError) Error() string
type PacketEncodingError ¶
type PacketEncodingError struct {
Info string
}
PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
func (PacketEncodingError) Error ¶
func (err PacketEncodingError) Error() string
type PartitionConsumer ¶
type PartitionConsumer struct {
// contains filtered or unexported fields
}
PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of scope (this is in addition to calling Close on the underlying consumer's client, which is still necessary). You have to read from both the Messages and Errors channels to prevent the consumer from locking eventually.
func (*PartitionConsumer) AsyncClose ¶
func (child *PartitionConsumer) AsyncClose()
AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you should wait until the 'messages' and 'errors' channel are drained. It is required to call this function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.
func (*PartitionConsumer) Close ¶
func (child *PartitionConsumer) Close() error
Close stops the PartitionConsumer from fetching messages. It is required to call this function, or AsyncCose before a consumer object passes out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.
func (*PartitionConsumer) Errors ¶
func (child *PartitionConsumer) Errors() <-chan *ConsumerError
Errors returns the read channel for any errors that occurred while consuming the partition. You have to read this channel to prevent the consumer from deadlock. Under no circumstances, the partition consumer will shut down by itself. It will just wait until it is able to continue consuming messages. If you want to shut down your consumer, you will have trigger it yourself by consuming this channel and calling Close or AsyncClose when appropriate.
func (*PartitionConsumer) Messages ¶
func (child *PartitionConsumer) Messages() <-chan *ConsumerMessage
Messages returns the read channel for the messages that are returned by the broker
type PartitionConsumerConfig ¶
type PartitionConsumerConfig struct { // The default (maximum) amount of data to fetch from the broker in each request. The default is 32768 bytes. DefaultFetchSize int32 // The maximum permittable message size - messages larger than this will return ErrMessageTooLarge. The default of 0 is // treated as no limit. MaxMessageSize int32 // The method used to determine at which offset to begin consuming messages. The default is to start at the most recent message. OffsetMethod OffsetMethod // Interpreted differently according to the value of OffsetMethod. OffsetValue int64 // The number of events to buffer in the Messages and Errors channel. Having this non-zero permits the // consumer to continue fetching messages in the background while client code consumes events, // greatly improving throughput. The default is 64. ChannelBufferSize int }
PartitionConsumerConfig is used to pass multiple configuration options to AddPartition
func NewPartitionConsumerConfig ¶
func NewPartitionConsumerConfig() *PartitionConsumerConfig
NewPartitionConsumerConfig creates a PartitionConsumerConfig with sane defaults.
func (*PartitionConsumerConfig) Validate ¶
func (config *PartitionConsumerConfig) Validate() error
Validate checks a PartitionConsumerConfig instance. It will return a ConfigurationError if the specified value doesn't make sense.
type PartitionMetadata ¶
type Partitioner ¶
type Partitioner interface { Partition(key Encoder, numPartitions int32) (int32, error) // Partition takes the key and partition count and chooses a partition // RequiresConsistency indicates to the user of the partitioner whether the mapping of key->partition is consistent or not. // Specifically, if a partitioner requires consistency then it must be allowed to choose from all partitions (even ones known to // be unavailable), and its choice must be respected by the caller. The obvious example is the HashPartitioner. RequiresConsistency() bool }
Partitioner is anything that, given a Kafka message key and a number of partitions indexed [0...numPartitions-1], decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided as simple default implementations.
func NewHashPartitioner ¶
func NewHashPartitioner() Partitioner
func NewRandomPartitioner ¶
func NewRandomPartitioner() Partitioner
func NewRoundRobinPartitioner ¶
func NewRoundRobinPartitioner() Partitioner
type PartitionerConstructor ¶
type PartitionerConstructor func() Partitioner
PartitionerConstructor is the type for a function capable of constructing new Partitioners.
type ProduceRequest ¶
type ProduceRequest struct { RequiredAcks RequiredAcks Timeout int32 // contains filtered or unexported fields }
func (*ProduceRequest) AddMessage ¶
func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message)
func (*ProduceRequest) AddSet ¶
func (p *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet)
type ProduceResponse ¶
type ProduceResponse struct {
Blocks map[string]map[int32]*ProduceResponseBlock
}
func (*ProduceResponse) AddTopicPartition ¶
func (pr *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError)
func (*ProduceResponse) GetBlock ¶
func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock
type ProduceResponseBlock ¶
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer publishes Kafka messages. It routes messages to the correct broker for the provided topic-partition, refreshing metadata as appropriate, and parses responses for errors. You must read from the Errors() channel or the producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid leaks: it will not be garbage-collected automatically when it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
Example ¶
client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig()) if err != nil { panic(err) } else { fmt.Println("> connected") } defer client.Close() producer, err := NewProducer(client, nil) if err != nil { panic(err) } defer producer.Close() for { select { case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}: fmt.Println("> message queued") case err := <-producer.Errors(): panic(err.Err) } }
Output:
func NewProducer ¶
func NewProducer(client *Client, config *ProducerConfig) (*Producer, error)
NewProducer creates a new Producer using the given client.
func (*Producer) AsyncClose ¶
func (p *Producer) AsyncClose()
AsyncClose triggers a shutdown of the producer, flushing any messages it may have buffered. The shutdown has completed when both the Errors and Successes channels have been closed. When calling AsyncClose, you *must* continue to read from those channels in order to drain the results of any messages in flight.
func (*Producer) Close ¶
Close shuts down the producer and flushes any messages it may have buffered. You must call this function before a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close on the underlying client.
func (*Producer) Errors ¶
func (p *Producer) Errors() <-chan *ProducerError
Errors is the error output channel back to the user. You MUST read from this channel or the Producer will deadlock. It is suggested that you send messages and read errors together in a single select statement.
func (*Producer) Input ¶
func (p *Producer) Input() chan<- *ProducerMessage
Input is the input channel for the user to write messages to that they wish to send.
func (*Producer) Successes ¶
func (p *Producer) Successes() <-chan *ProducerMessage
Successes is the success output channel back to the user when AckSuccesses is configured. If AckSuccesses is true, you MUST read from this channel or the Producer will deadlock. It is suggested that you send and read messages together in a single select statement.
type ProducerConfig ¶
type ProducerConfig struct { Partitioner PartitionerConstructor // Generates partitioners for choosing the partition to send messages to (defaults to hash). Similar to the `partitioner.class` setting for the JVM producer. RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal). Equivalent to the `request.required.acks` setting of the JVM producer. Timeout time.Duration // The maximum duration the broker will wait the receipt of the number of RequiredAcks (defaults to 10 seconds). This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution, nanoseconds will be truncated. Equivalent to the JVM producer's `request.timeout.ms` setting. Compression CompressionCodec // The type of compression to use on messages (defaults to no compression). Similar to `compression.codec` setting of the JVM producer. FlushMsgCount int // The number of messages needed to trigger a flush. This is a best effort; the number of messages may be more or less. Use `MaxMessagesPerReq` to set a hard upper limit. FlushFrequency time.Duration // If this amount of time elapses without a flush, one will be queued. The frequency is a best effort, and the actual frequency can be more or less. Equivalent to `queue.buffering.max.ms` setting of JVM producer. FlushByteCount int // If this many bytes of messages are accumulated, a flush will be triggered. This is a best effort; the number of bytes may be more or less. Use the gloabl `sarama.MaxRequestSize` to set a hard upper limit. AckSuccesses bool // If enabled, successfully delivered messages will be returned on the Successes channel. MaxMessageBytes int // The maximum permitted size of a message (defaults to 1000000). Equivalent to the broker's `message.max.bytes`. MaxMessagesPerReq int // The maximum number of messages the producer will send in a single broker request. Defaults to 0 for unlimited. The global setting MaxRequestSize still applies. Similar to `queue.buffering.max.messages` in the JVM producer. ChannelBufferSize int // The size of the buffers of the channels between the different goroutines (defaults to 256). RetryBackoff time.Duration // The amount of time to wait for the cluster to elect a new leader before processing retries (defaults to 100ms). Similar to the retry.backoff.ms setting of the JVM producer. MaxRetries int // The total number of times to retry sending a message (defaults to 3). Similar to the message.send.max.retries setting of the JVM producer. }
ProducerConfig is used to pass multiple configuration options to NewProducer.
Some of these configuration settings match settings with the JVM producer, but some of these are implementation specific and have no equivalent in the JVM producer.
func NewProducerConfig ¶
func NewProducerConfig() *ProducerConfig
NewProducerConfig creates a new ProducerConfig instance with sensible defaults.
func (*ProducerConfig) Validate ¶
func (config *ProducerConfig) Validate() error
Validate checks a ProducerConfig instance. It will return a ConfigurationError if the specified value doesn't make sense.
type ProducerError ¶
type ProducerError struct { Msg *ProducerMessage Err error }
ProducerError is the type of error generated when the producer fails to deliver a message. It contains the original ProducerMessage as well as the actual error value.
func (ProducerError) Error ¶
func (pe ProducerError) Error() string
type ProducerErrors ¶
type ProducerErrors []*ProducerError
ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface. It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel when closing a producer.
func (ProducerErrors) Error ¶
func (pe ProducerErrors) Error() string
type ProducerMessage ¶
type ProducerMessage struct { Topic string // The Kafka topic for this message. Key Encoder // The partitioning key for this message. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder. Value Encoder // The actual message to store in Kafka. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder. Metadata interface{} // This field is used to hold arbitrary data you wish to include so it will be available when receiving on the Successes and Errors channels. Sarama completely ignores this field and is only to be used for pass-through data. // contains filtered or unexported fields }
ProducerMessage is the collection of elements passed to the Producer in order to send a message.
func (*ProducerMessage) Offset ¶
func (m *ProducerMessage) Offset() int64
Offset is the offset of the message stored on the broker. This is only guaranteed to be defined if the message was successfully delivered and RequiredAcks is not NoResponse.
func (*ProducerMessage) Partition ¶
func (m *ProducerMessage) Partition() int32
Partition is the partition that the message was sent to. This is only guaranteed to be defined if the message was successfully delivered.
type RandomPartitioner ¶
type RandomPartitioner struct {
// contains filtered or unexported fields
}
RandomPartitioner implements the Partitioner interface by choosing a random partition each time.
func (*RandomPartitioner) Partition ¶
func (p *RandomPartitioner) Partition(key Encoder, numPartitions int32) (int32, error)
func (*RandomPartitioner) RequiresConsistency ¶
func (p *RandomPartitioner) RequiresConsistency() bool
type RequiredAcks ¶
type RequiredAcks int16
RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid. On broker versions prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced by setting the `min.isr` value in the brokers configuration).
const ( // NoResponse doesn't send any response, the TCP ACK is all you get. NoResponse RequiredAcks = 0 // WaitForLocal waits for only the local commit to succeed before responding. WaitForLocal RequiredAcks = 1 // WaitForAll waits for all replicas to commit before responding. WaitForAll RequiredAcks = -1 )
type RoundRobinPartitioner ¶
type RoundRobinPartitioner struct {
// contains filtered or unexported fields
}
RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.
func (*RoundRobinPartitioner) Partition ¶
func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) (int32, error)
func (*RoundRobinPartitioner) RequiresConsistency ¶
func (p *RoundRobinPartitioner) RequiresConsistency() bool
type StdLogger ¶
type StdLogger interface { Print(v ...interface{}) Printf(format string, v ...interface{}) Println(v ...interface{}) }
StdLogger is used to log error messages.
type StringEncoder ¶
type StringEncoder string
StringEncoder implements the Encoder interface for Go strings so that you can do things like
producer.SendMessage(nil, sarama.StringEncoder("hello world"))
func (StringEncoder) Encode ¶
func (s StringEncoder) Encode() ([]byte, error)
func (StringEncoder) Length ¶
func (s StringEncoder) Length() int
type SyncProducer ¶
type SyncProducer struct {
// contains filtered or unexported fields
}
SyncProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
Example ¶
client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig()) if err != nil { panic(err) } else { fmt.Println("> connected") } defer client.Close() producer, err := NewSyncProducer(client, nil) if err != nil { panic(err) } defer producer.Close() for { partition, offset, err := producer.SendMessage("my_topic", nil, StringEncoder("testing 123")) if err != nil { panic(err) } else { fmt.Printf("> message sent to partition %d at offset %d\n", partition, offset) } }
Output:
func NewSyncProducer ¶
func NewSyncProducer(client *Client, config *ProducerConfig) (*SyncProducer, error)
NewSyncProducer creates a new SyncProducer using the given client and configuration.
func (*SyncProducer) Close ¶
func (sp *SyncProducer) Close() error
Close shuts down the producer and flushes any messages it may have buffered. You must call this function before a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close on the underlying client.
func (*SyncProducer) SendMessage ¶
func (sp *SyncProducer) SendMessage(topic string, key, value Encoder) (partition int32, offset int64, err error)
SendMessage produces a message to the given topic with the given key and value. To send strings as either key or value, see the StringEncoder type. It returns the partition and offset of the successfully-produced message, or the error (if any).
type TopicMetadata ¶
type TopicMetadata struct { Err KError Name string Partitions []*PartitionMetadata }
Source Files ¶
- broker.go
- client.go
- consumer.go
- consumer_metadata_request.go
- consumer_metadata_response.go
- crc32_field.go
- encoder_decoder.go
- errors.go
- fetch_request.go
- fetch_response.go
- length_field.go
- message.go
- message_set.go
- metadata_request.go
- metadata_response.go
- mockbroker.go
- offset_commit_request.go
- offset_commit_response.go
- offset_fetch_request.go
- offset_fetch_response.go
- offset_request.go
- offset_response.go
- packet_decoder.go
- packet_encoder.go
- partitioner.go
- prep_encoder.go
- produce_request.go
- produce_response.go
- producer.go
- real_decoder.go
- real_encoder.go
- request.go
- response_header.go
- sarama.go
- snappy.go
- sync_producer.go
- utils.go