4、代码开发和调试示例

前面部署HADOOP时曾经测试过wordcount程序,这样我们在Eclipse也调试这一功能。HADOOP提供了这些示例的源代码,大家可以在HADOOP安装文件根路径下的examples目录下,比如WordCount位于:examples/org/apache/hadoop/examples/WordCount.java

我们新建一个java文件,右键选中项目名称,点击New -> Class创建一个新的Java Class文件,弹出窗口如下:

将示例代码直接复制进来,而后修改文件头部包名即可。新创建的WordCount.java文件内容如下:

    package com.jss.hadoop.mapreduce.test;

    import java.io.IOException;

    import java.util.StringTokenizer;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.IntWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.util.GenericOptionsParser;

    public class WordCount {

      public static class TokenizerMapper 

           extends Mapper<Object, Text, Text, IntWritable>{

        

        private final static IntWritable one = new IntWritable(1);

        private Text word = new Text();

          

        public void map(Object key, Text value, Context context

                        ) throws IOException, InterruptedException {

          StringTokenizer itr = new StringTokenizer(value.toString());

          while (itr.hasMoreTokens()) {

            word.set(itr.nextToken());

            context.write(word, one);

          }

        }

      }

      

      public static class IntSumReducer 

           extends Reducer<Text,IntWritable,Text,IntWritable> {

        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, 

                           Context context

                           ) throws IOException, InterruptedException {

          int sum = 0;

          for (IntWritable val : values) {

            sum += val.get();

          }

          result.set(sum);

          context.write(key, result);

        }

      }

      public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if (otherArgs.length != 2) {

          System.err.println("Usage: wordcount <in> <out>");

          System.exit(2);

        }

        Job job = new Job(conf, "word count");

        job.setJarByClass(WordCount.class);

        job.setMapperClass(TokenizerMapper.class);

        job.setCombinerClass(IntSumReducer.class);

        job.setReducerClass(IntSumReducer.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

      }

    }

WordCount如要运行,需要指定两个参数,即代码中65行和66行所需指定的路径。针对这种情况,我们即可以改动代码,直接在此处写好目标路径(同时还需要将53-57行之间的代码注释)而后即可直接运行调试;也可以配置WordCount的调试运行环境,为其配置运行参数。这里我们选择后一种方式。

选择菜单:Run -> Run Configurations -> Java Application,点击窗口左上角处的图标:

新建一个配置,将弹出的窗口显示项切换到Arguments选项:

此处需要我们填写Program arguments,即指定程序运行所需参数,根据程序设定,此时需要指定两个参数,一个指定要处理的文件源路径,另一个是处理后文件的输出路径,中间以空格分隔。请根据实际情况指定参数,配置好后,即可点击Run运行。

如果配置正确,执行成功后,在HDFS中就会创建jssout文件夹,如上图所示,其中保存的文件,就是对源路径中数据处理后的输出结果。

若要操作HDFS中的目录和文件也是同理,继续创建文件(过程不演示)FileOper.java,代码如下:

    $ more /data/developer/workspace/FirstHadoopProject/src/com/jss/hadoop/hdfs/test/FileOper.java 

    package com.jss.hadoop.hdfs.test;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.FSDataOutputStream;

    import org.apache.hadoop.fs.FSDataInputStream;

    import org.apache.hadoop.fs.FileStatus;

    import org.apache.hadoop.fs.FileSystem;

    import org.apache.hadoop.fs.Path;

    public class FileOper {

            public static void main(String[] args) throws Exception {

                    if (args.length < 1) {

                            System.out.println("Must define parameters!");

                    } else {

                            Configuration conf = new Configuration();

                            conf.set("fs.default.name", args[0]);

                            FileOper.listHDFSFiles(conf); // 显示目录结构

                            //FileOper.uploadLocal2HDFS(conf, args[1], args[2]); // 上传文件

                            //FileOper.createHDFSFile(conf, args[1], args[2]); // 创建文件

                            //FileOper.deleteHDFSFile(conf, args[1]); // 删除文件

                            //FileOper.readHDFSFile(conf, args[1]); // 读取文件

                            //FileOper.makeHDFSDirectory(conf, args[1]); // 创建目录

                            //FileOper.removeHDFSDirectory(conf, args[1]); // 删除目录

                    }

            }

            public static void listHDFSFiles(Configuration conf) throws IOException {

                    FileSystem fs = FileSystem.get(conf);

                    FileStatus files[] = fs.listStatus(new Path("/"));

                    for (FileStatus file : files) {

                            System.out.println(file.getPath());

                    }

            }

            public static void uploadLocal2HDFS(Configuration conf, String s, String d)

                            throws IOException {

                    FileSystem fs = FileSystem.get(conf);

                    Path src = new Path(s);

                    Path dst = new Path(d);

                    fs.copyFromLocalFile(src, dst);

                    fs.close();

                    System.out.println("Upload to " + conf.get("fs.default.name"));

            }

            public static void createHDFSFile(Configuration conf, String createFilePath,

                            String content) throws IOException {

                    FileSystem fs = FileSystem.get(conf);

                    FSDataOutputStream fsos = fs.create(new Path(createFilePath));

                    fsos.write(content.getBytes("UTF-8"));

                    fsos.close();

                    fs.close();

                    System.out.println("Succeeded created file " + createFilePath);

            }

            public static boolean deleteHDFSFile(Configuration conf, String dst)

                            throws IOException {

                    FileSystem fs = FileSystem.get(conf);

                    Path path = new Path(dst);

                    boolean isDeleted = fs.delete(path, true);

                    fs.close();

                    return isDeleted;

            }

            public static byte[] readHDFSFile(Configuration conf, String dst)

                            throws Exception {

                    FileSystem fs = FileSystem.get(conf);

                    Path path = new Path(dst);

                    if (fs.exists(path)) {

                            FSDataInputStream is = fs.open(path);

                            // get the file info to create the buffer

                            FileStatus stat = fs.getFileStatus(path);

                            // create the buffer

                            byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat

                                            .getLen()))];

                            is.readFully(0, buffer);

                            is.close();

                            fs.close();

                            return buffer;

                    } else {

                            throw new Exception("the file is not found .");

                    }

            }

            public static void makeHDFSDirectory(Configuration conf, String dst)

                            throws IOException {

                    FileSystem fs = FileSystem.get(conf);

                    fs.mkdirs(new Path(dst));

                    fs.close();

                    System.out.println("Succeeded created directory " + dst);

            }

            public static void removeHDFSDirectory(Configuration conf, String dst)

                            throws IOException {

                    FileSystem fs = FileSystem.get(conf);

                    fs.delete(new Path(dst), true);

                    fs.close();

                    System.out.println("Succeeded remove directory " + dst);

            }

    }

FileOper能够读取HDFS中的文件目录结构,操作文件和目录。程序在执行时,同样需要指定参数,具体步骤与前面操作WordCount的原理相同,就不一一演示了。