Documentation ¶
Index ¶
Constants ¶
View Source
const DefaultTopic = "my-topic"
Variables ¶
This section is empty.
Functions ¶
Types ¶
type AsyncProducer ¶
type AsyncProducer struct {
// contains filtered or unexported fields
}
func (*AsyncProducer) Close ¶
func (this *AsyncProducer) Close() error
func (*AsyncProducer) GetHeader ¶
func (this *AsyncProducer) GetHeader(event KafkaEvent) ([]byte, error)
func (*AsyncProducer) GetSuccesses ¶
func (this *AsyncProducer) GetSuccesses() <-chan *sarama.ProducerMessage
func (*AsyncProducer) Publish ¶
func (this *AsyncProducer) Publish(event KafkaEvent) error
func (*AsyncProducer) Start ¶
func (this *AsyncProducer) Start()
type FixtureRecord ¶
func NewFixtureRecord ¶
func NewFixtureRecord() *FixtureRecord
func (*FixtureRecord) Schema ¶
func (r *FixtureRecord) Schema() string
func (*FixtureRecord) ToAvroSerialization ¶
func (r *FixtureRecord) ToAvroSerialization() ([]byte, error)
func (*FixtureRecord) Topic ¶
func (r *FixtureRecord) Topic() string
type FixtureSchemaRegistry ¶
type FixtureSchemaRegistry struct {
*schema_registry.SchemaRegistry
}
func (*FixtureSchemaRegistry) RegisterOrGetSchemaId ¶
func (this *FixtureSchemaRegistry) RegisterOrGetSchemaId(event KafkaEvent) (int, error)
type KafkaEvent ¶
type Producer ¶
type Producer interface { Publish(KafkaEvent) error Start() Close() error GetSuccesses() <-chan *sarama.ProducerMessage }
func NewProducer ¶
func NewProducer(kafkaAddr string, config *sarama.Config, schemaRegistry *schema_registry.SchemaRegistry) (Producer, error)
Config example :
config.Producer.Return.Successes = true config.Producer.MaxMessageBytes = 20 * 1024 * 1024 // 20mb config.Producer.Flush.Frequency = 500 * time.Millisecond config.Version = sarama.V0_10_0_0 // This version is the same as in production
Click to show internal directories.
Click to hide internal directories.