1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
   | void runNewMapper(final JobConf job,                   final TaskSplitIndex splitIndex,                   final TaskUmbilicalProtocol umbilical,                   TaskReporter reporter                   ) throws IOException, ClassNotFoundException,                            InterruptedException {      org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =     new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,                                                                  getTaskID(),                                                                 reporter);      org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =     (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)       ReflectionUtils.newInstance(taskContext.getMapperClass(), job);      org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =     (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)       ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);      org.apache.hadoop.mapreduce.InputSplit split = null;   split = getSplitDetails(new Path(splitIndex.getSplitLocation()),       splitIndex.getStartOffset());   LOG.info("Processing split: " + split);
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =     new NewTrackingRecordReader<INKEY,INVALUE>       (split, inputFormat, reporter, taskContext);      job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());   org.apache.hadoop.mapreduce.RecordWriter output = null;         if (job.getNumReduceTasks() == 0) {     output =        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);   } else {     output = new NewOutputCollector(taskContext, job, umbilical, reporter);   }
    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>    mapContext =      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),          input, output,          committer,          reporter, split);
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context        mapperContext =          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(             mapContext);
    try {     input.initialize(split, mapperContext);     mapper.run(mapperContext);     mapPhase.complete();     setPhase(TaskStatus.Phase.SORT);     statusUpdate(umbilical);     input.close();     input = null;     output.close(mapperContext);     output = null;   } finally {     closeQuietly(input);     closeQuietly(output, mapperContext);   } }
   |