Documentation ¶
Overview ¶
Package mapreduce provides a mapreduce pipeline for Google's appengine environment. For a sample application, see http://github.com/pendo-io/mapreduce-sample
Package mapreduce provides a mapreduce pipeline for Google's appengine environment
Index ¶
- Constants
- func ConsoleHandler(w http.ResponseWriter, r *http.Request)
- func GetJobTaskResults(ds appwrap.Datastore, job JobInfo) ([]interface{}, error)
- func MapReduceHandler(baseUrl string, pipeline MapReducePipeline, ...) http.Handler
- func ReduceFunc(c context.Context, mr MapReducePipeline, writer SingleOutputWriter, ...) error
- func RemoveJob(ds appwrap.Datastore, jobId int64) error
- func Run(c context.Context, ds appwrap.Datastore, job MapReduceJob, log appwrap.Logging) (int64, error)
- type FatalError
- type FileLineInputReader
- type IgnoreTaskStatusChange
- type InputReader
- type Int64KeyHandler
- func (s Int64KeyHandler) Equal(a, b interface{}) bool
- func (s Int64KeyHandler) KeyDump(a interface{}) []byte
- func (s Int64KeyHandler) KeyLoad(a []byte) (interface{}, error)
- func (s Int64KeyHandler) Less(a, b interface{}) bool
- func (s Int64KeyHandler) SetShardParameters(jsonParameters string)
- func (s Int64KeyHandler) Shard(strInt interface{}, shardCount int) int
- type Int64ValueHandler
- type IntermediateStorage
- type IntermediateStorageIterator
- type JobInfo
- type JobStage
- type JobTask
- type KeyHandler
- type KeyValueHandler
- type LineOutputWriter
- type MapReduceJob
- type MapReducePipeline
- type MappedData
- type Mapper
- type NilOutputWriter
- type OutputWriter
- type ReaderIterator
- type Reducer
- type SingleInputReader
- type SingleIntermediateStorageWriter
- type SingleLineReader
- type SingleOutputWriter
- type StatusUpdateFunc
- type StringKeyHandler
- func (s StringKeyHandler) Equal(a, b interface{}) bool
- func (s StringKeyHandler) KeyDump(a interface{}) []byte
- func (s StringKeyHandler) KeyLoad(a []byte) (interface{}, error)
- func (s StringKeyHandler) Less(a, b interface{}) bool
- func (s StringKeyHandler) SetShardParameters(jsonParameters string)
- func (s StringKeyHandler) Shard(strInt interface{}, shardCount int) int
- type StringValueHandler
- type TaskInterface
- type TaskStatus
- type TaskStatusChange
- type TaskType
- type ValueHandler
Constants ¶
const ( TaskStatusPending = TaskStatus("pending") TaskStatusRunning = TaskStatus("running") TaskStatusDone = TaskStatus("done") TaskStatusFailed = TaskStatus("failed") )
const ( StageFormation = JobStage("forming") StageMapping = JobStage("map") StageReducing = JobStage("reduce") StageDone = JobStage("done") StageFailed = JobStage("failed") )
const ( TaskTypeMap = TaskType("map") TaskTypeReduce = TaskType("reduce") )
TaskTypes defines the type of task, map or reduce
const JobEntity = "MapReduceJob"
Datastore entity kinds for jobs and tasks
const TaskEntity = "MapReduceTask"
Variables ¶
This section is empty.
Functions ¶
func ConsoleHandler ¶
func ConsoleHandler(w http.ResponseWriter, r *http.Request)
func GetJobTaskResults ¶
func MapReduceHandler ¶
func MapReduceHandler(baseUrl string, pipeline MapReducePipeline, getContext func(r *http.Request) context.Context) http.Handler
MapReduceHandler returns an http.Handler which is responsible for all of the urls pertaining to the mapreduce job. The baseUrl acts as the name for the type of job being run.
func ReduceFunc ¶
func ReduceFunc(c context.Context, mr MapReducePipeline, writer SingleOutputWriter, shardNames []string, separateReduceItems bool, statusFunc StatusUpdateFunc, log appwrap.Logging) error
Types ¶
type FatalError ¶
type FatalError struct{ Err error }
FatalError wraps an error. If Map or Reduce returns a FatalError the task will not be retried
func (FatalError) Error ¶
func (fe FatalError) Error() string
type FileLineInputReader ¶
type FileLineInputReader struct {
Paths []string
}
func (FileLineInputReader) ReaderFromName ¶
func (m FileLineInputReader) ReaderFromName(c context.Context, path string) (SingleInputReader, error)
func (FileLineInputReader) ReaderNames ¶
func (m FileLineInputReader) ReaderNames() ([]string, error)
type IgnoreTaskStatusChange ¶
type IgnoreTaskStatusChange struct{}
IgnoreTaskStatusChange is an implementation of TaskStatusChange which ignores the call
func (*IgnoreTaskStatusChange) Status ¶
func (e *IgnoreTaskStatusChange) Status(jobId int64, task JobTask)
type InputReader ¶
type InputReader interface { // ReaderNames() returns a list of reader instance names; ReaderNames() ([]string, error) // ReaderFromName() creates the SingleInputReader for the given name ReaderFromName(c context.Context, name string) (SingleInputReader, error) }
InputReader is responsible for providing unique names for each of the input sources for a job, and creating individual SingleInputReader objects from those unique names. The number of unique names for the inputs determines the number of map tasks
type Int64KeyHandler ¶
type Int64KeyHandler struct{}
Int64KeyHandler provides a KeyHandler for int64 keys. A hash is used for computing the shards to distribute evenly. We encode things are strings for readability.
func (Int64KeyHandler) Equal ¶
func (s Int64KeyHandler) Equal(a, b interface{}) bool
func (Int64KeyHandler) KeyDump ¶
func (s Int64KeyHandler) KeyDump(a interface{}) []byte
func (Int64KeyHandler) KeyLoad ¶
func (s Int64KeyHandler) KeyLoad(a []byte) (interface{}, error)
func (Int64KeyHandler) Less ¶
func (s Int64KeyHandler) Less(a, b interface{}) bool
func (Int64KeyHandler) SetShardParameters ¶
func (s Int64KeyHandler) SetShardParameters(jsonParameters string)
func (Int64KeyHandler) Shard ¶
func (s Int64KeyHandler) Shard(strInt interface{}, shardCount int) int
type Int64ValueHandler ¶
type Int64ValueHandler struct{}
Int64ValueHandler provides a ValueHandler for int values
func (Int64ValueHandler) ValueDump ¶
func (j Int64ValueHandler) ValueDump(a interface{}) ([]byte, error)
func (Int64ValueHandler) ValueLoad ¶
func (j Int64ValueHandler) ValueLoad(val []byte) (interface{}, error)
type IntermediateStorage ¶
type IntermediateStorage interface { CreateIntermediate(c context.Context, handler KeyValueHandler) (SingleIntermediateStorageWriter, error) Iterator(c context.Context, name string, handler KeyValueHandler) (IntermediateStorageIterator, error) RemoveIntermediate(c context.Context, name string) error }
IntermediateStorage defines how intermediare results are saved and read. If keys need to be serialized KeyValueHandler.Load and KeyValueHandler.Save must be used.
type IntermediateStorageIterator ¶
type IntermediateStorageIterator interface { // Returns mapped data item, a bool saying if it's valid, and an error if one occurred // probably cause use error = EOF instead, but we don't Next() (MappedData, bool, error) Close() error }
func NewReaderIterator ¶
func NewReaderIterator(reader io.ReadCloser, handler KeyValueHandler) IntermediateStorageIterator
type JobInfo ¶
type JobInfo struct { UrlPrefix string Stage JobStage UpdatedAt time.Time StartTime time.Time TaskCount int `datastore:"TasksRunning,noindex"` FirstTaskId int64 `datastore:",noindex"` // 0 here means to use task keys like "%d.%d" (Id, taskNum) RetryCount int `datastore:",noindex"` SeparateReduceItems bool `datastore:",noindex"` OnCompleteUrl string `datastore:",noindex"` WriterNames []string `datastore:",noindex"` JsonParameters string `datastore:",noindex"` // filled in by getJob Id int64 `datastore:"-"` }
JobInfo is the entity stored in the datastore defining the MapReduce Job
type JobTask ¶
type JobTask struct { Status TaskStatus `datastore:,noindex` Job *appwrap.DatastoreKey Done *appwrap.DatastoreKey // nil if the task isn't done, job if it is Info string `datastore:,"noindex"` StartTime time.Time `datastore:,"noindex"` UpdatedAt time.Time `datastore:,"noindex"` Type TaskType `datastore:,"noindex"` Retries int `datastore:,"noindex"` SeparateReduceItems bool // this is named intermediate storage sources, and only used for reduce tasks ReadFrom []byte `datastore:",noindex"` Url string `datastore:",noindex"` Result string `datastore:",noindex"` }
JobTask is the entity stored in the datastore defining a single MapReduce task. They have JobInfo entities as their parents.
type KeyHandler ¶
type KeyHandler interface { // Less returns a< b Less(a, b interface{}) bool // Equals returns a == b Equal(a, b interface{}) bool // KeyDump converts a key into a byte array KeyDump(a interface{}) []byte // KeyDump converts a byte array into a key KeyLoad([]byte) (interface{}, error) // Shard returns the shard number a key belongs to, given the total number of shards // which are being used for the job Shard(a interface{}, shardCount int) int // Provides the (probably json) parameters for the job; may be useful for sharding strategy SetShardParameters(jsonParameters string) }
KeyHandler must be implemented for each key type to enable shuffling and storing of map keys
type KeyValueHandler ¶
type KeyValueHandler interface { KeyHandler ValueHandler }
type LineOutputWriter ¶
type LineOutputWriter struct {
// contains filtered or unexported fields
}
func NewLineOutputWriter ¶
func NewLineOutputWriter(w io.WriteCloser, handler KeyValueHandler) LineOutputWriter
func (LineOutputWriter) Write ¶
func (o LineOutputWriter) Write(data interface{}) error
func (LineOutputWriter) WriteMappedData ¶
func (o LineOutputWriter) WriteMappedData(item MappedData) error
type MapReduceJob ¶
type MapReduceJob struct { MapReducePipeline Inputs InputReader Outputs OutputWriter // UrlPrefix is the base url path used for mapreduce jobs posted into // task queues, and must match the baseUrl passed into MapReduceHandler() UrlPrefix string // OnCompleteUrl is the url to post to when a job is completed. The full url will include // multiple query parameters, including status=(done|error) and id=(jobId). If // an error occurred the error parameter will also be displayed. If this is empty, no // complete notification is given; it is assumed the caller will poll for results. OnCompleteUrl string // RetryCount is the number of times individual map/reduce tasks should be retried. Tasks that // return errors which are of type FatalError are not retried (defaults to 3, 1 // means it will never retry). RetryCount int // SeparateReduceItems means that instead of collapsing all rows with the same key into // one call to the reduce function, each row is passed individually (though wrapped in // an array of length one to keep the reduce function signature the same) SeparateReduceItems bool // JobParameters is passed to map and reduce job. They are assumed to be json encoded, though // absolutely no effort is made to enforce that. JobParameters string }
MapReduceJob defines a complete map reduce job, which is the pipeline and the parameters the job needs. The types for Inputs and Outputs must match the types for the InputReader and OutputWriter in the pipeline.
type MapReducePipeline ¶
type MapReducePipeline interface { // The basic pipeline of read, map, shuffle, reduce, save InputReader Mapper IntermediateStorage Reducer OutputWriter // Serialization and sorting primatives for keys and values KeyHandler ValueHandler TaskInterface TaskStatusChange }
MapReducePipeline defines the complete pipeline for a map reduce job (but not the job itself). No per-job information is available for the pipeline functions other than what gets passed in via the various interfaces.
type MappedData ¶
type MappedData struct { Key interface{} Value interface{} }
MappedData items are key/value pairs returned from the Map stage. The items are rearranged by the shuffle, and (Key, []Value) pairs are passed into the shuffle. KeyHandler interfaces provide the operations on MappedData items which are needed by the pipeline, and ValueHandler interfaces provide serialization operatons for the values.
type Mapper ¶
type Mapper interface { Map(item interface{}, statusUpdate StatusUpdateFunc) ([]MappedData, error) // Called once with the job parameters for each mapper task SetMapParameters(jsonParameters string) // Called when the map is complete. Return is same as for Map() // to the output writer MapComplete(statusUpdate StatusUpdateFunc) ([]MappedData, error) }
Mapper defines a map function; it is passed an item from the input and returns a list of mapped items.
type NilOutputWriter ¶
type NilOutputWriter struct {
Count int
}
NilOutputWriter collects output and throws it away. Useful for reduce tasks which only have side affects
func (NilOutputWriter) WriterFromName ¶
func (n NilOutputWriter) WriterFromName(c context.Context, name string) (SingleOutputWriter, error)
func (NilOutputWriter) WriterNames ¶
func (n NilOutputWriter) WriterNames(c context.Context) ([]string, error)
type OutputWriter ¶
type ReaderIterator ¶
type ReaderIterator struct {
// contains filtered or unexported fields
}
func (*ReaderIterator) Close ¶
func (r *ReaderIterator) Close() error
func (*ReaderIterator) Next ¶
func (r *ReaderIterator) Next() (MappedData, bool, error)
type Reducer ¶
type Reducer interface { Reduce(key interface{}, values []interface{}, statusUpdate StatusUpdateFunc) (result interface{}, err error) // Called once with the job parameters for each mapper task SetReduceParameters(jsonParameters string) // Called when the reduce is complete. Each item in the results array will be passed separately // to the output writer ReduceComplete(statusUpdate StatusUpdateFunc) ([]interface{}, error) }
Reducer defines the reduce function; it is called once for each key and is given a list of all of the values for that key.
type SingleInputReader ¶
func NewSingleLineInputReader ¶
func NewSingleLineInputReader(r io.ReadCloser) SingleInputReader
type SingleIntermediateStorageWriter ¶
type SingleIntermediateStorageWriter interface { WriteMappedData(data MappedData) error Close(c context.Context) error ToName() string }
this overlaps a great deal with SingleOutputWriter; they often share an implementation
type SingleLineReader ¶
type SingleLineReader struct {
// contains filtered or unexported fields
}
func (SingleLineReader) Close ¶
func (ir SingleLineReader) Close() (err error)
func (SingleLineReader) Next ¶
func (ir SingleLineReader) Next() (interface{}, error)
type SingleOutputWriter ¶
type StatusUpdateFunc ¶
type StatusUpdateFunc func(format string, paramList ...interface{})
StatusUpdateFunc functions are passed into Map and Reduce handlers to allow those handlers to post arbitrary status messages which are stored in the datastore
type StringKeyHandler ¶
type StringKeyHandler struct{}
StringKeyHandler provides a KeyHandler for string keys
func (StringKeyHandler) Equal ¶
func (s StringKeyHandler) Equal(a, b interface{}) bool
func (StringKeyHandler) KeyDump ¶
func (s StringKeyHandler) KeyDump(a interface{}) []byte
func (StringKeyHandler) KeyLoad ¶
func (s StringKeyHandler) KeyLoad(a []byte) (interface{}, error)
func (StringKeyHandler) Less ¶
func (s StringKeyHandler) Less(a, b interface{}) bool
func (StringKeyHandler) SetShardParameters ¶
func (s StringKeyHandler) SetShardParameters(jsonParameters string)
func (StringKeyHandler) Shard ¶
func (s StringKeyHandler) Shard(strInt interface{}, shardCount int) int
type StringValueHandler ¶
type StringValueHandler struct{}
StringValueHandler provides a ValueHandler for string values
func (StringValueHandler) ValueDump ¶
func (j StringValueHandler) ValueDump(a interface{}) ([]byte, error)
func (StringValueHandler) ValueLoad ¶
func (j StringValueHandler) ValueLoad(val []byte) (interface{}, error)
type TaskInterface ¶
type TaskInterface interface { PostTask(c context.Context, fullUrl string, jsonParameters string, log appwrap.Logging) error PostStatus(c context.Context, fullUrl string, log appwrap.Logging) error }
TaskInterface defines how the map and reduce tasks and controlled, and how they report their status.
type TaskStatus ¶
type TaskStatus string
type TaskStatusChange ¶
TaskStatusChange allows the map reduce framework to notify tasks when their status has changed to RUNNING or DONE. Handy for callbacks. Always called after SetMapParameters() and SetReduceParameters()