Documentation ¶
Index ¶
- func CleanBucketPrefix(prefix string) (cleaned string)
- func FilterS3(bucket *s3.Bucket, prefix string, level int, schema Schema, ...)
- func PrettySize(bytes int64) string
- func ReadS3File(bucket *s3.Bucket, s3Key string, s3Offset uint64, recordChan chan S3Record)
- func S3FileIterator(bucket *s3.Bucket, s3Key string, offset uint64) <-chan S3Record
- func S3Iterator(bucket *s3.Bucket, prefix string, schema Schema) <-chan S3ListResult
- func SanitizeDimension(dim string) (cleaned string)
- type AnyDimensionChecker
- type DimensionChecker
- type ListDimensionChecker
- type MessageLocation
- type PublishAttempt
- type RangeDimensionChecker
- type S3ListResult
- type S3OffsetInput
- func (input *S3OffsetInput) ConfigStruct() interface{}
- func (input *S3OffsetInput) Init(config interface{}) (err error)
- func (input *S3OffsetInput) ReportMsg(msg *message.Message) error
- func (input *S3OffsetInput) Run(runner pipeline.InputRunner, helper pipeline.PluginHelper) error
- func (input *S3OffsetInput) Stop()
- type S3OffsetInputConfig
- type S3Record
- type S3SplitFileInput
- func (input *S3SplitFileInput) ConfigStruct() interface{}
- func (input *S3SplitFileInput) Init(config interface{}) (err error)
- func (input *S3SplitFileInput) ReportMsg(msg *message.Message) error
- func (input *S3SplitFileInput) Run(runner pipeline.InputRunner, helper pipeline.PluginHelper) error
- func (input *S3SplitFileInput) Stop()
- type S3SplitFileInputConfig
- type S3SplitFileOutput
- type S3SplitFileOutputConfig
- type Schema
- type SplitFileInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CleanBucketPrefix ¶
func FilterS3 ¶
Recursively descend into an S3 directory tree, filtering based on the given schema, and sending results on the given channel. The `level` parameter indicates how far down the tree we are, and is used to determine which schema field we use for filtering.
func PrettySize ¶
Return a nice, human-readable representation of the given number of bytes.
func ReadS3File ¶
func S3FileIterator ¶
List the contents of the given bucket, sending matching filenames to a channel which can be read by the caller.
func S3Iterator ¶
func S3Iterator(bucket *s3.Bucket, prefix string, schema Schema) <-chan S3ListResult
List the contents of the given bucket, sending matching filenames to a channel which can be read by the caller.
func SanitizeDimension ¶
Given a string, return a sanitized version that can be used safely as part of a filename (for example).
Types ¶
type AnyDimensionChecker ¶
type AnyDimensionChecker struct { }
Accept any value at all.
func (AnyDimensionChecker) IsAllowed ¶
func (adc AnyDimensionChecker) IsAllowed(v string) bool
func (AnyDimensionChecker) ListValues ¶
func (adc AnyDimensionChecker) ListValues() ([]string, bool)
type DimensionChecker ¶
Interface for calculating whether a particular value is acceptable as-is, or if it should be replaced with a default value.
type ListDimensionChecker ¶
type ListDimensionChecker struct {
// contains filtered or unexported fields
}
Accept a specific list of values, anything not in the list will not be accepted
func NewListDimensionChecker ¶
func NewListDimensionChecker(allowed []string) *ListDimensionChecker
Factory for creating a ListDimensionChecker using a list instead of a map
func (ListDimensionChecker) IsAllowed ¶
func (ldc ListDimensionChecker) IsAllowed(v string) bool
func (ListDimensionChecker) ListValues ¶
func (ldc ListDimensionChecker) ListValues() ([]string, bool)
Return the list of allowed values, sorted alphabetically.
type MessageLocation ¶
type PublishAttempt ¶
type RangeDimensionChecker ¶
type RangeDimensionChecker struct {
// contains filtered or unexported fields
}
If both are specified, accept any value between `min` and `max` (inclusive). If one of the bounds is missing, only enforce the other. If neither bound is present, accept all values.
func (RangeDimensionChecker) IsAllowed ¶
func (rdc RangeDimensionChecker) IsAllowed(v string) bool
func (RangeDimensionChecker) ListValues ¶
func (rdc RangeDimensionChecker) ListValues() ([]string, bool)
type S3ListResult ¶
Encapsulates the result of a List operation, allowing detection of errors along the way.
type S3OffsetInput ¶
type S3OffsetInput struct { *S3OffsetInputConfig // contains filtered or unexported fields }
func (*S3OffsetInput) ConfigStruct ¶
func (input *S3OffsetInput) ConfigStruct() interface{}
func (*S3OffsetInput) Init ¶
func (input *S3OffsetInput) Init(config interface{}) (err error)
func (*S3OffsetInput) Run ¶
func (input *S3OffsetInput) Run(runner pipeline.InputRunner, helper pipeline.PluginHelper) error
func (*S3OffsetInput) Stop ¶
func (input *S3OffsetInput) Stop()
type S3OffsetInputConfig ¶
type S3OffsetInputConfig struct { // So we can default to using ProtobufDecoder. Decoder string Splitter string ClientIdListFile string `toml:"client_id_list"` MetaFile string `toml:"metadata_file"` StartDate string `toml:"start_date"` EndDate string `toml:"end_date"` AWSKey string `toml:"aws_key"` AWSSecretKey string `toml:"aws_secret_key"` AWSRegion string `toml:"aws_region"` S3MetaBucket string `toml:"s3_meta_bucket"` S3MetaBucketPrefix string `toml:"s3_meta_bucket_prefix"` S3Bucket string `toml:"s3_bucket"` S3Retries uint32 `toml:"s3_retries"` S3ConnectTimeout uint32 `toml:"s3_connect_timeout"` S3ReadTimeout uint32 `toml:"s3_read_timeout"` S3WorkerCount uint32 `toml:"s3_worker_count"` }
type S3Record ¶
Encapsulates a single record within an S3 file, allowing detection of errors along the way.
type S3SplitFileInput ¶
type S3SplitFileInput struct { *S3SplitFileInputConfig // contains filtered or unexported fields }
func (*S3SplitFileInput) ConfigStruct ¶
func (input *S3SplitFileInput) ConfigStruct() interface{}
func (*S3SplitFileInput) Init ¶
func (input *S3SplitFileInput) Init(config interface{}) (err error)
func (*S3SplitFileInput) ReportMsg ¶
func (input *S3SplitFileInput) ReportMsg(msg *message.Message) error
func (*S3SplitFileInput) Run ¶
func (input *S3SplitFileInput) Run(runner pipeline.InputRunner, helper pipeline.PluginHelper) error
func (*S3SplitFileInput) Stop ¶
func (input *S3SplitFileInput) Stop()
type S3SplitFileInputConfig ¶
type S3SplitFileInputConfig struct { // So we can default to using ProtobufDecoder. Decoder string // So we can default to using HekaFramingSplitter. Splitter string SchemaFile string `toml:"schema_file"` AWSKey string `toml:"aws_key"` AWSSecretKey string `toml:"aws_secret_key"` AWSRegion string `toml:"aws_region"` S3Bucket string `toml:"s3_bucket"` S3BucketPrefix string `toml:"s3_bucket_prefix"` S3ObjectMatchRegex string `toml:"s3_object_match_regex"` S3Retries uint32 `toml:"s3_retries"` S3ConnectTimeout uint32 `toml:"s3_connect_timeout"` S3ReadTimeout uint32 `toml:"s3_read_timeout"` S3WorkerCount uint32 `toml:"s3_worker_count"` }
type S3SplitFileOutput ¶
type S3SplitFileOutput struct { *S3SplitFileOutputConfig // contains filtered or unexported fields }
Output plugin that writes message contents to a file on the file system.
func (*S3SplitFileOutput) ConfigStruct ¶
func (o *S3SplitFileOutput) ConfigStruct() interface{}
func (*S3SplitFileOutput) Init ¶
func (o *S3SplitFileOutput) Init(config interface{}) (err error)
func (*S3SplitFileOutput) ReportMsg ¶
func (o *S3SplitFileOutput) ReportMsg(msg *message.Message) error
func (*S3SplitFileOutput) Run ¶
func (o *S3SplitFileOutput) Run(or OutputRunner, h PluginHelper) (err error)
type S3SplitFileOutputConfig ¶
type S3SplitFileOutputConfig struct { // Base output file path. // In-flight files go to <Path>/current/<dimensionPath> // finalized files go to <Path>/finalized/<dimensionPath> Path string // Output file permissions (default "644"). Perm string // Path to Schema file (json). Defaults to using the standard schema. SchemaFile string `toml:"schema_file"` // Interval at which we should check MaxFileAge for in-flight files. FlushInterval uint32 `toml:"flush_interval"` // Permissions to apply to directories created for output directories if // they don't exist. Must be a string representation of an octal integer. // Defaults to "700". FolderPerm string `toml:"folder_perm"` // Specifies whether or not Heka's stream framing will be applied to the // output. We do some magic to default to true if ProtobufEncoder is used, // false otherwise. UseFraming *bool `toml:"use_framing"` // Specifies how much data (in bytes) can be written to a single file before // we rotate and begin writing to another one (default 500 * 1024 * 1024, // i.e. 500MB). MaxFileSize uint32 `toml:"max_file_size"` // Specifies how long (in milliseconds) to wait before rotating the current // file and begin writing to another one (default 60 * 60 * 1000, i.e. 1hr). MaxFileAge uint32 `toml:"max_file_age"` // Specifies how many data files to keep open at once. If there are more // "current" files than this, the least-recently used file will be closed, // and will be re-opened if more messages arrive before it is rotated. The // default is 1000. A value of 0 means no maximum. MaxOpenFiles int `toml:"max_open_files"` AWSKey string `toml:"aws_key"` AWSSecretKey string `toml:"aws_secret_key"` AWSRegion string `toml:"aws_region"` S3Bucket string `toml:"s3_bucket"` S3BucketPrefix string `toml:"s3_bucket_prefix"` S3Retries uint32 `toml:"s3_retries"` S3ConnectTimeout uint32 `toml:"s3_connect_timeout"` S3ReadTimeout uint32 `toml:"s3_read_timeout"` S3WorkerCount uint32 `toml:"s3_worker_count"` }
ConfigStruct for S3SplitFileOutput plugin.
type Schema ¶
type Schema struct { Fields []string FieldIndices map[string]int Dims map[string]DimensionChecker }
Encapsulates the directory-splitting schema
func LoadSchema ¶
Load a schema from the given file name. The file is expected to contain valid JSON describing a hierarchy of dimensions, each of which specifies what values are "allowed" for that dimension. Example schema:
{ "version": 1, "dimensions": [ { "field_name": "submissionDate", "allowed_values": { { "min": "20140120", "max": "20140125" } }, { "field_name": "sourceName", "allowed_values": "*" }, { "field_name": "sourceVersion", "allowed_values": "*" }, { "field_name": "reason", "allowed_values": [ "idle-daily","saved-session" ] }, { "field_name": "appName", "allowed_values": [ "Firefox", "Fennec", "Thunderbird", "FirefoxOS", "B2G" ] }, { "field_name": "appUpdateChannel", "allowed_values": [ "default", "nightly", "aurora", "beta", "release", "esr" ] }, { "field_name": "appVersion", "allowed_values": "*" } ] }
func (*Schema) GetDimensions ¶
Extract all dimensions from the given pack.
type SplitFileInfo ¶
type SplitFileInfo struct {
// contains filtered or unexported fields
}
Info for a single split file