Sunday, May 13, 2012


Per-Split Semi-Join

Problem
Join a table R (customer table) and a table L (log table) on joining key CustID using MapReduce
architecture. 


Motivation
One problem with semi-join is that not every record in the filtered version of R will join with a particular
 split Li of L. The per-split semi-join is designed to address this problem[1]. The network overhead is
reduced by only sending the RLi to the mapper which holds Li instead of sending the whole filtered
version of R.

Dataset
The dataset for per-split-semi-join should be different from the dataset for sem-join. Because for some
dataset, we can't see much advantage of per-split-semi-join over semi-join. For example, in the dataset
for semi-join, we have two tables Transaction.txt and Customer.txt, both having 20,000 records and
Transaction.txt referencing the customers whose ID are from 1 to 200. Using per-split-semi-join, we'll
have 4 files L0.uk, L1.uk, L2.uk and L3.uk (each mapper read 5000 lines from Transaction.txt, need
 4 mappers) at the end of the first phase both having CustID from 1 to 200. It may not be ideal for the 4
files Li.uk to have totally the same content, it's better for them to have some different CustID's. Thus in
 order to generate the desire result of the first phase, we need to reduce the records in Transaction.txt to
 400 and  let it still reference the customers whose ID is from 1 to 200. In this case, it is very likely that
 the Li.uk files will have different CustID's. 

Workflow of MapRedue Jobs




Tips & Experience
1) If you're running Map-only job, you MUST EXPLICITLY set the number of the reducer to be 0. If
 you do not do this, the Hadoop will execute reduce tasks anyway, even if you didn't write any reduce
 code. Whether you set the number of reducers to 0 has effect to the file output. Take our
per-split-semi-join as an example, in the first phase, we need to generate some Li.uk files, each one
corresponds to a split of L processed by a mapper. If we don't tell hadoop explicitly that we don't need
 reducers, the final output is actually the files generated by reducers, which is not the result we desire. !!!
2) How to write close function. !!!
Sometimes you'll need a close function after map function to get some extra work done in the map
phase. As we know, the output of map function is store in the data structure OutputCollector<..., ...>
output, which will be taken as an input of reduce function. We need to store the result of close function
 into the data structure OutputCollector<..., ...> output, which is a parameter of map function. To
achieve this, we need to declare a variable, say outputClose, in the Mapper class, and assign the output
 to outputClose. These two variables points to the same memory location. Then we reference
outputClose in the close function. Thus the result of close function would be stored at the same location
 as output, which is then accessed by the reduce function. 
3) How mappers partition a single input file? !!!
When no partition is specified, the mappers will partition the input file using range partitioning. In our
 experiment, we split a file of 400 lines using two mappers. After running, one mapper output the first
 202 lines and the other one outputs the rest. A single file is approximately evenly partitioned within
 mappers. 
4) How mappers partition multiple input files? !!!
If there are multiple input files, the result of partitioning depends on how many lines in each files. We
 performed two experiments. In the first one, we use two files, both having 400 lines. When the
number of mappers is not specified (note  we have two datanodes), the output are two files. The first
one containing all the lines from input file 1, and another one containing all the lines from input file 2.
In the second experiment, we use two files, one having 400 lines and the other having 100 lines.
When the number of mappers is not specified, there will be three output files, part-00000 having the
 first 255 lines from input file 1, part-00001 having the rest lines from input file 1, and part-00002
having all the lines from input file 2. Clearly if no partitioning method is specified, hadoop uses its
own algorithm to partition input files depending on the number of datanode, the number of input files,
 and the number of lines in each file. 
5) For some MapReduce job, we need the partitioning method in the one Map function to be the same
 as that in some other Map functions. Take Per_Split_Semi_Join as an example, we want the
partitioning method in the first Map function to be the same as that in the last Map function.One way to
 achieve this is to specify the number of lines read by each mapper. To achieve this, you need to use
the class NLineInputFormat to specify the number of lines read by each mapper. !!!

How to run the program
(1) Delete previously generated output folders
(2) Compile
(3) Make jar
(4) Run

Classes Used
BlockLocation (data block information)
DatanodeInfo

Files Attached
  • GenerateDataset.java - Source code to generate input files
  • Per_Split_Semi_Join.java
  • Input files
  • Output files at each phase

No comments:

Post a Comment