Software Development

MapReduce Questions and Answers Part 2

4 Inverting Indexing for Text Retrieval

The chapter contains a lot of details about integer numbers encoding and compression. Since these topics are not directly about MapReduce, I made no questions about them.

4.4 Inverting Indexing: Revised Implementation

Explain inverting index retrieval algorithm. You may assume that each document fits into the memory. Assume also then there is a huge number of documents. Which part should be optimized by integer encoding and compression?

Input: text documents
key: document id
value: text document

Output: key/value pairs where
key: word
value: list[documentId, numberOfOccurences]
list elements must be sorted by numberOfOccurences

The answer is:

Mapper counts number of occurrences in the document for each word. As the whole document fits into the memory, we can hold partial results in a map.

Intermediate key/values:
key: word, numberOfOccurences
value: documentId

Custom partitioner groups intermediate key/values by word. Custom sort sorts them primary by word and secondary by the number of occurrences.

Reducer uses initialize method to initialize list of all postings. Reduce method handles two cases:

  • current word equal to previous word – add documentId and numberOfOccurences to posting list.
  • current word equal to previous word – emit previous word and posting list; initialize posting list.

Posting list in reducer should be compressed.

class MAPPER
  method INITIALIZE
    H = new hash map    

  method MAP(docid, doc d)
    H = new hash map
    for all term w in doc d do
        H(w) = H(w) + 1

    for all term w in H do
        emit(pair(u, w), count 1)

  method CLOSE 
    for all term w in H
      emit(pair(w, H(w)), docid)    

