Documentation ¶
Index ¶
- Constants
- func Client() sarama.Client
- func Consume(topic string, partition int, f func(m *Message)) error
- func Produce(topic string, msgs ...Message) error
- func ProduceAsync(topic string, msg Message) error
- func RegisterGroupConsumer(ctx context.Context, h Handler, groupID string, topics ...string)
- type Handler
- type Kafka
- type Message
- type Option
- type Options
- type RequiredAcks
Constants ¶
View Source
const ( // OffsetNewest stands for the log head offset, i.e. the offset that will be // assigned to the next message that will be produced to the partition. You // can send this to a client's GetOffset method to get this offset, or when // calling ConsumePartition to start consuming new messages. OffsetNewest int64 = -1 // OffsetOldest stands for the oldest offset available on the broker for a // partition. You can send this to a client's GetOffset method to get this // offset, or when calling ConsumePartition to start consuming from the // oldest offset that is still available on the broker. OffsetOldest int64 = -2 )
Variables ¶
This section is empty.
Functions ¶
func ProduceAsync ¶
Types ¶
type Kafka ¶
type Kafka struct {
// contains filtered or unexported fields
}
type Message ¶
type Message struct { Topic string Key []byte Value []byte Offset int64 Partition int32 Timestamp time.Time }
Message represents the kafka message
func NewProduceMessage ¶
type Option ¶
type Option func(*Options)
func WithAcks ¶
func WithAcks(ack RequiredAcks) Option
func WithAutoCommit ¶
func WithAutoCommit() Option
func WithOffsetInitial ¶
type Options ¶
type Options struct { Brokers string `json:"brokers"` SASL sasl `json:"sasl"` Acks RequiredAcks `json:"acks"` AutoCommit bool `json:"auto_commit"` OffsetInitial int64 `json:"offset_initial"` }
type RequiredAcks ¶
type RequiredAcks int16
const ( // NoResponse doesn't send any response, the TCP ACK is all you get. NoResponse RequiredAcks = 0 // WaitForLocal waits for only the local commit to succeed before responding. WaitForLocal RequiredAcks = 1 // WaitForAll waits for all in-sync replicas to commit before responding. // The minimum number of in-sync replicas is configured on the broker via // the `min.insync.replicas` configuration key. WaitForAll RequiredAcks = -1 )
Click to show internal directories.
Click to hide internal directories.