sink

package
v1.0.0-beta.120 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 15, 2024 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ClickHouseStorage

type ClickHouseStorage struct {
	// contains filtered or unexported fields
}

func NewClickhouseStorage

func NewClickhouseStorage(config ClickHouseStorageConfig) *ClickHouseStorage

func (*ClickHouseStorage) BatchInsert

func (c *ClickHouseStorage) BatchInsert(ctx context.Context, messages []SinkMessage) error

type ClickHouseStorageConfig

type ClickHouseStorageConfig struct {
	ClickHouse clickhouse.Conn
	Database   string
}

type FlushEventHandler

type FlushEventHandler interface {
	OnFlushSuccess(ctx context.Context, event *FlushSuccessEvent) error
	Shutdown() error
}

type FlushSuccessEvent

type FlushSuccessEvent struct {
	Messages []SinkMessage
}

type InsertEventsQuery

type InsertEventsQuery struct {
	Database string
	Messages []SinkMessage
}

func (InsertEventsQuery) ToSQL

func (q InsertEventsQuery) ToSQL() (string, []interface{}, error)

type MeterStore

type MeterStore struct {
	Meters []models.Meter
}

type NamespaceStore

type NamespaceStore struct {
	// contains filtered or unexported fields
}

func NewNamespaceStore

func NewNamespaceStore() *NamespaceStore

func (*NamespaceStore) AddMeter

func (n *NamespaceStore) AddMeter(meter models.Meter)

func (*NamespaceStore) ValidateEvent

func (n *NamespaceStore) ValidateEvent(_ context.Context, m *SinkMessage)

ValidateEvent validates a single event by matching it with the corresponding meter if any

type ProcessingState

type ProcessingState int8
const (
	OK ProcessingState = iota
	DROP
	INVALID
)

func (ProcessingState) String

func (c ProcessingState) String() string

type ProcessingStatus

type ProcessingStatus struct {
	State ProcessingState
	Error error
}

type Sink

type Sink struct {
	// contains filtered or unexported fields
}

func NewSink

func NewSink(config SinkConfig) (*Sink, error)

func (*Sink) Close

func (s *Sink) Close() error

func (*Sink) Run

func (s *Sink) Run(ctx context.Context) error

Run starts the Kafka consumer and sinks the events to Clickhouse

type SinkBuffer

type SinkBuffer struct {
	// contains filtered or unexported fields
}

func NewSinkBuffer

func NewSinkBuffer() *SinkBuffer

func (*SinkBuffer) Add

func (b *SinkBuffer) Add(message SinkMessage)

func (*SinkBuffer) Dequeue

func (b *SinkBuffer) Dequeue() []SinkMessage

func (*SinkBuffer) RemoveByPartitions

func (b *SinkBuffer) RemoveByPartitions(partitions []kafka.TopicPartition)

RemoveByPartitions removes messages from the buffer by partitions Useful when partitions are revoked.

func (*SinkBuffer) Size

func (b *SinkBuffer) Size() int

type SinkConfig

type SinkConfig struct {
	Logger          *slog.Logger
	Tracer          trace.Tracer
	MetricMeter     metric.Meter
	MeterRepository meter.Repository
	Storage         Storage
	Deduplicator    dedupe.Deduplicator
	Consumer        *kafka.Consumer
	// MinCommitCount is the minimum number of messages to wait before flushing the buffer.
	// Whichever happens earlier MinCommitCount or MaxCommitWait will trigger a flush.
	MinCommitCount int
	// MaxCommitWait is the maximum time to wait before flushing the buffer
	MaxCommitWait time.Duration
	// NamespaceRefetch is the interval to refetch exsisting namespaces and meters
	// this information is used to configure which topics the consumer subscribes and
	// the meter configs used in event validation.
	NamespaceRefetch time.Duration
	// FlushEventHandler is an optional lifecycle hook, to prevent blocking the main sink logic
	// this is always called in a go routine.
	FlushEventHandler FlushEventHandler

	// FlushSuccessTimeout is the timeout for the OnFlushSuccess callback, after this period the context
	// of the callback will be canceled.
	FlushSuccessTimeout time.Duration
}

type SinkMessage

type SinkMessage struct {
	Namespace    string
	KafkaMessage *kafka.Message
	Serialized   *serializer.CloudEventsKafkaPayload
	Status       ProcessingStatus
	// Meters contains the list of meters this message affects
	Meters []models.Meter
}

type Storage

type Storage interface {
	BatchInsert(ctx context.Context, messages []SinkMessage) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL