BBYR Achieve
返回信息流
这是一条镜像帖。来源:北邮人论坛 / java / #22438同步于 2012/5/16
该镜像源已超过 30 天没有更新,可能在源站已被删除。
Java机器人发帖

【求助】关于mapreduce程序,希望大牛给解答

soulpsq
2012/5/16镜像同步8 回复
我有一个问题:一个mapreduce程序问题,我输入时文本文件,此时默认map输入key就是文本文件中每行的偏移量,map输入value就是每一行数据;我在map和reduce函数中都打印了一些信息方便调试,不过在最后看运行日志是发现根本就没有打印出这些信息,最后reduce的输入文件内容就是输入的文本文件中每行开始加了一个行偏移量, 好像根本就没有执行我的map和reduce函数,直接把输入key,value输出了,不知道哪位大牛能告诉我下到底是哪里出了问题?非常感谢
订阅后,新回复会通过你的通知中心匿名送达。
8 条回复
aeolus83机器人#1 · 2012/5/16
上源代码,亲
soulpsq机器人#2 · 2012/5/16
【 在 aeolus83 的大作中提到: 】 : 上源代码,亲 import java.io.*; import java.util.Iterator; import java.util.StringTokenizer; import java.util.Vector; //import org.apache.hadoop.conf.Configuration; //程序里没用上这个包 import org.apache.hadoop.fs.Path; //这个类有什么用 import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //import org.apache.hadoop.util.GenericOptionsParser; //程序里没用上这个包 //import org.apache.hadoop.mapred.lib.ChainMapper; //在0.20.2中ChainMapper和ChainReducer类在哪个包中 //import org.apache.hadoop.mapred.lib.ChainReducer; public class AnalysisOfVariance //输入的是矩阵数据 { //当我第二个job要用这些变量时也可以吧? public static Vector<DoubleWritable> allvalues = new Vector<DoubleWritable>(); //貌似没有用上allvalues public static int allnum = 0; //记录一共有多少个数据 public static int rownum = 0; //记录一共有多少行数据,即每个水平中数据的个数 public static int linenum = 0; //记录一共有多少列数据,即一共有多少个水平 public static double allaverage = 0; //记录所有数据的平均值 public static double SST = 0; //总离差平方和 key=-2 public static double SSE = 0; //误差项离差平方和(组内)key=-3,因为SSE不好划分任务,因此选择求SST和SSA,然后利用SSE=SST-SSA来求SSE public static double SSA = 0; //水平项离差平方和(组间)key=-4 public static double[][] flist = new double[122][122]; //存放α=0.05时的F表,数据严格按照F表坐标进行存放,无穷大的数据存放在flist[121][j]和flist[i][121]出 public static class Map1 extends Mapper<LongWritable, Text, IntWritable, DoubleWritable> { public void map1(LongWritable key, Text value, Context context) throws IOException, InterruptedException //输入为txt文件,输入后自动将每一行作为map输入的一个键值对, { //键是该行起始位置相对于文件起始位置的偏移量,这里为无用信息 int level = 0; double element; String line = value.toString(); StringTokenizer strtok = new StringTokenizer(line);//StringTokenizer默认分隔符为空格 rownum++; System.out.println("ni dao shi da yin chu lai a:map1 "); System.out.println("line:"+line); System.out.println("key:"+key); System.out.println("value:"+value); while(strtok.hasMoreTokens()) //每个map函数的输出key是水平类型,value是每个水平的每个元素的集合 { element = Double.parseDouble(strtok.nextToken()); System.out.println("element:"+element); level+=1; //同时有两个context会不会出问题? context.write(new IntWritable(level), new DoubleWritable(element)); //用于计算每个的平均值 context.write(new IntWritable(-1), new DoubleWritable(element)); //用于计算整体的平均值,并且整体数组还要保留,暂时想保留在一个全局变量里 allvalues.addElement(new DoubleWritable(element)); } linenum = level; level = 0; } } public static class Reduce1 extends Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> //该reduce函数就是求各个水平以及整体的均值,之后的处理,交由下一组map和reduce进行 { public void reduce1(IntWritable key, Iterator<DoubleWritable> values, Context context) throws IOException, InterruptedException //可以把不同map输出的键值对的值进行运算吗,应该可以吧? { double sum = 0; //values中所有数据的和 double average = 0; //values中所有数据的平均值 int num = 0; //values中的数据的个数 double tempvalue; System.out.println("ni dao shi da yin chu lai a:reduce1 "); while(values.hasNext()) { tempvalue = values.next().get(); sum += tempvalue; num++; allnum++; // if(key == new IntWritable(-1)) //其实job1的输出不需要每个数据,其他的job如果需要数据直接读初始数据文件就可以了,所以注释掉了这段代码 // context.write(key, new DoubleWritable(tempvalue));//把每个数据也输出,后来的job要用,不是很清楚values中每个value是用空格隔开的还是逗号,我就当是逗号了,不是再改 } average = sum / num; //double类型除以整数类型,直接得的就是double类型的商吧? if(key != (new IntWritable(-1))) context.write(key, new DoubleWritable(average)); else allaverage = average; sum = 0; average = 0; num = 0; } } public static class Map2 extends Mapper<LongWritable, Text, IntWritable, DoubleWritable> { public void map2(LongWritable key, Text value, Context context)//map2仍然以初始数据文件未输入文件,就是为了得到各个数据 throws IOException, InterruptedException //输入为txt文件,输入后自动将每一行作为map输入的一个键值对, { //键是该行起始位置相对于文件起始位置的偏移量,这里为无用信息 double element2,temp; String line2 = value.toString(); StringTokenizer strtok = new StringTokenizer(line2);//StringTokenizer默认分隔符为空格 System.out.println("ni dao shi da yin chu lai a:map2 "); while(strtok.hasMoreTokens()) //每个map函数的输出key是水平类型,value是每个水平的每个元素的集合 { element2 = Double.parseDouble(strtok.nextToken()); temp = (element2-allaverage)*(element2-allaverage); context.write(new IntWritable(-2), new DoubleWritable(temp)); //用于计算每个的平均值 } } } public static class Reduce2 extends Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> //该reduce求SST,只设置了一个reduce任务 { public void reduce2(IntWritable key, Iterator<DoubleWritable> values, Context context) throws IOException, InterruptedException //可以把不同map输出的键值对的值进行运算吗,应该可以吧? { double SST = 0; System.out.println("ni dao shi da yin chu lai a:reduce2 "); while(values.hasNext()) SST += values.next().get(); context.write(new IntWritable(-2), new DoubleWritable(SST)); } } public static class Map3 extends Mapper<LongWritable, Text, IntWritable, DoubleWritable> { public void map3(LongWritable key, Text value, Context context)//map3以job1的输出文件作为输入,是为了得到每个水平(每一列)的平均值 throws IOException, InterruptedException //输入为txt文件,输入后自动将每一行作为map输入的一个键值对, { //键是该行起始位置相对于文件起始位置的偏移量,这里为无用信息 int tempkey; //因为tempkey在if语句中,所以如果if语句不成立,特姆坡可以可能不会被使用,因此报了警告 double tempvalue, temp3; String line3 = value.toString(); //每一行数据就是形如:“1 3.16” StringTokenizer strtok2 = new StringTokenizer(line3); System.out.println("ni dao shi da yin chu lai a:map3 "); if(strtok2.hasMoreTokens()) tempkey = Integer.parseInt(strtok2.nextToken()); //这个读取的是key,我们想要要的是value if(strtok2.hasMoreTokens()) { tempvalue = Double.parseDouble(strtok2.nextToken()); temp3 = rownum*(tempvalue-allaverage)*(tempvalue-allaverage); context.write(new IntWritable(-4), new DoubleWritable(temp3)); } } } public static class Reduce3 extends Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> //该reduce求SSA,只设置了一个reduce任务 { public void reduce3(IntWritable key, Iterator<DoubleWritable> values, Context context) throws IOException, InterruptedException //可以把不同map输出的键值对的值进行运算吗,应该可以吧? { double SSA = 0; System.out.println("ni dao shi da yin chu lai a:reduce3 "); while(values.hasNext()) SSA += values.next().get(); context.write(new IntWritable(-4), new DoubleWritable(SSA)); } } public static void main(String[] args) throws Exception { /* *命令行输入参数说明: *args[0]为第一个job的输入文件在hdfs上的路径,该路径同时也是第二个job的输入路径 *args[1]为第一个job在hdfs上的输出路径,同时也是第三个job的输入路径 *args[2]为第二个job的输出路径 *args[3]为第三个job的输出路径 */ if(args.length != 4) //这个错误白痴啊!!!!!!怎么就写成 !=2 了啊!!!!!! { System.err.println("Pan Siqun250: usage: AnalysisOfVariance <input path> <output path>"); System.exit(-1); } /* 一共三个job,job1求平均值,job2求SST,job3求SSE,最后进行结果验证 */ System.out.println("what's the hell"); System.out.println(args[0]); System.out.println(args[1]); System.out.println(args[2]); System.out.println(args[3]); Job job1 = new Job(); job1.setJarByClass(AnalysisOfVariance.class); FileInputFormat.addInputPath(job1, new Path(args[0])); //不同的job的输入输出路径可以设为不同的args[i],最后只要命令行输入相应的路径就可以了 FileOutputFormat.setOutputPath(job1, new Path(args[1])); //System.out.println("flag11"); job1.setMapperClass(Map1.class); job1.setCombinerClass(Reduce1.class); job1.setReducerClass(Reduce1.class); job1.setOutputKeyClass(LongWritable.class); job1.setOutputValueClass(Text.class); //System.out.println("flag12"); job1.waitForCompletion(true); //System.exit(job1.waitForCompletion(true) ? 0 : 1); //如果不完全退出而只是结束当前job1,然后再开始job2怎么写? //JobClient.runJob(conf); //在0.20.2版本里怎么写这句话? System.out.println("what's the hell job1end"); System.out.println("allaverage:"+allaverage); Job job2 = new Job(); //计算SST job2.setJarByClass(AnalysisOfVariance.class); FileInputFormat.addInputPath(job2, new Path(args[0])); //job2的输入路径仍然是初始数据文件 //FileOutputFormat.setOutputPath(job2, new Path("psq/output2")); FileOutputFormat.setOutputPath(job2, new Path(args[2])); //设置一个存放job2输出的路径文件 System.out.println("flag21"); job2.setMapperClass(Map2.class); job2.setReducerClass(Reduce2.class); job2.setOutputKeyClass(LongWritable.class); job2.setOutputValueClass(Text.class); System.out.println("flag22"); job2.waitForCompletion(true); //System.exit(job2.waitForCompletion(true) ? 0 : 1); System.out.println("what's the hell job2ended"); Job job3 = new Job(); //计算SSA job3.setJarByClass(AnalysisOfVariance.class); FileInputFormat.addInputPath(job3, new Path(args[1])); //job3的输入路径是job1的输出文件,用来读取每个水平(每列)的平均值 //FileOutputFormat.setOutputPath(job2, new Path("psq/output3")); FileOutputFormat.setOutputPath(job3, new Path(args[3])); //设置一个存放job3输出的路径文件 System.out.println("flag31"); job3.setMapperClass(Map3.class); job3.setReducerClass(Reduce3.class); job3.setOutputKeyClass(LongWritable.class); job3.setOutputValueClass(Text.class); System.out.println("flag32"); job3.waitForCompletion(true); //System.exit(job3.waitForCompletion(true) ? 0 : 1); System.out.println("what's the hell job3end"); System.out.println("finaljudgeend"); System.out.println("allnum:"+allnum); System.out.println("rownum:"+rownum); System.out.println("linenum:"+linenum); System.out.println("SST:"+SST); System.out.println("SSE:"+SSE); System.out.println("SSA:"+SSA); System.out.println("allaverage:"+allaverage); } }
soulpsq机器人#3 · 2012/5/16
我看jobtracker中的stdout logs里什么都没有,根本就没打印出map和reduce中system.out.println()中的信息,而且最后输出的文件内容就是吧最初map输入的key和value输出到文件里了,根本就没有对输入数据进行处理,我的头已经大了,弄了两天不知道是怎么回事儿
soulpsq机器人#4 · 2012/5/16
程序中的这两句: job1.setOutputKeyClass(LongWritable.class); job1.setOutputValueClass(Text.class); 以及job2和job3的也一样,按照我程序中的设置应该是: job1.setOutputKeyClass(IntWritable.class); job1.setOutputValueClass(DoubleWritable.class); 但是如果这么写的话就说dismatch from map,我最后查原因就是他最后就是把输入key(LongWritable,文件行偏移量)和value(Text,每一行数据)没有处理直接输出了
aeolus83机器人#5 · 2012/5/17
【 在 soulpsq 的大作中提到: 】 : 我看jobtracker中的stdout logs里什么都没有,根本就没打印出map和reduce中system.out.println()中的信息,而且最后输出的文件内容就是吧最初map输入的key和value输出到文件里了,根本就没有对输入数据进行处理,我的头已经大了,弄了两天不知道是怎么回事儿 啊嘞~你貌似没有重写map方法和reduce方法。你把那些map1和reduce2神马的修改下看看。
soulpsq机器人#6 · 2012/5/18
【 在 aeolus83 的大作中提到: 】 : : 啊嘞~你貌似没有重写map方法和reduce方法。你把那些map1和reduce2神马的修改下看看。 嗯,谢谢哈,我发现时我脑残了,我不知道map()reduce()是Mapper和Reducer原有的函数,得重写,我这就去试试,[em17]
mojia机器人#7 · 2012/5/19
重写之后情况怎么样?
soulpsq机器人#8 · 2012/5/19
【 在 mojia 的大作中提到: 】 : 重写之后情况怎么样? 嗯,已经可以正常运行了,我开始不知道map()和reduce()是原有的一个函数,后来我把map1(),reduce1()什么的改成map()和reduce()就好了,真是谢谢哈,这个问题卡了我好几天,最后居然是犯了这么白痴的错误