mapreduce

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2023 License: Apache-2.0 Imports: 29 Imported by: 6

Documentation

Overview

Package mapreduce is client to launching operations over YT.

Index

Constants

View Source
const (
	GoMaxProcEnvName = "GOMAXPROCS"
)

Variables

This section is empty.

Functions

func CreateCoreTable

func CreateCoreTable(ctx context.Context, yc yt.Client, path ypath.Path, opts ...yt.CreateTableOption) (yt.NodeID, error)

func CreateStderrTable

func CreateStderrTable(ctx context.Context, yc yt.Client, path ypath.Path, opts ...yt.CreateTableOption) (yt.NodeID, error)

func GroupKeys

func GroupKeys(r Reader, onKey func(r Reader) error) error

GroupKeys groups rows in r by key, according to KeySwitch().

onKey invoked once for every distinct value of the key.

func (*myJob) Do(ctx JobContext, in Reader, out []Writer) error {
    return GroupKeys(in, func(in Reader) error {
        for in.Next() {
            // do stuff
        }
        return nil
    })
}

func InsideJob

func InsideJob() bool

InsideJob determines whether current process is running inside a mapreduce job.

func JobMain

func JobMain() int

JobMain runs user code inside mapreduce job.

Binary that wishes to run mapreduce operations must place the following code at the beginning of the main() function.

if mapreduce.InsideJob() {
    os.Exit(mapreduce.JobMain())
}

func Register

func Register(job Job)

Register registers job type.

Value of job is irrelevant.

User must register all job types during initialisation.

type MyJob struct{}

func init() {
    mapreduce.Register(&MyJob{})
}

func RegisterJobPart

func RegisterJobPart(state interface{})

RegisterJobPart registers type that might be used as part of the job state.

func RegisterName

func RegisterName(name string, job Job)

RegisterName registers job type with overriden name.

func SwitchTable

func SwitchTable(out Writer, tableIndex int) error

Types

type Client

type Client interface {
	Map(mapper Job, spec *spec.Spec, opts ...OperationOption) (Operation, error)

	Reduce(reducer Job, spec *spec.Spec, opts ...OperationOption) (Operation, error)

	JoinReduce(reducer Job, spec *spec.Spec, opts ...OperationOption) (Operation, error)

	MapReduce(mapper Job, reducer Job, spec *spec.Spec, opts ...OperationOption) (Operation, error)

	MapCombineReduce(mapper Job, combiner Job, reducer Job, spec *spec.Spec, opts ...OperationOption) (Operation, error)

	Sort(spec *spec.Spec, opts ...OperationOption) (Operation, error)

	Merge(spec *spec.Spec, opts ...OperationOption) (Operation, error)

	Erase(spec *spec.Spec, opts ...OperationOption) (Operation, error)

	RemoteCopy(spec *spec.Spec, opts ...OperationOption) (Operation, error)

	// TODO(prime@): switch order of spec and jobs.
	Vanilla(spec *spec.Spec, jobs map[string]Job, opts ...OperationOption) (Operation, error)

	Track(opID yt.OperationID) (Operation, error)

	// WithTx returns Client, that starts all operations inside transaction tx.
	WithTx(tx yt.Tx) Client
}

func New

func New(yc yt.Client, options ...Option) Client

type Config

type Config struct {
	CreateOutputTables bool
}

func DefaultConfig

func DefaultConfig() *Config

type CoreTableRow

type CoreTableRow struct {
	JobID     guid.GUID `yson:"job_id,key"`
	CoreID    int       `yson:"core_id,key"`
	PartIndex int       `yson:"part_index,key"`
	Data      []byte    `yson:"data"`
}

CoreTableRow is a single row of operation core table.

type Job

type Job interface {
	Do(ctx JobContext, in Reader, out []Writer) error

	// InputTypes returns list of types this job expects to receive as inputs.
	//
	// Each element describing type of corresponding input table.
	InputTypes() []interface{}

	// OutputTypes returns list of types this job produces as outputs.
	//
	// Each element describing type of corresponding output table.
	OutputTypes() []interface{}
}

func NewJob

func NewJob(name string) Job

type JobContext

type JobContext interface {
	LookupVault(name string) (value string, ok bool)
	JobCookie() int
}

type Operation

type Operation interface {
	ID() yt.OperationID

	Wait() error
}

type OperationOption

type OperationOption interface {
	// contains filtered or unexported methods
}

func SkipSelfUpload

func SkipSelfUpload() OperationOption

func WithLocalFiles

func WithLocalFiles(paths []string) OperationOption

WithLocalFile makes local file available inside job sandbox directory.

Filename and file permissions are preserved. All files are uploaded into job sandbox.

type Option

type Option interface {
	// contains filtered or unexported methods
}

func WithConfig

func WithConfig(config *Config) Option

func WithContext

func WithContext(ctx context.Context) Option

func WithDefaultOperationACL

func WithDefaultOperationACL(acl []yt.ACE) Option

type Reader

type Reader interface {
	// TableIndex returns table index of the current row.
	TableIndex() int

	// KeySwitch returns true, if current row is the first row with the current key.
	KeySwitch() bool

	// RowIndex returns row index of the current row.
	RowIndex() int64

	// RangeIndex returns range index of the current row.
	RangeIndex() int

	// Scan decodes current value from the input stream.
	//
	// Note, that Scan does not advance input cursor forward.
	//
	// Returns error, only if current row can't be decoded into value.
	// All other errors will terminate current process immediately.
	Scan(value interface{}) error

	// MustScan works like Scan, but terminates current process in case of an error.
	MustScan(value interface{})

	// Advances input stream to the next record.
	Next() bool
}

Reader represents input of the job.

var row MyRow
for r.Next() {
    r.MustScan(&row)
}

type StderrTableRow

type StderrTableRow struct {
	JobID     guid.GUID `yson:"job_id,key"`
	PartIndex int       `yson:"part_index,key"`
	Data      []byte    `yson:"data"`
}

StderrTableRow is a single row of operation stderr table.

type Untyped

type Untyped struct{}

Untyped is empty struct useful for embedding inside user job type, it provides default implementation of InputTypes() and OutputTypes() methods.

func (Untyped) InputTypes

func (Untyped) InputTypes() []interface{}

func (Untyped) OutputTypes

func (Untyped) OutputTypes() []interface{}

type Writer

type Writer interface {
	// Write writes value to output stream.
	//
	// Returns error, only if current row can't be decoded into value.
	// All other errors will terminate current process immediately.
	Write(value interface{}) error

	// MustWrite works like Write, terminates current process in case of an error.
	MustWrite(value interface{})
}

Writer is single output of mapreduce job.

Directories

Path Synopsis
package spec defines specification of YT operation.
package spec defines specification of YT operation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL