Documentation ¶
Index ¶
- func ConvertTimestamp(ts time.Time) int64
- func GZIPEventSender(filename string) (*gzipEventSender, error)
- type Encoder
- type Event
- type EventSender
- type EventSenders
- type EventTopics
- type KafkaClientProvider
- type KafkaSender
- type KafkaSenderConfig
- type LogrusEventSender
- type NoopEventSender
- type Providers
- type TopicsFunc
- type WriterEventSender
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConvertTimestamp ¶
func GZIPEventSender ¶
Types ¶
type Encoder ¶
type Encoder interface { // Encodes an event into some kind of binary representation. Encode(event Event) ([]byte, error) // Close the encoder. Close() error }
func NewAvroConfluentEncoder ¶
func NewAvroConfluentEncoder(registry *schemaregistry.Client) Encoder
func NewAvroEncoder ¶
func NewJSONEncoder ¶
func NewJSONEncoder() Encoder
type EventSender ¶
type EventSender interface { // Init event schemas WITHOUT sending the events. This method should be used during startup // to register schemas in the beginning, so that the service has all schemas cached. Init(event []Event) error // Send the given event. This method should be non blocking and // must never fail. You might want to use a channel for buffering // events internally. Errors will be logged to the terminal // but otherwise ignored. Send(event Event) // Close the event sender and flush all pending events. // Waits for all events to be send out. Close() error }
var Events EventSender = LogrusEventSender{logrus.WithField("prefix", "events")}
Global instance to send events. Defaults to a simple sender that prints events using a logger instance.
func ParseEventSenders ¶
func ParseEventSenders(clientId string, providers Providers, config string) (EventSender, error)
Parses event sender config from string. an example could be --event-sender="confluent,address=http://confluent-registry.shared.svc.cluster.local,kafka=kafka.kafka.svc.cluster.local:9092,replication=1,blocking=true,schemainit=true" which uses confluent registry with kafka in blocking mode and initialises schemas at the registry during startup
Sender Options:
stdout: sends events to stdout noop: does not send anything at all stdout: sends events to stderr gzip,file=FILE: sends events to gziped filed kafka=URL: sends events to kafka
Schema registries:
consul,address=URL: uses consul as schema registry confluent,address=URL: uses confluent as schema registry
Other options:
replication=NUMBER: used to create the given kafka topics with the replication param blocking=true: will wait until the event got sent
type EventSenders ¶
type EventSenders []EventSender
A slice of event senders that is also an event sender.
func (EventSenders) Close ¶
func (senders EventSenders) Close() error
func (EventSenders) Init ¶
func (senders EventSenders) Init(event []Event) error
func (EventSenders) Send ¶
func (senders EventSenders) Send(event Event)
type EventTopics ¶
type EventTopics struct { EventTypes map[reflect.Type]kafka.Topic SchemaInitEvents []Event FailOnSchemaInit bool // This is the fallback topic if a type can not be matched to one of the event types. // It will be created automatically. Fallback string }
func (EventTopics) TopicForType ¶
func (topics EventTopics) TopicForType(t reflect.Type) string
func (EventTopics) Topics ¶
func (topics EventTopics) Topics() kafka.Topics
type KafkaClientProvider ¶
type KafkaSender ¶
type KafkaSender struct {
// contains filtered or unexported fields
}
func NewKafkaSender ¶
func NewKafkaSender(kafkaClient sarama.Client, senderConfig KafkaSenderConfig) (*KafkaSender, error)
func (*KafkaSender) Close ¶
func (kafka *KafkaSender) Close() error
func (*KafkaSender) Init ¶
func (kafka *KafkaSender) Init(events []Event) error
func (*KafkaSender) Send ¶
func (kafka *KafkaSender) Send(event Event)
type KafkaSenderConfig ¶
type KafkaSenderConfig struct { // Set to true to block Send() if the buffers are full. AllowBlocking bool // Topics configuration TopicsConfig EventTopics // The event encoder to use Encoder Encoder EventBufferSize int }
type LogrusEventSender ¶
type LogrusEventSender struct {
logrus.FieldLogger
}
func (LogrusEventSender) Close ¶
func (LogrusEventSender) Close() error
func (LogrusEventSender) Init ¶
func (l LogrusEventSender) Init(events []Event) error
func (LogrusEventSender) Send ¶
func (l LogrusEventSender) Send(event Event)
type NoopEventSender ¶
type NoopEventSender struct{}
func (NoopEventSender) Close ¶
func (NoopEventSender) Close() error
func (NoopEventSender) Init ¶
func (s NoopEventSender) Init(event []Event) error
func (NoopEventSender) Send ¶
func (NoopEventSender) Send(event Event)
type Providers ¶
type Providers struct { Kafka KafkaClientProvider Topics TopicsFunc }
type TopicsFunc ¶
type TopicsFunc func(replicationFactor int16) EventTopics
type WriterEventSender ¶
func (WriterEventSender) Close ¶
func (sender WriterEventSender) Close() error
func (WriterEventSender) Init ¶
func (sender WriterEventSender) Init(event []Event) error
func (WriterEventSender) Send ¶
func (sender WriterEventSender) Send(event Event)