eventbus

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2022 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ComponentStop  = ComponentEventType("stop")
	ComponentStart = ComponentEventType("start")
)

Variables

View Source
var (
	FileSourceMetricTopic = "filesource"
	FileWatcherTopic      = "filewatcher"
	SinkMetricTopic       = "sink"
	ReloadTopic           = "reload"
	ErrorTopic            = "error"
	LogAlertTopic         = "log"
	QueueMetricTopic      = "queue"
	PipelineTopic         = "pipeline"
	ComponentBaseTopic    = "component"
	SystemTopic           = "sys"
)

Functions

func AfterErrorFunc

func AfterErrorFunc(errorMsg string)

func GetFieldsByRef

func GetFieldsByRef(fieldsRef []string, fields map[string]interface{}) map[string]interface{}

func InjectFields

func InjectFields(labels map[string]string, fields map[string]interface{})

func Publish

func Publish(topic string, data interface{})

func PublishOrDrop

func PublishOrDrop(topic string, data interface{})

func Registry

func Registry(listenerName string, listenerFactory ListenerFactory, opts ...SubscribeOpt)

func RegistrySubscribe

func RegistrySubscribe(subscribe *Subscribe)

func StartAndRun

func StartAndRun(config Config)

Types

type BaseMetric

type BaseMetric struct {
	PipelineName string
	SourceName   string
}

type BaseMetricData

type BaseMetricData struct {
	Lines int
	Bytes int
}

type CollectMetricData

type CollectMetricData struct {
	BaseMetric
	FileName     string // including path
	Offset       int64
	LineNumber   int64 // file lines count
	Lines        int64 // current line offset
	FileSize     int64
	SourceFields map[string]interface{}
}

type ComponentBaseConfig

type ComponentBaseConfig struct {
	Name     string
	Type     api.Type
	Category api.Category
}

func (ComponentBaseConfig) Code

func (cbc ComponentBaseConfig) Code() string

type ComponentBaseMetricData

type ComponentBaseMetricData struct {
	EventType    ComponentEventType // "start","stop"...
	PipelineName string
	EpochTime    time.Time
	Config       ComponentBaseConfig
}

type ComponentEventType

type ComponentEventType string

type Config

type Config struct {
	LoggerConfig    logger.Config            `yaml:"logger"`
	ListenerConfigs map[string]cfg.CommonCfg `yaml:"listeners"`
}

type ErrorMetricData

type ErrorMetricData struct {
	ErrorMsg string
}

type Event

type Event struct {
	Topic       string
	PublishTime time.Time
	Data        interface{}
}

func NewEvent

func NewEvent(topic string, data interface{}) Event

type EventCenter

type EventCenter struct {
	// contains filtered or unexported fields
}

func NewEventCenter

func NewEventCenter(bufferSize int64, asyncConsumerSize int) *EventCenter

func (*EventCenter) Stop

func (ec *EventCenter) Stop()

type FileInfo

type FileInfo struct {
	FileName       string
	Size           int64
	LastModifyTime time.Time
	Offset         int64
	IsIgnoreOlder  bool
	IsRelease      bool
}

type Listener

type Listener interface {
	api.Lifecycle
	Name() string
	Config() interface{}
	Subscribe(event Event)
}

type ListenerFactory

type ListenerFactory func() Listener

type LogAlertData

type LogAlertData struct {
	Labels      map[string]string
	Annotations map[string]string
}

func NewLogAlertData

func NewLogAlertData(labels map[string]string, annotations map[string]string) *LogAlertData

func (*LogAlertData) Fingerprint

func (lad *LogAlertData) Fingerprint() string

type PipelineMetricData

type PipelineMetricData struct {
	EventType        ComponentEventType
	Name             string
	Time             time.Time
	ComponentConfigs []ComponentBaseConfig
}

type QueueMetricData

type QueueMetricData struct {
	PipelineName string
	Type         string
	Capacity     int64
	Size         int64
}

type ReloadMetricData

type ReloadMetricData struct {
	Tick int
}

type SinkMetricData

type SinkMetricData struct {
	BaseMetric
	SuccessEventCount int
	FailEventCount    int
}

type Subscribe

type Subscribe struct {
	// contains filtered or unexported fields
}

func NewSubscribe

func NewSubscribe(listenerName string, factory ListenerFactory, opts ...SubscribeOpt) *Subscribe

type SubscribeOpt

type SubscribeOpt func(s *Subscribe)

func WithAsync

func WithAsync(async bool) SubscribeOpt

func WithTopic

func WithTopic(topic string) SubscribeOpt

func WithTopics

func WithTopics(topics []string) SubscribeOpt

type WatchMetricData

type WatchMetricData struct {
	BaseMetric
	FileInfos       []FileInfo
	TotalFileCount  int
	InactiveFdCount int
	SourceFields    map[string]interface{}
}

Directories

Path Synopsis
export
listener
sys

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL