s3splitfile

package
v0.0.0-...-b95d3d4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2015 License: MPL-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FilterS3

func FilterS3(bucket *s3.Bucket, prefix string, level int, schema Schema, kc chan S3ListResult)

Recursively descend into an S3 directory tree, filtering based on the given schema, and sending results on the given channel. The `level` parameter indicates how far down the tree we are, and is used to determine which schema field we use for filtering.

func PrettySize

func PrettySize(bytes int64) string

Return a nice, human-readable representation of the given number of bytes.

func ReadS3File

func ReadS3File(bucket *s3.Bucket, s3Key string, recordChan chan S3Record)

func S3FileIterator

func S3FileIterator(bucket *s3.Bucket, s3Key string) <-chan S3Record

List the contents of the given bucket, sending matching filenames to a channel which can be read by the caller.

func S3Iterator

func S3Iterator(bucket *s3.Bucket, prefix string, schema Schema) <-chan S3ListResult

List the contents of the given bucket, sending matching filenames to a channel which can be read by the caller.

func SanitizeDimension

func SanitizeDimension(dim string) (cleaned string)

Given a string, return a sanitized version that can be used safely as part of a filename (for example).

Types

type AnyDimensionChecker

type AnyDimensionChecker struct {
}

Accept any value at all.

func (AnyDimensionChecker) IsAllowed

func (adc AnyDimensionChecker) IsAllowed(v string) bool

type DimensionChecker

type DimensionChecker interface {
	IsAllowed(v string) bool
}

Interface for calculating whether a particular value is acceptable as-is, or if it should be replaced with a default value.

type ListDimensionChecker

type ListDimensionChecker struct {
	// contains filtered or unexported fields
}

Accept a specific list of values, anything not in the list will not be accepted

func NewListDimensionChecker

func NewListDimensionChecker(allowed []string) *ListDimensionChecker

Factory for creating a ListDimensionChecker using a list instead of a map

func (ListDimensionChecker) IsAllowed

func (ldc ListDimensionChecker) IsAllowed(v string) bool

type PublishAttempt

type PublishAttempt struct {
	Name              string
	AttemptsRemaining uint32
}

type RangeDimensionChecker

type RangeDimensionChecker struct {
	// contains filtered or unexported fields
}

If both are specified, accept any value between `min` and `max` (inclusive). If one of the bounds is missing, only enforce the other. If neither bound is present, accept all values.

func (RangeDimensionChecker) IsAllowed

func (rdc RangeDimensionChecker) IsAllowed(v string) bool

type S3ListResult

type S3ListResult struct {
	Key s3.Key
	Err error
}

Encapsulates the result of a List operation, allowing detection of errors along the way.

type S3Record

type S3Record struct {
	BytesRead int
	Record    []byte
	Err       error
}

Encapsulates a single record within an S3 file, allowing detection of errors along the way.

type S3SplitFileInput

type S3SplitFileInput struct {
	*S3SplitFileInputConfig
	// contains filtered or unexported fields
}

func (*S3SplitFileInput) ConfigStruct

func (input *S3SplitFileInput) ConfigStruct() interface{}

func (*S3SplitFileInput) Init

func (input *S3SplitFileInput) Init(config interface{}) (err error)

func (*S3SplitFileInput) ReportMsg

func (input *S3SplitFileInput) ReportMsg(msg *message.Message) error

func (*S3SplitFileInput) Run

func (input *S3SplitFileInput) Run(runner pipeline.InputRunner, helper pipeline.PluginHelper) error

func (*S3SplitFileInput) Stop

func (input *S3SplitFileInput) Stop()

type S3SplitFileInputConfig

type S3SplitFileInputConfig struct {
	// So we can default to using ProtobufDecoder.
	Decoder string
	// So we can default to using HekaFramingSplitter.
	Splitter string

	SchemaFile     string `toml:"schema_file"`
	AWSKey         string `toml:"aws_key"`
	AWSSecretKey   string `toml:"aws_secret_key"`
	AWSRegion      string `toml:"aws_region"`
	S3Bucket       string `toml:"s3_bucket"`
	S3BucketPrefix string `toml:"s3_bucket_prefix"`
	S3Retries      uint32 `toml:"s3_retries"`
	S3WorkerCount  uint32 `toml:"s3_worker_count"`
}

type S3SplitFileOutput

type S3SplitFileOutput struct {
	*S3SplitFileOutputConfig
	// contains filtered or unexported fields
}

