Join the social network of Tech Nerds, increase skill rank, get work, manage projects...
 
  • Hadoop Mapper Utility Class

    • 0
    • 2
    • 1
    • 2
    • 0
    • 0
    • 0
    • 0
    • 682
    Comment on it

    In example I am going to demonstrate how to load a file from Hadoop Distributed Cache. Here I am writing Mapper and Driver class, inside of Mapper class we have define input type key vale pairs and output type key value pairs.

    Inside driver class we have also define resources initialization information.

    public class LoadFileDistributedCachePidUtilsMapperDemo {
    
    
    
    //Mapper
    
    //This Mapper class takes input as Key "NullWritable" and value will be "BytesWritable" and output will be this mapper class Key as a "NullWritable" and value "Text"
    public static class PidExtractionMapper extends   Mapper<NullWritable, BytesWritable, NullWritable, Text> {
    
    //we have declared class Level Variable
        private NullWritable noKey = NullWritable.get();
        private Configuration conf;
        private Text outputValue= new Text();
        private String fileName;
        private List<String> locpid;
        private Text locationAsXml=new Text();
        private XML xml;
        private String locxml;
        private String json;
        //private final static IntWritable one = new IntWritable(1);
    
    
    //Hadoop Framwork Called below method once at the beginning of the task.as per our need it will be load "LPIDFILE" in hdfs Distributed Cache.so that we can read file and perform business logic
        @Override
        public void setup(Context context) throws IOException,InterruptedException{
    
            this.conf= context.getConfiguration();
    
            String fileName= FileUtils.getFilePathFromDistributedCache("LPIDFILE");
    
    
       }
    
    
    //this is Mapper class logic where we read file data and check if record is exist then pass inside of written JSON utils class so that it will write data into JSON format.
        public void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
    
            Location location= ExtractionUtilHelperResources.getLocationFromSerializedObject(value);
    
            String locationPId= location.getPublishedId();
    
            if((location==null) && (!locpid.contains(location.getPublishedId()))) return;
    
    
              System.out.println("Mapper Value: "+locationPId);
    
               json=JSONUtils.getJSONStringFromObject(location);
    
                if(json==null){
                     json=locationPId;
                }
                context.write(NullWritable.get(), new Text(json));
            }
    }
    
    
    
    
        public static void main(String [] args) throws Exception{
    
    //create configuration
    
            Configuration conf= new Configuration();
    
    //going to submit new hadoop job
            Job hadoopJob = new Job(conf, "Extraction Utility class");
    // assigned new name to Job Tracker
            hadoopJob.setJobName("Extraction Utility Job");
    
    
            hadoopJob.setJarByClass(LoadFileDistributedCachePidUtilsMapperDemo.class);
    
    //mapper class name
            hadoopJob.setMapperClass(PidExtractionMapper.class);
    
            //specify input file format 
           hadoopJob.setInputFormatClass(SequenceFileInputFormat.class);
    
    
    
            hadoopJob.setOutputFormatClass(TextOutputFormat.class);
    
    
    
    //Specify Key
            hadoopJob.setMapOutputKeyClass(NullWritable.class);
    
    //Specify value
            hadoopJob.setMapOutputValueClass(Text.class);
    
    //having no reducer task
            hadoopJob.setNumReduceTasks(0);
    
    //first argument specify HDFS input file Location 
            FileInputFormat.addInputPath(hadoopJob,new Path (args[0]));
    //second argument specify  HDFS Output file Location
            FileOutputFormat.setOutputPath(hadoopJob,new Path (args[1]));
    
    //third argument specify HDFS Distributed cache file Location
           FileUtils.loadFiletoDC((args[2]),"LPIDFILE",hadoopJob.getConfiguration());
    
             hadoopJob.waitForCompletion(true);
    
    
        }
    
    
    }
    

 0 Comment(s)

Sign In
                           OR                           
                           OR                           
Register

Sign up using

                           OR                           
Forgot Password
Fill out the form below and instructions to reset your password will be emailed to you:
Reset Password
Fill out the form below and reset your password: