Documentation ¶
Index ¶
- Variables
- func NewProducer(ctx context.Context, conf Config, logger *log.Logger) (*kafkaProducer, error)
- type AvroEncoder
- type CachedSchemaRegistryClient
- func (client *CachedSchemaRegistryClient) CreateSubject(subject string, codec *goavro.Codec) (int, error)
- func (client *CachedSchemaRegistryClient) DeleteSubject(subject string) error
- func (client *CachedSchemaRegistryClient) DeleteVersion(subject string, version int) error
- func (client *CachedSchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error)
- func (client *CachedSchemaRegistryClient) GetSchema(id int) (*goavro.Codec, error)
- func (client *CachedSchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*goavro.Codec, error)
- func (client *CachedSchemaRegistryClient) GetSubjects() ([]string, error)
- func (client *CachedSchemaRegistryClient) GetVersions(subject string) ([]int, error)
- func (client *CachedSchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (int, error)
- type Config
- type Consumer
- type ConsumerMessage
- type Error
- type Handler
- type Message
- type Producer
- type ProducerMessage
- type SchemaRegistryClient
- func (client *SchemaRegistryClient) CreateSubject(subject string, codec *goavro.Codec) (int, error)
- func (client *SchemaRegistryClient) DeleteSubject(subject string) error
- func (client *SchemaRegistryClient) DeleteVersion(subject string, version int) error
- func (client *SchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error)
- func (client *SchemaRegistryClient) GetSchema(id int) (*goavro.Codec, error)
- func (client *SchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*goavro.Codec, error)
- func (client *SchemaRegistryClient) GetSubjects() ([]string, error)
- func (client *SchemaRegistryClient) GetVersions(subject string) ([]int, error)
- func (client *SchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (int, error)
- type SchemaRegistryClientInterface
Constants ¶
This section is empty.
Variables ¶
var NoOpHandler = &noOpHandler{}
Functions ¶
Types ¶
type AvroEncoder ¶
AvroEncoder encodes schemaId and Avro message.
func (*AvroEncoder) Encode ¶
func (a *AvroEncoder) Encode() ([]byte, error)
Notice: the Confluent schema registry has special requirements for the Avro serialization rules, not only need to serialize the specific content, but also attach the Schema ID and Magic Byte. Ref: https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format
type CachedSchemaRegistryClient ¶
type CachedSchemaRegistryClient struct { SchemaRegistryClient *SchemaRegistryClient // contains filtered or unexported fields }
CachedSchemaRegistryClient is a schema registry client that will cache some data to improve performance
func NewCachedSchemaRegistryClient ¶
func NewCachedSchemaRegistryClient(connect []string) *CachedSchemaRegistryClient
func NewCachedSchemaRegistryClientWithRetries ¶
func NewCachedSchemaRegistryClientWithRetries(connect []string, retries int) *CachedSchemaRegistryClient
func (*CachedSchemaRegistryClient) CreateSubject ¶
func (client *CachedSchemaRegistryClient) CreateSubject(subject string, codec *goavro.Codec) (int, error)
CreateSubject will return and cache the id with the given codec
func (*CachedSchemaRegistryClient) DeleteSubject ¶
func (client *CachedSchemaRegistryClient) DeleteSubject(subject string) error
DeleteSubject deletes the subject, should only be used in development
func (*CachedSchemaRegistryClient) DeleteVersion ¶
func (client *CachedSchemaRegistryClient) DeleteVersion(subject string, version int) error
DeleteVersion deletes the a specific version of a subject, should only be used in development.
func (*CachedSchemaRegistryClient) GetLatestSchema ¶
func (client *CachedSchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error)
GetLatestSchema returns the highest version schema for a subject
func (*CachedSchemaRegistryClient) GetSchema ¶
func (client *CachedSchemaRegistryClient) GetSchema(id int) (*goavro.Codec, error)
GetSchema will return and cache the codec with the given id
func (*CachedSchemaRegistryClient) GetSchemaByVersion ¶
func (client *CachedSchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*goavro.Codec, error)
GetSchemaByVersion returns the codec for a specific version of a subject
func (*CachedSchemaRegistryClient) GetSubjects ¶
func (client *CachedSchemaRegistryClient) GetSubjects() ([]string, error)
GetSubjects returns a list of subjects
func (*CachedSchemaRegistryClient) GetVersions ¶
func (client *CachedSchemaRegistryClient) GetVersions(subject string) ([]int, error)
GetVersions returns a list of all versions of a subject
func (*CachedSchemaRegistryClient) IsSchemaRegistered ¶
func (client *CachedSchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (int, error)
IsSchemaRegistered checks if a specific codec is already registered to a subject
type Config ¶
type Config struct { Brokers string `envconfig:"KAFKA_BROKERS"` Version string `envconfig:"KAFKA_VERSION"` Verbose bool `envconfig:"KAFKA_VERBOSE"` ClientID string `envconfig:"KAFKA_CLIENT_ID"` Topics string `envconfig:"KAFKA_TOPICS"` TLSEnabled bool `envconfig:"KAFKA_TLS_ENABLED"` TLSKey string `envconfig:"KAFKA_TLS_KEY"` TLSCert string `envconfig:"KAFKA_TLS_CERT"` CACerts string `envconfig:"KAFKA_CA_CERTS"` // Consumer specific parameters Group string `envconfig:"KAFKA_GROUP"` RebalanceStrategy string `envconfig:"KAFKA_REBALANCE_STRATEGY"` RebalanceTimeout time.Duration `envconfig:"KAFKA_REBALANCE_TIMEOUT"` InitOffsets string `envconfig:"KAFKA_INIT_OFFSETS"` CommitInterval time.Duration `envconfig:"KAFKA_COMMIT_INTERVAL"` // Producer specific parameters FlushInterval time.Duration `envconfig:"KAFKA_FLUSH_INTERVAL"` // Schema Registry server SchemaRegistryServers string `envconfig:"KAFKA_SCHEMA_REGISTRY_SERVERS"` IsolationLevel string `envconfig:"KAFKA_ISOLATION_LEVEL"` }
simple Kafka config abstraction; can be populated from env vars via FromEnv() or fields can applied to CLI flags by the caller.
func NewKafkaConfig ¶
func NewKafkaConfig() Config
returns a new kafka.Config with reasonable defaults for some values
type Consumer ¶
type Consumer interface { // caller should run the returned function in a goroutine, and consume // the returned error channel until it's closed at shutdown. Background() (func(), chan error) }
type ConsumerMessage ¶
type ConsumerMessage sarama.ConsumerMessage
alias these to abstract the Sarama-specific message type from end users
type Producer ¶
type Producer interface { // caller should run the returned function in a goroutine, and consume // the returned error channel until it's closed at shutdown. Background() (func(), chan error) // user-facing event emit API Send(ProducerMessage) error }
type ProducerMessage ¶
abstracts kafka.Producer message type
type SchemaRegistryClient ¶
type SchemaRegistryClient struct { SchemaRegistryConnect []string // contains filtered or unexported fields }
SchemaRegistryClient is a basic http client to interact with schema registry
func NewSchemaRegistryClient ¶
func NewSchemaRegistryClient(connect []string) *SchemaRegistryClient
NewSchemaRegistryClient creates a client to talk with the schema registry at the connect string By default it will retry failed requests (5XX responses and http errors) len(connect) number of times
func NewSchemaRegistryClientWithRetries ¶
func NewSchemaRegistryClientWithRetries(connect []string, retries int) *SchemaRegistryClient
NewSchemaRegistryClientWithRetries creates an http client with a configurable amount of retries on 5XX responses
func (*SchemaRegistryClient) CreateSubject ¶
func (client *SchemaRegistryClient) CreateSubject(subject string, codec *goavro.Codec) (int, error)
CreateSubject adds a schema to the subject
func (*SchemaRegistryClient) DeleteSubject ¶
func (client *SchemaRegistryClient) DeleteSubject(subject string) error
DeleteSubject deletes a subject. It should only be used in development
func (*SchemaRegistryClient) DeleteVersion ¶
func (client *SchemaRegistryClient) DeleteVersion(subject string, version int) error
DeleteVersion deletes a subject. It should only be used in development
func (*SchemaRegistryClient) GetLatestSchema ¶
func (client *SchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error)
GetLatestSchema returns a goavro.Codec for the latest version of the subject
func (*SchemaRegistryClient) GetSchema ¶
func (client *SchemaRegistryClient) GetSchema(id int) (*goavro.Codec, error)
GetSchema returns a goavro.Codec by unique id
func (*SchemaRegistryClient) GetSchemaByVersion ¶
func (client *SchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*goavro.Codec, error)
GetSchemaByVersion returns a goavro.Codec for the version of the subject
func (*SchemaRegistryClient) GetSubjects ¶
func (client *SchemaRegistryClient) GetSubjects() ([]string, error)
GetSubjects returns a list of all subjects in the schema registry
func (*SchemaRegistryClient) GetVersions ¶
func (client *SchemaRegistryClient) GetVersions(subject string) ([]int, error)
GetVersions returns a list of the versions of a subject
func (*SchemaRegistryClient) IsSchemaRegistered ¶
func (client *SchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (int, error)
IsSchemaRegistered tests if the schema is registered, if so it returns the unique id of that schema
type SchemaRegistryClientInterface ¶
type SchemaRegistryClientInterface interface { GetSchema(int) (*goavro.Codec, error) GetSubjects() ([]string, error) GetVersions(string) ([]int, error) GetSchemaByVersion(string, int) (*goavro.Codec, error) GetLatestSchema(string) (*goavro.Codec, error) CreateSubject(string, *goavro.Codec) (int, error) IsSchemaRegistered(string, *goavro.Codec) (int, error) DeleteSubject(string) error DeleteVersion(string, int) error }
SchemaRegistryClientInterface defines the api for all clients interfacing with schema registry