Saturday, June 22, 2013

Hadoop load third party libraries

As we see programmer always create utils libraries to be used with all relevant projects, To reduce
Development time. But Hadoop ecosystem where computation is carried out on cluster of N-number of host. we have following options to load those libs to be used by our application:
  1.  We can copy utils libraries to some specific location on all hosts, but It is not possible to copy such utils libraries to all hosts to run MapReduce task that will make maintenance too cumbersome. Or 
  2. We can embed utils libraries in application itself, That causes increase in size of application and also maintaining library versioning.  Or
  3. We can copy utils libraries in DtributedCache and utilize them. This approach will overcome isses in above mentioned method.                       
Now how can we accomplish this with 3rd method :
STEP  1:
hdfs dfs -copyFromLocal YourLib.jar /lib/YourLib·jar
STEP 2: Now add cached library to apllication by  
                  DistributedCache.addFileToClassPath(new Path("/lib/YourLib.jar.jar"), job);


That's it.

Sample Application to demonstrate : MatrixTranspose


   

Thursday, June 20, 2013

FindBugs

we always see there is a complete team of developer works on single project. it is difficult maintain quality of code or always follow best practices. FindBugs is the right tools for the same goal.

FingBug is useful to detect bed practices like

  • Method might ignore exception.
  • Method might drop exception.
  • Comparison of String parameter using == or !=.
  • Comparison of String objects using == or !=.
  • Class defines compareTo(...) and uses Object.equals().
  • Finalizer does not call superclass finalizer.
  • Format string should use %n rather than \n. 

Wednesday, June 19, 2013

Hadoop load Native libraries

How to load Native third party libraries in Hadoop?

Some time we might need some native libraries for performance gain in java. Problem with native libraries is they are platform dependent. In java we can do it either by System.load() or System.loadLibrary().  But hadoop is run on heterogeneous cluster, means we might have many plaforms(Linux, MacOSX, etc...).  And  the basic question is how to overcome this?

Answer is:
You can load any native shared library using DistributedCache for distributing and symlinking the library files.
This example shows you how to distribute a shared library, mylib.so, and load it from a MapReduce task.
  1. First copy the library to the HDFS: bin/hadoop fs -copyFromLocal mylib.so.1 /libraries/mylib.so.1
  2. The job launching program should contain the following: DistributedCache.createSymlink(conf); DistributedCache.addCacheFile("hdfs://host:port/libraries/mylib.so. 1#mylib.so", conf);
  3. The MapReduce task can contain: System.loadLibrary("mylib.so");
Note: If you downloaded or built the native hadoop library, you don’t need to use DistibutedCache to make the library available to your MapReduce tasks.  

Tuesday, June 18, 2013

ChainMapper


ChainMapper allows to create multiple Mapper classes in single Map task. Where output of one mapper will be chained to next mapper. This can be useful for synchronization such as barrier in OpenCl.

This enables having reusable specialized Mappers that can be combined to perform composite operations within a single task.

Special care has to be taken when creating chains that the key/values output by a Mapper are valid for the following Mapper in the chain. It is assumed all Mappers and the Reduce in the chain use maching output and input key and value classes as no conversion is done by the chaining code.

Using the ChainMapper and the ChainReducer classes is possible to compose Map/Reduce jobs that look like [MAP+ / REDUCE MAP*]. And immediate benefit of this pattern is a dramatic reduction in disk IO.

IMPORTANT: There is no need to specify the output key/value classes for the ChainMapper, this is done by the addMapper for the last mapper in the chain

Now will see how to use ChainMapper in over example to convert input to UpperCase and count the number of occurrence of each word in input.
Sample input:

sample.txt
The ChainMapper class allows to use multiple Mapper
classes within a single Map task.
The Mapper classes are invoked in a chained ( or piped ) fashion,
the output of the first becomes the input of the second,
and so on until the last Mapper,
the output of the last Mapper will be written to the task's output.
TokenizerMapper.java
public class TokenizerMapper extends Mapper< LongWritable, Text, Text, LongWritable > {
    private Text word = new Text();
    private LongWritable count = new LongWritable(1l);

    @Override
    public void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] tokens = value.toString().split(" ");
        for (String token : tokens ) {
            if(token.isEmpty())
                return;
            word.set(token);
            context.write(word, count);
        }
    }
}

UppercaseMapper.java
public class UppercaseMapper extends Mapper< Text, LongWritable, Text, LongWritable > {

    @Override
    public void map (Text word, LongWritable count, Context context) throws IOException, InterruptedException {
        String uppercaseString = word.toString().toUpperCase();
        word.set(uppercaseString);
        context.write(word, count);
    }
}

ChainMapperDriver.java
public class ChainMapperDriver extends Configured implements Tool { 

    static int printUsageAndExit() {
        ToolRunner.printGenericCommandUsage(System.out);
        return -1;
    }

    @Override
    public int run(String[] args) throws Exception {

        if (args.length != 2) {
            printUsageAndExit();
        }

        Job job = Job.getInstance(getConf());
        job.setJobName("Word_ToUppercase_Count");

        Path inPath = new Path(args[0]);
        Path outPath = new Path(args[1]);
        FileSystem fileSystem = FileSystem.get(outPath.toUri(), getConf());
        fileSystem.delete(outPath, true);

        TextInputFormat.setInputPaths(job, inPath);
        TextOutputFormat.setOutputPath(job, outPath);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        Configuration tokenizerMapperConfig = new Configuration(false);
        ChainMapper.addMapper(job, TokenizerMapper.class,
                LongWritable.class, Text.class,
                Text.class, LongWritable.class,
                tokenizerMapperConfig);

        Configuration uppercaseMapperConfig = new Configuration(false);
        ChainMapper.addMapper(job, UppercaseMapper.class,
                Text.class, LongWritable.class,
                Text.class, LongWritable.class,
                uppercaseMapperConfig);

        job.setReducerClass((new LongSumReducer< Text >()).getClass());

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new ChainMapperDriver(), args));
    }
}
Output:
(    1
)    1
A    2
ALLOWS    1
AND    1
ARE    1
BE    1
BECOMES    1
CHAINED    1
CHAINMAPPER    1
CLASS    1
CLASSES    2
FASHION,    1
FIRST    1
IN    1
INPUT    1
INVOKED    1
LAST    2
MAP    1
MAPPER    3
MAPPER,    1
MULTIPLE    1
OF    3
ON    1
OR    1
OUTPUT    2
OUTPUT.    1
PIPED    1
SECOND,    1
SINGLE    1
SO    1
TASK'S    1
TASK.    1
THE    10
TO    2
UNTIL    1
USE    1
WILL    1
WITHIN    1
WRITTEN    1