Sunday, May 13, 2012


Semi-Join

Problem
Join a table R (customer table) and a table L (log table) on joining key CustID using MapReduce
architecture. 

Motivation
Often, when R is large, many records in R may not be actually referenced by any records in table
L. Consider Facebook as an example. Its user table has hundreds of millions of records. However, an
hour worth of log data likely contains the activities of only a few million unique users and the majority
of the users are not present in this log at all. For broadcast join, this means that a large portion of the
records in R that are shipped across the network (via the DFS) and loaded in the hash table are not used
 by the join. We exploit semi-join to avoid sending the records in R over the network that will not join
with L [1].

Dataset
For the input dataset, I used the code GenerateDataset.java to generate the transaction data,
Transaction.txt, and customer data, Customer.txt. In order to simulate the circumstances that benefit from
 Semi-Join, we need to generate proper size of customer data and log data. Let's generate a Customer.txt
 file with 20,000 records and a Transaction.txt file with the same number of records, with the
Transaction.txt only referencing the customers whose ID is from 1 to 200. Thus only the first 200 (one
hundredth) customers will have their transactions records in Transaction.txt file. This is to simulate the
situation that an hour worth of log data likely contains the activities of only a few thousand unique users
 and the majority of the users are not present in this log at all [1]. The Customer.txt file represent table R
 mentioned in the paper, and the Transaction.txt file represent table L mentioned in the paper.

Workflow of MapRedue Jobs


Bugs Encountered and Solutions 
1) For reducers, the type of the input key-value pair must be the same as the type of the output key-value
 pair. !!!
2) Do not create a output file directory for a MapReduce job if you will provide that as a parameter to
run the jobs, otherwise a "directory already exist" error will be raised. 
3) The character between the key and value of  a key-value pair in Hadoop output files is "\t", not
speces. Thus, when you try to split a line, do not use delimiter " ". Instead, use "\t". !!!
4) No matter what the type of output key-value pairs are, you can't assign null to either of them, e.g.
output.collect(custID, null), otherwise a NullPonterException would be raised.!!!
5) You can't map a key to a null value in hashtables in MapReduce code, e.g. ht.put(custID, null),
otherwise a error java.util.Hashtable.put(Unknown Source) would be raised. !!!

Tips & Experience
1) The difference among mappers, datanodes and map function.
Datanodes are the independent machines in which HDFS are stored and mappers are running. Mappers
 are the processes running in datanodes. A datanode can have one or multiple mappers, but a mapper
 can only reside in one datanode. Map functions are just snippets of code executed by each mapper. 
2) If we don't specify the number of reducers, it would be the same as the number of the datanodes. In
the first phase of semi join, we only need one output file, i.e. L.uk. Because the number of output 
files of a MapReduce Job is the same as the number of Reducers, we need to specify the number
of reducers to be 1
3) If you are going to run a MapReduce that contains multiple jobs, you'd better get these jobs working
 one by one for easier debugging. 

How to run the program
(1) Delete previously generated output folders
(2) Compile
(3) Make jar
(4) Run

Useful Resources
How to read  files in a directory in HDFS using Hadoop filesystem API  
Hadoop File System Java Tutorial

Classes Used
JobConf !!!
FileSystem !!!
Files Attached
  • GenerateDataset.java - Source code to generate input files
  • Semi_Join.java
  • Input files
  • Output files at each phase

Per-Split Semi-Join

Problem
Join a table R (customer table) and a table L (log table) on joining key CustID using MapReduce
architecture. 


Motivation
One problem with semi-join is that not every record in the filtered version of R will join with a particular
 split Li of L. The per-split semi-join is designed to address this problem[1]. The network overhead is
reduced by only sending the RLi to the mapper which holds Li instead of sending the whole filtered
version of R.

Dataset
The dataset for per-split-semi-join should be different from the dataset for sem-join. Because for some
dataset, we can't see much advantage of per-split-semi-join over semi-join. For example, in the dataset
for semi-join, we have two tables Transaction.txt and Customer.txt, both having 20,000 records and
Transaction.txt referencing the customers whose ID are from 1 to 200. Using per-split-semi-join, we'll
have 4 files L0.uk, L1.uk, L2.uk and L3.uk (each mapper read 5000 lines from Transaction.txt, need
 4 mappers) at the end of the first phase both having CustID from 1 to 200. It may not be ideal for the 4
files Li.uk to have totally the same content, it's better for them to have some different CustID's. Thus in
 order to generate the desire result of the first phase, we need to reduce the records in Transaction.txt to
 400 and  let it still reference the customers whose ID is from 1 to 200. In this case, it is very likely that
 the Li.uk files will have different CustID's. 

Workflow of MapRedue Jobs




Tips & Experience
1) If you're running Map-only job, you MUST EXPLICITLY set the number of the reducer to be 0. If
 you do not do this, the Hadoop will execute reduce tasks anyway, even if you didn't write any reduce
 code. Whether you set the number of reducers to 0 has effect to the file output. Take our
per-split-semi-join as an example, in the first phase, we need to generate some Li.uk files, each one
corresponds to a split of L processed by a mapper. If we don't tell hadoop explicitly that we don't need
 reducers, the final output is actually the files generated by reducers, which is not the result we desire. !!!
2) How to write close function. !!!
Sometimes you'll need a close function after map function to get some extra work done in the map
phase. As we know, the output of map function is store in the data structure OutputCollector<..., ...>
output, which will be taken as an input of reduce function. We need to store the result of close function
 into the data structure OutputCollector<..., ...> output, which is a parameter of map function. To
achieve this, we need to declare a variable, say outputClose, in the Mapper class, and assign the output
 to outputClose. These two variables points to the same memory location. Then we reference
outputClose in the close function. Thus the result of close function would be stored at the same location
 as output, which is then accessed by the reduce function. 
3) How mappers partition a single input file? !!!
When no partition is specified, the mappers will partition the input file using range partitioning. In our
 experiment, we split a file of 400 lines using two mappers. After running, one mapper output the first
 202 lines and the other one outputs the rest. A single file is approximately evenly partitioned within
 mappers. 
4) How mappers partition multiple input files? !!!
If there are multiple input files, the result of partitioning depends on how many lines in each files. We
 performed two experiments. In the first one, we use two files, both having 400 lines. When the
number of mappers is not specified (note  we have two datanodes), the output are two files. The first
one containing all the lines from input file 1, and another one containing all the lines from input file 2.
In the second experiment, we use two files, one having 400 lines and the other having 100 lines.
When the number of mappers is not specified, there will be three output files, part-00000 having the
 first 255 lines from input file 1, part-00001 having the rest lines from input file 1, and part-00002
having all the lines from input file 2. Clearly if no partitioning method is specified, hadoop uses its
own algorithm to partition input files depending on the number of datanode, the number of input files,
 and the number of lines in each file. 
5) For some MapReduce job, we need the partitioning method in the one Map function to be the same
 as that in some other Map functions. Take Per_Split_Semi_Join as an example, we want the
partitioning method in the first Map function to be the same as that in the last Map function.One way to
 achieve this is to specify the number of lines read by each mapper. To achieve this, you need to use
the class NLineInputFormat to specify the number of lines read by each mapper. !!!

How to run the program
(1) Delete previously generated output folders
(2) Compile
(3) Make jar
(4) Run

Classes Used
BlockLocation (data block information)
DatanodeInfo

Files Attached
  • GenerateDataset.java - Source code to generate input files
  • Per_Split_Semi_Join.java
  • Input files
  • Output files at each phase

Hadoop Project 1: Join Algorithm (Feb - May 2012)

Team member: David Zheng
Programming language: Java
Hadoop version: 1.0.0
Description: Implement two join algorithms presented in the paper, A Comparison of Join Algorithms for Log Processing
 in MapReduce. The two join algorithms are Semi-Join and Per-Split-Semi-Join. We use a cluster of three nodes, one 
namenode and two datanodes. 

What you will learn from this project

  • For reducers, the type of the input key-value pair must be the same as the type of the output key-value pair.
  • If you're running Map-only job, you MUST EXPLICITLY set the number of the reducers to be 0.
  • The difference among mappers, datanodes and map function.
  • How to write close function after map function.
  • How mappers partition input file/s when no partitioning method is specified.
  • How to specify the number of lines read by each mapper.

[1] Vuk Ercegovac, Jun Rao, Spyros Blanas, Jignesh M. Patel. A Comparison of Join Algorithms for Log 
Processing in MapReduce. SIGMOD '10 Proceedings of the 2010 international conference on Management of
 data.

Hadoop Projects

In this site, I documented a Hadoop project I did during the Spring Term 2012 at WPI. I hope the 
material in this site would be helpful to those who are new to Hadoop. 

Some conventions:
All the prompt commands and code snippet are written in brown. 
Lines ended with !!! are those you should pay extra attention to or are very important resources.