hadoop学习之Mapreduce(2.4.1):写mapreduce程序时编写自己的writable类

Windows Windows 2个月前 (08-13) 15次浏览 未收录 0个评论 扫描二维码

前言:

在编写mapreduce程序时,hadoop提供的writable基本数据类型(如IntWritable,DoubleWritable,Text等等,它们都继承自Writable接口)往往不能满足要求。

比如:我们需要在map或者reduce阶段调用context.write(key,value)方法向外写数据,官网给出的wordcount例子,只需以word为key,数值为value向外写,这些都是单一的数据类型。然而,我们常常要往外写的key或者value包含多个数据类型(相同的或者不同的),如:已知某学生的成绩单如下:

 

1:小明:98:78:89
2:小花:87:79:98

 

这些字段分别为-学号:姓名:语文成绩:数学成绩:英语成绩
现在,我们要统计学生的总成绩,那么写mapreduce程序时,map阶段往外写的就是各科的成绩,然后在reduce阶段汇总。
此时,hadoop提供的基本类型无法使用。我们可以将多种基本类型组合使用,创建我们自己的writable类型。

一,下面就介绍一下如何实现自己的writable类。

首先来看hadoop的Writable接口的源码:

 

public interface Writable {
  /** 
   * Serialize the fields of this object to out.
   * 
   * @param out DataOuput to serialize this object into. * @throws IOException */ void write(DataOutput out) throws IOException; /** * Deserialize the fields of this object from in. * * 

For efficiency, implementations should attempt to re-use storage in the * existing object where possible.

* * @param in DataInput to deseriablize this object from. * @throws IOException */ void readFields(DataInput in) throws IOException; }


接口声明2个函数,注释写的很清楚,write()方法是将变量序列化,以便传输。readFields()方法是将序列化的变量反序列化为java基本类型。

 

同样的,我们自己的MyWritable()类也要继承自Writable接口,并且要实现接口中的两个函数。

下面是我们的MyWritable()类的代码:

 

package com.jimmy.scoreCount; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class MyWritable implements Writable{ //首先定义要求和的“各科成绩”变量和“总成绩”变量 //这些变量是要被一起序列化和反序列化的 private int yuwen; private int shuxue; private int yingyu; private int score; //生成默认构造函数 //生成含参数(各学科成绩变量)的构造函数 public MyWritable() {} //hadoop的Writable类用到了动态代理和反射机制,所以要有默认构造函数。 public MyWritable(int yuwen, int shuxue, int yingyu) { this.yuwen = yuwen; this.shuxue = shuxue; this.yingyu = yingyu; this.score = yuwen+shuxue+yingyu; } //根据成员变量设置set和get方法,在eclipse中很快生成。 //set,get方法不是必须的,提供获取变量的函数,可能在map或reduce阶段为获取单个成员变量提供方便。 public int getYuwen() { return yuwen; } public void setYuwen(int yuwen) { this.yuwen = yuwen; } public int getShuxue() { return shuxue; } public void setShuxue(int shuxue) { this.shuxue = shuxue; } public int getYingyu() { return yingyu; } public void setYingyu(int yingyu) { this.yingyu = yingyu; } public int getScore() { return score; } public void setScore(int score) { this.score = score; } /** * 序列化函数,这是每个自定义Writable类必须要实现的函数 */ @Override public void write(DataOutput out) throws IOException { //写出是有顺序的,按顺序写出,但是要和readFields()方法读入的顺序一致。 out.writeInt(yuwen); out.writeInt(shuxue); out.writeInt(yingyu); out.writeInt(score); } /** * 反序列化函数,也是每个自定义Writable类必须要实现的函数 */ @Override public void readFields(DataInput in) throws IOException { //同样,读入也是有顺序的,要和write()方法写出的顺序一致。 yuwen = in.readInt(); shuxue = in.readInt(); yingyu = in.readInt(); score = in.readInt(); } } 


二,有了自定义Writable类,就可以写mapreduce程序算总成绩了。

 

 

package com.jimmy.scoreCount; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //mapper类 class MyMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString();//得到一行数据 String[] ss = line.split(",");//数据按逗号切分后,放入数组 int yuwen = Integer.parseInt(ss[2]); //取出数组中的相应成绩 int shuxue = Integer.parseInt(ss[3]); int yingyu = Integer.parseInt(ss[4]); //map阶段以name为key,MyWritable对象为value写出 //将各科成绩传入MyWritable构造函数,作为一个整体在节点之间传递 context.write(new Text(ss[1]), new MyWritable(yuwen,shuxue,yingyu)); } } class MyReducer extends Reducer { @Override protected void reduce(Text key, Iterable value,Context context) throws IOException, InterruptedException { int score = 0; for(MyWritable mm:value) score += mm.getScore(); context.write(key, new IntWritable(score)); } } public class ScoreCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "flow"); job.setJarByClass(ScoreCount.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(MyWritable.class); FileInputFormat.addInputPath(job, new Path("hdfs://node1:9000/user/root/input_score")); FileOutputFormat.setOutputPath(job, new Path("hdfs://node1:9000/user/root/output_scorecount")); System.exit(job.waitForCompletion(true)?0:1); } } 


运行即得到每个学生的总成绩。

 

喜欢 (0)
[1353713598@qq.com]
分享 (0)
关于作者:
发表我的评论
取消评论
表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址