返回信息流我有一个问题:一个mapreduce程序问题,我输入时文本文件,此时默认map输入key就是文本文件中每行的偏移量,map输入value就是每一行数据;我在map和reduce函数中都打印了一些信息方便调试,不过在最后看运行日志是发现根本就没有打印出这些信息,最后reduce的输入文件内容就是输入的文本文件中每行开始加了一个行偏移量, 好像根本就没有执行我的map和reduce函数,直接把输入key,value输出了,不知道哪位大牛能告诉我下到底是哪里出了问题?非常感谢
这是一条镜像帖。来源:北邮人论坛 / java / #22438同步于 2012/5/16
该镜像源已超过 30 天没有更新,可能在源站已被删除。
Java机器人发帖
【求助】关于mapreduce程序,希望大牛给解答
soulpsq
2012/5/16镜像同步8 回复
订阅后,新回复会通过你的通知中心匿名送达。
8 条回复
【 在 aeolus83 的大作中提到: 】
: 上源代码,亲
import java.io.*;
import java.util.Iterator;
import java.util.StringTokenizer;
import java.util.Vector;
//import org.apache.hadoop.conf.Configuration; //程序里没用上这个包
import org.apache.hadoop.fs.Path; //这个类有什么用
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//import org.apache.hadoop.util.GenericOptionsParser; //程序里没用上这个包
//import org.apache.hadoop.mapred.lib.ChainMapper; //在0.20.2中ChainMapper和ChainReducer类在哪个包中
//import org.apache.hadoop.mapred.lib.ChainReducer;
public class AnalysisOfVariance //输入的是矩阵数据
{
//当我第二个job要用这些变量时也可以吧?
public static Vector<DoubleWritable> allvalues = new Vector<DoubleWritable>(); //貌似没有用上allvalues
public static int allnum = 0; //记录一共有多少个数据
public static int rownum = 0; //记录一共有多少行数据,即每个水平中数据的个数
public static int linenum = 0; //记录一共有多少列数据,即一共有多少个水平
public static double allaverage = 0; //记录所有数据的平均值
public static double SST = 0; //总离差平方和 key=-2
public static double SSE = 0; //误差项离差平方和(组内)key=-3,因为SSE不好划分任务,因此选择求SST和SSA,然后利用SSE=SST-SSA来求SSE
public static double SSA = 0; //水平项离差平方和(组间)key=-4
public static double[][] flist = new double[122][122]; //存放α=0.05时的F表,数据严格按照F表坐标进行存放,无穷大的数据存放在flist[121][j]和flist[i][121]出
public static class Map1 extends Mapper<LongWritable, Text, IntWritable, DoubleWritable>
{
public void map1(LongWritable key, Text value, Context context)
throws IOException, InterruptedException //输入为txt文件,输入后自动将每一行作为map输入的一个键值对,
{ //键是该行起始位置相对于文件起始位置的偏移量,这里为无用信息
int level = 0;
double element;
String line = value.toString();
StringTokenizer strtok = new StringTokenizer(line);//StringTokenizer默认分隔符为空格
rownum++;
System.out.println("ni dao shi da yin chu lai a:map1 ");
System.out.println("line:"+line);
System.out.println("key:"+key);
System.out.println("value:"+value);
while(strtok.hasMoreTokens()) //每个map函数的输出key是水平类型,value是每个水平的每个元素的集合
{
element = Double.parseDouble(strtok.nextToken());
System.out.println("element:"+element);
level+=1;
//同时有两个context会不会出问题?
context.write(new IntWritable(level), new DoubleWritable(element)); //用于计算每个的平均值
context.write(new IntWritable(-1), new DoubleWritable(element)); //用于计算整体的平均值,并且整体数组还要保留,暂时想保留在一个全局变量里
allvalues.addElement(new DoubleWritable(element));
}
linenum = level;
level = 0;
}
}
public static class Reduce1 extends Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> //该reduce函数就是求各个水平以及整体的均值,之后的处理,交由下一组map和reduce进行
{
public void reduce1(IntWritable key, Iterator<DoubleWritable> values, Context context)
throws IOException, InterruptedException //可以把不同map输出的键值对的值进行运算吗,应该可以吧?
{
double sum = 0; //values中所有数据的和
double average = 0; //values中所有数据的平均值
int num = 0; //values中的数据的个数
double tempvalue;
System.out.println("ni dao shi da yin chu lai a:reduce1 ");
while(values.hasNext())
{
tempvalue = values.next().get();
sum += tempvalue;
num++;
allnum++;
// if(key == new IntWritable(-1)) //其实job1的输出不需要每个数据,其他的job如果需要数据直接读初始数据文件就可以了,所以注释掉了这段代码
// context.write(key, new DoubleWritable(tempvalue));//把每个数据也输出,后来的job要用,不是很清楚values中每个value是用空格隔开的还是逗号,我就当是逗号了,不是再改
}
average = sum / num; //double类型除以整数类型,直接得的就是double类型的商吧?
if(key != (new IntWritable(-1)))
context.write(key, new DoubleWritable(average));
else
allaverage = average;
sum = 0;
average = 0;
num = 0;
}
}
public static class Map2 extends Mapper<LongWritable, Text, IntWritable, DoubleWritable>
{
public void map2(LongWritable key, Text value, Context context)//map2仍然以初始数据文件未输入文件,就是为了得到各个数据
throws IOException, InterruptedException //输入为txt文件,输入后自动将每一行作为map输入的一个键值对,
{ //键是该行起始位置相对于文件起始位置的偏移量,这里为无用信息
double element2,temp;
String line2 = value.toString();
StringTokenizer strtok = new StringTokenizer(line2);//StringTokenizer默认分隔符为空格
System.out.println("ni dao shi da yin chu lai a:map2 ");
while(strtok.hasMoreTokens()) //每个map函数的输出key是水平类型,value是每个水平的每个元素的集合
{
element2 = Double.parseDouble(strtok.nextToken());
temp = (element2-allaverage)*(element2-allaverage);
context.write(new IntWritable(-2), new DoubleWritable(temp)); //用于计算每个的平均值
}
}
}
public static class Reduce2 extends Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> //该reduce求SST,只设置了一个reduce任务
{
public void reduce2(IntWritable key, Iterator<DoubleWritable> values, Context context)
throws IOException, InterruptedException //可以把不同map输出的键值对的值进行运算吗,应该可以吧?
{
double SST = 0;
System.out.println("ni dao shi da yin chu lai a:reduce2 ");
while(values.hasNext())
SST += values.next().get();
context.write(new IntWritable(-2), new DoubleWritable(SST));
}
}
public static class Map3 extends Mapper<LongWritable, Text, IntWritable, DoubleWritable>
{
public void map3(LongWritable key, Text value, Context context)//map3以job1的输出文件作为输入,是为了得到每个水平(每一列)的平均值
throws IOException, InterruptedException //输入为txt文件,输入后自动将每一行作为map输入的一个键值对,
{ //键是该行起始位置相对于文件起始位置的偏移量,这里为无用信息
int tempkey; //因为tempkey在if语句中,所以如果if语句不成立,特姆坡可以可能不会被使用,因此报了警告
double tempvalue, temp3;
String line3 = value.toString(); //每一行数据就是形如:“1 3.16”
StringTokenizer strtok2 = new StringTokenizer(line3);
System.out.println("ni dao shi da yin chu lai a:map3 ");
if(strtok2.hasMoreTokens())
tempkey = Integer.parseInt(strtok2.nextToken()); //这个读取的是key,我们想要要的是value
if(strtok2.hasMoreTokens())
{
tempvalue = Double.parseDouble(strtok2.nextToken());
temp3 = rownum*(tempvalue-allaverage)*(tempvalue-allaverage);
context.write(new IntWritable(-4), new DoubleWritable(temp3));
}
}
}
public static class Reduce3 extends Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> //该reduce求SSA,只设置了一个reduce任务
{
public void reduce3(IntWritable key, Iterator<DoubleWritable> values, Context context)
throws IOException, InterruptedException //可以把不同map输出的键值对的值进行运算吗,应该可以吧?
{
double SSA = 0;
System.out.println("ni dao shi da yin chu lai a:reduce3 ");
while(values.hasNext())
SSA += values.next().get();
context.write(new IntWritable(-4), new DoubleWritable(SSA));
}
}
public static void main(String[] args) throws Exception
{
/*
*命令行输入参数说明:
*args[0]为第一个job的输入文件在hdfs上的路径,该路径同时也是第二个job的输入路径
*args[1]为第一个job在hdfs上的输出路径,同时也是第三个job的输入路径
*args[2]为第二个job的输出路径
*args[3]为第三个job的输出路径
*/
if(args.length != 4) //这个错误白痴啊!!!!!!怎么就写成 !=2 了啊!!!!!!
{
System.err.println("Pan Siqun250: usage: AnalysisOfVariance <input path> <output path>");
System.exit(-1);
}
/*
一共三个job,job1求平均值,job2求SST,job3求SSE,最后进行结果验证
*/
System.out.println("what's the hell");
System.out.println(args[0]);
System.out.println(args[1]);
System.out.println(args[2]);
System.out.println(args[3]);
Job job1 = new Job();
job1.setJarByClass(AnalysisOfVariance.class);
FileInputFormat.addInputPath(job1, new Path(args[0])); //不同的job的输入输出路径可以设为不同的args[i],最后只要命令行输入相应的路径就可以了
FileOutputFormat.setOutputPath(job1, new Path(args[1]));
//System.out.println("flag11");
job1.setMapperClass(Map1.class);
job1.setCombinerClass(Reduce1.class);
job1.setReducerClass(Reduce1.class);
job1.setOutputKeyClass(LongWritable.class);
job1.setOutputValueClass(Text.class);
//System.out.println("flag12");
job1.waitForCompletion(true);
//System.exit(job1.waitForCompletion(true) ? 0 : 1); //如果不完全退出而只是结束当前job1,然后再开始job2怎么写?
//JobClient.runJob(conf); //在0.20.2版本里怎么写这句话?
System.out.println("what's the hell job1end");
System.out.println("allaverage:"+allaverage);
Job job2 = new Job(); //计算SST
job2.setJarByClass(AnalysisOfVariance.class);
FileInputFormat.addInputPath(job2, new Path(args[0])); //job2的输入路径仍然是初始数据文件
//FileOutputFormat.setOutputPath(job2, new Path("psq/output2"));
FileOutputFormat.setOutputPath(job2, new Path(args[2])); //设置一个存放job2输出的路径文件
System.out.println("flag21");
job2.setMapperClass(Map2.class);
job2.setReducerClass(Reduce2.class);
job2.setOutputKeyClass(LongWritable.class);
job2.setOutputValueClass(Text.class);
System.out.println("flag22");
job2.waitForCompletion(true);
//System.exit(job2.waitForCompletion(true) ? 0 : 1);
System.out.println("what's the hell job2ended");
Job job3 = new Job(); //计算SSA
job3.setJarByClass(AnalysisOfVariance.class);
FileInputFormat.addInputPath(job3, new Path(args[1])); //job3的输入路径是job1的输出文件,用来读取每个水平(每列)的平均值
//FileOutputFormat.setOutputPath(job2, new Path("psq/output3"));
FileOutputFormat.setOutputPath(job3, new Path(args[3])); //设置一个存放job3输出的路径文件
System.out.println("flag31");
job3.setMapperClass(Map3.class);
job3.setReducerClass(Reduce3.class);
job3.setOutputKeyClass(LongWritable.class);
job3.setOutputValueClass(Text.class);
System.out.println("flag32");
job3.waitForCompletion(true);
//System.exit(job3.waitForCompletion(true) ? 0 : 1);
System.out.println("what's the hell job3end");
System.out.println("finaljudgeend");
System.out.println("allnum:"+allnum);
System.out.println("rownum:"+rownum);
System.out.println("linenum:"+linenum);
System.out.println("SST:"+SST);
System.out.println("SSE:"+SSE);
System.out.println("SSA:"+SSA);
System.out.println("allaverage:"+allaverage);
}
}
我看jobtracker中的stdout logs里什么都没有,根本就没打印出map和reduce中system.out.println()中的信息,而且最后输出的文件内容就是吧最初map输入的key和value输出到文件里了,根本就没有对输入数据进行处理,我的头已经大了,弄了两天不知道是怎么回事儿
程序中的这两句:
job1.setOutputKeyClass(LongWritable.class);
job1.setOutputValueClass(Text.class);
以及job2和job3的也一样,按照我程序中的设置应该是:
job1.setOutputKeyClass(IntWritable.class);
job1.setOutputValueClass(DoubleWritable.class);
但是如果这么写的话就说dismatch from map,我最后查原因就是他最后就是把输入key(LongWritable,文件行偏移量)和value(Text,每一行数据)没有处理直接输出了
【 在 soulpsq 的大作中提到: 】
: 我看jobtracker中的stdout logs里什么都没有,根本就没打印出map和reduce中system.out.println()中的信息,而且最后输出的文件内容就是吧最初map输入的key和value输出到文件里了,根本就没有对输入数据进行处理,我的头已经大了,弄了两天不知道是怎么回事儿
啊嘞~你貌似没有重写map方法和reduce方法。你把那些map1和reduce2神马的修改下看看。
【 在 aeolus83 的大作中提到: 】
:
: 啊嘞~你貌似没有重写map方法和reduce方法。你把那些map1和reduce2神马的修改下看看。
嗯,谢谢哈,我发现时我脑残了,我不知道map()reduce()是Mapper和Reducer原有的函数,得重写,我这就去试试,[em17]
【 在 mojia 的大作中提到: 】
: 重写之后情况怎么样?
嗯,已经可以正常运行了,我开始不知道map()和reduce()是原有的一个函数,后来我把map1(),reduce1()什么的改成map()和reduce()就好了,真是谢谢哈,这个问题卡了我好几天,最后居然是犯了这么白痴的错误