s3splitfile

package
v0.0.0-...-0c94d32 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2019 License: MPL-2.0 Imports: 22 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CleanBucketPrefix

func CleanBucketPrefix(prefix string) (cleaned string)

func FilterS3

func FilterS3(bucket *s3.Bucket, prefix string, level int, schema Schema, kc chan S3ListResult)

Recursively descend into an S3 directory tree, filtering based on the given schema, and sending results on the given channel. The `level` parameter indicates how far down the tree we are, and is used to determine which schema field we use for filtering.

func PrettySize

func PrettySize(bytes int64) string

Return a nice, human-readable representation of the given number of bytes.

func ReadS3File

func ReadS3File(bucket *s3.Bucket, s3Key string, s3Offset uint64, recordChan chan S3Record)

func S3FileIterator

func S3FileIterator(bucket *s3.Bucket, s3Key string, offset uint64) <-chan S3Record

List the contents of the given bucket, sending matching filenames to a channel which can be read by the caller.

func S3Iterator

func S3Iterator(bucket *s3.Bucket, prefix string, schema Schema) <-chan S3ListResult

List the contents of the given bucket, sending matching filenames to a channel which can be read by the caller.

func SanitizeDimension

func SanitizeDimension(dim string) (cleaned string)

Given a string, return a sanitized version that can be used safely as part of a filename (for example).

Types

type AnyDimensionChecker

type AnyDimensionChecker struct {
}

Accept any value at all.

func (AnyDimensionChecker) IsAllowed

func (adc AnyDimensionChecker) IsAllowed(v string) bool

func (AnyDimensionChecker) ListValues

func (adc AnyDimensionChecker) ListValues() ([]string, bool)

type DimensionChecker

type DimensionChecker interface {
	IsAllowed(v string) bool
	ListValues() ([]string, bool)
}

Interface for calculating whether a particular value is acceptable as-is, or if it should be replaced with a default value.

type ListDimensionChecker

type ListDimensionChecker struct {
	// contains filtered or unexported fields
}

Accept a specific list of values, anything not in the list will not be accepted

func NewListDimensionChecker

func NewListDimensionChecker(allowed []string) *ListDimensionChecker

Factory for creating a ListDimensionChecker using a list instead of a map

func (ListDimensionChecker) IsAllowed

func (ldc ListDimensionChecker) IsAllowed(v string) bool

func (ListDimensionChecker) ListValues

func (ldc ListDimensionChecker) ListValues() ([]string, bool)

Return the list of allowed values, sorted alphabetically.

type MessageLocation

type MessageLocation struct {
	Key    string
	Offset uint32
	Length uint32
}

type PublishAttempt

type PublishAttempt struct {
	Name              string
	AttemptsRemaining uint32
}

type RangeDimensionChecker

type RangeDimensionChecker struct {
	// contains filtered or unexported fields
}

If both are specified, accept any value between `min` and `max` (inclusive). If one of the bounds is missing, only enforce the other. If neither bound is present, accept all values.

func (RangeDimensionChecker) IsAllowed

func (rdc RangeDimensionChecker) IsAllowed(v string) bool

func (RangeDimensionChecker) ListValues

func (rdc RangeDimensionChecker) ListValues() ([]string, bool)

type S3ListResult

type S3ListResult struct {
	Key s3.Key
	Err error
}

Encapsulates the result of a List operation, allowing detection of errors along the way.

type S3OffsetInput

type S3OffsetInput struct {
	*S3OffsetInputConfig
	// contains filtered or unexported fields
}

func (*S3OffsetInput) ConfigStruct

func (input *S3OffsetInput) ConfigStruct() interface{}

func (*S3OffsetInput) Init

func (input *S3OffsetInput) Init(config interface{}) (err error)

func (*S3OffsetInput) ReportMsg

func (input *S3OffsetInput) ReportMsg(msg *message.Message) error

func (*S3OffsetInput) Run

