MapReduce Questions and Answers Part 2
The chapter contains a lot of details about integer numbers encoding and compression. Since these topics are not directly about MapReduce, I made no questions about them.
4.4 Inverting Indexing: Revised Implementation
Explain inverting index retrieval algorithm. You may assume that each document fits into the memory. Assume also then there is a huge number of documents. Which part should be optimized by integer encoding and compression?
Input: text documents
key: document id
value: text document
Output: key/value pairs where
key: word
value: list[documentId, numberOfOccurences]
list elements must be sorted by numberOfOccurences
Mapper counts number of occurrences in the document for each word. As the whole document fits into the memory, we can hold partial results in a map.
Intermediate key/values:
key: word, numberOfOccurences
value: documentId
Custom partitioner groups intermediate key/values by word. Custom sort sorts them primary by word and secondary by the number of occurrences.
Reducer uses initialize method to initialize list of all postings. Reduce method handles two cases:
- current word equal to previous word – add documentId and numberOfOccurences to posting list.
- current word equal to previous word – emit previous word and posting list; initialize posting list.
Posting list in reducer should be compressed.
class MAPPER method INITIALIZE H = new hash map method MAP(docid, doc d) H = new hash map for all term w in doc d do H(w) = H(w) + 1 for all term w in H do emit(pair(u, w), count 1) method CLOSE for all term w in H emit(pair(w, H(w)), docid) class REDUCER variable previous_word = 0 variable PL = new list of postings method REDUCE(pair (w, #occurrences), docid) if w <> previous_word && previous_word <> 0 do emit(w, PL) PL = new list of postings PL.add(pair(#occurrences, docid)) previous_word = w method compare(key (w1, o1), key (w2, o2)) if w1 = w2 return keys are equal return keys are different class SORTING_COMPARATOR method compare(key (w1, o1), key (w2, o2)) if w1 = w2 do return compare(o1, o2) return compare(w1, w2)
5 Graph Algorithms
The chapter contains two algorithms: shortest path in the graph and page ranking algorithm. The questions are straightforward.
5.2 Parallel Breadth-First Search
Find shortest path from one node origin
to all other nodes. Each edge has a weight associated. Input key/value pairs have already bean preprocessed into comfortable form.
Input: graph
key: node id
value: distance to origin, list[adjacent node, edge length]
Output: key/value pairs where
key: node id
value: distance to origin, list[adjacent node, edge length]
The algorithm requires multiple iterations. It stops the iteration does not change any ‘distance to origin’. At worst, there will be O(n) iterations where n is a number of nodes in the graph.
Mapper passes original graph to the next iteration as it is. Plus, it generates key/value pair for each adjacent node. The value contains the minimum known distance from origin if the route would go through node.
class MAPPER method MAP(node, pair(dist, adjacencylist)) emit(node, pair(dist, adjacencylist)) for all (closenode, nodedist) in adjacencylist do emit(closenode, pair(dist + nodedist, empty))
Reducer finds the minimum known distance from each node. It passes the distance along with the original graph to the next iteration. It also increments global counter whenever minimum known distance to any node changes.
class REDUCER method REDUCE(node, list(dist, adjacencylist)) minimum = infinity previous_iteration_solution = infinity original_graph = empty for all (dist, adjacencylist) in list do if adjacencylist not empty do original_graph = adjacencylist previous_iteration_solution = dist if minimum > dist minimum = dist if previous_iteration_solution <> minimum increment global counter emit(node, pair(minimum, original_graph))
If the global counter is 0, the algorithm stops. Otherwise another iteration is needed.
Explain page rank algorithm, assume alpha = 0.
Page rank P(n)
of a page n
is calculated form page ranks of all pages linking to it.
P(n) = sum_m (P(m)/C(m))
The sum goes through all pages m
linking to the page n
. C(m)
is the number of outgoing links of the page m
.
Page rank algorithm runs in iterations. Mapper passes page rank contribution of each page to adjacent pages. Reducer updates page rank of each node. The algorithm stops when page ranks no longer moves.
class MAPPER method MAP(page, (page_rank, adjacency_list)) emit(page, (0, adjacency_list)) contribution = page_rank/adjacency_list.length for all node in adjacency_list do emit(node, (contribution, empty)) class REDUCER method REDUCE(page, contributions[c1, c2, ..., cn]) rank = 0 adjacency_list = new list for all c in contributions do adjacency_list.addAll(c.adjacency_list) rank = rank + c.contribution emit(page, (rank, adjacency_list))
6 EM Algorithms For Text Processing
I made no questions out of this chapter.
Exercises
This chapter contains hands-on exercises for MapReduce. Some of them require multiple iterations.
Warm Up
Count number of occurrences of every word in a text collection.
Input:
key: document id,
value: text document.
Output:
key: word,
value: number of occurences.
Intermediate pairs:
key: word
value: integer - how many times was the word seen in the input.
class MAPPER method MAP(docid a, doc d) for all term w in doc d do emit(w, 1) class COMBINER method COMBINE(word w, counts[c1, c2, ..., cn]) s = 0 for all c in counts[c1, c2, ..., cn] do s = s + c emit(word w, s) class REDUCER variable total_occurrences = 0 method REDUCE(word w, counts[c1, c2, ..., cn]) s = 0 for all c in counts[c1, c2, ..., cn] do s = s + c emit(word w, s)
Alternative solution would use in-mapper combining.
Web Store
Website user log contains user ids and length of each session. The website has modest number of registered users. Compute the average session length for each user.
Input:
key: user id,
value: session length.
Output:
key: user id,
value: average session length.
As the number of registered users is modest, we can use in-mapper combining.
class MAPPER variable total_time = new hash map variable sessions_number = new hash map method MAP(user_id, session_length) total_time(user_id) = total_time(user_id) + session_length sessions_number(user_id) = sessions_number(user_id) + 1 method CLOSE for all user_id in total_logged_in_time tt = total_time(user_id) sn = sessions_number(user_id) emit(user_id, pair(tt, sn)) class REDUCER method REDUCE(user_id, [pairs(time, sessions_number)]) total_time = 0 total_sessions = 0 for all pairs in [pairs(time, sessions_number)] do total_time = total_time + time total_sessions = total_sessions + sessions_number emit(user_id, total_time/total_sessions)
Web store log contains user id and bought item for each sale. You need to implement “buyers of item also bought” functionality. Whenever the item is shown, the store will suggest five items most often bought by items buyers.
Input:
key: user id,
value: brought item.
Output:
key: item,
value: list of five most common "buyers of item also bought" items.
Our solution has two iterations. First iteration generates lists of all items brought by the same user. Grouping is done by the framework, both mapper and reducer perform an identity function.
Input:
key: user id,
value: brought item.
Output:
key: user id,
value: list of all brought items.
class MAPPER method MAP(user_id, item) emit(user_id, item) class REDUCER method REDUCE(user_id, items[i1, i2, ..., in]) emit(user_id, items)
Second iteration solves co-occurrences problem on list items. It uses the stripes approach. Only difference against the standard solution is that we have emit only five most common co-occurrences.
Input:
key: user id,
value: list of all brought items.
Output:
key: item,
value: list of five most common co-occurrences.
class MAPPER method MAP(user_id, items[i1, i2, ..., in]) for all item in items do H = new hash map for all item j in items do H(j) = H(j) + 1 emit(item, H) class REDUCER method REDUCE(item, stripes[H1, H2, ..., Hn]) T = new hash map for all H in stripes do for all (key/value) in H do T(key) = T(key) + value emit(user_id, max_five(T))
Web store log contains user id, timestamp, item and number of brought pieces for each sale. The store is looking for items whose sales rise or decline at the same time. Find 20 item couples with maximum of such months.
Input:
key: user id,
value: timestamp, brought item, count.
Output:
key: item, item
value: number of months when both items sales rose or decline.
#: the output contains 20 key/value pairs with maximum value
Our solution requires multiple MapReduce iterations. We have to:
- calculate whether items sales for any given month went up or down,
- create lists of items with the same sales change during the same month,
- find number of co-occurrences in those lists,
- choose items with maximum co-occurrences.
First iteration calculates sales changes for any given month. We have to supply mapper, partitioner, custom sort and reducer. Mapper generates one intermediate key/value pair for each input key/value. Key is composed of sold item and sales month. Value contains number of sold pieces.
Partitioner sends all key/value pairs with the same item to the same reducer. Custom sort sorts them by months. Finally, reducer calculates sales changes.
Input:
key: user id,
value: timestamp, item, count.
Intermediate key/values:
key: item, month
value: count.
Output:
key: month, up/down/equal
value: item.
class MAPPER method MAP(user_id, (timestamp, item, count)) month = get_month(timestamp) emit((item, month), count) class PARTITIONING_COMPARATOR method compare(key (item1, month1), key (item2, month2)) if item1 = item2 return keys are equal return keys are different class SORTING_COMPARATOR method compare(key (item1, month1), key (item2, month2)) if item1 = item2 do return compare(month1, month2) return compare(item1, item2) class REDUCER method REDUCE((item, month), counts[c1, c2, ..., cn]) c = sum([c1, c2, ..., cn]) if last_item = item if last_month + 1 = month //emit correct up/down/equal flags if last_count < count emit((item, month), up) if last_count > count emit((item, month), down) if last_count = count emit((item, month), equal) else //no sales during some months emit((item, last_month + 1), down) emit((item, month), up) else // new item emit((last_item, last_month + 1), down) emit((item, month), up) last_item = item last_count = count last_month = month
Second iteration groups first iteration results by keys. It generates lists of items with same sales changes during the same month. Framework does all the work. Both mapper and reducer perform an identity function.
Input:
key: month, up/down/equal
value: item.
Output:
key: month, up/down/equal
value: [items].
Third iteration performs standard ‘co-occurrences by pairs’ algorithm.
Input:
key: month, up/down/equal
value: [items].
Intermediate key/values:
key: item, item
value: partial number of co-occurrences.
Output:
key: item, item
value: number of months when both items sales rose or decline.
#: the output contains all items couples
class MAPPER method MAP((month, change), items[i1, i2, ..., in]) for each i in items do for each j in items do if i != j emit((i, j), 1) class COMBINER method COMBINE((item1, item2), co-occurrences[c1, c2, ..., cn]) s = 0 for all c in co-occurrences[c1, c2, ..., cn] do s = s + c emit((item1, item2), s) class REDUCER method REDUCE((item, item), co-occurrences[c1, c2, ..., cn]) s = 0 for all c in co-occurrences[c1, c2, ..., cn] do s = s + c emit((item1, item2), s)
Finally, we have to choose 20 key/value pairs with maximum value. Each mapper selects 20 key/value pairs with maximum value and emits them with the same key. There will be only one reducer which selects final 20 key/value pairs.
Input:
key: item, item
value: number of months when both items sales rose or decline.
#: the output contains all items couples
Intermediate key/values:
key: 1
value: item, item, number of months when both items sales rose or decline.
#: the output contains 20 key/value pairs with maximum value for each mapper
Output:
key: item, item
value: number of months when both items sales rose or decline.
#: the output contains 20 key/value pairs with maximum value
the code is very simple but long
Criminal Agency
Inputs to all exercises in this chapter uses the same data structure.
Criminal agency stole Facebook’s friendships database and wants to analyze new data. Friendships are stored in form key/value pairs, each friendship corresponds to two key/value pairs:
Friends:
key: first friend name
value: second friend name
key: second friend name
value: first friend name
The agency owns also criminal records of all citizens:
Criminal record:
key: citizen name
value: when, where, accomplices, description
Find at risk youths. A person is considered at risk youth if more than half of his friends have criminal record.
Our solution has two iterations. First iteration joins two sets and flags each ‘value friend’ with has_record/law_abiding flags.
Output:
key: first friend
value: second friend, has_record/law_abiding
The mapper flags each key with data set name. Partitioner groups data according to names in keys and sorter puts criminal records before friendships. We could use local aggregation to remove multiple criminal records for the same person.
class MAPPER method MAP(name, value) if value is name do emit(name, friendship, item) else emit(name, criminal, item) class PARTITIONING_COMPARATOR method compare(key (name1, dataset1), key (name2, dataset2)) if name1 = name2 return keys are equal return keys are different class SORTING_COMPARATOR method compare(key (name1, dataset1), key (name2, dataset2)) if name1 = name2 AND dataset1 is criminal return key1 is lower if name1 = name2 AND dataset2 is criminal return key2 is lower return compare(name1, name2) class REDUCER variable previous_name method REDUCE(pair(name, flag), items[i1, i2, ..., in]) if flag is criminal do previous_name = name has_record = criminal return if previous_name <> name do has_record = law_abiding else has_record = criminal previous_name = name for all i in items do emit(i.name, pair(name, has_record))
Second iteration counts both total number of friends and number of friends with criminal record. Reducer emits key/value pairs only for at risk youths. Also this iteration could use some kind of local aggregation.
Intermediate key/value:
key: name
value: total friends, total friend criminals
# totals are relative only to in data sets subsets
Output:
key: name
value: empty
# only at risk youths
class MAPPER method MAP(name, pair(name, has_record)) if has_record is law_abiding do emit(name, pair(0, 1)) else emit(name, pair(1, 1)) class REDUCER method REDUCE(name, items[pair(total, criminals)]) total = 0 criminals = 0 for all i in items do total = total + i.total criminals = criminals + i.criminals if criminals / total > 0.5 do emit(name, empty)
Find gangs. Gang is a group of people that:
- has exactly 5 members,
- each member is friend with all other members,
- each two members committed at least 3 crimes together.
Again, we need three iterations. The idea is to first clean up the graph of all useless edges, so that only criminal contacts remain. Then, we split graph into smaller manageable sub-graphs. We attach all criminal contacts and edges between them to each person:
Last iteration reducers input:
key: person
values: all his criminal contacts and relationships between them.
Final reducer takes smaller graphs represented by value in each key/value pair and finds complete sub-graphs with 4 vertices in it. Add person from the key in it, and you have found a complete sub-graph with 5 vertices. The reducer may use any polynomial algorithm.
First iteration uses pairs approach to clear the graph. We omit both local aggregation and removal of duplicities. Both would make the algorithm more efficient.
Intermediate key/values:
key: first friend, second friend, friendship/accomplice
value: 1
Output:
key: first friend, second friend
value: empty
# only friends with common criminal record
class MAPPER method MAP(name, value) if value is name do emit(triple(name, value, friendship), empty) else for all crime_accomplice in value.accomplices do emit(triple(name, crime_accomplice, accomplice), 1) class PARTITIONING_COMPARATOR method compare(key (name1, accomplice1, flag1), key (name2, accomplice2, flag2)) if name1 = name2 AND accomplice1 = accomplice2 return keys are equal return keys are different class SORTING_COMPARATOR method compare(key (name1, accomplice1, flag1), key (name2, accomplice2, flag2)) if name1 = name2 AND accomplice1 AND flag1 is friendship return key1 is lower if name1 = name2 AND accomplice1 AND flag2 is friendship return key2 is lower return compare(pair(name1, accomplice1), pair(name2, accomplice2)) class REDUCER variable previous_name variable previous_accomplice method sameAsPrevious(name, accomplice) if previous_name <> name return false if previous_accomplice <> accomplice return false return true method REDUCE(triple(name, accomplice, flag), items[i1, i2, ..., in]) if sameAsPrevious(name, accomplice) do if items.length > 2 do emit(name, accomplice) return if flag is friendship do previous_name = name previous_accomplice = accomplice
Second iteration attaches lists of all ‘second degree’ friends to edges:
Input
key: first friend, second friend
value: empty
Intermediate key/values:
key: first friend
value: first friend, second friend
key: second friend
value: first friend, second friend
Output:
key: first friend, second friend
value: all friends of second friend
key: second friend, first friend
value: all friends of first friend
class MAPPER method MAP((first friend, second friend), empty) emit(first friend, (first friend, second friend)) emit(second friend, (first friend, second friend)) class REDUCER method REDUCE(name, edges[e1, e2, ..., en]) friends = new Set friends.add(name) for all edge in edges do friends.add(edge.v1, edge.v2) for all edge in edges do emit(edge, friends)
Finally, mapper and shuffle and sort phase together generate lists of all friends of any given person and relationships between them.
Input
key: friend 1, friend 2
value: all friends of friend 2
Intermediate key/values:
key: friend 1
value: friend 2, all friends of friend 2
Reducers input (after shuffle and sort):
key: person
values: all his friends and relationships between them.
Output:
key: first friend, second friend, third friend, fourth friend, fifth friend
value: gang
class MAPPER method MAP((friend , friend 2), all friends of second friend) emit(friend 1, (friend 2, all friends of friend 2)) class REDUCER method REDUCE(name, graph attached to it) any polynomial algorithm will work
Reference: MapReduce Questions and Answers from our JCG partner Maria Jurcovicova at the This is Stuff blog.