Documentation ¶
Index ¶
- Constants
- func GetProducibleScansFactory(vp, ipVersion string) func(host, runId string) []ProducibleScan
- type EventProducer
- type FileProducer
- type GetProducibleScans
- type KafkaEventProducer
- type KafkaProducer
- type KafkaProducerConfig
- type KafkaScanProducer
- type NewProducer
- type ProducibleScan
- type ScanProducer
- type WatchDirectoryProducer
Constants ¶
View Source
const DEFAULT_ACKS = "1"
View Source
const DEFAULT_FLUSH_TIMEOUT_MS = 5000
View Source
const DEFAULT_KAFKA_SERVER = "localhost:29092"
View Source
const DEFAULT_KAFKA_WRITE_TIMEOUT = 10000 * time.Millisecond
View Source
const DEFAULT_MAX_PARTITIONS int32 = 100
View Source
const DEFAULT_MSG_UNTIL_FLUSH = 10000
this collects messages until it flushes, it boosts performance
View Source
const WAIT_UNTIL_EXIT_TAILING = 60 * time.Minute
Variables ¶
This section is empty.
Functions ¶
func GetProducibleScansFactory ¶
func GetProducibleScansFactory(vp, ipVersion string) func(host, runId string) []ProducibleScan
Types ¶
type EventProducer ¶
type FileProducer ¶
type FileProducer struct { GetProducibleScans GetProducibleScans Producer ScanProducer }
func NewFileProducer ¶
func NewFileProducer(newScans GetProducibleScans, producer ScanProducer) *FileProducer
func (*FileProducer) Produce ¶
func (dp *FileProducer) Produce(file string) error
type GetProducibleScans ¶
type GetProducibleScans func(host, runId string) []ProducibleScan
type KafkaEventProducer ¶
type KafkaEventProducer struct { ProducedMsgs atomic.Int32 Config *KafkaProducerConfig Producer KafkaProducer // contains filtered or unexported fields }
func NewKafkaProducer ¶
func NewKafkaProducer(config *KafkaProducerConfig, p KafkaProducer) (kp *KafkaEventProducer, err error)
func (*KafkaEventProducer) Close ¶
func (kep *KafkaEventProducer) Close()
func (*KafkaEventProducer) Flush ¶
func (kep *KafkaEventProducer) Flush(timeout int) int
func (*KafkaEventProducer) Produce ¶
func (kep *KafkaEventProducer) Produce(msg []byte, topic string) (err error)
blocking call to produce a message
func (*KafkaEventProducer) WatchEvents ¶
func (kep *KafkaEventProducer) WatchEvents()
type KafkaProducer ¶
type KafkaProducerConfig ¶
type KafkaProducerConfig struct { Server string Timeout time.Duration Acks string ProducerChannelSize int MaxPartitions int32 ReplicationFactor int }
func GetDefaultKafkaProducerConfig ¶
func GetDefaultKafkaProducerConfig() *KafkaProducerConfig
type KafkaScanProducer ¶
type KafkaScanProducer struct {
Producer EventProducer
}
func (*KafkaScanProducer) Close ¶
func (sp *KafkaScanProducer) Close()
func (*KafkaScanProducer) Flush ¶
func (sp *KafkaScanProducer) Flush(timeout int) int
type NewProducer ¶
type NewProducer func() (ScanProducer, error)
type ProducibleScan ¶
type ScanProducer ¶
type ScanProducer interface { Produce(scan scan.Scan, topic string) error Close() Flush(timeout int) int }
TODO rename interfaces to unofficial naming convention https://www.reddit.com/r/golang/comments/cjrk46/is_there_a_naming_convention_for_interface_struct/
func NewKafkaScanProducer ¶
func NewKafkaScanProducer(config *KafkaProducerConfig) (sp ScanProducer, err error)
func NewScanProducer ¶
func NewScanProducer(eventProducer EventProducer) (sp ScanProducer, err error)
type WatchDirectoryProducer ¶
type WatchDirectoryProducer struct { GetProducibleScans GetProducibleScans NewProducer NewProducer WaitUntilExit time.Duration }
func NewWatchDirectoryProducer ¶
func NewWatchDirectoryProducer(newScans GetProducibleScans, newProducer NewProducer) *WatchDirectoryProducer
func (*WatchDirectoryProducer) WatchAndProduce ¶
func (dp *WatchDirectoryProducer) WatchAndProduce(ctx context.Context, dir string) error
Click to show internal directories.
Click to hide internal directories.