sink

package
v1.0.0-beta.42 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2024 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ClickHouseStorage

type ClickHouseStorage struct {
	// contains filtered or unexported fields
}

func NewClickhouseStorage

func NewClickhouseStorage(config ClickHouseStorageConfig) *ClickHouseStorage

func (*ClickHouseStorage) BatchInsert

func (c *ClickHouseStorage) BatchInsert(ctx context.Context, messages []SinkMessage) error

type ClickHouseStorageConfig

type ClickHouseStorageConfig struct {
	ClickHouse clickhouse.Conn
	Database   string
}

type InsertEventsQuery

type InsertEventsQuery struct {
	Database string
	Messages []SinkMessage
}

func (InsertEventsQuery) ToSQL

func (q InsertEventsQuery) ToSQL() (string, []interface{}, error)

type MeterStore

type MeterStore struct {
	Meters []models.Meter
}

type NamespaceStore

type NamespaceStore struct {
	// contains filtered or unexported fields
}

func NewNamespaceStore

func NewNamespaceStore() *NamespaceStore

func (*NamespaceStore) AddMeter

func (n *NamespaceStore) AddMeter(meter models.Meter)

func (*NamespaceStore) ValidateEvent

func (a *NamespaceStore) ValidateEvent(ctx context.Context, event serializer.CloudEventsKafkaPayload, namespace string) error

ValidateEvent validates a single event by matching it with the corresponding meter if any

type Offset

type Offset struct {
	Offset int64
}

type OffsetStore

type OffsetStore struct {
	// contains filtered or unexported fields
}

OffsetStore helps to determinate the next offset to commit

func NewOffsetStore

func NewOffsetStore() *OffsetStore

func (*OffsetStore) Add

func (o *OffsetStore) Add(topicPartition kafka.TopicPartition)

func (*OffsetStore) Get

func (o *OffsetStore) Get() []kafka.TopicPartition

type PartitionOffsets

type PartitionOffsets struct {
	// contains filtered or unexported fields
}

type ProcessingControl

type ProcessingControl int32
const (
	DROP    ProcessingControl = 0
	INVALID ProcessingControl = 1
)

type ProcessingError

type ProcessingError struct {
	Message           string
	ProcessingControl ProcessingControl
}

func NewProcessingError

func NewProcessingError(msg string, control ProcessingControl) *ProcessingError

func (*ProcessingError) Error

func (e *ProcessingError) Error() string

type Sink

type Sink struct {
	// contains filtered or unexported fields
}

func NewSink

func NewSink(config SinkConfig) (*Sink, error)

func (*Sink) Close

func (s *Sink) Close() error

func (*Sink) ParseMessage

func (s *Sink) ParseMessage(e *kafka.Message) (string, *serializer.CloudEventsKafkaPayload, error)

func (*Sink) Run

func (s *Sink) Run() error

Run starts the Kafka consumer and sinks the events to Clickhouse

type SinkBuffer

type SinkBuffer struct {
	// contains filtered or unexported fields
}

func NewSinkBuffer

func NewSinkBuffer() *SinkBuffer

func (*SinkBuffer) Add

func (b *SinkBuffer) Add(message SinkMessage)

func (*SinkBuffer) Dequeue

func (b *SinkBuffer) Dequeue() []SinkMessage

func (*SinkBuffer) Size

func (b *SinkBuffer) Size() int

type SinkConfig

type SinkConfig struct {
	Logger          *slog.Logger
	Tracer          trace.Tracer
	MetricMeter     metric.Meter
	MeterRepository meter.Repository
	Storage         Storage
	Deduplicator    dedupe.Deduplicator
	Consumer        *kafka.Consumer
	// MinCommitCount is the minimum number of messages to wait before flushing the buffer.
	// Whichever happens earlier MinCommitCount or MaxCommitWait will trigger a flush.
	MinCommitCount int
	// MaxCommitWait is the maximum time to wait before flushing the buffer
	MaxCommitWait time.Duration
	// NamespaceRefetch is the interval to refetch exsisting namespaces and meters
	// this information is used to configure which topics the consumer subscribes and
	// the meter configs used in event validation.
	NamespaceRefetch time.Duration
	// OnFlushSuccess is an optional lifecycle hook
	OnFlushSuccess func(string, int64)
}

type SinkMessage

type SinkMessage struct {
	Namespace    string
	KafkaMessage *kafka.Message
	Serialized   *serializer.CloudEventsKafkaPayload
	Error        *ProcessingError
}

type Storage

type Storage interface {
	BatchInsert(ctx context.Context, messages []SinkMessage) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL