events

package
v2.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2020 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConvertTimestamp

func ConvertTimestamp(ts time.Time) int64

func GZIPEventSender

func GZIPEventSender(filename string) (*gzipEventSender, error)

Types

type Encoder

type Encoder interface {
	// Encodes an event into some kind of binary representation.
	Encode(event Event) ([]byte, error)

	// Close the encoder.
	Close() error
}

func NewAvroConfluentEncoder

func NewAvroConfluentEncoder(registry *schemaregistry.Client) Encoder

func NewAvroEncoder

func NewAvroEncoder(registry schema.Registry) Encoder

func NewJSONEncoder

func NewJSONEncoder() Encoder

type Event

type Event interface {
	// Returns the avro schema of this event
	Schema() string

	// Writes the event (in avro format) to the given writer.
	Serialize(io.Writer) error
}

type EventSender

type EventSender interface {

	// Init event schemas WITHOUT sending the events. This method should be used during startup
	// to register schemas in the beginning, so that the service has all schemas cached.
	Init(event []Event) error

	// Send the given event. This method should be non blocking and
	// must never fail. You might want to use a channel for buffering
	// events internally. Errors will be logged to the terminal
	// but otherwise ignored.
	Send(event Event)

	// Close the event sender and flush all pending events.
	// Waits for all events to be send out.
	Close() error
}
var Events EventSender = LogrusEventSender{logrus.WithField("prefix", "events")}

Global instance to send events. Defaults to a simple sender that prints events using a logger instance.

func ParseEventSenders

func ParseEventSenders(clientId string, providers Providers, config string) (EventSender, error)

Parses event sender config from string. an example could be --event-sender="confluent,address=http://confluent-registry.shared.svc.cluster.local,kafka=kafka.kafka.svc.cluster.local:9092,replication=1,blocking=true,schemainit=true" which uses confluent registry with kafka in blocking mode and initialises schemas at the registry during startup

Sender Options:

stdout: sends events to stdout noop: does not send anything at all stdout: sends events to stderr gzip,file=FILE: sends events to gziped filed kafka=URL: sends events to kafka

Schema registries:

consul,address=URL: uses consul as schema registry confluent,address=URL: uses confluent as schema registry

Other options:

replication=NUMBER: used to create the given kafka topics with the replication param blocking=true: will wait until the event got sent

type EventSenders

type EventSenders []EventSender

A slice of event senders that is also an event sender.

func (EventSenders) Close

func (senders EventSenders) Close() error

func (EventSenders) Init

func (senders EventSenders) Init(event []Event) error

func (EventSenders) Send

func (senders EventSenders) Send(event Event)

type EventTopics

type EventTopics struct {
	EventTypes       map[reflect.Type]kafka.Topic
	SchemaInitEvents []Event
	FailOnSchemaInit bool

	// This is the fallback topic if a type can not be matched to one of the event types.
	// It will be created automatically.
	Fallback string
}

func (EventTopics) TopicForType

func (topics EventTopics) TopicForType(t reflect.Type) string

func (EventTopics) Topics

func (topics EventTopics) Topics() kafka.Topics

type KafkaClientProvider

type KafkaClientProvider interface {
	KafkaClient(clientId string, addresses []string) (sarama.Client, error)
}

type KafkaSender

type KafkaSender struct {
	// contains filtered or unexported fields
}

func NewKafkaSender

func NewKafkaSender(kafkaClient sarama.Client, senderConfig KafkaSenderConfig) (*KafkaSender, error)

func (*KafkaSender) Close

func (kafka *KafkaSender) Close() error

func (*KafkaSender) Init

func (kafka *KafkaSender) Init(events []Event) error

func (*KafkaSender) Send

func (kafka *KafkaSender) Send(event Event)

type KafkaSenderConfig

type KafkaSenderConfig struct {
	// Set to true to block Send() if the buffers are full.
	AllowBlocking bool

	// Topics configuration
	TopicsConfig EventTopics

	// The event encoder to use
	Encoder Encoder

	EventBufferSize int
}

type LogrusEventSender

type LogrusEventSender struct {
	logrus.FieldLogger
}

func (LogrusEventSender) Close

func (LogrusEventSender) Close() error

func (LogrusEventSender) Init

func (l LogrusEventSender) Init(events []Event) error

func (LogrusEventSender) Send

func (l LogrusEventSender) Send(event Event)

type NoopEventSender

type NoopEventSender struct{}

func (NoopEventSender) Close

func (NoopEventSender) Close() error

func (NoopEventSender) Init

func (s NoopEventSender) Init(event []Event) error

func (NoopEventSender) Send

func (NoopEventSender) Send(event Event)

type Providers

type Providers struct {
	Kafka  KafkaClientProvider
	Topics TopicsFunc
}

type TopicsFunc

type TopicsFunc func(replicationFactor int16) EventTopics

type WriterEventSender

type WriterEventSender struct {
	io.Writer
}

func (WriterEventSender) Close

func (sender WriterEventSender) Close() error

func (WriterEventSender) Init

func (sender WriterEventSender) Init(event []Event) error

func (WriterEventSender) Send

func (sender WriterEventSender) Send(event Event)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL