Versions in this module Expand all Collapse all v0 v0.0.6 Feb 12, 2024 Changes in this version + var NoOpHandler = &noOpHandler + func NewProducer(ctx context.Context, conf Config, logger *log.Logger) (*kafkaProducer, error) + type AvroEncoder struct + Content []byte + SchemaID int + func (a *AvroEncoder) Encode() ([]byte, error) + func (a *AvroEncoder) Length() int + type CachedSchemaRegistryClient struct + SchemaRegistryClient *SchemaRegistryClient + func NewCachedSchemaRegistryClient(connect []string) *CachedSchemaRegistryClient + func NewCachedSchemaRegistryClientWithRetries(connect []string, retries int) *CachedSchemaRegistryClient + func (client *CachedSchemaRegistryClient) CreateSubject(subject string, codec *goavro.Codec) (int, error) + func (client *CachedSchemaRegistryClient) DeleteSubject(subject string) error + func (client *CachedSchemaRegistryClient) DeleteVersion(subject string, version int) error + func (client *CachedSchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error) + func (client *CachedSchemaRegistryClient) GetSchema(id int) (*goavro.Codec, error) + func (client *CachedSchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*goavro.Codec, error) + func (client *CachedSchemaRegistryClient) GetSubjects() ([]string, error) + func (client *CachedSchemaRegistryClient) GetVersions(subject string) ([]int, error) + func (client *CachedSchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (int, error) + type Config struct + Brokers string + CACerts string + ClientID string + CommitInterval time.Duration + FlushInterval time.Duration + Group string + InitOffsets string + IsolationLevel string + RebalanceStrategy string + RebalanceTimeout time.Duration + SchemaRegistryServers string + TLSCert string + TLSEnabled bool + TLSKey string + Topics string + Verbose bool + Version string + func FromEnv() (Config, error) + func NewKafkaConfig() Config + type Consumer interface + Background func() (func(), chan error) + func NewConsumer(ctx context.Context, conf Config, handler Handler, logger *log.Logger) (Consumer, error) + type ConsumerMessage sarama.ConsumerMessage + type Error struct + ErrorCode int + Message string + func (e *Error) Error() string + type Handler interface + Handle func(*Message) error + type Message struct + HighWaterMarkOffset int64 + Key string + Offset int64 + Partition int32 + SchemaId int + Topic string + Value string + type Producer interface + Background func() (func(), chan error) + Send func(ProducerMessage) error + type ProducerMessage struct + Key []byte + Topic string + Value []byte + type SchemaRegistryClient struct + SchemaRegistryConnect []string + func NewSchemaRegistryClient(connect []string) *SchemaRegistryClient + func NewSchemaRegistryClientWithRetries(connect []string, retries int) *SchemaRegistryClient + func (client *SchemaRegistryClient) CreateSubject(subject string, codec *goavro.Codec) (int, error) + func (client *SchemaRegistryClient) DeleteSubject(subject string) error + func (client *SchemaRegistryClient) DeleteVersion(subject string, version int) error + func (client *SchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error) + func (client *SchemaRegistryClient) GetSchema(id int) (*goavro.Codec, error) + func (client *SchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*goavro.Codec, error) + func (client *SchemaRegistryClient) GetSubjects() ([]string, error) + func (client *SchemaRegistryClient) GetVersions(subject string) ([]int, error) + func (client *SchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (int, error) + type SchemaRegistryClientInterface interface + CreateSubject func(string, *goavro.Codec) (int, error) + DeleteSubject func(string) error + DeleteVersion func(string, int) error + GetLatestSchema func(string) (*goavro.Codec, error) + GetSchema func(int) (*goavro.Codec, error) + GetSchemaByVersion func(string, int) (*goavro.Codec, error) + GetSubjects func() ([]string, error) + GetVersions func(string) ([]int, error) + IsSchemaRegistered func(string, *goavro.Codec) (int, error)