producer

package
v0.0.0-...-1b4ddf7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2024 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const DEFAULT_ACKS = "1"
View Source
const DEFAULT_FLUSH_TIMEOUT_MS = 5000
View Source
const DEFAULT_KAFKA_SERVER = "localhost:29092"
View Source
const DEFAULT_KAFKA_WRITE_TIMEOUT = 10000 * time.Millisecond
View Source
const DEFAULT_MAX_PARTITIONS int32 = 100
View Source
const DEFAULT_MSG_UNTIL_FLUSH = 10000

this collects messages until it flushes, it boosts performance

View Source
const WAIT_UNTIL_EXIT_TAILING = 60 * time.Minute

Variables

This section is empty.

Functions

func GetProducibleScansFactory

func GetProducibleScansFactory(vp, ipVersion string) func(host, runId string) []ProducibleScan

Types

type EventProducer

type EventProducer interface {
	Produce(msg []byte, topic string) (err error)
	Close()
	Flush(timeout int) int
}

type FileProducer

type FileProducer struct {
	GetProducibleScans GetProducibleScans
	Producer           ScanProducer
}

func NewFileProducer

func NewFileProducer(newScans GetProducibleScans, producer ScanProducer) *FileProducer

func (*FileProducer) Produce

func (dp *FileProducer) Produce(file string) error

type GetProducibleScans

type GetProducibleScans func(host, runId string) []ProducibleScan

type KafkaEventProducer

type KafkaEventProducer struct {
	ProducedMsgs atomic.Int32

	Config   *KafkaProducerConfig
	Producer KafkaProducer
	// contains filtered or unexported fields
}

func NewKafkaProducer

func NewKafkaProducer(config *KafkaProducerConfig, p KafkaProducer) (kp *KafkaEventProducer, err error)

func (*KafkaEventProducer) Close

func (kep *KafkaEventProducer) Close()

func (*KafkaEventProducer) Flush

func (kep *KafkaEventProducer) Flush(timeout int) int

func (*KafkaEventProducer) Produce

func (kep *KafkaEventProducer) Produce(msg []byte, topic string) (err error)

blocking call to produce a message

func (*KafkaEventProducer) WatchEvents

func (kep *KafkaEventProducer) WatchEvents()

type KafkaProducer

type KafkaProducer interface {
	Produce(msg *kafka.Message, deliveryChan chan kafka.Event) (err error)
	Flush(timeout int) int
	Events() chan kafka.Event
	Close()
}

type KafkaProducerConfig

type KafkaProducerConfig struct {
	Server  string
	Timeout time.Duration
	Acks    string

	ProducerChannelSize int
	MaxPartitions       int32
	ReplicationFactor   int
}

func GetDefaultKafkaProducerConfig

func GetDefaultKafkaProducerConfig() *KafkaProducerConfig

type KafkaScanProducer

type KafkaScanProducer struct {
	Producer EventProducer
}

func (*KafkaScanProducer) Close

func (sp *KafkaScanProducer) Close()

func (*KafkaScanProducer) Flush

func (sp *KafkaScanProducer) Flush(timeout int) int

func (*KafkaScanProducer) Produce

func (sp *KafkaScanProducer) Produce(scan scan.Scan, topic string) (err error)

type NewProducer

type NewProducer func() (ScanProducer, error)

type ProducibleScan

type ProducibleScan struct {
	Scan  scan.Scan
	Topic string
}

type ScanProducer

type ScanProducer interface {
	Produce(scan scan.Scan, topic string) error
	Close()
	Flush(timeout int) int
}

TODO rename interfaces to unofficial naming convention https://www.reddit.com/r/golang/comments/cjrk46/is_there_a_naming_convention_for_interface_struct/

func NewKafkaScanProducer

func NewKafkaScanProducer(config *KafkaProducerConfig) (sp ScanProducer, err error)

func NewScanProducer

func NewScanProducer(eventProducer EventProducer) (sp ScanProducer, err error)

type WatchDirectoryProducer

type WatchDirectoryProducer struct {
	GetProducibleScans GetProducibleScans

	NewProducer   NewProducer
	WaitUntilExit time.Duration
}

func NewWatchDirectoryProducer

func NewWatchDirectoryProducer(newScans GetProducibleScans, newProducer NewProducer) *WatchDirectoryProducer

func (*WatchDirectoryProducer) WatchAndProduce

func (dp *WatchDirectoryProducer) WatchAndProduce(ctx context.Context, dir string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL