pipeline

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DEFAULT_HARVEST_TIME       = 60
	DEFAULT_ITEMS_PER_BATCH    = 500
	DEFAULT_MAX_EXPORT_WORKERS = 2
)

Variables

This section is empty.

Functions

func WithAuthenticator

func WithAuthenticator(
	authenticator connectors.HttpAuthenticator,
) simpleReceiverOpt

func WithBody

func WithBody(body any) simpleReceiverOpt

func WithEventsDecoder

func WithEventsDecoder(decoder EventsDecoderFunc) simpleReceiverOpt

func WithHeaders

func WithHeaders(headers map[string]string) simpleReceiverOpt

func WithLogsDecoder

func WithLogsDecoder(decoder LogsDecoderFunc) simpleReceiverOpt

func WithMethod

func WithMethod(method string) simpleReceiverOpt

func WithMetricsDecoder

func WithMetricsDecoder(decoder MetricsDecoderFunc) simpleReceiverOpt

func WithTimeout

func WithTimeout(timeout time.Duration) simpleReceiverOpt

Types

type Component

type Component interface {
	GetId() string
}

type EventsDecoderFunc

type EventsDecoderFunc func(
	receiver EventsReceiver,
	in io.ReadCloser,
	out chan<- model.Event,
) error

type EventsExporter

type EventsExporter interface {
	Component
	ExportEvents(ctx context.Context, events []model.Event) error
}

type EventsPipeline

type EventsPipeline pipeline[model.Event]

func NewEventsPipeline

func NewEventsPipeline() *EventsPipeline

func (*EventsPipeline) AddExporter

func (p *EventsPipeline) AddExporter(exporter EventsExporter)

func (*EventsPipeline) AddProcessor

func (p *EventsPipeline) AddProcessor(processor ProcessorFunc[model.Event])

func (*EventsPipeline) AddReceiver

func (p *EventsPipeline) AddReceiver(receiver EventsReceiver)

func (*EventsPipeline) Execute

func (p *EventsPipeline) Execute(ctx context.Context) error

func (*EventsPipeline) ExecuteSync

func (p *EventsPipeline) ExecuteSync(ctx context.Context) []error

func (*EventsPipeline) Shutdown

func (p *EventsPipeline) Shutdown(ctx context.Context) error

func (*EventsPipeline) Start

func (p *EventsPipeline) Start(ctx context.Context, wg *sync.WaitGroup) error

type EventsReceiver

type EventsReceiver interface {
	Component
	PollEvents(context context.Context, writer chan<- model.Event) error
}

type Exporter

type Exporter[T interface{}] interface {
	Component
	Export(context.Context, []T) error
}

type ExporterAdapter

type ExporterAdapter[T interface{}] struct {
	// contains filtered or unexported fields
}

func NewExporterAdapter

func NewExporterAdapter[T interface{}](
	id string,
	exporter ExporterFunc[T],
) *ExporterAdapter[T]

func (*ExporterAdapter[T]) Export

func (r *ExporterAdapter[T]) Export(ctx context.Context, data []T) error

func (*ExporterAdapter[T]) GetId

func (r *ExporterAdapter[T]) GetId() string

type ExporterFunc

type ExporterFunc[T interface{}] func(context.Context, []T) error

type LogsDecoderFunc

type LogsDecoderFunc func(
	receiver LogsReceiver,
	in io.ReadCloser,
	out chan<- model.Log,
) error

type LogsExporter

type LogsExporter interface {
	Component
	ExportLogs(ctx context.Context, logs []model.Log) error
}

type LogsPipeline

type LogsPipeline pipeline[model.Log]

func NewLogsPipeline

func NewLogsPipeline() *LogsPipeline

func (*LogsPipeline) AddExporter

func (p *LogsPipeline) AddExporter(exporter LogsExporter)

func (*LogsPipeline) AddProcessor

func (p *LogsPipeline) AddProcessor(processor ProcessorFunc[model.Log])

func (*LogsPipeline) AddReceiver

func (p *LogsPipeline) AddReceiver(receiver LogsReceiver)

func (*LogsPipeline) Execute

func (p *LogsPipeline) Execute(ctx context.Context) error

func (*LogsPipeline) ExecuteSync

func (p *LogsPipeline) ExecuteSync(ctx context.Context) []error

func (*LogsPipeline) Shutdown

func (p *LogsPipeline) Shutdown(ctx context.Context) error

