In example I am going to demonstrate how to load a file from Hadoop Distributed Cache.
Here I am writing Mapper and Driver class, inside of Mapper class we have define input type key vale pairs and output type key value pairs.
Inside driver class we have also define resources initialization information.
public class LoadFileDistributedCachePidUtilsMapperDemo {
//Mapper
//This Mapper class takes input as Key "NullWritable" and value will be "BytesWritable" and output will be this mapper class Key as a "NullWritable" and value "Text"
public static class PidExtractionMapper extends Mapper<NullWritable, BytesWritable, NullWritable, Text> {
//we have declared class Level Variable
private NullWritable noKey = NullWritable.get();
private Configuration conf;
private Text outputValue= new Text();
private String fileName;
private List<String> locpid;
private Text locationAsXml=new Text();
private XML xml;
private String locxml;
private String json;
//private final static IntWritable one = new IntWritable(1);
//Hadoop Framwork Called below method once at the beginning of the task.as per our need it will be load "LPIDFILE" in hdfs Distributed Cache.so that we can read file and perform business logic
@Override
public void setup(Context context) throws IOException,InterruptedException{
this.conf= context.getConfiguration();
String fileName= FileUtils.getFilePathFromDistributedCache("LPIDFILE");
}
//this is Mapper class logic where we read file data and check if record is exist then pass inside of written JSON utils class so that it will write data into JSON format.
public void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
Location location= ExtractionUtilHelperResources.getLocationFromSerializedObject(value);
String locationPId= location.getPublishedId();
if((location==null) && (!locpid.contains(location.getPublishedId()))) return;
System.out.println("Mapper Value: "+locationPId);
json=JSONUtils.getJSONStringFromObject(location);
if(json==null){
json=locationPId;
}
context.write(NullWritable.get(), new Text(json));
}
}
public static void main(String [] args) throws Exception{
//create configuration
Configuration conf= new Configuration();
//going to submit new hadoop job
Job hadoopJob = new Job(conf, "Extraction Utility class");
// assigned new name to Job Tracker
hadoopJob.setJobName("Extraction Utility Job");
hadoopJob.setJarByClass(LoadFileDistributedCachePidUtilsMapperDemo.class);
//mapper class name
hadoopJob.setMapperClass(PidExtractionMapper.class);
//specify input file format
hadoopJob.setInputFormatClass(SequenceFileInputFormat.class);
hadoopJob.setOutputFormatClass(TextOutputFormat.class);
//Specify Key
hadoopJob.setMapOutputKeyClass(NullWritable.class);
//Specify value
hadoopJob.setMapOutputValueClass(Text.class);
//having no reducer task
hadoopJob.setNumReduceTasks(0);
//first argument specify HDFS input file Location
FileInputFormat.addInputPath(hadoopJob,new Path (args[0]));
//second argument specify HDFS Output file Location
FileOutputFormat.setOutputPath(hadoopJob,new Path (args[1]));
//third argument specify HDFS Distributed cache file Location
FileUtils.loadFiletoDC((args[2]),"LPIDFILE",hadoopJob.getConfiguration());
hadoopJob.waitForCompletion(true);
}
}
0 Comment(s)