Documentation
¶
Index ¶
- Variables
- func NewBulkBuffer() *bulKBuffer
- func NewKafkaProcessors(namespace string) map[string]*BulkProcessor
- func Process(s *BulkProcessor, tenantID string, b []byte) error
- func StartGetLagByTopic(ctx context.Context, kafkaBrokers []string, groupID string, ...) error
- type BulkProcessor
- func (s *BulkProcessor) Add(b BulkRequest)
- func (s *BulkProcessor) BulkActions(bulkActions int) *BulkProcessor
- func (p *BulkProcessor) Close() error
- func (p *BulkProcessor) Flush()
- func (s *BulkProcessor) FlushInterval(interval time.Duration) *BulkProcessor
- func (p *BulkProcessor) Start(ctx context.Context) error
- func (p *BulkProcessor) Stop() error
- func (s *BulkProcessor) Workers(num int) *BulkProcessor
- type BulkRequest
- type Ingester
- type Mappable
Constants ¶
This section is empty.
Variables ¶
View Source
var ( CommitNeo4jRecordsCounts = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "neo4j_commit_records_total", Help: "Total number of records committed to neo4j", }, []string{"worker", "status", "namespace"}) KafkaTopicsLag = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "kafka_consumer_group_lag", Help: "Kafka consumer group lag per topic", }, []string{"topic", "namespace"}) )
Functions ¶
func NewBulkBuffer ¶
func NewBulkBuffer() *bulKBuffer
func NewKafkaProcessors ¶
func NewKafkaProcessors(namespace string) map[string]*BulkProcessor
Types ¶
type BulkProcessor ¶
type BulkProcessor struct {
// contains filtered or unexported fields
}
func NewBulkProcessor ¶
func NewBulkProcessor(name string, ns string, fn commitFn) *BulkProcessor
func NewBulkProcessorWithSize ¶
func NewBulkProcessorWithSize(name string, ns string, fn commitFn, size int) *BulkProcessor
func (*BulkProcessor) Add ¶
func (s *BulkProcessor) Add(b BulkRequest)
func (*BulkProcessor) BulkActions ¶
func (s *BulkProcessor) BulkActions(bulkActions int) *BulkProcessor
func (*BulkProcessor) Close ¶
func (p *BulkProcessor) Close() error
func (*BulkProcessor) Flush ¶
func (p *BulkProcessor) Flush()
func (*BulkProcessor) FlushInterval ¶
func (s *BulkProcessor) FlushInterval(interval time.Duration) *BulkProcessor
func (*BulkProcessor) Stop ¶
func (p *BulkProcessor) Stop() error
func (*BulkProcessor) Workers ¶
func (s *BulkProcessor) Workers(num int) *BulkProcessor
type BulkRequest ¶
func NewBulkRequest ¶
func NewBulkRequest(namespace string, data []byte) BulkRequest
type Ingester ¶
type Ingester struct {
// contains filtered or unexported fields
}
func NewIngester ¶
func NewIngester(ns directory.NamespaceID, cfg wtils.Config, cancel context.CancelFunc) (Ingester, error)
func (*Ingester) AddAuditLog ¶
func (*Ingester) StartAuditLogProcessor ¶
func (*Ingester) StartKafkaConsumers ¶
Click to show internal directories.
Click to hide internal directories.