class REDUCER
  variable previous_word = 0
  variable PL = new list of postings

  method REDUCE(pair (w, #occurrences), docid)
    if w <> previous_word && previous_word <> 0 do
      emit(w, PL)
      PL = new list of postings
    
    PL.add(pair(#occurrences, docid))
    previous_word = w

  method compare(key (w1, o1), key (w2, o2))
    if w1 = w2 
      return keys are equal

    return keys are different

class SORTING_COMPARATOR
  method compare(key (w1, o1), key (w2, o2))
    if w1 = w2 do
      return compare(o1, o2)
         
    return compare(w1, w2)

5 Graph Algorithms

The chapter contains two algorithms: shortest path in the graph and page ranking algorithm. The questions are straightforward.

5.2 Parallel Breadth-First Search

Find shortest path from one node origin to all other nodes. Each edge has a weight associated. Input key/value pairs have already bean preprocessed into comfortable form.

Input: graph
key: node id
value: distance to origin, list[adjacent node, edge length]

Output: key/value pairs where
key: node id
value: distance to origin, list[adjacent node, edge length]

The answer is:

The algorithm requires multiple iterations. It stops the iteration does not change any ‘distance to origin’. At worst, there will be O(n) iterations where n is a number of nodes in the graph.

Mapper passes original graph to the next iteration as it is. Plus, it generates key/value pair for each adjacent node. The value contains the minimum known distance from origin if the route would go through node.

class MAPPER
  method MAP(node, pair(dist, adjacencylist))
    emit(node, pair(dist, adjacencylist))
    for all (closenode, nodedist) in adjacencylist do
      emit(closenode, pair(dist + nodedist, empty))

Reducer finds the minimum known distance from each node. It passes the distance along with the original graph to the next iteration. It also increments global counter whenever minimum known distance to any node changes.

class REDUCER
  method REDUCE(node, list(dist, adjacencylist))
    minimum = infinity
    previous_iteration_solution = infinity
    original_graph = empty
    for all (dist, adjacencylist) in list do
      if adjacencylist not empty do 
        original_graph = adjacencylist
        previous_iteration_solution = dist
      if minimum > dist
        minimum = dist
    
    if previous_iteration_solution <> minimum
      increment global counter
    emit(node, pair(minimum, original_graph)) 

If the global counter is 0, the algorithm stops. Otherwise another iteration is needed.

Explain page rank algorithm, assume alpha = 0. 

The answer is:

Page rank P(n) of a page n is calculated form page ranks of all pages linking to it.

P(n) = sum_m (P(m)/C(m))

The sum goes through all pages m linking to the page n. C(m) is the number of outgoing links of the page m.

Page rank algorithm runs in iterations. Mapper passes page rank contribution of each page to adjacent pages. Reducer updates page rank of each node. The algorithm stops when page ranks no longer moves.

class MAPPER
  method MAP(page, (page_rank, adjacency_list))
    emit(page, (0, adjacency_list))
    contribution = page_rank/adjacency_list.length
    for all node in adjacency_list do
      emit(node, (contribution, empty))

class REDUCER
  method REDUCE(page, contributions[c1, c2, ..., cn])
    rank = 0
    adjacency_list = new list
    for all c in contributions do
      adjacency_list.addAll(c.adjacency_list)
      rank = rank + c.contribution 

    emit(page, (rank, adjacency_list))

6 EM Algorithms For Text Processing

I made no questions out of this chapter.

Exercises
This chapter contains hands-on exercises for MapReduce. Some of them require multiple iterations.

Warm Up

Count number of occurrences of every word in a text collection.

Input:
key: document id,
value: text document.

Output:
key: word,
value: number of occurences.

The answer is:

Intermediate pairs:
key: word
value: integer - how many times was the word seen in the input.

class MAPPER
  method MAP(docid a, doc d)
    for all term w in doc d do
      emit(w, 1)

class COMBINER
  method COMBINE(word w, counts[c1, c2, ..., cn])
    s = 0
    for all c in counts[c1, c2, ..., cn] do 
      s = s + c

    emit(word w, s)

class REDUCER
  variable total_occurrences = 0

  method REDUCE(word w, counts[c1, c2, ..., cn])
    s = 0
    for all c in counts[c1, c2, ..., cn] do 
      s = s + c

    emit(word w, s)

Alternative solution would use in-mapper combining.  

Web Store

Website user log contains user ids and length of each session. The website has modest number of registered users. Compute the average session length for each user.

Input:
key: user id,
value: session length.

Output:
key: user id,
value: average session length.

The answer is:

As the number of registered users is modest, we can use in-mapper combining.

class MAPPER
  variable total_time = new hash map 
  variable sessions_number = new hash map 

  method MAP(user_id, session_length)
    total_time(user_id) = total_time(user_id) + session_length
    sessions_number(user_id) = sessions_number(user_id) + 1

  method CLOSE 
    for all user_id in total_logged_in_time
      tt = total_time(user_id)
      sn = sessions_number(user_id)
      emit(user_id, pair(tt, sn))    

class REDUCER
  method REDUCE(user_id, [pairs(time, sessions_number)])
    total_time = 0
    total_sessions = 0
    for all pairs in [pairs(time, sessions_number)] do 
      total_time = total_time + time
      total_sessions = total_sessions + sessions_number

    emit(user_id, total_time/total_sessions)

Web store log contains user id and bought item for each sale. You need to implement “buyers of item also bought” functionality. Whenever the item is shown, the store will suggest five items most often bought by items buyers.

Input:
key: user id,
value: brought item.

Output:
key: item,
value: list of five most common "buyers of item also bought" items.

The answer is:

Our solution has two iterations. First iteration generates lists of all items brought by the same user. Grouping is done by the framework, both mapper and reducer perform an identity function.

Input:
key: user id,
value: brought item.

Output:
key: user id,
value: list of all brought items.

class MAPPER
  method MAP(user_id, item)
    emit(user_id, item)

class REDUCER
  method REDUCE(user_id, items[i1, i2, ..., in])
    emit(user_id, items)

Second iteration solves co-occurrences problem on list items. It uses the stripes approach. Only difference against the standard solution is that we have emit only five most common co-occurrences.

Input:
key: user id,
value: list of all brought items.

Output:
key: item,
value: list of five most common co-occurrences.

class MAPPER
  method MAP(user_id, items[i1, i2, ..., in])
    for all item in items do
      H = new hash map
      for all item j in items do
        H(j) = H(j) + 1
      emit(item, H)

class REDUCER
  method REDUCE(item, stripes[H1, H2, ..., Hn])
    T = new hash map
    for all H in stripes do
      for all (key/value) in H do
        T(key) = T(key) + value
    emit(user_id, max_five(T))

Web store log contains user id, timestamp, item and number of brought pieces for each sale. The store is looking for items whose sales rise or decline at the same time. Find 20 item couples with maximum of such months.

Input:
key: user id,
value: timestamp, brought item, count.

Output:
key: item, item
value: number of months when both items sales rose or decline.
#: the output contains 20 key/value pairs with maximum value

The answer is:

Our solution requires multiple MapReduce iterations. We have to:

  • calculate whether items sales for any given month went up or down,
  • create lists of items with the same sales change during the same month,
  • find number of co-occurrences in those lists,
  • choose items with maximum co-occurrences.

First iteration calculates sales changes for any given month. We have to supply mapper, partitioner, custom sort and reducer. Mapper generates one intermediate key/value pair for each input key/value. Key is composed of sold item and sales month. Value contains number of sold pieces.

Partitioner sends all key/value pairs with the same item to the same reducer. Custom sort sorts them by months. Finally, reducer calculates sales changes.

Input:
key: user id,
value: timestamp, item, count.

Intermediate key/values:
key: item, month
value: count.

Output:
key: month, up/down/equal
value: item.

class MAPPER
  method MAP(user_id, (timestamp, item, count))
    month = get_month(timestamp) 
    emit((item, month), count)

class PARTITIONING_COMPARATOR
  method compare(key (item1, month1), key (item2, month2))
    if item1 = item2 
      return keys are equal

    return keys are different

class SORTING_COMPARATOR
  method compare(key (item1, month1), key (item2, month2))
    if item1 = item2 do
      return compare(month1, month2)
         
    return compare(item1, item2)

class REDUCER
  method REDUCE((item, month), counts[c1, c2, ..., cn])
    c = sum([c1, c2, ..., cn])
    if last_item = item
      if last_month + 1 = month
        //emit correct up/down/equal flags
        if last_count < count
          emit((item, month), up)
        if last_count > count
          emit((item, month), down)
        if last_count = count
          emit((item, month), equal)
      else
        //no sales during some months
        emit((item, last_month + 1), down)
        emit((item, month), up)
    else 
      // new item
      emit((last_item, last_month + 1), down)
      emit((item, month), up)

    last_item = item
    last_count = count
    last_month = month

Second iteration groups first iteration results by keys. It generates lists of items with same sales changes during the same month. Framework does all the work. Both mapper and reducer perform an identity function.

Input:
key: month, up/down/equal
value: item.

Output:
key: month, up/down/equal
value: [items].

Third iteration performs standard ‘co-occurrences by pairs’ algorithm.

Input:
key: month, up/down/equal
value: [items].

Intermediate key/values:
key: item, item
value: partial number of co-occurrences.

Output:
key: item, item
value: number of months when both items sales rose or decline.
#: the output contains all items couples

class MAPPER
  method MAP((month, change), items[i1, i2, ..., in])
    for each i in items do 
      for each j in items do
        if i != j 
          emit((i, j), 1) 

class COMBINER
  method COMBINE((item1, item2), co-occurrences[c1, c2, ..., cn])
    s = 0
    for all c in co-occurrences[c1, c2, ..., cn] do 
      s = s + c

    emit((item1, item2), s)

class REDUCER
  method REDUCE((item, item), co-occurrences[c1, c2, ..., cn])
    s = 0
    for all c in co-occurrences[c1, c2, ..., cn] do 
      s = s + c

    emit((item1, item2), s)

Finally, we have to choose 20 key/value pairs with maximum value. Each mapper selects 20 key/value pairs with maximum value and emits them with the same key. There will be only one reducer which selects final 20 key/value pairs.

Input:
key: item, item
value: number of months when both items sales rose or decline.
#: the output contains all items couples

Intermediate key/values:
key: 1
value: item, item, number of months when both items sales rose or decline.
#: the output contains 20 key/value pairs with maximum value for each mapper

Output:
key: item, item
value: number of months when both items sales rose or decline.
#: the output contains 20 key/value pairs with maximum value

the code is very simple but long

Criminal Agency

Inputs to all exercises in this chapter uses the same data structure.

Criminal agency stole Facebook’s friendships database and wants to analyze new data. Friendships are stored in form key/value pairs, each friendship corresponds to two key/value pairs:

Friends:
key: first friend name
value: second friend name

key: second friend name
value: first friend name

The agency owns also criminal records of all citizens:

Criminal record:
key: citizen name
value: when, where, accomplices, description

Find at risk youths. A person is considered at risk youth if more than half of his friends have criminal record.

The answer is:

Our solution has two iterations. First iteration joins two sets and flags each ‘value friend’ with has_record/law_abiding flags.

Output:
key: first friend
value: second friend, has_record/law_abiding

The mapper flags each key with data set name. Partitioner groups data according to names in keys and sorter puts criminal records before friendships. We could use local aggregation to remove multiple criminal records for the same person.

class MAPPER
  method MAP(name, value)
    if value is name do
      emit(name, friendship, item)
    else
      emit(name, criminal, item)

class PARTITIONING_COMPARATOR
  method compare(key (name1, dataset1), key (name2, dataset2))
    if name1 = name2
      return keys are equal
 
    return keys are different
 
class SORTING_COMPARATOR
  method compare(key (name1, dataset1), key (name2, dataset2))
    if name1 = name2 AND dataset1 is criminal
      return key1 is lower

    if name1 = name2 AND dataset2 is criminal
      return key2 is lower

    return compare(name1, name2)

class REDUCER
  variable previous_name

  method REDUCE(pair(name, flag), items[i1, i2, ..., in])
    if flag is criminal do 
      previous_name = name
      has_record = criminal
      return 

    if previous_name <> name do 
      has_record = law_abiding
    else 
      has_record = criminal

    previous_name = name
    for all i in items do
      emit(i.name, pair(name, has_record))

Second iteration counts both total number of friends and number of friends with criminal record. Reducer emits key/value pairs only for at risk youths. Also this iteration could use some kind of local aggregation.

Intermediate key/value:
key: name
value: total friends, total friend criminals
# totals are relative only to in data sets subsets

Output:
key: name
value: empty
# only at risk youths

class MAPPER
  method MAP(name, pair(name, has_record))
    if has_record is law_abiding do
      emit(name, pair(0, 1))
    else
      emit(name, pair(1, 1))

class REDUCER
  method REDUCE(name, items[pair(total, criminals)])
    total = 0
    criminals = 0
    for all i in items do
      total = total + i.total
      criminals = criminals + i.criminals

    if criminals / total > 0.5 do
      emit(name, empty) 

Find gangs. Gang is a group of people that:

  • has exactly 5 members,
  • each member is friend with all other members,
  • each two members committed at least 3 crimes together.
The answer is:

Again, we need three iterations. The idea is to first clean up the graph of all useless edges, so that only criminal contacts remain. Then, we split graph into smaller manageable sub-graphs. We attach all criminal contacts and edges between them to each person:

Last iteration reducers input:
key: person
values: all his criminal contacts and relationships between them.

Final reducer takes smaller graphs represented by value in each key/value pair and finds complete sub-graphs with 4 vertices in it. Add person from the key in it, and you have found a complete sub-graph with 5 vertices. The reducer may use any polynomial algorithm.

First iteration uses pairs approach to clear the graph. We omit both local aggregation and removal of duplicities. Both would make the algorithm more efficient.

Intermediate key/values:
key: first friend, second friend, friendship/accomplice
value: 1

Output:
key: first friend, second friend
value: empty
# only friends with common criminal record

class MAPPER
  method MAP(name, value)
    if value is name do
      emit(triple(name, value, friendship), empty)
    else
      for all crime_accomplice in value.accomplices do
        emit(triple(name, crime_accomplice, accomplice), 1)

class PARTITIONING_COMPARATOR
  method compare(key (name1, accomplice1, flag1), key (name2, accomplice2, flag2))
    if name1 = name2 AND accomplice1 = accomplice2
      return keys are equal
 
    return keys are different
 
class SORTING_COMPARATOR
  method compare(key (name1, accomplice1, flag1), key (name2, accomplice2, flag2))
    if name1 = name2 AND accomplice1 AND flag1 is friendship
      return key1 is lower

    if name1 = name2 AND accomplice1 AND flag2 is friendship
      return key2 is lower

    return compare(pair(name1, accomplice1), pair(name2, accomplice2))

class REDUCER
  variable previous_name
  variable previous_accomplice

  method sameAsPrevious(name, accomplice) 
    if previous_name <> name
      return false

    if previous_accomplice <> accomplice
      return false

    return true

  method REDUCE(triple(name, accomplice, flag), items[i1, i2, ..., in])
    if sameAsPrevious(name, accomplice) do 
      if items.length > 2 do 
        emit(name, accomplice)
      return

    if flag is friendship do 
      previous_name = name
      previous_accomplice = accomplice

Second iteration attaches lists of all ‘second degree’ friends to edges:

Input
key: first friend, second friend
value: empty
Intermediate key/values:
key: first friend
value: first friend, second friend

key: second friend
value: first friend, second friend

Output:
key: first friend, second friend
value: all friends of second friend

key: second friend, first friend
value: all friends of first friend

class MAPPER
  method MAP((first friend, second friend), empty)
    emit(first friend, (first friend, second friend))
    emit(second friend, (first friend, second friend))

class REDUCER
  method REDUCE(name, edges[e1, e2, ..., en])
    friends = new Set
    friends.add(name)

    for all edge in edges do
      friends.add(edge.v1, edge.v2)

    for all edge in edges do
       emit(edge, friends)      

Finally, mapper and shuffle and sort phase together generate lists of all friends of any given person and relationships between them.

Input
key: friend 1, friend 2
value: all friends of friend 2

Intermediate key/values:
key: friend 1
value: friend 2, all friends of friend 2

Reducers input (after shuffle and sort):
key: person
values: all his friends and relationships between them.

Output:
key: first friend, second friend, third friend, fourth friend, fifth friend
value: gang

class MAPPER
  method MAP((friend , friend 2), all friends of second friend)
    emit(friend 1, (friend 2, all friends of friend 2))

class REDUCER
  method REDUCE(name, graph attached to it)
    any polynomial algorithm will work

Reference: MapReduce Questions and Answers from our JCG partner Maria Jurcovicova at the This is Stuff blog.

Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button