Documentation ¶
Overview ¶
Package corral is a MapReduce framework designed to be deployed to serverless platforms, like AWS Lambda.
It presents a lightweight alternative to Hadoop MapReduce. Much of the design philosophy was inspired by Yelp's mrjob -- corral retains mrjob's ease-of-use while gaining the type safety and speed of Go.
Corral's runtime model consists of stateless, transient executors controlled by a central driver. Currently, the best environment for deployment is AWS Lambda, but corral is modular enough that support for other serverless platforms can be added as support for Go in cloud functions improves.
Corral is best suited for data-intensive but computationally inexpensive tasks, such as ETL jobs.
Index ¶
- func CompileFlagName() string
- func RunningOnCloudPlatfrom() bool
- type ActivationSet
- func (s *ActivationSet) Add(activationID string) error
- func (s *ActivationSet) AddAll(new []string) error
- func (s *ActivationSet) AddWithData(activationID string, data interface{}) error
- func (s *ActivationSet) Clear()
- func (s *ActivationSet) Close()
- func (s *ActivationSet) Drained(threshold int) bool
- func (s *ActivationSet) Has(activationID string) bool
- func (s *ActivationSet) IsEmpty() bool
- func (s *ActivationSet) Len() int
- func (s *ActivationSet) List() []string
- func (s *ActivationSet) Remove(activationID string) interface{}
- func (s *ActivationSet) Take(num int) []string
- func (s *ActivationSet) Top(num int) []string
- type Driver
- func (d *Driver) CurrentJob() *Job
- func (d *Driver) DownloadAndRemove(inputs []string, dest string) error
- func (d *Driver) Execute()
- func (d *Driver) GetFinalOutputs() []string
- func (d *Driver) Main()
- func (d *Driver) Undeploy(backendType *string) error
- func (d *Driver) WithBackend(backendType *string)
- type Emitter
- type HintFunc
- type Job
- type Mapper
- type Option
- func WithBackoffPolling() Option
- func WithInputs(inputs ...string) Option
- func WithLambdaRole(arn string) Option
- func WithLambdaS3Backend(bucket, key string) Option
- func WithLocalMemoryCache() Option
- func WithMapBinSize(s int64) Option
- func WithMultiStageInputs(inputs [][]string) Option
- func WithMultipleSize(mul float64) Option
- func WithRedisBackedCache() Option
- func WithReduceBinSize(s int64) Option
- func WithSplitSize(s int64) Option
- func WithWorkingLocation(location string) Option
- type PartitionFunc
- type PauseFunc
- type Reducer
- type SetValue
- type SortJob
- type StopFunc
- type ValueIterator
- type WhiskAckMsg
- type WhiskInvokationError
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CompileFlagName ¶ added in v0.1.7
func CompileFlagName() string
CompileFlagName all set flags and compile a unique identifier
func RunningOnCloudPlatfrom ¶ added in v0.1.2
func RunningOnCloudPlatfrom() bool
Types ¶
type ActivationSet ¶ added in v0.2.1
func NewSet ¶ added in v0.2.1
func NewSet() *ActivationSet
func (*ActivationSet) Add ¶ added in v0.2.1
func (s *ActivationSet) Add(activationID string) error
Add add
func (*ActivationSet) AddAll ¶ added in v0.2.1
func (s *ActivationSet) AddAll(new []string) error
func (*ActivationSet) AddWithData ¶ added in v0.2.2
func (s *ActivationSet) AddWithData(activationID string, data interface{}) error
Add add
func (*ActivationSet) Clear ¶ added in v0.2.1
func (s *ActivationSet) Clear()
Clear removes all items from the set
func (*ActivationSet) Close ¶ added in v0.2.1
func (s *ActivationSet) Close()
func (*ActivationSet) Drained ¶ added in v0.2.1
func (s *ActivationSet) Drained(threshold int) bool
func (*ActivationSet) Has ¶ added in v0.2.1
func (s *ActivationSet) Has(activationID string) bool
Has looks for the existence of an item
func (*ActivationSet) IsEmpty ¶ added in v0.2.1
func (s *ActivationSet) IsEmpty() bool
IsEmpty checks for emptiness
func (*ActivationSet) Len ¶ added in v0.2.1
func (s *ActivationSet) Len() int
Len returns the number of items in a set.
func (*ActivationSet) List ¶ added in v0.2.1
func (s *ActivationSet) List() []string
func (*ActivationSet) Remove ¶ added in v0.2.1
func (s *ActivationSet) Remove(activationID string) interface{}
Remove deletes the specified item from the map
func (*ActivationSet) Take ¶ added in v0.2.1
func (s *ActivationSet) Take(num int) []string
Take returns the first num items from the set, blocking if necessary until all items are available
func (*ActivationSet) Top ¶ added in v0.2.2
func (s *ActivationSet) Top(num int) []string
Gets at most the Top n values
type Driver ¶
type Driver struct { Start time.Time Runtime time.Duration // contains filtered or unexported fields }
Driver controls the execution of a MapReduce Job
func NewSequentialMultiStageDriver ¶ added in v0.1.2
NewSequentialMultiStageDriver creates a new Driver with the provided jobs and optional configuration. Jobs are executed in sequence with each output becoming the input of the next stage.
func (*Driver) CurrentJob ¶ added in v0.1.2
func (*Driver) DownloadAndRemove ¶ added in v0.1.2
func (*Driver) GetFinalOutputs ¶ added in v0.1.2
func (*Driver) WithBackend ¶ added in v0.1.2
type Job ¶
type Job struct { Map Mapper Reduce Reducer PartitionFunc PartitionFunc PauseFunc PauseFunc StopFunc StopFunc HintFunc HintFunc // contains filtered or unexported fields }
Job is the logical container for a MapReduce job
func (*Job) Collect ¶ added in v0.1.4
func (j *Job) Collect(result api.TaskResult)
func (*Job) CollectMetrics ¶ added in v0.1.2
func (j *Job) CollectMetrics()
needs to run in a process
type Option ¶
type Option func(*config)
Option allows configuration of a Driver
func WithBackoffPolling ¶ added in v0.2.2
func WithBackoffPolling() Option
func WithInputs ¶
WithInputs specifies job inputs (i.e. input files/directories)
func WithLambdaRole ¶ added in v0.1.3
func WithLambdaS3Backend ¶ added in v0.1.3
func WithLocalMemoryCache ¶ added in v0.1.2
func WithLocalMemoryCache() Option
func WithMapBinSize ¶
WithMapBinSize sets the MapBinSize of the Driver
func WithMultiStageInputs ¶ added in v0.1.2
WithMultiStageInputs specifies job inputs (i.e. input files/directories) for each stage
func WithMultipleSize ¶ added in v0.2.2
WithMultipleSize adjusts the splitsize by a multiple. Warn! uses math.Ceil to "round", set the sizes manually if u need precision.
func WithRedisBackedCache ¶ added in v0.1.2
func WithRedisBackedCache() Option
func WithReduceBinSize ¶
WithReduceBinSize sets the ReduceBinSize of the Driver
func WithSplitSize ¶
WithSplitSize sets the SplitSize of the Driver
func WithWorkingLocation ¶
WithWorkingLocation sets the location and filesystem backend of the Driver
type PartitionFunc ¶
PartitionFunc defines a function that can be used to segment map keys into intermediate buckets. The default partition function simply hashes the key, and takes hash % numBins to determine the bin. The value returned from PartitionFunc (binIdx) must be in the range 0 <= binIdx < numBins, i.e. [0, numBins)
type Reducer ¶
type Reducer interface {
Reduce(key string, values ValueIterator, emitter Emitter)
}
Reducer defines the interface for a Reduce Task.
type ValueIterator ¶
type ValueIterator struct {
// contains filtered or unexported fields
}
ValueIterator iterates over a sequence of values. This is used during the Reduce phase, wherein a reduce Task iterates over all values for a particular key.
func (*ValueIterator) Iter ¶
func (v *ValueIterator) Iter() <-chan string
Iter iterates over all the values in the iterator.
type WhiskAckMsg ¶ added in v0.2.2
type WhiskInvokationError ¶ added in v0.2.2
type WhiskInvokationError struct {
// contains filtered or unexported fields
}
func NewWhiskInvokationError ¶ added in v0.2.2
func NewWhiskInvokationError() *WhiskInvokationError
func (*WhiskInvokationError) Activations ¶ added in v0.2.2
func (e *WhiskInvokationError) Activations() []string
func (*WhiskInvokationError) Add ¶ added in v0.2.2
func (e *WhiskInvokationError) Add(activationId string, t api.Task)
func (WhiskInvokationError) Error ¶ added in v0.2.2
func (e WhiskInvokationError) Error() string
func (*WhiskInvokationError) FailedTasks ¶ added in v0.2.2
func (e *WhiskInvokationError) FailedTasks() []api.Task