Output plugin that writes message contents to a file on the file system.

func (*S3SplitFileOutput) ConfigStruct

func (o *S3SplitFileOutput) ConfigStruct() interface{}

func (*S3SplitFileOutput) Init

func (o *S3SplitFileOutput) Init(config interface{}) (err error)

func (*S3SplitFileOutput) ReportMsg

func (o *S3SplitFileOutput) ReportMsg(msg *message.Message) error

func (*S3SplitFileOutput) Run

func (o *S3SplitFileOutput) Run(or OutputRunner, h PluginHelper) (err error)

type S3SplitFileOutputConfig

type S3SplitFileOutputConfig struct {
	// Base output file path.
	// In-flight files go to <Path>/current/<dimensionPath>
	// finalized files go to <Path>/finalized/<dimensionPath>
	Path string

	// Output file permissions (default "644").
	Perm string

	// Path to Schema file (json). Defaults to using the standard schema.
	SchemaFile string `toml:"schema_file"`

	// Interval at which we should check MaxFileAge for in-flight files.
	FlushInterval uint32 `toml:"flush_interval"`

	// Permissions to apply to directories created for output directories if
	// they don't exist.  Must be a string representation of an octal integer.
	// Defaults to "700".
	FolderPerm string `toml:"folder_perm"`

	// Specifies whether or not Heka's stream framing will be applied to the
	// output. We do some magic to default to true if ProtobufEncoder is used,
	// false otherwise.
	UseFraming *bool `toml:"use_framing"`

	// Specifies how much data (in bytes) can be written to a single file before
	// we rotate and begin writing to another one (default 500 * 1024 * 1024,
	// i.e. 500MB).
	MaxFileSize uint32 `toml:"max_file_size"`

	// Specifies how long (in milliseconds) to wait before rotating the current
	// file and begin writing to another one (default 60 * 60 * 1000, i.e. 1hr).
	MaxFileAge uint32 `toml:"max_file_age"`

	// Specifies how many data files to keep open at once. If there are more
	// "current" files than this, the least-recently used file will be closed,
	// and will be re-opened if more messages arrive before it is rotated. The
	// default is 1000. A value of 0 means no maximum.
	MaxOpenFiles int `toml:"max_open_files"`

	AWSKey         string `toml:"aws_key"`
	AWSSecretKey   string `toml:"aws_secret_key"`
	AWSRegion      string `toml:"aws_region"`
	S3Bucket       string `toml:"s3_bucket"`
	S3BucketPrefix string `toml:"s3_bucket_prefix"`
	S3Retries      uint32 `toml:"s3_retries"`
	S3WorkerCount  uint32 `toml:"s3_worker_count"`
}

ConfigStruct for S3SplitFileOutput plugin.

type Schema

type Schema struct {
	Fields       []string
	FieldIndices map[string]int
	Dims         map[string]DimensionChecker
}

Encapsulates the directory-splitting schema

func LoadSchema

func LoadSchema(schemaFileName string) (schema Schema, err error)

Load a schema from the given file name. The file is expected to contain valid JSON describing a hierarchy of dimensions, each of which specifies what values are "allowed" for that dimension. Example schema:

{
  "version": 1,
  "dimensions": [
    { "field_name": "submissionDate", "allowed_values": {
        { "min": "20140120", "max": "20140125" }
    },
    { "field_name": "sourceName",     "allowed_values": "*" },
    { "field_name": "sourceVersion",  "allowed_values": "*" },
    { "field_name": "reason",         "allowed_values":
        [ "idle-daily","saved-session" ]
    },
    { "field_name": "appName",        "allowed_values":
        [ "Firefox", "Fennec", "Thunderbird", "FirefoxOS", "B2G" ]
    },
    { "field_name": "appUpdateChannel",
      "allowed_values":
        [ "default", "nightly", "aurora", "beta", "release", "esr" ]
    },
    { "field_name": "appVersion",     "allowed_values": "*" }
  ]
}

func (*Schema) GetDimensions

func (s *Schema) GetDimensions(pack *PipelinePack) (dimensions []string)

Extract all dimensions from the given pack.

func (*Schema) GetValue

func (s *Schema) GetValue(field string, value string) (rvalue string, err error)

Determine whether a given value is acceptable for a given field, and if not return a default value instead.

type SplitFileInfo

type SplitFileInfo struct {
	// contains filtered or unexported fields
}

Info for a single split file

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL