Documentation ¶
Index ¶
Constants ¶
View Source
const ( // KafkaDefaultBatchMessageCount is the Kafka default value for batch size. KafkaDefaultBatchMessageCount = 100 // KafkaDefaultBatchMaxBytes is the Kafka default value for batch size. KafkaDefaultBatchMaxBytes = 1000012 - (100 * 1024) // KafkaMinBatchInterval is a reasonable minimum for batch timeout. KafkaMinBatchInterval = 250 * time.Millisecond )
View Source
const ( // RequireAllReplicas means that ALL nodes in the replica-set must to confirm the write for a write // to be considered durable. RequireAllReplicas = -1 // DefaultWriterWriteTimeout is how much to wait for a write to go through. DefaultWriterWriteTimeout = 30 * time.Second // DefaultWriterReadTimeout is how much to wait for reads. DefaultWriterReadTimeout = 30 * time.Second // DefaultMaxRetryAttempts is how many times to retry a failed operation. DefaultMaxRetryAttempts = 3 // DefaultMetadataTTL is the frequency of metadata refreshes. DefaultMetadataTTL = 5 * time.Second // DefaultIdleTimeout is the period during which an idle connection can be resued. DefaultIdleTimeout = 30 * time.Second )
Variables ¶
View Source
var ErrInvalidMessage = errors.New("kafka: message is invalid")
Functions ¶
Types ¶
type Producer ¶
type Producer struct { Settings *Settings Writer Writer Logger log.Logger // contains filtered or unexported fields }
func NewProducer ¶
func NewProducer(ctx context.Context, conf cfg.Config, logger log.Logger, name string) (*Producer, error)
NewProducer returns a topic producer.
type Settings ¶
type Settings struct { ConnectionName string `cfg:"connection" validate:"required"` Topic string `cfg:"topic" validate:"required"` // FQTopic is the fully-qualified topic name (with prefixes). FQTopic string BatchSize int `cfg:"batch_size"` BatchTimeout time.Duration `cfg:"idle_timeout"` AsyncWrites bool `cfg:"async_writes"` // contains filtered or unexported fields }
func (*Settings) Connection ¶
func (s *Settings) Connection() *connection.Settings
func (*Settings) WithConnection ¶
func (s *Settings) WithConnection(conn *connection.Settings) *Settings
type WriterOption ¶
type WriterOption func(*kafka.WriterConfig)
Click to show internal directories.
Click to hide internal directories.