func (*LogsPipeline) Start

func (p *LogsPipeline) Start(ctx context.Context, wg *sync.WaitGroup) error

type LogsReceiver

type LogsReceiver interface {
	Component
	PollLogs(context context.Context, writer chan<- model.Log) error
}

type MetricsDecoderFunc

type MetricsDecoderFunc func(
	receiver MetricsReceiver,
	in io.ReadCloser,
	out chan<- model.Metric,
) error

type MetricsExporter

type MetricsExporter interface {
	Component
	ExportMetrics(ctx context.Context, metrics []model.Metric) error
}

type MetricsPipeline

type MetricsPipeline pipeline[model.Metric]

func NewMetricsPipeline

func NewMetricsPipeline() *MetricsPipeline

func (*MetricsPipeline) AddExporter

func (p *MetricsPipeline) AddExporter(exporter MetricsExporter)

func (*MetricsPipeline) AddProcessor

func (p *MetricsPipeline) AddProcessor(processor ProcessorFunc[model.Metric])

func (*MetricsPipeline) AddReceiver

func (p *MetricsPipeline) AddReceiver(receiver MetricsReceiver)

func (*MetricsPipeline) Execute

func (p *MetricsPipeline) Execute(ctx context.Context) error

func (*MetricsPipeline) ExecuteSync

func (p *MetricsPipeline) ExecuteSync(ctx context.Context) []error

func (*MetricsPipeline) Shutdown

func (p *MetricsPipeline) Shutdown(ctx context.Context) error

func (*MetricsPipeline) Start

func (p *MetricsPipeline) Start(ctx context.Context, wg *sync.WaitGroup) error

type MetricsReceiver

type MetricsReceiver interface {
	Component
	PollMetrics(context context.Context, writer chan<- model.Metric) error
}

type Pipeline

type Pipeline interface {
	Start(ctx context.Context, wg *sync.WaitGroup) error
	Execute(ctx context.Context) error
	ExecuteSync(ctx context.Context) []error
	Shutdown(ctx context.Context) error
}

type ProcessorFunc

type ProcessorFunc[T any] func(data []T) ([]T, error)

type ProcessorList

type ProcessorList[T any] struct {
	// contains filtered or unexported fields
}

func (*ProcessorList[T]) AddProcessor

func (pl *ProcessorList[T]) AddProcessor(processor ProcessorFunc[T])

func (*ProcessorList[T]) Process

func (mpl *ProcessorList[T]) Process(data []T) ([]T, error)

type ProcessorListNode

type ProcessorListNode[T any] struct {
	// contains filtered or unexported fields
}

func (*ProcessorListNode[T]) Process

func (pln *ProcessorListNode[T]) Process(data []T) ([]T, error)

type Receiver

type Receiver[T interface{}] interface {
	Component
	Poll(context.Context, chan<- T) error
}

type ReceiverAdapter

type ReceiverAdapter[T interface{}] struct {
	// contains filtered or unexported fields
}

func NewReceiverAdapter

func NewReceiverAdapter[T interface{}](
	id string,
	poller ReceiverFunc[T],
) *ReceiverAdapter[T]

func (*ReceiverAdapter[T]) GetId

func (r *ReceiverAdapter[T]) GetId() string

func (*ReceiverAdapter[T]) Poll

func (r *ReceiverAdapter[T]) Poll(ctx context.Context, ch chan<- T) error

type ReceiverFunc

type ReceiverFunc[T interface{}] func(context.Context, chan<- T) error

type SimpleReceiver

type SimpleReceiver struct {
	// contains filtered or unexported fields
}

func NewSimpleReceiver

func NewSimpleReceiver(
	id string,
	url string,
	simpleReceiverOpts ...simpleReceiverOpt,
) *SimpleReceiver

func (*SimpleReceiver) GetId

func (r *SimpleReceiver) GetId() string

func (*SimpleReceiver) PollEvents

func (s *SimpleReceiver) PollEvents(
	ctx context.Context,
	out chan<- model.Event,
) error

func (*SimpleReceiver) PollLogs

func (s *SimpleReceiver) PollLogs(
	ctx context.Context,
	out chan<- model.Log,
) error

func (*SimpleReceiver) PollMetrics

func (s *SimpleReceiver) PollMetrics(
	ctx context.Context,
	metricChan chan<- model.Metric,
) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL