cwl

package
v0.0.0-...-a0b3b17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2019 License: MIT Imports: 20 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Fatal

func Fatal(err error)

Checks for error existance and panics

func IsBefore

func IsBefore(horizon time.Duration, lastEventTimestamp int64) bool

returns true if the event timestamp is before the specified horizon

func ToTime

func ToTime(timestamp int64) time.Time

Converts a timestamp to time

func ValidateMultiline

func ValidateMultiline(multiline *Multiline) error

Validates a multiline configuration section

Types

type AwsSession

type AwsSession struct {
	// contains filtered or unexported fields
}

func NewAwsSession

func NewAwsSession(awsRegion string) *AwsSession

func (*AwsSession) CloudWatchLogsClient

func (sess *AwsSession) CloudWatchLogsClient() cloudwatchlogsiface.CloudWatchLogsAPI

func (*AwsSession) S3Client

func (sess *AwsSession) S3Client() s3iface.S3API

type Config

type Config struct {
	S3BucketName           string        `config:"s3_bucket_name"`
	S3KeyPrefix            string        `config:"s3_key_prefix"`
	GroupRefreshFrequency  time.Duration `config:"group_refresh_frequency"`
	StreamRefreshFrequency time.Duration `config:"stream_refresh_frequency"`
	ReportFrequency        time.Duration `config:"report_frequency"`
	AWSRegion              string        `config:"aws_region"`

	HotStreamEventHorizon          time.Duration `config:"hot_stream_event_horizon"`
	HotStreamEventRefreshFrequency time.Duration `config:"hot_stream_event_refresh_frequency"`

	StreamEventHorizon          time.Duration `config:"stream_event_horizon"`
	StreamEventRefreshFrequency time.Duration `config:"stream_event_refresh_frequency"`

	StreamEventLimit int64 `config:"stream_event_limit"`

	Prospectors []Prospector `config:"prospectors"`
}

func DefaultConfig

func DefaultConfig(awsRegion string) *Config

func (*Config) String

func (config *Config) String() string

func (*Config) Validate

func (config *Config) Validate() error

type DummyRegistry

type DummyRegistry struct {
	// contains filtered or unexported fields
}

func (*DummyRegistry) ReadStreamInfo

func (registry *DummyRegistry) ReadStreamInfo(stream *Stream) error

func (*DummyRegistry) WriteStreamInfo

func (registry *DummyRegistry) WriteStreamInfo(stream *Stream) error

type Event

type Event struct {
	Stream    *Stream
	Message   string
	Timestamp int64
}

type EventPublisher

type EventPublisher interface {
	Publish(event *Event)
	Close()
}

type Group

type Group struct {
	Name       string
	Prospector *Prospector
	Params     *Params
	// contains filtered or unexported fields
}

func NewGroup

func NewGroup(name string, prospector *Prospector, params *Params) *Group

func (*Group) Monitor

func (group *Group) Monitor()

func (*Group) RefreshStreams

func (group *Group) RefreshStreams()

type GroupManager

type GroupManager struct {
	Params *Params
	// contains filtered or unexported fields
}

func NewGroupManager

func NewGroupManager(params *Params) *GroupManager

func (*GroupManager) Monitor

func (manager *GroupManager) Monitor()

type Multiline

type Multiline struct {
	Pattern string
	Negate  bool
	Match   string
}

type Params

type Params struct {
	Config    *Config
	Registry  Registry
	AWSClient cloudwatchlogsiface.CloudWatchLogsAPI
	Publisher EventPublisher
}

type Prospector

type Prospector struct {
	Id         string     `config:"id"`
	GroupNames []string   `config:"groupnames"`
	Multiline  *Multiline `config:"multiline"`
}

type Publisher

type Publisher struct {
	Client beat.Client
}

func (Publisher) Close

func (publisher Publisher) Close()

func (Publisher) Publish

func (publisher Publisher) Publish(event *Event)

type Registry

type Registry interface {
	ReadStreamInfo(*Stream) error
	WriteStreamInfo(*Stream) error
}

func NewDummyRegistry

func NewDummyRegistry() Registry

type RegistryItem

type RegistryItem struct {
	NextToken string
	Buffer    string
}

type S3Registry

type S3Registry struct {
	S3Client   s3iface.S3API
	BucketName string
	KeyPrefix  string
}

func (*S3Registry) GetBucketKeyForStream

func (registry *S3Registry) GetBucketKeyForStream(stream *Stream) string

func (*S3Registry) ReadStreamInfo

func (registry *S3Registry) ReadStreamInfo(stream *Stream) error

func (*S3Registry) WriteStreamInfo

func (registry *S3Registry) WriteStreamInfo(stream *Stream) error

type Stream

type Stream struct {
	Name   string
	Group  *Group
	Params *Params

	LastEventTimestamp int64 // the last event that we've processed (in milliseconds since 1970)
	// contains filtered or unexported fields
}

func NewStream

func NewStream(name string, group *Group, multiline *Multiline, finished chan<- bool, params *Params) *Stream

func (*Stream) FullName

func (stream *Stream) FullName() string

func (*Stream) IsHot

func (stream *Stream) IsHot(lastEventTimestamp int64) bool

func (*Stream) Monitor

func (stream *Stream) Monitor()

Monitor continuously monitors the stream for new events. If an error is encountered, monitoring will stop and the stream will send an event to the finished channel for the group to cleanup

func (*Stream) Next

func (stream *Stream) Next() error

Next fetches the next batch of events from the cloudwatchlogs stream returns the error (if any) otherwise nil

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL