Documentation ¶
Index ¶
- Constants
- type AvroConsumer
- type AvroProducer
- func (ap *AvroProducer) Add(topic string, schema string, key []byte, value []byte) error
- func (ap *AvroProducer) AddWithResponse(topic string, schema string, key []byte, value []byte) (int32, int64, error)
- func (ac *AvroProducer) Close()
- func (ap *AvroProducer) GetSchemaId(topic string, avroCodec *goavro.Codec) (int, error)
- type CachedSchemaRegistryClient
- func (client *CachedSchemaRegistryClient) CreateSubject(subject string, codec *goavro.Codec) (int, error)
- func (client *CachedSchemaRegistryClient) DeleteSubject(subject string) error
- func (client *CachedSchemaRegistryClient) DeleteVersion(subject string, version int) error
- func (client *CachedSchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error)
- func (client *CachedSchemaRegistryClient) GetSchema(id int) (*goavro.Codec, error)
- func (client *CachedSchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*goavro.Codec, error)
- func (client *CachedSchemaRegistryClient) GetSubjects() ([]string, error)
- func (client *CachedSchemaRegistryClient) GetVersions(subject string) ([]int, error)
- func (client *CachedSchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (int, error)
- type ConsumerCallbacks
- type Error
- type Message
- type SchemaRegistryClient
- func (client *SchemaRegistryClient) CreateSubject(subject string, codec *goavro.Codec) (int, error)
- func (client *SchemaRegistryClient) DeleteSubject(subject string) error
- func (client *SchemaRegistryClient) DeleteVersion(subject string, version int) error
- func (client *SchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error)
- func (client *SchemaRegistryClient) GetSchema(id int) (*goavro.Codec, error)
- func (client *SchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*goavro.Codec, error)
- func (client *SchemaRegistryClient) GetSubjects() ([]string, error)
- func (client *SchemaRegistryClient) GetVersions(subject string) ([]int, error)
- func (client *SchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (int, error)
- type SchemaRegistryClientInterface
Constants ¶
const (
COMMIT_ERROR_CTX = "commit"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AvroConsumer ¶
type AvroConsumer struct { Consumer *cluster.Consumer SchemaRegistryClient *CachedSchemaRegistryClient // contains filtered or unexported fields }
func NewAvroConsumer ¶
func NewAvroConsumer(kafkaServers []string, schemaRegistryServers []string, topic string, groupId string, callbacks ConsumerCallbacks) (*AvroConsumer, error)
avroConsumer is a basic consumer to interact with schema registry, avro and kafka
func NewAvroConsumerCustomConfig ¶
func NewAvroConsumerCustomConfig(kafkaServers []string, schemaRegistryServers []string, topic string, groupId string, callbacks ConsumerCallbacks, config *cluster.Config) (*AvroConsumer, error)
func (*AvroConsumer) Close ¶
func (ac *AvroConsumer) Close()
func (*AvroConsumer) Consume ¶
func (ac *AvroConsumer) Consume() error
func (*AvroConsumer) GetSchema ¶
func (ac *AvroConsumer) GetSchema(id int) (*goavro.Codec, error)
GetSchemaId get schema id from schema-registry service
func (*AvroConsumer) ProcessAvroMsg ¶
func (ac *AvroConsumer) ProcessAvroMsg(m *sarama.ConsumerMessage) (Message, error)
type AvroProducer ¶
type AvroProducer struct {
// contains filtered or unexported fields
}
func NewAvroProducer ¶
func NewAvroProducer(kafkaServers []string, schemaRegistryServers []string) (*AvroProducer, error)
NewAvroProducer is a basic producer to interact with schema registry, avro and kafka
func (*AvroProducer) AddWithResponse ¶
func (*AvroProducer) Close ¶
func (ac *AvroProducer) Close()
func (*AvroProducer) GetSchemaId ¶
GetSchemaId get schema id from schema-registry service
type CachedSchemaRegistryClient ¶
type CachedSchemaRegistryClient struct { SchemaRegistryClient *SchemaRegistryClient // contains filtered or unexported fields }
CachedSchemaRegistryClient is a schema registry client that will cache some data to improve performance
func NewCachedSchemaRegistryClient ¶
func NewCachedSchemaRegistryClient(connect []string) *CachedSchemaRegistryClient
func NewCachedSchemaRegistryClientWithRetries ¶
func NewCachedSchemaRegistryClientWithRetries(connect []string, retries int) *CachedSchemaRegistryClient
func (*CachedSchemaRegistryClient) CreateSubject ¶
func (client *CachedSchemaRegistryClient) CreateSubject(subject string, codec *goavro.Codec) (int, error)
CreateSubject will return and cache the id with the given codec
func (*CachedSchemaRegistryClient) DeleteSubject ¶
func (client *CachedSchemaRegistryClient) DeleteSubject(subject string) error
DeleteSubject deletes the subject, should only be used in development
func (*CachedSchemaRegistryClient) DeleteVersion ¶
func (client *CachedSchemaRegistryClient) DeleteVersion(subject string, version int) error
DeleteVersion deletes the a specific version of a subject, should only be used in development.
func (*CachedSchemaRegistryClient) GetLatestSchema ¶
func (client *CachedSchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error)
GetLatestSchema returns the highest version schema for a subject
func (*CachedSchemaRegistryClient) GetSchema ¶
func (client *CachedSchemaRegistryClient) GetSchema(id int) (*goavro.Codec, error)
GetSchema will return and cache the codec with the given id
func (*CachedSchemaRegistryClient) GetSchemaByVersion ¶
func (client *CachedSchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*goavro.Codec, error)
GetSchemaByVersion returns the codec for a specific version of a subject
func (*CachedSchemaRegistryClient) GetSubjects ¶
func (client *CachedSchemaRegistryClient) GetSubjects() ([]string, error)
GetSubjects returns a list of subjects
func (*CachedSchemaRegistryClient) GetVersions ¶
func (client *CachedSchemaRegistryClient) GetVersions(subject string) ([]int, error)
GetVersions returns a list of all versions of a subject
func (*CachedSchemaRegistryClient) IsSchemaRegistered ¶
func (client *CachedSchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (int, error)
IsSchemaRegistered checks if a specific codec is already registered to a subject
type ConsumerCallbacks ¶
type SchemaRegistryClient ¶
type SchemaRegistryClient struct { SchemaRegistryConnect []string // contains filtered or unexported fields }
SchemaRegistryClient is a basic http client to interact with schema registry
func NewSchemaRegistryClient ¶
func NewSchemaRegistryClient(connect []string) *SchemaRegistryClient
NewSchemaRegistryClient creates a client to talk with the schema registry at the connect string By default it will retry failed requests (5XX responses and http errors) len(connect) number of times
func NewSchemaRegistryClientWithRetries ¶
func NewSchemaRegistryClientWithRetries(connect []string, retries int) *SchemaRegistryClient
NewSchemaRegistryClientWithRetries creates an http client with a configurable amount of retries on 5XX responses
func (*SchemaRegistryClient) CreateSubject ¶
CreateSubject adds a schema to the subject
func (*SchemaRegistryClient) DeleteSubject ¶
func (client *SchemaRegistryClient) DeleteSubject(subject string) error
DeleteSubject deletes a subject. It should only be used in development
func (*SchemaRegistryClient) DeleteVersion ¶
func (client *SchemaRegistryClient) DeleteVersion(subject string, version int) error
DeleteVersion deletes a subject. It should only be used in development
func (*SchemaRegistryClient) GetLatestSchema ¶
func (client *SchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error)
GetLatestSchema returns a goavro.Codec for the latest version of the subject
func (*SchemaRegistryClient) GetSchema ¶
func (client *SchemaRegistryClient) GetSchema(id int) (*goavro.Codec, error)
GetSchema returns a goavro.Codec by unique id
func (*SchemaRegistryClient) GetSchemaByVersion ¶
func (client *SchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*goavro.Codec, error)
GetSchemaByVersion returns a goavro.Codec for the version of the subject
func (*SchemaRegistryClient) GetSubjects ¶
func (client *SchemaRegistryClient) GetSubjects() ([]string, error)
GetSubjects returns a list of all subjects in the schema registry
func (*SchemaRegistryClient) GetVersions ¶
func (client *SchemaRegistryClient) GetVersions(subject string) ([]int, error)
GetVersions returns a list of the versions of a subject
func (*SchemaRegistryClient) IsSchemaRegistered ¶
func (client *SchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (int, error)
IsSchemaRegistered tests if the schema is registered, if so it returns the unique id of that schema
type SchemaRegistryClientInterface ¶
type SchemaRegistryClientInterface interface { GetSchema(int) (*goavro.Codec, error) GetSubjects() ([]string, error) GetVersions(string) ([]int, error) GetSchemaByVersion(string, int) (*goavro.Codec, error) GetLatestSchema(string) (*goavro.Codec, error) CreateSubject(string, *goavro.Codec) (int, error) IsSchemaRegistered(string, *goavro.Codec) (int, error) DeleteSubject(string) error DeleteVersion(string, int) error }
SchemaRegistryClientInterface defines the api for all clients interfacing with schema registry