meduce

package module
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2023 License: Apache-2.0 Imports: 10 Imported by: 0

README

Meduce

A library for processing huge amounts of data on your device by using MapReduce paradigm. It was inspired by Hadoop MapReduce and MongoDB MapReduce.

The goal of the library is to fully utilize all of your CPU cores and maximize concurrent processing of data. It is not meant to be used for distributed processing, but it can be used to process data in parallel on a single machine.

It is written in Go 1.20, and it fully utilizes generic mechanics, so you don't need to worry about casts from interface{}.

Usage

Functions

The paradigm is pretty simple. You only need two (or four) functions to process all of your data:

  1. func Mapper(key KeyIn, value ValueIn, emit Emitter[KeyOut, ValueOut])
  2. func Reducer(key KeyOut, values []ValueOut) ValueOut
  3. func Finalizer(key KeyOut, valueRef *ValueOut) ValueOut (optional)
  4. func Filter(key KeyOut, valueRef *ValueOut) bool (optional)
Sources

Data is gathered from a channel named Source. You can use any channel, but most commonly used ones are already predefined for you. And you can instantiate them by calling suitable constructor functions:

  1. func NewFileSource(path string) meduce.Source[int, string]
  2. func NewMapSource(m map[K]V) meduce.Source[K, V]
  3. func NewSliceSource(slice []T) meduce.Source[int, T]

And if you need to read data from multiple sources, you can aggregate all of them by calling a function that joins all sources into one:
func AggregateDataSources(dataSources ...meduce.Source[K, V]) meduce.Source[K, V]

Collectors

After all data is processed, it is collected by using a Collector. You can either use predefined collectors (FileCollector, MapCollector, ChannelCollector) or create your own that implements Collector[K, V] interface.

Process

To start data processing, you firstly need to create an Process object. That can be accomplished by calling a constructor function.

After creating the process, you can start it either synchronously or asynchronously. And if you start it asynchronously, you can wait for it to finish by calling WaitToFinish() method.

go process.Run()
process.WaitToFinish()

You can link multiple processes together to create a pipeline. Interconnected processes will share data between each other internally.

They will be executed in parallel, and you don't need to worry about starting them manually. You only need to start the first one asynchronously, and the rest will be started automatically.

If you want to wait for all processes to finish, you can wait for the last one to finish by calling WaitToFinish() method on it.

Common reducers

In the reducers package, you can find some common reducers that you can use in your processes.

Example

In this example, we're using IMDB title_basics dataset (can be found here) to find out in which year the most movies were released.

We will create two linked processes. The first one will count how many movies were released in each year, and the second one will find the year with the most movies.

package main

import (
	"github.com/djordje200179/meduce"
	"github.com/djordje200179/meduce/collectors"
	"github.com/djordje200179/meduce/sources"
	"log"
	"strconv"
	"strings"
)

func MapMovieToYear(_ int, line string, emit meduce.Emitter[int, int]) {
	values := strings.Split(line, "\t")

	year, err := strconv.Atoi(values[5])
	if err != nil {
		return
	}

	emit(year, 1)
}

func ReduceYearCounters(_ int, counters []int) int {
	count := 0
	for _, value := range counters {
		count += value
	}

	return count
}

type YearInfo struct {
	Year  int
	Count int
}

func MapYearToInfo(year int, count int, emit meduce.Emitter[string, YearInfo]) {
	emit("max", YearInfo{year, count})
}

func ReduceMaxYear(_ string, infoList []YearInfo) YearInfo {
	var max YearInfo

	for _, info := range infoList {
		if max.Year == 0 || info.Count > max.Count {
			max = info
		}
	}

	return max
}

func main() {
	process1 := meduce.NewDefaultProcess(
		meduce.Config[int, string, int, int]{
			Mapper:  MapMovieToYear,
			Reducer: ReduceYearCounters,

			Source: sources.NewFileSource("files/title_basics.tsv"),

			Logger: log.Default(),
		},
	)

	maxValueCollector := collectors.NewSingleValueCollector[string, YearInfo]()

	process2 := meduce.NewDefaultProcess(
		meduce.Config[int, int, string, YearInfo]{
			Mapper:  MapYearToInfo,
			Reducer: reducers.MaxOrdered[string, YearInfo, int](func(info YearInfo) int { return info.Count }),

			Collector: maxValueCollector,

			Logger: log.Default(),
		},
	)

	meduce.Link(process1, process2)

	go process1.Run()
	process2.WaitToFinish()

	maxValue := maxValueCollector.Value()
	fmt.Printf("Most movies (%d) were made in %d. year.\n", maxValue.Count, maxValue.Year)
}

Documentation

Overview

Package meduce implements an interface to run MapReduce tasks on a single machine.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Link[KeyOld, ValueOld, KeyIn, ValueIn, KeyOut, ValueOut any](
	prevProcess *Process[KeyOld, ValueOld, KeyIn, ValueIn],
	nextProcess *Process[KeyIn, ValueIn, KeyOut, ValueOut],
)

Link links two processes together.

func LinkWithBufferSize added in v0.4.0

func LinkWithBufferSize[KeyOld, ValueOld, KeyIn, ValueIn, KeyOut, ValueOut any](
	prevProcess *Process[KeyOld, ValueOld, KeyIn, ValueIn],
	nextProcess *Process[KeyIn, ValueIn, KeyOut, ValueOut],
	bufferSize int,
)

LinkWithBufferSize links two processes together with a buffer of given size.

bufferSize is the size of the buffer that will be created to link the processes.

Types

type Collector added in v0.2.0

type Collector[KeyOut, ValueOut any] interface {
	Init()                              // Init is called just before collecting starts
	Collect(key KeyOut, value ValueOut) // Collect is called for each processed key-value pair
	Finalize()                          // Finalize is called after all key-value pairs were processed
}

A Collector is an entity that is supplied by user and is used to collect processed key-value pairs.

type Config added in v0.3.0

type Config[KeyIn, ValueIn, KeyOut, ValueOut any] struct {
	// KeyComparator and ValueComparator are used to sort key-value pairs
	// before they are passed to the Reducer.
	// KeyComparator is used as primary comparator,
	// and ValueComparator is used as secondary.
	KeyComparator   comparison.Comparator[KeyOut]
	ValueComparator comparison.Comparator[ValueOut]

	Mapper    Mapper[KeyIn, ValueIn, KeyOut, ValueOut]
	Reducer   Reducer[KeyOut, ValueOut]
	Finalizer Finalizer[KeyOut, ValueOut]
	Filter    Filter[KeyOut, ValueOut]

	Source    Source[KeyIn, ValueIn]
	Collector Collector[KeyOut, ValueOut]

	Logger *log.Logger
}

A Config is a configuration for a single MapReduce task.

type Emitter

type Emitter[KeyOut, ValueOut any] func(key KeyOut, value ValueOut)

An Emitter is a function that is supplied by library.

It is passed to user's Mapper function, and is called to emit key-value pairs.

type Filter added in v0.3.0

type Filter[KeyOut, ValueOut any] func(key KeyOut, valueRef *ValueOut) bool

A Filter is a function that is created by user and is used to filter processed key-value pairs.

It is called after a key-value pair was finalized.

It should return true if a final key-value pair should be collected or false if it should be discarded.

type Finalizer

type Finalizer[KeyOut, ValueOut any] func(key KeyOut, valueRef *ValueOut)

A Finalizer is a function that is created by user and is used to finalize key-value pairs.

It is called after all values for a key were reduced to a single value.

It should process the reduced value in-place before it is passed to collector.

type Mapper

type Mapper[KeyIn, ValueIn, KeyOut, ValueOut any] func(key KeyIn, value ValueIn, emit Emitter[KeyOut, ValueOut])

A Mapper is a function that is created by user and is used to map input data to key-value pairs.

It is called for each key-value pair from input data.

Function can call emit function multiple times to emit any number of key-value pairs.

type Process

type Process[KeyIn, ValueIn, KeyOut, ValueOut any] struct {
	Config[KeyIn, ValueIn, KeyOut, ValueOut]
	// contains filtered or unexported fields
}

A Process is an instance of a single MapReduce task.

Zero value of Process has no configuration set and has invalid uid.

func NewDefaultProcess added in v0.3.0

func NewDefaultProcess[KeyIn, ValueIn any, KeyOut cmp.Ordered, ValueOut any](
	config Config[KeyIn, ValueIn, KeyOut, ValueOut],
) *Process[KeyIn, ValueIn, KeyOut, ValueOut]

NewDefaultProcess creates a new Process with default key comparator for ordered keys.

func NewProcess

func NewProcess[KeyIn, ValueIn, KeyOut, ValueOut any](config Config[KeyIn, ValueIn, KeyOut, ValueOut]) *Process[KeyIn, ValueIn, KeyOut, ValueOut]

NewProcess creates a new Process with given configuration.

func (*Process[KeyIn, ValueIn, KeyOut, ValueOut]) Run

func (process *Process[KeyIn, ValueIn, KeyOut, ValueOut]) Run()

Run starts the MapReduce task and blocks until it is finished.

If logger is set, it will be used to log the progress.

func (*Process[KeyIn, ValueIn, KeyOut, ValueOut]) WaitToFinish

func (process *Process[KeyIn, ValueIn, KeyOut, ValueOut]) WaitToFinish() Collector[KeyOut, ValueOut]

WaitToFinish blocks until the MapReduce task is finished.

type Reducer

type Reducer[KeyOut, ValueOut any] func(key KeyOut, values []ValueOut) ValueOut

A Reducer is a function that is created by user and is used to reduce values to single value.

It is called for all keys multiple times, until all values for that key are reduced.

It should be idempotent and have no side effects.

type Source

type Source[KeyIn, ValueIn any] <-chan misc.Pair[KeyIn, ValueIn]

A Source is a channel that is created by user and from which key-value pairs are read.

Preferably, it should be buffered to avoid blocking and context switches.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL