README ¶
Pachyderm Word Count
In this guide, we will write a classic word count application on Pachyderm. This is a somewhat advanced guide; to learn the basic usage of Pachyderm, start with the beginner tutorial.
Setup
This guide assumes that you already have a Pachyderm cluster running and have configured pachctl
to talk to the cluster. Installation instructions can be found here.
Pipelines
In this example, we will have three processing stages defined by three pipeline stages:
Our first pipeline, scraper
, is a web scraper that just pulls content from the internet. Our second pipeline, map
, tokenizes the words from the scraped pages in parallel over all pages and appends counts of words to files corresponding to those words. Our final pipeline, reduce
, aggreates the the total counts for each word.
All three pipelines, including reduce
, can be run in a distributed fashion to maximize performance.
Input
Our input data is a set of files. Each file is named for the site we want to scrape with the content being the URL or URLs for that site.
Let's create the input repo and add one URL, Wikipedia:
$ pachctl create-repo urls
# We assume you're running this from the root of this example (pachyderm/doc/examples/word_count/):
$ pachctl put-file urls master -c -f Wikipedia
Then to actually scrape this site and save the data, we create the first pipeline based on the scraper.json pipeline specification:
# We assume you're running this from the root of this example:
$ pachctl create-pipeline -f scraper.json
This first pipeline, scraper
, uses wget
to download web pages from Wikipedia which will be used as the input for the next pipeline. It'll take a minute or two because it needs to apt-get
a few dependencies (this can be avoided by creating a custom Docker container with the dependencies already downloaded).
When you create the scraper
pipeline, you should be able to see a job running and a new repo called scraper
that contains the output of our scrape:
$ pachctl list-job
ID OUTPUT COMMIT STARTED DURATION RESTART PROGRESS STATE
44190a81-a87b-4a6b-8f25-8e5d3504566a scraper/- 3 seconds ago - 0 0 / 1 running
$ pachctl list-job
ID OUTPUT COMMIT STARTED DURATION RESTART PROGRESS STATE
44190a81-a87b-4a6b-8f25-8e5d3504566a scraper/da0786abd4254ff6b2297aeaf10204e4 About a minute ago 42 seconds 0 1 / 1 success
$ pachctl list-repo
NAME CREATED SIZE
scraper About a minute ago 71.34 KiB
urls 3 minutes ago 39 B
$ pachctl list-file scraper master
NAME TYPE SIZE
Wikipedia dir 71.34 KiB
$ pachctl list-file scraper master Wikipedia
NAME TYPE SIZE
Wikipedia/Main_Page.html file 71.34 KiB
Map
The map
pipeline counts the number of occurrences of each word it encounters for each of the scraped webpages. While this task can very well be accomplished in bash, we will demonstrate how to use custom code in Pachyderm by using a Go program.
In this case, you don't have to build a custom Docker image yourself with this compiled program. We have pushed a public image to Docker Hub, pachyderm/wordcount-map
, which is referenced in the map.json pipeline specification.
Let's create the map
pipeline:
# Again, we assume you're running this from the root of this example:
$ pachctl create-pipeline -f map.json
As soon as you create this pipeline, it will start processing data from the scraper
data repository. For each web page the map.go
code processes, it writes a file for each encountered word. These files have the word itself as a filename, and the content of each file is the number of occurrences of the respective word. If multiple workers write to the same file, the content is appended. As an example, a file wikipedia
might look like this (assuming we have already processed multiple web sites referencing wikipedia):
$ pachctl get-file map master wikipedia
36
11
17
By default, Pachyderm will spin up the same number of workers as the number of nodes in your cluster. This can of course be customized or changed (see here for more info on controlling the number of workers).
Reduce
The final pipeline, reduce
goes through every file and adds up the numbers in each file, thus obtaining a total count per word. For this pipeline we can use a simple bash script:
find /pfs/map -name '*' | while read count; do cat $count | awk '{ sum+=$1} END {print sum}' >/tmp/count; mv /tmp/count /pfs/out/`basename $count`; done
Which we bake into reduce.json. Again, creating the pipeline is as simple as:
# We assume you're running this from the root of this repo:
$ pachctl create-pipeline -f reduce.json
The output should look like:
$ pachctl list-repo
NAME CREATED SIZE
reduce 43 minutes ago 4.216 KiB
map 46 minutes ago 2.867 KiB
scraper 50 minutes ago 71.34 KiB
urls 53 minutes ago 39 B
$ pachctl get-file reduce master wikipedia
241
To get a complete list of the words counted:
$ pachctl list-file reduce master
NAME TYPE SIZE
a file 4 B
abdul file 2 B
about file 3 B
aboutsite file 2 B
absolute file 2 B
accesskey file 3 B
accidentally file 2 B
account file 2 B
across file 2 B
action file 2 B
activities file 2 B
additional file 2 B
etc...
Expand on the example
Now that we've got a full end-to-end scraper and wordcount use case set up, lets add more to it. First, let's add more data. Go ahead and add a few more sites to scrape.
# Instead of using the -c shorthand flag, let's do this the long way by starting a commit, adding files, and then finishing the commit.
$ pachctl start-commit urls master
# Reminder: files added should be named for the website and have the URL as the content. You'll have to create these files.
$ pachctl put-file urls master -f HackerNews
$ pachctl put-file urls master -f Reddit
$ pachctl put-file urls master -f GitHub
$ pachctl finish-commit urls master
Your scraper should automatically get started pulling these new sites (it won't rescrape Wikipedia). That will then automatically trigger the map
and reduce
pipelines to process the new data and update the word counts for all the sites combined.
If you add a bunch more data and your pipeline starts to run slowly, you can crank up the parallism. By default, pipelines spin up one worker for each node in your cluster, but you can set that manually with the parallelism spec field in the pipeline specification. Further, the pipelines are already configured to spread computation across the various workers with `"glob": "/*". Check out our spreading data across workers docs to learn more about that.
Documentation ¶
There is no documentation for this package.