Week 11: MapReduce
High-level overview
MapReduce is a framework for distributed computation that helps scale up parallel programs. Writing code for parallel or distributed processing manually can quickly become tedious due to the need to manage communication between machines, synchronization, etc. In response to that, the MapReduce framework defines 3 fundamental steps (that can be iterated) to write parallel programs:
-
map
: in this step, a function is applied to a list of inputs and outputs a list of(key, value)
pairs. -
shuffle
(also referred to as thecombine
orpartition
step): the(key, value)
tuples generated in themap
step are grouped based on theirkey
field. -
reduce
: finally, a function is applied to a series of(key, [value_1, value_2, ..., value_N])
pairs generated by theshuffle
step, and outputs another list of(key, value)
pairs
Since MapReduce is a framework for distributed computing, the reader should keep in mind that
the map
and reduce
steps can happen concurrently on different machines within a compute network. The shuffle
step that groups data per key ensures that (key, value)
pairs with
the same key
will be collected and processed in the same machine in the next step.
Distributed frameworks
Though the MapReduce model is abstract, it is most often used in the context of a larger distributed computation framework; the standard example is Apache Hadoop. Often, people use the Apache Spark compute engine together on top of a Hadoop installation.
Iterating MapReduce steps
The output of the reduce
step is itself a list of (key, value)
pairs,
allowing the programmer to "compose" multiple steps of MapReduce. In that case, the
output of the reduce
function is again grouped by key
and passed on to the map
function
of the next step.
Because there are multiple backends implementing MapReduce, we will use the mrjob
library to
write MapReduce programs without having to worry about setting up the backend. To run the examples below, make sure to install the library first:
$ pip install --user mrjob
Example: Word Count
As a first example, suppose we have a text file consisting of multiple lines and we wish to find the count of each word appearing in that file. We will use the MapReduce framework to do that, as follows:
-
The
map
step will split each input line to a list of words, and output(word, 1)
for eachword
found; -
The
shuffle
step will group all(word, 1)
pairs byword
, and output a list of(word, [1, 1, ..., 1])
pairs; -
Finally, the
reduce
step will sum over the number of1
s for eachword
in the output of theshuffle
step.
Note that the number of 1
s in a (word, [1, 1, ..., 1])
pair indicates the number of appearances of word
. The code follows below:
from mrjob.job import MRJob
import re
WORD_REGEX = re.compile(r"[\w]+")
class MRWordCount(MRJob):
def mapper(self, _, line):
for word in WORD_REGEX.findall(line):
yield word.lower(), 1
def reducer(self, word, counts):
yield word, sum(counts)
if __name__ == "__main__":
MRWordCount().run()
Method declarations
Both the mapper
and reducer
functions have signatures that look like below:
def mapper(self, key, value): ...
def reducer(self, key, value): ...
In short, both mapper
and reducer
are invoked on a series of key-value pairs, and this
is reflected in their declarations. The second argument of mapper
is omitted here because
mapper
is expected to process lines directly from an input file.
Running mrjob
examples
If the above script is in a file called mr_word_count.py
and your input file is called
data.txt
, it suffices to write
$ python mr_word_count.py data.txt
assuming that data.txt
is in the same folder as your script. The run()
method will
take care of splitting the file line-by-line and "feeding" each line to the mapper
function.
Let us take a step-by-step look at how this example works. Suppose that the input file contains the following lines:
hello ORIE students
welcome to ORIE
The mapper
function will process the file line-by-line. Processing the first line will lead to
the following intermediate outputs:
(hello, 1), (orie, 1), (students, 1)
while processing the second line will output:
(welcome, 1), (to, 1), (orie, 1)
Now comes the shuffle
step - if we have two tuples of the form (word, 1)
we merge them into a single tuple of the form (word, [1, 1])
, and so on. Therefore, we obtain
(hello, [1]), (orie, [1, 1]), (students, [1]), (welcome, [1]), (to, [1]).
Note that the number of 1
s appearing in the list for each word is exactly the number of appearances of that word.
Finally, the reducer
sums over the list of 1
s for each distinct word
, leading to the following output:
(hello, 1), (orie, 2), (students, 1), (welcome, 1), (to, 1)
Tip
Note that we make each word lowercase by using word.lower()
in the output of the mapper
function. This is because grouping is case-sensitive, i.e., the strings "orie"
and "ORIE"
would be treated as different keys by MapReduce.
More details on MapReduce
There are several things to keep in mind when running MapReduce jobs:
-
Even though all our examples use local data, data can be stored in a distributed fashion. This means that:
- multiple machines may be processing a local file by a given name;
- even if input is a single file, each line could be handled by a different process! (depends on implementation)
-
Grouping outputs by key always happens to ensure consistency:
- data that look like
(key, value)
are grouped bykey
after each call; - there is no way to prevent this grouping from happening.
- data that look like
-
The output of
mapper
andreducer
functions must be in the form(key, value)
. In addition:- between each call,
key
andvalue
are serialized (using the JSON format); - some data types, such as
set
, cannot be used without "hacks".
- between each call,
Chaining multiple steps
MapReduce jobs are composable, meaning that the map-shuffle-reduce pipeline can be applied to a series of inputs multiple times to perform complex tasks. The mrjob
library allows us to chain multiple steps, as long as each step:
- implements at least one of
mapper
,reducer
, orcombiner
- follows the
(key, value)
output format
Here is an example, using multiple steps of MapReduce to find the word of maximum frequency in a file.
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
WORD_REGEX = re.compile(r"[\w]+")
class MRMaxFreq(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper,
reducer=self.reducer),
MRStep(mapper=self.mapper_post,
reducer=self.reducer_post)
]
def mapper(self, _, line):
for word in WORD_REGEX.findall(line):
yield word.lower(), 1
def reducer(self, word, counts):
yield word, sum(counts)
# keys: None, values: (word, word_count)
def mapper_post(self, word, word_count):
yield None, (word, word_count)
# sort list of (word, word_count) by word_count
def reducer_post(self, _, word_count_pairs):
yield max(word_count_pairs, key=lambda p: p[1], reverse=True)
if __name__ == "__main__":
MRMaxFreq().run()
There are several moving parts here, so let us parse them one-by-one.
-
First, note that we are overloading the
steps()
method to informmrjob
that our program consist of multiple steps: -
the first step uses the same mapper and reducer functions that we used for the simple word count program
-
the second step uses
mapper_post
as mapper function andreducer_post
as its reducer function.The above means that the output of
reducer
becomes the input tomapper_post
. -
The
mapper_post
function is very simple: it converts key-value pairs as follows:(word, word_count) -> (None, (word, word_count))
This is for good reason: without a step like that, we would not be able to find the word with maximum frequency since any function we write would process
(word, word_count)
pairs one at a time. By mapping everything to the keyNone
, we ensure thatreducer_post
will process all(word, word_count)
pairs at once, since theshuffle
step of MapReduce will collect them all into a list corresponding to the "placeholder" key ofNone
. -
Finally,
reducer_post
sorts the list of(word, word_count)
pairs byword_count
and outputs the word with the maximal word count at the end. Thekey
parameter in themax()
function is so that the list of tuples is sorted by the second element, which contains the count of each word.
Again, let us see how our program would work given the example input file below:
$ cat data.txt
hello ORIE students
welcome to ORIE
Step 1: the first MRStep
would give us identical output to that produced by our simple
word count program:
(hello, 1), (orie, 2), (students, 1), (welcome, 1), (to, 1)
Step 2: The mapper_post
function processes the above series of key-value pairs and outputs the following:
(None, (hello, 1)), (None, (orie, 2)), (None, (students, 1)), (None, (welcome, 1)), (None, (to, 1))
Step 3: MapReduce groups the above outputs by key. Since all tuples have None
as their key, the result is a single key-value pair like the one below:
(None, [(hello, 1), (orie, 2), (students, 1), (welcome, 1), (to, 1)])
Step 4: The reducer_post
function processes the above pair, sorting the list of (word, word_count)
pairs in descending order of word_count
, and finally outputs
(orie, 2)
which indicates that orie
is the word with maximal frequency 2
.
Tip
In this example, since the mapper_post
function does something trivial to the
output of reducer
, we can incorporate that change directly into the reducer
function:
def reducer(self, word, counts):
yield None, (word, sum(counts))
In that case, we can omit the mapper_post
function entirely and rewrite the steps()
method as follows:
def steps(self):
return [
MRStep(mapper=self.mapper, reducer=self.reducer),
MRStep(reducer=self.reducer_post)
]
Other available methods
The mrjob
library supports a few other methods that complement the mapper
, combiner
and reducer
functions, such as:
-
{mapper,combiner,reducer}_init()
: these can be used to e.g., initialize helper data structures; -
{mapper,combiner,reducer}_final()
: used for post-processing after the "main" function is run.
We now show an example of using mapper_init()
and mapper_final()
to slightly optimize the word count example. Recall that the simple word count program had a somewhat inefficient mapper
function; it would output (word, 1)
immediately for each word
encountered, which means that we could have something like the following:
hi hello hi hello hi -> (hi, 1), (hello, 1), (hi, 1), (hello, 1), (hi, 1)
If possible, we would like to keep track of the partial sums of each word encountered in our compute node to output fewer key-value pairs, such as:
hi hello hi hello hi -> (hi, 3), (hello, 2)
Below, we use mapper_init()
to initialize a dictionary holding word counts for each word encountered so far. The mapper
function simply updates the key-value pairs in that dictionary,
and in mapper_final()
, which runs after all inputs have been processed, we output all the
key-value pairs from that dictionary.
At first glance, it seems like this is all we need to do to count the number of words. However:
- MapReduce runs in multiple instances / processes! Therefore, a separate dictionary will be created for each process;
- Because of this, we still need a
reducer
step to combine the partial counts from each process.
The complete code follows below:
from collections import defaultdict
from mrjob.job import MRJob
import re
word_regex = re.compile(r"[\w]+")
class MRWordCount(MRJob):
def mapper_init(self):
self.words_dict = defaultdict(int)
def mapper(self, _, line):
for w in word_regex.findall(line):
self.words_dict[word.lower()] += 1
def mapper_final(self):
for word, val in self.words_dict.items():
yield word, val
def reducer(self, word, counts):
yield word, sum(counts)
Our mapper
function in the above example only populates the dictionary
and does not output any key-value pairs; the latter is taken care of in the
mapper_final
function, which emits all key-value pairs from the dictionary.
Skipping mapper_final
is not possible, as otherwise reducer
would not
receive any inputs.
The defaultdict
class
We use defaultdict
, which is a convenience wrapper around dictionaries providing a default value for keys that do not exist yet. For example:
defaultdict(int) # keys have default value 0
defaultdict(list) # keys have default value []
This allows us to e.g., increment keys as in self.words_dict[word.lower()] += 1
without having to check whether word.lower()
previously existed as a key in the dictionary.
Example: Finding mutual friends
Suppose that you have a textual representation of a graph in the following form:
A -> B C E
B -> A C F
C -> A B
...
In each line, the first element is a node of the graph, with the elements after the
arrow being the nodes it is connected to. We assume that this graph is undirected, meaning that if A
is connected to B
then B
is also connected to
A
.
Now, suppose that this graph was representing a social network, where nodes are users and being connected means that you are "friends" or "followers". A common piece of functionality (implemented, e.g., in Facebook) is to report a list of mutual friends between a pair of already existing friends.
For example, in the following graph, (A, B)
have C
as a mutual friend:
A -> B C E
B -> A C D
C -> A B
D -> B E
E -> A D
Indeed, A
is connected to B
, C
and E
and B
is connected to A
, C
, and D
.
We can use MapReduce to solve the problem of finding mutual friends between each pair of friends in the original graph:
-
Initially, for each pair of friends
(U, V)
described in each line of the input file, we output a key-value pair with(U, V)
as its key and the list of friends ofU
as its value. For example:# mapper(A -> B C E) ((A, B), [B, C, E]), ((A, C), [B, C, E]), ((A, E), [B, C, E])
The intuition behind this step is as follows. - since the graph is undirected,every pair
(U, V)
of friends will appear twice as a key - the first time, we will have((U, V), [list of friends of U])
- the second time, we will have((V, U), [list of friends of V])
Note that if we could intersect the list of friends of
U
with the list of friends ofV
, this would give us precisely the list of their mutual friends. However, if we maintain the output format as-is, theshuffle
step of MapReduce will treat(U, V)
and(V, U)
as different keys! -
Given the aforementioned problem, an important part in this step is to actually make sure that we output
(U, V)
both times, so that theshuffle
step is MapReduce groups these two lists together as below:((U, V), [[list of friends of U], [list of friends of V]])
This can be accomplished by making sure that the keys in the output of the
mapper
function are sorted lexicographically. In our example input, we would obtain:# mapper(A -> B C E) ((A, B), [B, C, E]), ((A, C), [B, C, E]), ((A, E), [B, C, E]) # mapper(B -> A C D) ((A, B), [A, C, D]), ((B, C), [A, C, D]), ((B, D), [A, C, D]) ...
After grouping the above by key in the
shuffle
step, one of the intermediate outputs would be((A, B), [[B, C, E], [A, C, D]])
where we can simply intersect the two lists to obtain the list of mutual friends of
A
andB
. -
The final step, which is the
reducer
step, simply performs the list intersection.
The following program contains the complete implementation in Python:
from mrjob.job import MRJob
class MRFriends(MRJob):
def mapper(self, _, line):
# parse line: A -> B C D E
f_from, to_str = line.split("->")
f_from = f_from.strip() # remove surrounding whitespace
friends_to = to_str.strip().split() # split on whitespace
# output: ((U, V), [F1, F2, ...])
for f_to in friends_to:
if f_from > f_to: # key sorted lexicographically
f_to, f_from = f_from, f_to
yield (f_from, f_to), friends_to
def reducer(self, pair, friends_to):
common = set.intersection(
set(friends_to[0]), set(friends_to[1]))
yield pair, list(common)
if __name__ == "__main__":
MRFriends().run()
Note
Note that the above program only works for finding mutual friends of
A, B
when A
and B
are already friends in the existing graph.
To find mutual friends of (A, B)
when A
and B
are allowed to not be
connected, one way would be to output ((X, Y), [list of friends of X])
for each pair (X, Y)
, regardless of whether X, Y
are connected. For this
approach to work, one may need to know the nodes in the graph in advance, or
build it up via multiple steps of MapReduce.