Documentation ¶
Overview ¶
Package avro provides implementation methods for working with avro to serialize and deserialize messages. It provides ability to publish and subscribe Avro-encoded messages while handling Avro schemas and encoding/decoding avro data.
Index ¶
- func New(ps pubsub.PublisherSubscriber, src SchemaRegistryClientInterface, ...) (pubsub.PublisherSubscriber, error)
- func NewWithConfig(c *Config, ps pubsub.PublisherSubscriber) (pubsub.PublisherSubscriber, error)
- type Avro
- func (a *Avro) Bind(message []byte, target interface{}) error
- func (a *Avro) CommitOffset(offsets pubsub.TopicPartition)
- func (a *Avro) HealthCheck() types.Health
- func (a *Avro) IsSet() bool
- func (a *Avro) Ping() error
- func (a *Avro) PublishEvent(key string, value interface{}, headers map[string]string) error
- func (a *Avro) PublishEventWithOptions(key string, value interface{}, headers map[string]string, ...) error
- func (a *Avro) Subscribe() (*pubsub.Message, error)
- func (a *Avro) SubscribeWithCommit(f pubsub.CommitFunc) (*pubsub.Message, error)
- type Config
- type Encoder
- type SchemaRegistryClient
- type SchemaRegistryClientInterface
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func New ¶
func New(ps pubsub.PublisherSubscriber, src SchemaRegistryClientInterface, version, sub string) (pubsub.PublisherSubscriber, error)
New initializes the avro pubsub model and sets the required fields
func NewWithConfig ¶
func NewWithConfig(c *Config, ps pubsub.PublisherSubscriber) (pubsub.PublisherSubscriber, error)
NewWithConfig initializes new avro pubsub along with the configs
Types ¶
type Avro ¶
type Avro struct {
// contains filtered or unexported fields
}
Avro represents a configuration structure for working with Avro schema serialization and message publishing.
func (*Avro) Bind ¶
Bind parses the avro encoded data and stores the result in the value pointed to by target
func (*Avro) CommitOffset ¶
func (a *Avro) CommitOffset(offsets pubsub.TopicPartition)
CommitOffset marks a particular offset on a specific partition as Read.
func (*Avro) HealthCheck ¶
HealthCheck returns the health of the PubSub
func (*Avro) PublishEvent ¶
PublishEvent publishes an event onto the pubsub configured
func (*Avro) PublishEventWithOptions ¶
func (a *Avro) PublishEventWithOptions(key string, value interface{}, headers map[string]string, options *pubsub.PublishOptions) error
PublishEventWithOptions publishes message to the avro configured. Ability to provide additional options described in PublishOptions struct
func (*Avro) SubscribeWithCommit ¶
SubscribeWithCommit calls the CommitFunc after subscribing message from avro and based on the return values decides whether to commit message and consume another message
type Config ¶
type Config struct { URL string Version string Subject string SchemaUser string SchemaPassword string }
Config represents the configuration for Avro schema and message serialization.
type Encoder ¶
Encoder encodes schemaId and Avro message.
func (*Encoder) Encode ¶
Note: the Confluent schema registry has special requirements for the Avro serialization rules,
not only need to serialize the specific content, but also attach the Schema ID and Magic Byte. Ref: https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format
type SchemaRegistryClient ¶
type SchemaRegistryClient struct { SchemaRegistryConnect []string // contains filtered or unexported fields }
SchemaRegistryClient is a basic http client to interact with schema registry
func (*SchemaRegistryClient) GetSchema ¶
func (client *SchemaRegistryClient) GetSchema(id int) (string, error)
GetSchema returns a schema by unique id
func (*SchemaRegistryClient) GetSchemaByVersion ¶
func (client *SchemaRegistryClient) GetSchemaByVersion(subject, version string) (id int, schema string, err error)
GetSchemaByVersion returns a schema by version
type SchemaRegistryClientInterface ¶
type SchemaRegistryClientInterface interface { GetSchemaByVersion(subject, version string) (int, string, error) GetSchema(id int) (string, error) }
SchemaRegistryClientInterface defines the api for all clients interfacing with schema registry
func NewSchemaRegistryClient ¶
func NewSchemaRegistryClient(connect []string, user, pass string) SchemaRegistryClientInterface
NewSchemaRegistryClient creates a client to talk with the schema registry at the connect string By default it will retry failed requests (5XX responses and http errors) len(connect) number of times