Documentation ¶
Overview ¶
Package input provides input components
Index ¶
- Variables
- func NewKCL(cfg baker.InputParams) (baker.Input, error)
- func NewKinesis(cfg baker.InputParams) (baker.Input, error)
- func NewList(cfg baker.InputParams) (baker.Input, error)
- func NewSQS(cfg baker.InputParams) (baker.Input, error)
- func NewTCP(cfg baker.InputParams) (baker.Input, error)
- type KCL
- type KCLConfig
- type Kinesis
- type KinesisConfig
- type List
- type ListConfig
- type SQS
- type SQSConfig
- type TCP
- type TCPConfig
Constants ¶
This section is empty.
Variables ¶
View Source
var All = []baker.InputDesc{ KCLDesc, KinesisDesc, ListDesc, SQSDesc, TCPDesc, }
All is the list of all baker inputs.
View Source
var KCLDesc = baker.InputDesc{ Name: "KCL", New: NewKCL, Config: &KCLConfig{}, Help: "This input fetches records from Kinesis with KCL. It consumes a specified stream, and\n" + "processes all shards in that stream. It never exits.\n" + "Multiple baker instances can consume the same stream, in that case the KCL will take care of\n" + "balancing the shards between workers. Careful (shard stealing is not implemented yet).\n" + "Resharding on the producer side is automatically handled by the KCL that will distribute\n" + "the shards among KCL workers.", }
KCLDesc describes the KCL input.
View Source
var KinesisDesc = baker.InputDesc{ Name: "Kinesis", New: NewKinesis, Config: &KinesisConfig{}, Help: "This input fetches log lines from Kinesis. It listens on a specified stream, and\n" + "processes all the shards in that stream. It never exits.\n", }
View Source
var ListDesc = baker.InputDesc{ Name: "List", New: NewList, Config: &ListConfig{}, Help: "This input fetches logs from a predefined list of local or remote sources. The \"Files\"\n" + "configuration variable is a list of \"file specifiers\". Each \"file specifier\" can be:\n\n" + " * A local file path on the filesystem: the log file at that path will be processed\n" + " * A HTTP/HTTPS URL: the log file at that URL will be downloaded and processed\n" + " * A S3 URL: the log file at that URL that will be downloaded and processed\n" + " * \"@\" followed by a local path pointing to a file: the file is expected to be a text file\n" + " and each line will be read and parsed as a \"file specifier\"\n" + " * \"@\" followed by a HTTP/HTTPS URL: the text file pointed by the URL will be downloaded,\n" + " and each line will be read and parsed as a \"file specifier\"\n" + " * \"@\" followed by a S3 URL pointing to a file: the text file pointed by the URL will be\n" + " downloaded, and each line will be read and parsed as a \"file specifier\"\n" + " * \"@\" followed by a local path pointing to a directory (must end with a slash): the directory will be recursively\n" + " walked, and all files matching the \"MatchPath\" option regexp will be processed as logfiles\n" + " * \"@\" followed by a S3 URL pointing to a directory: the directory on S3 will be recursively\n" + " walked, and all files matching the \"MatchPath\" option regexp will be processed as logfiles\n" + " * \"-\": the contents of a log file will be read from stdin and processed\n" + " * \"@-\": each line read from stdin will be parsed as a \"file specifier\"\n\n" + "All records produced by this input contain 2 metadata values:\n" + " * url: the files that originally contained the record\n" + " * last_modified: the last modification datetime of the above file\n", }
View Source
var SQSDesc = baker.InputDesc{ Name: "SQS", New: NewSQS, Config: &SQSConfig{}, Help: "This input listens on multiple SQS queues for new incoming log files\n" + "on S3; it is meant to be used with SQS queues popoulated by SNS.\n" + "It never exits.\n", }
View Source
var TCPDesc = baker.InputDesc{ Name: "TCP", New: NewTCP, Config: &TCPConfig{}, Help: "This input relies on a TCP connection to receive records in the usual format\n" + "Configure it with a host and port that you want to accept connection from.\n" + "By default it listens on port 6000 for any connection\n" + "It never exits.\n", }
Functions ¶
func NewKinesis ¶
func NewKinesis(cfg baker.InputParams) (baker.Input, error)
NewKinesis creates a Kinesis tail, and immediately do a first connection to get the current shard list.
Types ¶
type KCL ¶
type KCL struct {
// contains filtered or unexported fields
}
KCL is a Baker input reading from Kinesis with the KCL (Kinesis Client Library).
func (*KCL) CreateProcessor ¶
func (k *KCL) CreateProcessor() interfaces.IRecordProcessor
CreateProcessor implements interfaces.IRecordProcessorFactory.
type KCLConfig ¶
type KCLConfig struct { AwsRegion string `help:"AWS region to connect to" default:"us-west-2"` Stream string `help:"Name of Kinesis stream" required:"true"` AppName string `help:"Used by KCL to allow multiple app to consume the same stream." required:"true"` MaxShards int `help:"Max shards this Worker can handle at a time" default:"32767"` ShardSync time.Duration `help:"Time between tasks to sync leases and Kinesis shards" default:"60s"` InitialPosition string `help:"Position in the stream where a new application should start from. Values: LATEST or TRIM_HORIZON" default:"LATEST"` // contains filtered or unexported fields }
KCLConfig is the configuration for the KCL input.
type Kinesis ¶
type Kinesis struct { Cfg *KinesisConfig Data chan<- *baker.Data // contains filtered or unexported fields }
func (*Kinesis) Stats ¶
func (s *Kinesis) Stats() baker.InputStats
type KinesisConfig ¶
type List ¶
type List struct { Cfg *ListConfig // contains filtered or unexported fields }
func (*List) ProcessDirectory ¶
func (*List) Stats ¶
func (s *List) Stats() baker.InputStats
type ListConfig ¶
type ListConfig struct { Files []string `help:"List of log-files, directories and/or list-files to process" default:"[\"-\"]"` MatchPath string `help:"regexp to filter files in specified directories" default:".*\\.log\\.gz"` Region string `help:"AWS Region for fetching from S3" default:"us-west-2"` }
type SQS ¶
type SQS struct { Cfg *SQSConfig FilePathRegexp *regexp.Regexp // contains filtered or unexported fields }
func (*SQS) Stats ¶
func (s *SQS) Stats() baker.InputStats
type SQSConfig ¶
type SQSConfig struct { AwsRegion string `help:"AWS region to connect to" default:"us-west-2"` Bucket string `help:"S3 Bucket to use for processing" default:""` QueuePrefixes []string `help:"Prefixes of the names of the SQS queues to monitor" required:"true"` MessageFormat string `` /* 189-byte string literal not displayed */ FilePathFilter string `help:"If provided, will only use S3 files with the given path."` }
Click to show internal directories.
Click to hide internal directories.