• Post Reply Bookmark Topic Watch Topic
  • New Topic

Java Hadoop  RSS feed

 
Prats Shah
Greenhorn
Posts: 9
  • Mark post as helpful
  • send pies
  • Quote
  • Report post to moderator
Compiler is not even going inside Reducer's code. Can I know why is this happening?

package counter;

import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Iterator;
import java.util.Scanner;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {
public static class Map extends
Mapper<LongWritable, Text, Text, Text>
{

private Text word = new Text();

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
int positiveCount=0,negativeCount=0,neutralCount=0,tokenCount=0;
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);

Scanner scanner1; // For positive words file
Scanner scanner2; //For negative words File
String timeStamp="";
while (tokenizer.hasMoreTokens())
{

scanner1 = new Scanner(new File("/home/hadoop/Downloads/positive.txt")); //Path

scanner2 = new Scanner(new File("/home/hadoop/Downloads/negative.txt")); //Path

String tempToken=tokenizer.nextToken();

tokenCount++;

if(tokenCount<4 || (tokenCount>=5 && tokenCount<=6)) // Because 4th Token is TimeStamp and from 7th onwards its Tweets
continue;
if(tokenCount==4) // Gives TimeStamp
timeStamp=tempToken;
if(tokenCount>=7) // Tweets starts from 7th Token
{
neutralCount++;

while (scanner1.hasNextLine()) // For Positive words File
{
String nextLine1 = scanner1.nextLine();
if (nextLine1.equalsIgnoreCase(tempToken))
{
positiveCount++;
neutralCount--; // To neutralise above increment
break; // If word fetching successful break
}
}
while (scanner2.hasNextLine()) // For Negative words File
{
String nextLine2 = scanner2.nextLine();
if (nextLine2.equalsIgnoreCase(tempToken))
{
negativeCount++;
neutralCount--; // To neutralise above increment
break; // If word fetching successful break
}
}
}

}
String ts=""; // Temporary TimeStamp to truncate seconds ( 12:48:53=>12:48 )
StringTokenizer tt=new StringTokenizer(timeStamp,":"); // Temporary Tokenizer for ts
// System.out.println(timeStamp+" "+positiveCount+" "+neutralCount+" "+negativeCount);
try
{
ts=tt.nextToken()+":"+tt.nextToken(); // 12:48

}
catch(Exception ex)
{

}

word.set(ts);

value=new Text(Integer.toString(positiveCount)+"."+Integer.toString(neutralCount)+"."+Integer.toString(negativeCount));
// Value=>2,2,1 (Means 2 Positive, 2 Neutral and 1 Negative)

//System.out.println(value);

context.write(word, value);// Mapper writing Key Value to shared area
}

}
public static class Reduce extends
Reducer<Text, Iterator<Text>, Text, Text> {

public void reduce(Text key, Iterator<Text> values,
Context context) throws IOException, InterruptedException
{

int positive=0,negative=0,neutral=0; //Positive,Neutral and Negative Count
int ps,ng,nt; // Temporary Positive Neutral and Negative
String temp="";

while(values.hasNext())
{
String tempValue=values.next().toString(); // Temporary String from values
StringTokenizer tempToken=new StringTokenizer(tempValue,".");//Separated based on '.' See Mapper's Output

ps=Integer.valueOf(tempToken.nextToken()); //Temporary for comparison
nt=Integer.valueOf(tempToken.nextToken()); //Temporary for comparison
ng=Integer.valueOf(tempToken.nextToken()); //Temporary for comparison

if(ps>ng)
positive++;
else if (ng>ps)
negative++;
else
neutral++;

}
// Now positive, neutral and negative gives total counts for same Keys as for example 12:48 12,3,7

if(positive>negative)
temp="1.0.0"; // Finally Maximum Positive Tweets at specific TimeStamp
else if(negative>positive)
temp="0.0.1"; // Finally Maximum Negative Tweets at specific TimeStamp
else
temp="0.1.0"; // Finally Maximum Neutral Tweets at specific TimeStamp

//String output=Integer.toString(positive)+"."+Integer.toString(neutral)+"."+Integer.toString(negative);

context.write(key, new Text(temp));

}
}

public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();

Job job = new Job(conf, "wordcount");

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job, new Path("counterinput"));
// Erase previous run output (if any)
FileSystem.get(conf).delete(new Path("counteroutput"), true);
FileOutputFormat.setOutputPath(job, new Path("counteroutput"));

job.waitForCompletion(true);
}

}
 
Hussein Baghdadi
clojure forum advocate
Bartender
Posts: 3479
Clojure Mac Objective C
  • Mark post as helpful
  • send pies
  • Quote
  • Report post to moderator
Please, don't duplicate your question.

How did you know that the compiler isn't even going through the reducer's code?
 
It is sorta covered in the JavaRanch Style Guide.
  • Post Reply Bookmark Topic Watch Topic
  • New Topic
Boost this thread!