Compiler is not even going inside Reducer's code. Can I know why is this happening?
package counter;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Iterator;
import java.util.Scanner;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount {
public static class Map extends
Mapper<LongWritable, Text, Text, Text>
{
private Text
word = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
int positiveCount=0,negativeCount=0,neutralCount=0,tokenCount=0;
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
Scanner scanner1; // For positive words file
Scanner scanner2; //For negative words File
String timeStamp="";
while (tokenizer.hasMoreTokens())
{
scanner1 = new Scanner(new File("/home/hadoop/Downloads/positive.txt")); //Path
scanner2 = new Scanner(new File("/home/hadoop/Downloads/negative.txt")); //Path
String tempToken=tokenizer.nextToken();
tokenCount++;
if(tokenCount<4 || (tokenCount>=5 && tokenCount<=6)) // Because 4th Token is TimeStamp and from 7th onwards its Tweets
continue;
if(tokenCount==4) // Gives TimeStamp
timeStamp=tempToken;
if(tokenCount>=7) // Tweets starts from 7th Token
{
neutralCount++;
while (scanner1.hasNextLine()) // For Positive words File
{
String nextLine1 = scanner1.nextLine();
if (nextLine1.equalsIgnoreCase(tempToken))
{
positiveCount++;
neutralCount--; // To neutralise above increment
break; // If word fetching successful break
}
}
while (scanner2.hasNextLine()) // For Negative words File
{
String nextLine2 = scanner2.nextLine();
if (nextLine2.equalsIgnoreCase(tempToken))
{
negativeCount++;
neutralCount--; // To neutralise above increment
break; // If word fetching successful break
}
}
}
}
String ts=""; // Temporary TimeStamp to truncate seconds ( 12:48:53=>12:48 )
StringTokenizer tt=new StringTokenizer(timeStamp,":"); // Temporary Tokenizer for ts
// System.out.println(timeStamp+" "+positiveCount+" "+neutralCount+" "+negativeCount);
try
{
ts=tt.nextToken()+":"+tt.nextToken(); // 12:48
}
catch(Exception ex)
{
}
word.set(ts);
value=new Text(Integer.toString(positiveCount)+"."+Integer.toString(neutralCount)+"."+Integer.toString(negativeCount));
// Value=>2,2,1 (Means 2 Positive, 2 Neutral and 1 Negative)
//System.out.println(value);
context.write(word, value);// Mapper writing Key Value to shared area
}
}
public static class Reduce extends
Reducer<Text, Iterator<Text>, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
Context context) throws IOException, InterruptedException
{
int positive=0,negative=0,neutral=0; //Positive,Neutral and Negative Count
int ps,ng,nt; // Temporary Positive Neutral and Negative
String temp="";
while(values.hasNext())
{
String tempValue=values.next().toString(); // Temporary String from values
StringTokenizer tempToken=new StringTokenizer(tempValue,".");//Separated based on '.' See Mapper's Output
ps=Integer.valueOf(tempToken.nextToken()); //Temporary for comparison
nt=Integer.valueOf(tempToken.nextToken()); //Temporary for comparison
ng=Integer.valueOf(tempToken.nextToken()); //Temporary for comparison
if(ps>ng)
positive++;
else if (ng>ps)
negative++;
else
neutral++;
}
// Now positive, neutral and negative gives total counts for same Keys as for example 12:48 12,3,7
if(positive>negative)
temp="1.0.0"; // Finally Maximum Positive Tweets at specific TimeStamp
else if(negative>positive)
temp="0.0.1"; // Finally Maximum Negative Tweets at specific TimeStamp
else
temp="0.1.0"; // Finally Maximum Neutral Tweets at specific TimeStamp
//String output=Integer.toString(positive)+"."+Integer.toString(neutral)+"."+Integer.toString(negative);
context.write(key, new Text(temp));
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path("counterinput"));
// Erase previous run output (if any)
FileSystem.get(conf).delete(new Path("counteroutput"), true);
FileOutputFormat.setOutputPath(job, new Path("counteroutput"));
job.waitForCompletion(true);
}
}