func (input *S3OffsetInput) Run(runner pipeline.InputRunner, helper pipeline.PluginHelper) error

func (*S3OffsetInput) Stop

func (input *S3OffsetInput) Stop()

type S3OffsetInputConfig

type S3OffsetInputConfig struct {
	// So we can default to using ProtobufDecoder.
	Decoder            string
	Splitter           string
	ClientIdListFile   string `toml:"client_id_list"`
	MetaFile           string `toml:"metadata_file"`
	StartDate          string `toml:"start_date"`
	EndDate            string `toml:"end_date"`
	AWSKey             string `toml:"aws_key"`
	AWSSecretKey       string `toml:"aws_secret_key"`
	AWSRegion          string `toml:"aws_region"`
	S3MetaBucket       string `toml:"s3_meta_bucket"`
	S3MetaBucketPrefix string `toml:"s3_meta_bucket_prefix"`
	S3Bucket           string `toml:"s3_bucket"`
	S3Retries          uint32 `toml:"s3_retries"`
	S3ConnectTimeout   uint32 `toml:"s3_connect_timeout"`
	S3ReadTimeout      uint32 `toml:"s3_read_timeout"`
	S3WorkerCount      uint32 `toml:"s3_worker_count"`
}

type S3Record

type S3Record struct {
	Key       string
	Offset    uint64
	BytesRead int
	Record    []byte
	Err       error
}

Encapsulates a single record within an S3 file, allowing detection of errors along the way.

type S3SplitFileInput

type S3SplitFileInput struct {
	*S3SplitFileInputConfig
	// contains filtered or unexported fields
}

func (*S3SplitFileInput) ConfigStruct

func (input *S3SplitFileInput) ConfigStruct() interface{}

func (*S3SplitFileInput) Init

func (input *S3SplitFileInput) Init(config interface{}) (err error)

func (*S3SplitFileInput) ReportMsg

func (input *S3SplitFileInput) ReportMsg(msg *message.Message) error

func (*S3SplitFileInput) Run

func (input *S3SplitFileInput) Run(runner pipeline.InputRunner, helper pipeline.PluginHelper) error

func (*S3SplitFileInput) Stop

func (input *S3SplitFileInput) Stop()

type S3SplitFileInputConfig

type S3SplitFileInputConfig struct {
	// So we can default to using ProtobufDecoder.
	Decoder string
	// So we can default to using HekaFramingSplitter.
	Splitter string

	SchemaFile         string `toml:"schema_file"`
	AWSKey             string `toml:"aws_key"`
	AWSSecretKey       string `toml:"aws_secret_key"`
	AWSRegion          string `toml:"aws_region"`
	S3Bucket           string `toml:"s3_bucket"`
	S3BucketPrefix     string `toml:"s3_bucket_prefix"`
	S3ObjectMatchRegex string `toml:"s3_object_match_regex"`
	S3Retries          uint32 `toml:"s3_retries"`
	S3ConnectTimeout   uint32 `toml:"s3_connect_timeout"`
	S3ReadTimeout      uint32 `toml:"s3_read_timeout"`
	S3WorkerCount      uint32 `toml:"s3_worker_count"`
}

type S3SplitFileOutput

type S3SplitFileOutput struct {
	*S3SplitFileOutputConfig
	// contains filtered or unexported fields
}

Output plugin that writes message contents to a file on the file system.

func (*S3SplitFileOutput) ConfigStruct

func (o *S3SplitFileOutput) ConfigStruct() interface{}

func (*S3SplitFileOutput) Init

func (o *S3SplitFileOutput) Init(config interface{}) (err error)

func (*S3SplitFileOutput) ReportMsg

func (o *S3SplitFileOutput) ReportMsg(msg *message.Message) error

func (*S3SplitFileOutput) Run

func (o *S3SplitFileOutput) Run(or OutputRunner, h PluginHelper) (err error)

type S3SplitFileOutputConfig

type S3SplitFileOutputConfig struct {
	// Base output file path.
	// In-flight files go to <Path>/current/<dimensionPath>
	// finalized files go to <Path>/finalized/<dimensionPath>
	Path string

	// Output file permissions (default "644").
	Perm string

	// Path to Schema file (json). Defaults to using the standard schema.
	SchemaFile string `toml:"schema_file"`

	// Interval at which we should check MaxFileAge for in-flight files.
	FlushInterval uint32 `toml:"flush_interval"`

	// Permissions to apply to directories created for output directories if
	// they don't exist.  Must be a string representation of an octal integer.
	// Defaults to "700".
	FolderPerm string `toml:"folder_perm"`

	// Specifies whether or not Heka's stream framing will be applied to the
	// output. We do some magic to default to true if ProtobufEncoder is used,
	// false otherwise.
	UseFraming *bool `toml:"use_framing"`

	// Specifies how much data (in bytes) can be written to a single file before
	// we rotate and begin writing to another one (default 500 * 1024 * 1024,
	// i.e. 500MB).
	MaxFileSize uint32 `toml:"max_file_size"`

	// Specifies how long (in milliseconds) to wait before rotating the current
	// file and begin writing to another one (default 60 * 60 * 1000, i.e. 1hr).
	MaxFileAge uint32 `toml:"max_file_age"`

	// Specifies how many data files to keep open at once. If there are more
	// "current" files than this, the least-recently used file will be closed,
	// and will be re-opened if more messages arrive before it is rotated. The
	// default is 1000. A value of 0 means no maximum.
	MaxOpenFiles int `toml:"max_open_files"`

	AWSKey           string `toml:"aws_key"`
	AWSSecretKey     string `toml:"aws_secret_key"`
	AWSRegion        string `toml:"aws_region"`
	S3Bucket         string `toml:"s3_bucket"`
	S3BucketPrefix   string `toml:"s3_bucket_prefix"`
	S3Retries        uint32 `toml:"s3_retries"`
	S3ConnectTimeout uint32 `toml:"s3_connect_timeout"`
	S3ReadTimeout    uint32 `toml:"s3_read_timeout"`
	S3WorkerCount    uint32 `toml:"s3_worker_count"`
}

ConfigStruct for S3SplitFileOutput plugin.

type Schema

type Schema struct {
	Fields       []string
	FieldIndices map[string]int
	Dims         map[string]DimensionChecker
}

Encapsulates the directory-splitting schema

func LoadSchema

func LoadSchema(schemaFileName string) (schema Schema, err error)

Load a schema from the given file name. The file is expected to contain valid JSON describing a hierarchy of dimensions, each of which specifies what values are "allowed" for that dimension. Example schema:

{
  "version": 1,
  "dimensions": [
    { "field_name": "submissionDate", "allowed_values": {
        { "min": "20140120", "max": "20140125" }
    },
    { "field_name": "sourceName",     "allowed_values": "*" },
    { "field_name": "sourceVersion",  "allowed_values": "*" },
    { "field_name": "reason",         "allowed_values":
        [ "idle-daily","saved-session" ]
    },
    { "field_name": "appName",        "allowed_values":
        [ "Firefox", "Fennec", "Thunderbird", "FirefoxOS", "B2G" ]
    },
    { "field_name": "appUpdateChannel",
      "allowed_values":
        [ "default", "nightly", "aurora", "beta", "release", "esr" ]
    },
    { "field_name": "appVersion",     "allowed_values": "*" }
  ]
}

func (*Schema) GetDimensions

func (s *Schema) GetDimensions(pack *PipelinePack) (dimensions []string)

Extract all dimensions from the given pack.

func (*Schema) GetValue

func (s *Schema) GetValue(field string, value string) (rvalue string, err error)

Determine whether a given value is acceptable for a given field, and if not return a default value instead.

type SplitFileInfo

type SplitFileInfo struct {
	// contains filtered or unexported fields
}

Info for a single split file

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL