Documentation
¶
Index ¶
- Variables
- func NewTLSConfig(config *Configuration, clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error)
- func SignalExitCond(outputDone *sync.Cond)
- type BaseOutput
- type BufferOutput
- type BundleBehavior
- type BundleStatistics
- type BundledOutput
- type FileOutput
- type FileStatistics
- type HTTPBehavior
- func (this *HTTPBehavior) CreateTransport() http.RoundTripper
- func (this *HTTPBehavior) Initialize(dest string) error
- func (this *HTTPBehavior) Key() string
- func (this *HTTPBehavior) Statistics() interface{}
- func (this *HTTPBehavior) String() string
- func (this *HTTPBehavior) Upload(fileName string, fp *os.File) UploadStatus
- type HTTPStatistics
- type KafkaOutput
- type KafkaStatistics
- type NGS3Output
- func (so *NGS3Output) ExitCleanup()
- func (so *NGS3Output) HandleHup() error
- func (so *NGS3Output) HandleMessage(message string) error
- func (so *NGS3Output) HandleTerm()
- func (so *NGS3Output) HandleTick() error
- func (so *NGS3Output) Initialize(connString string) error
- func (so *NGS3Output) Key() string
- func (so *NGS3Output) Start() (err error)
- func (so *NGS3Output) Statistics() interface{}
- func (so *NGS3Output) String() string
- type NetOutput
- type NetStatistics
- type Output
- type OutputHandler
- type OutputInitializer
- type OutputKeys
- type S3Behavior
- type S3ChunkingPublisher
- func (chunkingPublisher *S3ChunkingPublisher) LaunchInputWorkers(workerNum int) error
- func (chunkingPublisher *S3ChunkingPublisher) RollChunkIf(uploadEmpty bool) (err error)
- func (chunkingPublisher *S3ChunkingPublisher) RollChunkIfTimeElapsed(uploadEmpty bool, duration time.Duration) (err error)
- func (chunkingPublisher *S3ChunkingPublisher) Start() error
- func (chunkingPublisher *S3ChunkingPublisher) Stop()
- type S3OutputChunk
- func (chunk *S3OutputChunk) CloseChunkReader() error
- func (chunk *S3OutputChunk) CloseChunkWriters() error
- func (chunk *S3OutputChunk) FlushIfNeeded() error
- func (chunk *S3OutputChunk) Full() bool
- func (chunk *S3OutputChunk) MarkSent()
- func (chunk *S3OutputChunk) NeedsFlush() bool
- func (chunk *S3OutputChunk) PrepareS3UploadInput(workerId int) *s3manager.UploadInput
- func (chunk *S3OutputChunk) Read(buffer []byte) (n int, err error)
- func (chunk *S3OutputChunk) Write(message string) error
- type S3OutputChunkWorker
- func (chunkWorker *S3OutputChunkWorker) CloseCurrentChunk() error
- func (chunkWorker *S3OutputChunkWorker) CurrentChunkTime() time.Time
- func (chunkWorker *S3OutputChunkWorker) RollChunk() error
- func (chunkWorker *S3OutputChunkWorker) RollChunkAndSend() error
- func (chunkWorker *S3OutputChunkWorker) RollChunkIf(emptyOk bool) error
- func (chunkWorker *S3OutputChunkWorker) RollChunkIfTimeElapsed(emptyOk bool, duration time.Duration) error
- func (chunkWorker *S3OutputChunkWorker) SendChunk()
- func (chunkWorker *S3OutputChunkWorker) Work(workerId int, wg *sync.WaitGroup, input <-chan string)
- type S3Publisher
- type S3PublisherWorker
- type S3Statistics
- type SplunkBehavior
- type SplunkStatistics
- type SyslogOutput
- type SyslogStatistics
- type UploadData
- type UploadEvent
- type UploadStatus
- type WrappedUploader
- type XDGSCRAMClient
Constants ¶
This section is empty.
Variables ¶
var ( SHA256 scram.HashGeneratorFcn = sha256.New SHA512 scram.HashGeneratorFcn = sha512.New )
Functions ¶
func NewTLSConfig ¶
func SignalExitCond ¶
Types ¶
type BaseOutput ¶
type BaseOutput struct {
OutputHandler
}
func NewNGS3OutputFromConfig ¶
func NewNGS3OutputFromConfig(cfg *Configuration) *BaseOutput
type BufferOutput ¶
type BufferOutput struct {
// contains filtered or unexported fields
}
type BundleBehavior ¶
type BundleBehavior interface { Upload(fileName string, fp *os.File) UploadStatus Initialize(connString string) error Statistics() interface{} Key() string String() string }
Each bundled output plugin must implement the BundleBehavior interface, specifying how to upload files, initialize itself, and report back statistics.
type BundleStatistics ¶
type BundleStatistics struct { FilesUploaded int64 `json:"files_uploaded"` UploadErrors int64 `json:"upload_errors"` LastErrorTime time.Time `json:"last_error_time"` LastErrorText string `json:"last_error_text"` LastSuccessfulUpload time.Time `json:"last_successful_upload"` HoldingArea interface{} `json:"file_holding_area"` StorageStatistics interface{} `json:"storage_statistics"` BundleSendTimeout int64 `json:"bundle_send_timeout"` BundleSizeMax int64 `json:"bundle_size_max"` UploadEmptyFiles bool `json:"upload_empty_files"` }
type BundledOutput ¶
type BundledOutput struct { Behavior BundleBehavior Config *Configuration // TODO: make this thread-safe from the status page sync.RWMutex // contains filtered or unexported fields }
func NewHTTPOutputFromConfig ¶
func NewHTTPOutputFromConfig(cfg *Configuration) *BundledOutput
func NewS3OutputFromConfig ¶
func NewS3OutputFromConfig(cfg *Configuration) *BundledOutput
func NewSplunkOutputFromConfig ¶
func NewSplunkOutputFromConfig(cfg *Configuration) *BundledOutput
func (*BundledOutput) Initialize ¶
func (o *BundledOutput) Initialize(connString string) error
func (*BundledOutput) Key ¶
func (o *BundledOutput) Key() string
func (*BundledOutput) Statistics ¶
func (o *BundledOutput) Statistics() interface{}
func (*BundledOutput) String ¶
func (o *BundledOutput) String() string
type FileOutput ¶
type FileOutput struct { Config *Configuration sync.RWMutex // contains filtered or unexported fields }
func NewFileOutputFromConfig ¶
func NewFileOutputFromConfig(cfg *Configuration) *FileOutput
func (*FileOutput) Initialize ¶
func (o *FileOutput) Initialize(fileName string) error
func (*FileOutput) Key ¶
func (o *FileOutput) Key() string
func (*FileOutput) Statistics ¶
func (o *FileOutput) Statistics() interface{}
func (*FileOutput) String ¶
func (o *FileOutput) String() string
type FileStatistics ¶
type HTTPBehavior ¶
type HTTPBehavior struct { Config *Configuration HTTPPostTemplate *template.Template // contains filtered or unexported fields }
This is the HTTP implementation of the OutputHandler interface defined in main.go
func (*HTTPBehavior) CreateTransport ¶
func (this *HTTPBehavior) CreateTransport() http.RoundTripper
createTransport returns Transport which will be used in http.Client.
func (*HTTPBehavior) Initialize ¶
func (this *HTTPBehavior) Initialize(dest string) error
Construct the HTTPBehavior object
func (*HTTPBehavior) Key ¶
func (this *HTTPBehavior) Key() string
func (*HTTPBehavior) Statistics ¶
func (this *HTTPBehavior) Statistics() interface{}
func (*HTTPBehavior) String ¶
func (this *HTTPBehavior) String() string
func (*HTTPBehavior) Upload ¶
func (this *HTTPBehavior) Upload(fileName string, fp *os.File) UploadStatus
This function does a POST of the given event to this.dest. UploadBehavior is called from within its own
goroutine so we can do some expensive work here.
type HTTPStatistics ¶
type HTTPStatistics struct {
Destination string `json:"destination"`
}
type KafkaOutput ¶
type KafkaOutput struct { Config *Configuration EventSent metrics.Meter DroppedEvent metrics.Meter sync.RWMutex // contains filtered or unexported fields }
func NewKafkaOutputFromConfig ¶
func NewKafkaOutputFromConfig(cfg *Configuration) *KafkaOutput
func (*KafkaOutput) Initialize ¶
func (o *KafkaOutput) Initialize(unused string) (err error)
func (*KafkaOutput) Key ¶
func (o *KafkaOutput) Key() string
func (*KafkaOutput) Statistics ¶
func (o *KafkaOutput) Statistics() interface{}
func (*KafkaOutput) String ¶
func (o *KafkaOutput) String() string
type KafkaStatistics ¶
type NGS3Output ¶
type NGS3Output struct { Config *Configuration // contains filtered or unexported fields }
func (*NGS3Output) ExitCleanup ¶
func (so *NGS3Output) ExitCleanup()
func (*NGS3Output) HandleHup ¶
func (so *NGS3Output) HandleHup() error
func (*NGS3Output) HandleMessage ¶
func (so *NGS3Output) HandleMessage(message string) error
func (*NGS3Output) HandleTerm ¶
func (so *NGS3Output) HandleTerm()
func (*NGS3Output) HandleTick ¶
func (so *NGS3Output) HandleTick() error
func (*NGS3Output) Initialize ¶
func (so *NGS3Output) Initialize(connString string) error
func (*NGS3Output) Key ¶
func (so *NGS3Output) Key() string
func (*NGS3Output) Start ¶
func (so *NGS3Output) Start() (err error)
func (*NGS3Output) Statistics ¶
func (so *NGS3Output) Statistics() interface{}
func (*NGS3Output) String ¶
func (so *NGS3Output) String() string
type NetOutput ¶
type NetOutput struct { Config *Configuration sync.RWMutex // contains filtered or unexported fields }
func NewNetOutputfromConfig ¶
func NewNetOutputfromConfig(cfg *Configuration) *NetOutput
func (*NetOutput) Initialize ¶
Initialize expects a connection string in the following format: (protocol):(hostname/IP):(port) for example: tcp:destination.server.example.com:512
func (*NetOutput) Statistics ¶
func (o *NetOutput) Statistics() interface{}
type NetStatistics ¶
type Output ¶
type Output interface { Go(messages <-chan string, signalChan <-chan os.Signal, exitCond *sync.Cond) error OutputKeys OutputInitializer }
type OutputHandler ¶
type OutputHandler interface { Start() error HandleMessage(message string) error HandleHup() error HandleTick() error ExitCleanup() HandleTerm() OutputKeys OutputInitializer }
type OutputInitializer ¶
type OutputKeys ¶
type S3Behavior ¶
type S3Behavior struct { Config *Configuration // contains filtered or unexported fields }
func (*S3Behavior) Initialize ¶
func (o *S3Behavior) Initialize(connString string) error
func (*S3Behavior) Key ¶
func (o *S3Behavior) Key() string
func (*S3Behavior) Statistics ¶
func (o *S3Behavior) Statistics() interface{}
func (*S3Behavior) String ¶
func (o *S3Behavior) String() string
func (*S3Behavior) Upload ¶
func (o *S3Behavior) Upload(fileName string, fp *os.File) UploadStatus
type S3ChunkingPublisher ¶
type S3ChunkingPublisher struct { S3Publisher Input chan string // contains filtered or unexported fields }
func NewS3ChunkingPublisher ¶
func NewS3ChunkingPublisher(cfg *Configuration, uploader WrappedUploader, bucketName string) *S3ChunkingPublisher
func (*S3ChunkingPublisher) LaunchInputWorkers ¶
func (chunkingPublisher *S3ChunkingPublisher) LaunchInputWorkers(workerNum int) error
func (*S3ChunkingPublisher) RollChunkIf ¶
func (chunkingPublisher *S3ChunkingPublisher) RollChunkIf(uploadEmpty bool) (err error)
func (*S3ChunkingPublisher) RollChunkIfTimeElapsed ¶
func (chunkingPublisher *S3ChunkingPublisher) RollChunkIfTimeElapsed(uploadEmpty bool, duration time.Duration) (err error)
func (*S3ChunkingPublisher) Start ¶
func (chunkingPublisher *S3ChunkingPublisher) Start() error
func (*S3ChunkingPublisher) Stop ¶
func (chunkingPublisher *S3ChunkingPublisher) Stop()
type S3OutputChunk ¶
type S3OutputChunk struct { Closed bool // contains filtered or unexported fields }
func NewS3OutputChunk ¶
func NewS3OutputChunk(cfg *Configuration, chunkSize, flushSize int64, fileName, bucketName string) (*S3OutputChunk, error)
func (*S3OutputChunk) CloseChunkReader ¶
func (chunk *S3OutputChunk) CloseChunkReader() error
func (*S3OutputChunk) CloseChunkWriters ¶
func (chunk *S3OutputChunk) CloseChunkWriters() error
func (*S3OutputChunk) FlushIfNeeded ¶
func (chunk *S3OutputChunk) FlushIfNeeded() error
func (*S3OutputChunk) Full ¶
func (chunk *S3OutputChunk) Full() bool
func (*S3OutputChunk) MarkSent ¶
func (chunk *S3OutputChunk) MarkSent()
func (*S3OutputChunk) NeedsFlush ¶
func (chunk *S3OutputChunk) NeedsFlush() bool
func (*S3OutputChunk) PrepareS3UploadInput ¶
func (chunk *S3OutputChunk) PrepareS3UploadInput(workerId int) *s3manager.UploadInput
func (*S3OutputChunk) Read ¶
func (chunk *S3OutputChunk) Read(buffer []byte) (n int, err error)
Read is called by the S3 uploader. It is repeatedly called to pull data off the pipe, so it can be written to S3. Once the writer end of the pipe is closed, the current S3 upload will be completed and no more reading will be performed on that pipe.
func (*S3OutputChunk) Write ¶
func (chunk *S3OutputChunk) Write(message string) error
type S3OutputChunkWorker ¶
func NewS3ChunkWorker ¶
func NewS3ChunkWorker(cfg *Configuration, uploads chan<- *S3OutputChunk, chunkSize, flushSize int64, fileName, bucketName string) (*S3OutputChunkWorker, error)
func (*S3OutputChunkWorker) CloseCurrentChunk ¶
func (chunkWorker *S3OutputChunkWorker) CloseCurrentChunk() error
func (*S3OutputChunkWorker) CurrentChunkTime ¶
func (chunkWorker *S3OutputChunkWorker) CurrentChunkTime() time.Time
func (*S3OutputChunkWorker) RollChunk ¶
func (chunkWorker *S3OutputChunkWorker) RollChunk() error
RollChunk ends the current chunk and creates another. Ending it is accomplished by closing the pipe writer which results in the read and then the S3 upload completing. The chunk is created before any data is received.
func (*S3OutputChunkWorker) RollChunkAndSend ¶
func (chunkWorker *S3OutputChunkWorker) RollChunkAndSend() error
RollChunkAndSend creates a new chunk and sends it to the thead doing the S3 uploading via a channel.
func (*S3OutputChunkWorker) RollChunkIf ¶
func (chunkWorker *S3OutputChunkWorker) RollChunkIf(emptyOk bool) error
func (*S3OutputChunkWorker) RollChunkIfTimeElapsed ¶
func (chunkWorker *S3OutputChunkWorker) RollChunkIfTimeElapsed(emptyOk bool, duration time.Duration) error
func (*S3OutputChunkWorker) SendChunk ¶
func (chunkWorker *S3OutputChunkWorker) SendChunk()
SendChunk - sends the details of a chunk (which includes the pipe being used to send data) and is one of the first things that is done. Doing this triggers the creation of a new S3 uploader, which will continuously read from the pipe until it is closed. This does not send RabbitMQ data, but rather a chunk object.
type S3Publisher ¶
type S3Publisher struct { WrappedUploader // contains filtered or unexported fields }
func NewS3Publisher ¶
func NewS3Publisher(waitGroup *sync.WaitGroup, uploader WrappedUploader, uploadInputs <-chan *S3OutputChunk) S3Publisher
func (*S3Publisher) LaunchUploadWorkers ¶
func (publisher *S3Publisher) LaunchUploadWorkers(workerNum int)
type S3PublisherWorker ¶
type S3PublisherWorker struct {
// contains filtered or unexported fields
}
func NewS3PublisherWorker ¶
func NewS3PublisherWorker(workerId int, waitGroup *sync.WaitGroup, uploads <-chan *S3OutputChunk, publisher WrappedUploader) *S3PublisherWorker
func (*S3PublisherWorker) Work ¶
func (worker *S3PublisherWorker) Work()
Work waits for a new chunk on the channel and then uses it to setup a new S3 uploader. That uploader will make calls to the above S3OutputChunk.Read() to pull data off the internal pipe where we are also writing data that is received from RabbitMQ. Once the pipe's writer is closed, the data will be uploaded and this will again wait for the next chunk. Note that chunks are created before any data is received.
type S3Statistics ¶
type SplunkBehavior ¶
type SplunkBehavior struct { Config *Configuration HTTPPostTemplate *template.Template // contains filtered or unexported fields }
This is the Splunk HTTP Event Collector (HEC) implementation of the OutputHandler interface defined in main.go
func (*SplunkBehavior) Initialize ¶
func (this *SplunkBehavior) Initialize(dest string) error
Construct the syslog_output.go object
func (*SplunkBehavior) Key ¶
func (this *SplunkBehavior) Key() string
func (*SplunkBehavior) Statistics ¶
func (this *SplunkBehavior) Statistics() interface{}
func (*SplunkBehavior) String ¶
func (this *SplunkBehavior) String() string
func (*SplunkBehavior) Upload ¶
func (this *SplunkBehavior) Upload(fileName string, fp *os.File) UploadStatus
This function does a POST of the given event to this.dest. UploadBehavior is called from within its own
goroutine so we can do some expensive work here.
type SplunkStatistics ¶
type SplunkStatistics struct {
Destination string `json:"destination"`
}
type SyslogOutput ¶
type SyslogOutput struct { Config *Configuration sync.RWMutex // contains filtered or unexported fields }
func NewSyslogOutputFromConfig ¶
func NewSyslogOutputFromConfig(cfg *Configuration) *SyslogOutput
func (*SyslogOutput) Initialize ¶
func (o *SyslogOutput) Initialize(netConn string) error
Initialize expects a connection string in the following format: [protocol]:host[:port] For example: tcp+tls:destination.server.example.com:512 - protocol is optional (but the colon before host is not) and should be something like:
- tcp, udp, tcp+tls
- port is optional and if not provided will default to 514.
func (*SyslogOutput) Key ¶
func (o *SyslogOutput) Key() string
func (*SyslogOutput) Statistics ¶
func (o *SyslogOutput) Statistics() interface{}
func (*SyslogOutput) String ¶
func (o *SyslogOutput) String() string
type SyslogStatistics ¶
type UploadData ¶
type UploadData struct { FileName string FileSize int64 Events chan UploadEvent }
type UploadEvent ¶
func NewUploadEvent ¶
func NewUploadEvent(eventSeq int64, eventText string, eventTextAsJsonByteArray bool) UploadEvent
NewUploadEvent creates an instance of UploadEvent.
type UploadStatus ¶
type UploadStatus struct {
// contains filtered or unexported fields
}
type WrappedUploader ¶
type WrappedUploader interface {
Upload(input *s3manager.UploadInput, options ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error)
}
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool