word_count

command
v1.4.0-GM Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2017 License: Apache-2.0 Imports: 11 Imported by: 0

README

Pachyderm Word Count

In this guide, we will write a classic word count application on Pachyderm. This is a somewhat advanced guide; to learn the basic usage of Pachyderm, start with the beginner tutorial.

Setup

This guide assumes that you already have a Pachyderm cluster running and have configured pachctl to talk to the cluster. Installation instructions can be found here.

Pipelines

In this example, we will have three processing stages defined by three pipeline stages. We say that these pipeline stages are "connected" because one pipeline's output is the input of the other.

input

Let's create the first pipeline:

# We assume you're running this from the root of this repo:
$ pachctl create-pipeline -f input_pipeline.json

This first pipeline, input, uses wget to download web pages from Wikipedia which will be used as the input for the next pipeline. We set parallelism to 1 because wget can't be parallelized.

Note how this pipeline itself has no inputs. A pipeline that doesn't have inputs can only be triggered manually:

$ pachctl run-pipeline input

Now you should be able to see a job running:

$ pachctl list-job

map

This pipeline counts the number of occurrences of each word it encounters. While this task can very well be accomplished in bash, we will demonstrate how to use custom code in Pachyderm by using a Go program.

First of all, we build a Docker image that contains our Go program:

$ docker build -t wordcount-map .

Then, we simply refer to the image in our pipeline and run the pipeline:

# We assume you're running this from the root of this repo:
$ pachctl create-pipeline -f mapPipeline.json

As soon as you create this pipeline, it will start processing data from input. To parallelize the pipeline, you can modify the parallelism spec, mapPipeline.json. Pachyderm will then spin up concurrently running containers where each container gets 1/N of the data (where N in the number of containers). For each word a container encounters, it writes a file whose filename is the word, and whose content is the number of occurrences. If multiple containers write the same file, the content is merged. As an example, the file morning might look like this:

# find the commit ID
$ pachctl list-job -p map  
ID                                 OUTPUT                                           STARTED             DURATION            STATE
6600c71be4e8604f716ce1965895dc27   map/5b0e5c8f345b4f8c9690c77333564687   46 hours ago        -                   success
$ pachctl get-file map 5b0e5c8f345b4f8c9690c77333564687 morning 
217
355
142

This shows that there were three containers that wrote to the file morning.

reduce

The final pipeline goes through every file and adds up the numbers in each file. For this pipeline we can use a simple bash script:

find /pfs/map -name '*' | while read count; do cat $count | awk '{ sum+=$1} END {print sum}' >/tmp/count; mv /tmp/count /pfs/out/`basename $count`; done

Which we bake into reducePipeline.json.

# We assume you're running this from the root of this repo:
$ pachctl create-pipeline -f reducePipeline.json

The final output might look like this:

$ pachctl get-file reduce [commit ID] morning
714

To get a complete list of the words counted:

$ pachctl list-file reduce [commit ID]

Preliminary Benchmarks

We ran this pipeline on 7.5GB of data in July 2016, on a 3-node GCE cluster with 4 CPUs and 15GB of memory each. The job completed in 9 hours.

Documentation

The Go Gopher

There is no documentation for this package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL