Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ClickHouseStorage ¶
type ClickHouseStorage struct {
// contains filtered or unexported fields
}
func NewClickhouseStorage ¶
func NewClickhouseStorage(config ClickHouseStorageConfig) (*ClickHouseStorage, error)
func (*ClickHouseStorage) BatchInsert ¶
func (c *ClickHouseStorage) BatchInsert(ctx context.Context, messages []sinkmodels.SinkMessage) error
BatchInsert inserts multiple messages into ClickHouse.
type ClickHouseStorageConfig ¶
func (ClickHouseStorageConfig) Validate ¶
func (c ClickHouseStorageConfig) Validate() error
type MeterStore ¶
type NamespaceStore ¶
type NamespaceStore struct {
// contains filtered or unexported fields
}
func NewNamespaceStore ¶
func NewNamespaceStore() *NamespaceStore
func (*NamespaceStore) AddMeter ¶
func (n *NamespaceStore) AddMeter(meter models.Meter)
func (*NamespaceStore) ValidateEvent ¶
func (n *NamespaceStore) ValidateEvent(_ context.Context, m *sinkmodels.SinkMessage)
ValidateEvent validates a single event by matching it with the corresponding meter if any
type Sink ¶
type Sink struct {
// contains filtered or unexported fields
}
func NewSink ¶
func NewSink(config SinkConfig) (*Sink, error)
type SinkBuffer ¶
type SinkBuffer struct {
// contains filtered or unexported fields
}
func NewSinkBuffer ¶
func NewSinkBuffer() *SinkBuffer
func (*SinkBuffer) Add ¶
func (b *SinkBuffer) Add(message sinkmodels.SinkMessage)
func (*SinkBuffer) Dequeue ¶
func (b *SinkBuffer) Dequeue() []sinkmodels.SinkMessage
func (*SinkBuffer) RemoveByPartitions ¶
func (b *SinkBuffer) RemoveByPartitions(partitions []kafka.TopicPartition)
RemoveByPartitions removes messages from the buffer by partitions Useful when partitions are revoked.
func (*SinkBuffer) Size ¶
func (b *SinkBuffer) Size() int
type SinkConfig ¶
type SinkConfig struct { Logger *slog.Logger Tracer trace.Tracer MetricMeter metric.Meter MeterRepository meter.Repository Storage Storage Deduplicator dedupe.Deduplicator Consumer *kafka.Consumer // MinCommitCount is the minimum number of messages to wait before flushing the buffer. // Whichever happens earlier MinCommitCount or MaxCommitWait will trigger a flush. MinCommitCount int // MaxCommitWait is the maximum time to wait before flushing the buffer MaxCommitWait time.Duration // The time, in milliseconds, spent waiting in poll if data is not available in the buffer. // If 0, returns immediately with any records that are available currently in the buffer, else returns empty. MaxPollTimeout time.Duration // NamespaceRefetch is the interval to refetch exsisting namespaces and meters // this information is used to configure which topics the consumer subscribes and // the meter configs used in event validation. NamespaceRefetch time.Duration // NamespaceRefetchTimeout is the timeout for updating namespaces and consumer subscription. // It must be less than NamespaceRefetch interval. NamespaceRefetchTimeout time.Duration // NamespaceTopicRegexp defines the regular expression to match/validate topic names the sink-worker needs to subscribe to. NamespaceTopicRegexp string // FlushEventHandlers is an optional lifecycle hook, allowing to act on successful batch // flushes. To prevent blocking the main sink logic this is always called in a go routine. FlushEventHandler flushhandler.FlushEventHandler // FlushSuccessTimeout is the timeout for the OnFlushSuccess callback, // after this period the context of the callback will be canceled. FlushSuccessTimeout time.Duration // DrainTimeout is the maximum time to wait before draining the buffer and closing the sink. DrainTimeout time.Duration TopicResolver topicresolver.Resolver }
func (*SinkConfig) Validate ¶
func (s *SinkConfig) Validate() error
type Storage ¶
type Storage interface {
BatchInsert(ctx context.Context, messages []sinkmodels.SinkMessage) error
}
Click to show internal directories.
Click to hide internal directories.