林子雨编著《大数据基础编程、实验和案例教程》教材附录A的代码

大数据技术原理与应用

林子雨编著《大数据基础编程、实验和案例教程》(教材官网)教材中的代码,在纸质教材中的印刷效果不是很好,可能会影响读者对代码的理解,为了方便读者正确理解代码或者直接拷贝代码用于上机实验,这里提供全书配套的所有代码。
查看教材所有章节的代码

附录A 大数据课程实验答案

教材第298页

(温馨提示:代码框上方的复制代码按钮,也就是“两张A4纸图标”,用鼠标点击复制代码按钮,就可以把代码框中的代码复制到粘贴板,粘贴到其他地方。但是,有的浏览器可能不支持该功能)

cd /usr/local

教材第299页

cd ..
cd ~
cd /usr
ls -al
cd /tmp
mkdir a
ls -al
cd /tmp
mkdir -p a1/a2/a3/a4
cd /tmp
rmdir a
cd /tmp
rmdir -p a1/a2/a3/a4
ls -al
sudo cp ~/.bashrc /usr/bashrc1

教材第300页

cd /tmp
mkdir test
sudo cp -r /tmp/test /usr
sudo mv /usr/bashrc1 /usr/test
sudo mv /usr/test /usr/test2
sudo rm /usr/test2/bashrc1
sudo rm -r /usr/test2
cat ~/.bashrc
tac ~/.bashrc
more ~/.bashrc

教材第301页

head -n 20 ~/.bashrc
head -n -50 ~/.bashrc
tail -n 20 ~/.bashrc
tail -n +50 ~/.bashrc
cd /tmp
touch hello
ls -l hello
touch -d “5 days ago” hello
sudo chown root /tmp/hello
ls -l /tmp/hello
find ~ -name .bashrc

教材第302页

sudo mkdir /test
sudo tar -zcv -f /test.tar.gz test
sudo tar -zxv -f /test.tar.gz -C /tmp
grep -n 'examples' ~/.bashrc
vim ~/.bashrc
export JAVA_HOME=JDK安装路径
source ~/.bashrc
echo $JAVA_HOME
cd /usr/local/hadoop
./sbin/start-dfs.sh
./bin/hdfs dfs -mkdir -p /user/hadoop

教材第303页

cd /usr/local/hadoop
./bin/hdfs dfs -mkdir test
./bin/hdfs dfs -ls .
cd /usr/local/hadoop
./bin/hdfs dfs -put ~/.bashrc test
./bin/hdfs dfs -ls test
cd /usr/local/hadoop
./bin/hdfs dfs -get test ./
cd /usr/local/hadoop
./bin/hdfs dfs -test -e text.txt

教材第304页

echo $?
cd /usr/local/hadoop
./bin/hdfs dfs -appendToFile local.txt text.txt #追加到原文件末尾
./bin/hdfs dfs -copyFromLocal -f local.txt text.txt #覆盖原来文件,第一种命令形式
./bin/hdfs dfs -cp -f file:///home/hadoop/local.txt text.txt #覆盖原来文件,第二种命令形式
if $(hdfs dfs -test -e text.txt);
then $(hdfs dfs -appendToFile local.txt text.txt);
else $(hdfs dfs -copyFromLocal -f local.txt text.txt);
fi
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;

public class HDFSApi {
    /**
     * 判断路径是否存在
     */
    public static boolean test(Configuration conf, String path) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        return fs.exists(new Path(path));
    }

    /**
     * 复制文件到指定路径
     * 若路径已存在,则进行覆盖
     */
    public static void copyFromLocalFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path localPath = new Path(localFilePath);
        Path remotePath = new Path(remoteFilePath);
        /* fs.copyFromLocalFile 第一个参数表示是否删除源文件,第二个参数表示是否覆盖 */
        fs.copyFromLocalFile(false, true, localPath, remotePath);
        fs.close();
    }

    /**
     * 追加文件内容
     */
    public static void appendToFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path remotePath = new Path(remoteFilePath);
        /* 创建一个文件读入流 */
        FileInputStream in = new FileInputStream(localFilePath);
        /* 创建一个文件输出流,输出的内容将追加到文件末尾 */
        FSDataOutputStream out = fs.append(remotePath);
        /* 读写文件内容 */
        byte[] data = new byte[1024];
        int read = -1;
        while ( (read = in.read(data)) > 0 ) {
            out.write(data, 0, read);
        }
        out.close();
        in.close();
        fs.close();
    }

    /**
     * 主函数
     */
    public static void main(String[] args) {
        Configuration conf = new Configuration();
    conf.set("fs.default.name","hdfs://localhost:9000");
        String localFilePath = "/home/hadoop/text.txt";    // 本地路径
        String remoteFilePath = "/user/hadoop/text.txt";    // HDFS路径
        String choice = "append";    // 若文件存在则追加到文件末尾
//      String choice = "overwrite";    // 若文件存在则覆盖

        try {
            /* 判断文件是否存在 */
            Boolean fileExists = false;
            if (HDFSApi.test(conf, remoteFilePath)) {
                fileExists = true;
                System.out.println(remoteFilePath + " 已存在.");
            } else {
                System.out.println(remoteFilePath + " 不存在.");
            }
            /* 进行处理 */
            if ( !fileExists) { // 文件不存在,则上传
                HDFSApi.copyFromLocalFile(conf, localFilePath, remoteFilePath);
                System.out.println(localFilePath + " 已上传至 " + remoteFilePath);
            } else if ( choice.equals("overwrite") ) {    // 选择覆盖
                HDFSApi.copyFromLocalFile(conf, localFilePath, remoteFilePath);
                System.out.println(localFilePath + " 已覆盖 " + remoteFilePath);
            } else if ( choice.equals("append") ) {   // 选择追加
                HDFSApi.appendToFile(conf, localFilePath, remoteFilePath);
                System.out.println(localFilePath + " 已追加至 " + remoteFilePath);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

教材第306页

if $(hdfs dfs -test -e file:///home/hadoop/text.txt);
then $(hdfs dfs -copyToLocal text.txt ./text2.txt); 
else $(hdfs dfs -copyToLocal text.txt ./text.txt); 
fi
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;
public class HDFSApi {
    /**
     * 下载文件到本地
     * 判断本地路径是否已存在,若已存在,则自动进行重命名
     */
    public static void copyToLocal(Configuration conf, String remoteFilePath, String localFilePath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path remotePath = new Path(remoteFilePath);
        File f = new File(localFilePath);
        /* 如果文件名存在,自动重命名(在文件名后面加上 _0, _1 ...) */
        if (f.exists()) {
            System.out.println(localFilePath + " 已存在.");
            Integer i = 0;
            while (true) {
                f = new File(localFilePath + "_" + i.toString());
                if (!f.exists()) {
                    localFilePath = localFilePath + "_" + i.toString();
                    break;
                }
            }
            System.out.println("将重新命名为: " + localFilePath);
        }

        // 下载文件到本地
       Path localPath = new Path(localFilePath);
       fs.copyToLocalFile(remotePath, localPath);
       fs.close();
    }

    /**
     * 主函数
     */
    public static void main(String[] args) {
        Configuration conf = new Configuration();
    conf.set("fs.default.name","hdfs://localhost:9000");
        String localFilePath = "/home/hadoop/text.txt";    // 本地路径
        String remoteFilePath = "/user/hadoop/text.txt";    // HDFS路径

        try {
            HDFSApi.copyToLocal(conf, remoteFilePath, localFilePath);
            System.out.println("下载完成");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

教材第308页

hdfs dfs -cat text.txt
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;

public class HDFSApi {
    /**
     * 读取文件内容
     */
    public static void cat(Configuration conf, String remoteFilePath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path remotePath = new Path(remoteFilePath);
        FSDataInputStream in = fs.open(remotePath);
        BufferedReader d = new BufferedReader(new InputStreamReader(in));
        String line = null;
        while ( (line = d.readLine()) != null ) {
            System.out.println(line);
        }
       d.close();
       in.close();
       fs.close();
    }

    /**
     * 主函数
     */
    public static void main(String[] args) {
        Configuration conf = new Configuration();
    conf.set("fs.default.name","hdfs://localhost:9000");
        String remoteFilePath = "/user/hadoop/text.txt";    // HDFS路径

        try {
            System.out.println("读取文件: " + remoteFilePath);
            HDFSApi.cat(conf, remoteFilePath);
            System.out.println("\n读取完成");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

教材第309页

hdfs dfs -ls -h text.txt
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;
import java.text.SimpleDateFormat;

public class HDFSApi {
    /**
     * 显示指定文件的信息
     */
    public static void ls(Configuration conf, String remoteFilePath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path remotePath = new Path(remoteFilePath);
        FileStatus[] fileStatuses = fs.listStatus(remotePath);
        for (FileStatus s : fileStatuses) {
            System.out.println("路径: " + s.getPath().toString());
            System.out.println("权限: " + s.getPermission().toString());
            System.out.println("大小: " + s.getLen());
            /* 返回的是时间戳,转化为时间日期格式 */
            Long timeStamp = s.getModificationTime();
            SimpleDateFormat format =  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String date = format.format(timeStamp);  
            System.out.println("时间: " + date);
        }
        fs.close();
    }

    /**
     * 主函数
     */
    public static void main(String[] args) {
        Configuration conf = new Configuration();
    conf.set("fs.default.name","hdfs://localhost:9000");
        String remoteFilePath = "/user/hadoop/text.txt";    // HDFS路径

        try {
            System.out.println("读取文件信息: " + remoteFilePath);
            HDFSApi.ls(conf, remoteFilePath);
            System.out.println("\n读取完成");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

教材第310页

cd /usr/local/hadoop
./bin/hdfs dfs -ls -R -h /user/hadoop
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;
import java.text.SimpleDateFormat;

public class HDFSApi {
    /**
     * 显示指定文件夹下所有文件的信息(递归)
     */
    public static void lsDir(Configuration conf, String remoteDir) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path dirPath = new Path(remoteDir);
        /* 递归获取目录下的所有文件 */
        RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(dirPath, true);
        /* 输出每个文件的信息 */
        while (remoteIterator.hasNext()) {
            FileStatus s = remoteIterator.next();
            System.out.println("路径: " + s.getPath().toString());
            System.out.println("权限: " + s.getPermission().toString());
            System.out.println("大小: " + s.getLen());
            /* 返回的是时间戳,转化为时间日期格式 */
            Long timeStamp = s.getModificationTime();
            SimpleDateFormat format =  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String date = format.format(timeStamp);  
            System.out.println("时间: " + date);
            System.out.println();
        }
        fs.close();
    }    

    /**
     * 主函数
     */
    public static void main(String[] args) {
        Configuration conf = new Configuration();
    conf.set("fs.default.name","hdfs://localhost:9000");
        String remoteDir = "/user/hadoop";    // HDFS路径

        try {
            System.out.println("(递归)读取目录下所有文件的信息: " + remoteDir);
            HDFSApi.lsDir(conf, remoteDir);
            System.out.println("读取完成");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

教材第311页

if $(hdfs dfs -test -d dir1/dir2);
then $(hdfs dfs -touchz dir1/dir2/filename); 
else $(hdfs dfs -mkdir -p dir1/dir2 && hdfs dfs -touchz dir1/dir2/filename); 
fi
hdfs dfs -rm dir1/dir2/filename   #删除文件
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;

public class HDFSApi {
    /**
     * 判断路径是否存在
     */
    public static boolean test(Configuration conf, String path) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        return fs.exists(new Path(path));
    }

    /**
     * 创建目录
     */
    public static boolean mkdir(Configuration conf, String remoteDir) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path dirPath = new Path(remoteDir);
        boolean result = fs.mkdirs(dirPath);
        fs.close();
        return result;
    }

    /**
     * 创建文件
     */
    public static void touchz(Configuration conf, String remoteFilePath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path remotePath = new Path(remoteFilePath);
        FSDataOutputStream outputStream = fs.create(remotePath);
        outputStream.close();
        fs.close();
    }

    /**
     * 删除文件
     */
    public static boolean rm(Configuration conf, String remoteFilePath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path remotePath = new Path(remoteFilePath);
        boolean result = fs.delete(remotePath, false);
        fs.close();
        return result;
    }

    /**
     * 主函数
     */
    public static void main(String[] args) {
        Configuration conf = new Configuration();
    conf.set("fs.default.name","hdfs://localhost:9000");
        String remoteFilePath = "/user/hadoop/input/text.txt";    // HDFS路径
        String remoteDir = "/user/hadoop/input";    // HDFS路径对应的目录

        try {
            /* 判断路径是否存在,存在则删除,否则进行创建 */
            if ( HDFSApi.test(conf, remoteFilePath) ) {
                HDFSApi.rm(conf, remoteFilePath); // 删除
                System.out.println("删除路径: " + remoteFilePath);
            } else {
                if ( !HDFSApi.test(conf, remoteDir) ) { // 若目录不存在,则进行创建
                    HDFSApi.mkdir(conf, remoteDir);
                    System.out.println("创建文件夹: " + remoteDir);
                }
                HDFSApi.touchz(conf, remoteFilePath);
                System.out.println("创建路径: " + remoteFilePath);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

教材第313页

hdfs dfs -mkdir -p dir1/dir2
hdfs dfs -rmdir dir1/dir2
hdfs dfs -rm -R dir1/dir2

教材第314页

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;

public class HDFSApi {
    /**
     * 判断路径是否存在
     */
    public static boolean test(Configuration conf, String path) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        return fs.exists(new Path(path));
    }

    /**
     * 判断目录是否为空
     * true: 空,false: 非空
     */
    public static boolean isDirEmpty(Configuration conf, String remoteDir) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path dirPath = new Path(remoteDir);
        RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(dirPath, true);
        return !remoteIterator.hasNext();
    }

    /**
     * 创建目录
     */
    public static boolean mkdir(Configuration conf, String remoteDir) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path dirPath = new Path(remoteDir);
        boolean result = fs.mkdirs(dirPath);
        fs.close();
        return result;
    }

    /**
     * 删除目录
     */
    public static boolean rmDir(Configuration conf, String remoteDir) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path dirPath = new Path(remoteDir);
        /* 第二个参数表示是否递归删除所有文件 */
        boolean result = fs.delete(dirPath, true);
        fs.close();
        return result;
    }

    /**
     * 主函数
     */
    public static void main(String[] args) {
        Configuration conf = new Configuration();
    conf.set("fs.default.name","hdfs://localhost:9000");
        String remoteDir = "/user/hadoop/input";    // HDFS目录
        Boolean forceDelete = false;  // 是否强制删除

        try {
            /* 判断目录是否存在,不存在则创建,存在则删除 */
            if ( !HDFSApi.test(conf, remoteDir) ) {
                HDFSApi.mkdir(conf, remoteDir); // 创建目录
                System.out.println("创建目录: " + remoteDir);
            } else {
                if ( HDFSApi.isDirEmpty(conf, remoteDir) || forceDelete ) { // 目录为空或强制删除
                    HDFSApi.rmDir(conf, remoteDir);
                    System.out.println("删除目录: " + remoteDir);
                } else  { // 目录不为空
                    System.out.println("目录不为空,不删除: " + remoteDir);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

教材第315页

hdfs dfs -appendToFile local.txt text.txt

教材第316页

hdfs dfs -get text.txt
cat text.txt >> local.txt
hdfs dfs -copyFromLocal -f text.txt text.txt
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;

public class HDFSApi {
    /**
     * 判断路径是否存在
     */
    public static boolean test(Configuration conf, String path) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        return fs.exists(new Path(path));
    }

    /**
     * 追加文本内容
     */
    public static void appendContentToFile(Configuration conf, String content, String remoteFilePath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path remotePath = new Path(remoteFilePath);
        /* 创建一个文件输出流,输出的内容将追加到文件末尾 */
        FSDataOutputStream out = fs.append(remotePath);
        out.write(content.getBytes());
        out.close();
        fs.close();
}

    /**
     * 追加文件内容
     */
    public static void appendToFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path remotePath = new Path(remoteFilePath);
        /* 创建一个文件读入流 */
        FileInputStream in = new FileInputStream(localFilePath);
        /* 创建一个文件输出流,输出的内容将追加到文件末尾 */
        FSDataOutputStream out = fs.append(remotePath);
        /* 读写文件内容 */
        byte[] data = new byte[1024];
        int read = -1;
        while ( (read = in.read(data)) > 0 ) {
            out.write(data, 0, read);
        }
        out.close();
        in.close();
        fs.close();
    }

    /**
     * 移动文件到本地
     * 移动后,删除源文件
     */
    public static void moveToLocalFile(Configuration conf, String remoteFilePath, String localFilePath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path remotePath = new Path(remoteFilePath);
        Path localPath = new Path(localFilePath);
        fs.moveToLocalFile(remotePath, localPath);
    }

    /**
     * 创建文件
     */
    public static void touchz(Configuration conf, String remoteFilePath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path remotePath = new Path(remoteFilePath);
        FSDataOutputStream outputStream = fs.create(remotePath);
        outputStream.close();
        fs.close();
    }

    /**
     * 主函数
     */
    public static void main(String[] args) {
        Configuration conf = new Configuration();
    conf.set("fs.default.name","hdfs://localhost:9000");
        String remoteFilePath = "/user/hadoop/text.txt";    // HDFS文件
        String content = "新追加的内容\n";
        String choice = "after";        //追加到文件末尾
//      String choice = "before";    // 追加到文件开头

        try {
            /* 判断文件是否存在 */
            if ( !HDFSApi.test(conf, remoteFilePath) ) {
                System.out.println("文件不存在: " + remoteFilePath);
            } else {
                if ( choice.equals("after") ) { // 追加在文件末尾
                    HDFSApi.appendContentToFile(conf, content, remoteFilePath);
                    System.out.println("已追加内容到文件末尾" + remoteFilePath);
                } else if ( choice.equals("before") )  { // 追加到文件开头
                    /* 没有相应的api可以直接操作,因此先把文件移动到本地*/
/*创建一个新的HDFS,再按顺序追加内容 */
                    String localTmpPath = "/user/hadoop/tmp.txt";
                    // 移动到本地
HDFSApi.moveToLocalFile(conf, remoteFilePath, localTmpPath);
   // 创建一个新文件
                    HDFSApi.touchz(conf, remoteFilePath); 
                    // 先写入新内容
                    HDFSApi.appendContentToFile(conf, content, remoteFilePath);
                    // 再写入原来内容
                    HDFSApi.appendToFile(conf, localTmpPath, remoteFilePath); 
                    System.out.println("已追加内容到文件开头: " + remoteFilePath);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

教材第318页

hdfs dfs -rm text.txt
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;

public class HDFSApi {
    /**
     * 删除文件
     */
    public static boolean rm(Configuration conf, String remoteFilePath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path remotePath = new Path(remoteFilePath);
        boolean result = fs.delete(remotePath, false);
        fs.close();
        return result;
    }

    /**
     * 主函数
     */
    public static void main(String[] args) {
        Configuration conf = new Configuration();
    conf.set("fs.default.name","hdfs://localhost:9000");
        String remoteFilePath = "/user/hadoop/text.txt";    // HDFS文件

        try {
            if ( HDFSApi.rm(conf, remoteFilePath) ) {
                System.out.println("文件删除: " + remoteFilePath);
            } else {
                System.out.println("操作失败(文件不存在或删除失败)");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

教材第319页

hdfs dfs -mv text.txt text2.txt
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;

public class HDFSApi {
    /**
     * 移动文件
     */
    public static boolean mv(Configuration conf, String remoteFilePath, String remoteToFilePath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path srcPath = new Path(remoteFilePath);
        Path dstPath = new Path(remoteToFilePath);
        boolean result = fs.rename(srcPath, dstPath);
        fs.close();
        return result;
    }

    /**
     * 主函数
     */
    public static void main(String[] args) {
        Configuration conf = new Configuration();
    conf.set("fs.default.name","hdfs://localhost:9000");
        String remoteFilePath = "hdfs:///user/hadoop/text.txt";    // 源文件HDFS路径
        String remoteToFilePath = "hdfs:///user/hadoop/new.txt";    // 目的HDFS路径

        try {
            if ( HDFSApi.mv(conf, remoteFilePath, remoteToFilePath) ) {
                System.out.println("将文件 " + remoteFilePath + " 移动到 " + remoteToFilePath);
            } else {
                    System.out.println("操作失败(源文件不存在或移动失败)");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

教材第321页

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.*;

public class MyFSDataInputStream extends FSDataInputStream {
    public MyFSDataInputStream(InputStream in) {
        super(in);
    }

    /**
     * 实现按行读取
     * 每次读入一个字符,遇到"\n"结束,返回一行内容
     */
    public static String readline(BufferedReader br) throws IOException {
        char[] data = new char[1024];
        int read = -1;
        int off = 0; 
// 循环执行时,br 每次会从上一次读取结束的位置继续读取
//因此该函数里,off 每次都从0开始
        while ( (read = br.read(data, off, 1)) != -1 ) {
            if (String.valueOf(data[off]).equals("\n") ) {
                off += 1;
                break;
            }
            off += 1;
        }

        if (off > 0) {
            return String.valueOf(data);
        } else {
            return null;
        }
    }

    /**
     * 读取文件内容
     */
    public static void cat(Configuration conf, String remoteFilePath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path remotePath = new Path(remoteFilePath);
        FSDataInputStream in = fs.open(remotePath);
        BufferedReader br = new BufferedReader(new InputStreamReader(in));
        String line = null;
        while ( (line = MyFSDataInputStream.readline(br)) != null ) {
            System.out.println(line);
        }
        br.close();
        in.close();
        fs.close();
    }

    /**
     * 主函数
     */
    public static void main(String[] args) {
        Configuration conf = new Configuration();
    conf.set("fs.default.name","hdfs://localhost:9000");
        String remoteFilePath = "/user/hadoop/text.txt";    // HDFS路径
        try {
            MyFSDataInputStream.cat(conf, remoteFilePath);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

教材第322页

import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import java.io.*;
import java.net.URL;

public class HDFSApi {
    static{  
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());  
    }

    /**
     * 主函数 
     */
    public static void main(String[] args) throws Exception {
        String remoteFilePath = "hdfs:///user/hadoop/text.txt";    // HDFS文件
        InputStream in = null; 
        try{  
            /* 通过URL对象打开数据流,从中读取数据 */
            in = new URL(remoteFilePath).openStream();  
            IOUtils.copyBytes(in,System.out,4096,false);  
        } finally{  
            IOUtils.closeStream(in);  
        }  
    }
}

教材第323页

hbase> list
public static void listTables() throws IOException {
    init();//建立连接
    HTableDescriptor hTableDescriptors[] = admin.listTables();
    for(HTableDescriptor hTableDescriptor :hTableDescriptors){
        System.out.println("表名:"+hTableDescriptor.getNameAsString());
    }
    close();//关闭连接
}

教材第324页

hbase> scan 's1'
//在终端打印出指定的表的所有记录数据
public static void getData(String tableName)throws  IOException{
    init();
    Table table = connection.getTable(TableName.valueOf(tableName));
    Scan scan = new Scan();
    ResultScanner scanner = table.getScanner(scan);
    for (Result result:scanner){
        printRecoder(result);
    }
    close();
}
//打印一条记录的详情
public  static void printRecoder(Result result)throws IOException{
    for(Cell cell:result.rawCells()){
        System.out.print("行健: "+new String(CellUtil.cloneRow(cell)));
        System.out.print("列簇: "+new String(CellUtil.cloneFamily(cell)));
        System.out.print(" 列: "+new String(CellUtil.cloneQualifier(cell)));
        System.out.print(" 值: "+new String(CellUtil.cloneValue(cell)));
        System.out.println("时间戳: "+cell.getTimestamp());
    }
}
hbase> create 's1','score'

教材第325页

hbase> put 's1','zhangsan','score:Math','69'
hbase> delete 's1','zhangsan','score:Math'
//向表添加数据
public static void insertRow(String tableName,String rowKey,String colFamily,String col,String val) throws IOException {
    init();
    Table table = connection.getTable(TableName.valueOf(tableName));
    Put put = new Put(rowKey.getBytes());
    put.addColumn(colFamily.getBytes(), col.getBytes(), val.getBytes());
    table.put(put);
    table.close();
    close();
}
    insertRow("s1",'zhangsan','score','Math','69')

//删除数据
public static void deleteRow(String tableName,String rowKey,String colFamily,String col) throws IOException {
    init();
    Table table = connection.getTable(TableName.valueOf(tableName));
    Delete delete = new Delete(rowKey.getBytes());
    //删除指定列族
    delete.addFamily(Bytes.toBytes(colFamily));
    //删除指定列
    delete.addColumn(Bytes.toBytes(colFamily),Bytes.toBytes(col));
    table.delete(delete);
    table.close();
    close();
}
deleteRow("s1",'zhangsan','score','Math')
hbase> truncate 's1'

教材第326页

//清空指定的表的所有记录数据
public static void clearRows(String tableName)throws IOException{
    init();
    TableName tablename = TableName.valueOf(tableName);
    admin.disableTable(tablename);
    admin.deleteTable(tablename);
    HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
    admin.createTable(hTableDescriptor);
    close();
}
hbase> count 's1'
//(5)统计表的行数
public static void countRows(String tableName)throws IOException{
    init();
    Table table = connection.getTable(TableName.valueOf(tableName));
    Scan scan = new Scan();
    ResultScanner scanner = table.getScanner(scan);
    int num = 0;
    for (Result result = scanner.next();result!=null;result=scanner.next()){
        num++;
    }
    System.out.println("行数:"+ num);
    scanner.close();
    close();
}

教材第327页

hbase> create 'Student','S_No','S_Name','S_Sex','S_Age'
hbase>put 'Student','s001','S_No','2015001'
hbase>put 'Student','s001','S_Name','Zhangsan'
hbase>put 'Student','s001','S_Sex','male'
hbase>put 'Student','s001','S_Age','23'
hbase>put 'Student','s002','S_No','2015002'
hbase>put 'Student','s002','S_Name','Mary'
hbase>put 'Student','s002','S_Sex','female'
hbase>put 'Student','s002','S_Age','22'
hbase>put 'Student','s003','S_No','2015003'
hbase>put 'Student','s003','S_Name','Lisi'
hbase>put 'Student','s003','S_Sex','male'
hbase>put 'Student','s003','S_Age','24'
hbase> create 'Course','C_No','C_Name','C_Credit'

教材第328页

hbase>put 'Course','c001','C_No','123001'
hbase>put 'Course','c001','C_Name','Math'
hbase>put 'Course','c001','C_Credit','2.0'
hbase>put 'Course','c002','C_No','123002'
hbase>put 'Course','c002','C_Name','Computer'
hbase>put 'Course','c002','C_Credit','5.0'
hbase>put 'Course','c003','C_No','123003'
hbase>put 'Course','c003','C_Name','English'
hbase>put 'Course','c003','C_Credit','3.0'
hbase> create 'SC','SC_Sno','SC_Cno','SC_Score'
hbase>put 'SC','sc001','SC_Sno','2015001'
hbase>put 'SC','sc001','SC_Cno','123001'
hbase>put 'SC','sc001','SC_Score','86'
hbase>put 'SC','sc002','SC_Sno','2015001'
hbase>put 'SC','sc002','SC_Cno','123003'
hbase>put 'SC','sc002','SC_Score','69'
hbase>put 'SC','sc003','SC_Sno','2015002'
hbase>put 'SC','sc003','SC_Cno','123002'
hbase>put 'SC','sc003','SC_Score','77'
hbase>put 'SC','sc004','SC_Sno','2015002'
hbase>put 'SC','sc004','SC_Cno','123003'
hbase>put 'SC','sc004','SC_Score','99'
hbase>put 'SC','sc005','SC_Sno','2015003'
hbase>put 'SC','sc005','SC_Cno','123001'
hbase>put 'SC','sc005','SC_Score','98'
hbase>put 'SC','sc006','SC_Sno','2015003'
hbase>put 'SC','sc006','SC_Cno','123002'
hbase>put 'SC','sc006','SC_Score','95'

教材第329页

public static void createTable(String tableName,String[] fields) throws IOException {
    init();
    TableName tablename = TableName.valueOf(tableName);
    if(admin.tableExists(tablename)){
        System.out.println("table is exists!");
        admin.disableTable(tablename);
        admin.deleteTable(tablename);//删除原来的表
    }
    HTableDescriptor hTableDescriptor = new HTableDescriptor(tablename);
    for(String str:fields){
        HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(str);
        hTableDescriptor.addFamily(hColumnDescriptor);
    }
    admin.createTable(hTableDescriptor);
    close();
}
public static void addRecord(String tableName,String row,String[] fields,String[] values) throws IOException {
    init();
    Table table = connection.getTable(TableName.valueOf(tableName));
    for(int i = 0;i != fields.length;i++){
        Put put = new Put(row.getBytes());
        String[] cols = fields[i].split(":");
        put.addColumn(cols[0].getBytes(), cols[1].getBytes(), values[i].getBytes());
        table.put(put);
    }
    table.close();
    close();
}

教材第330页

public static void scanColumn(String tableName,String column)throws  IOException{
    init();
    Table table = connection.getTable(TableName.valueOf(tableName));
    Scan scan = new Scan();
    scan.addFamily(Bytes.toBytes(column));
    ResultScanner scanner = table.getScanner(scan);
    for (Result result = scanner.next(); result != null; result = scanner.next()){
        showCell(result);
    }
    table.close();
    close();
}
//格式化输出
public static void showCell(Result result){
    Cell[] cells = result.rawCells();
    for(Cell cell:cells){
        System.out.println("RowName:"+new String(CellUtil.cloneRow(cell))+" ");
        System.out.println("Timetamp:"+cell.getTimestamp()+" ");
        System.out.println("column Family:"+new String(CellUtil.cloneFamily(cell))+" ");
        System.out.println("row Name:"+new String(CellUtil.cloneQualifier(cell))+" ");
        System.out.println("value:"+new String(CellUtil.cloneValue(cell))+" ");
    }
}

教材第331页

public static void modifyData(String tableName,String row,String column,String val)throws IOException{
    init();
    Table table = connection.getTable(TableName.valueOf(tableName));
    Put put = new Put(row.getBytes());
    put.addColumn(column.getBytes(),null,val.getBytes());
    table.put(put);
    table.close();
    close();
}
public static void deleteRow(String tableName,String row)throws IOException{
    init();
    Table table = connection.getTable(TableName.valueOf(tableName));
    Delete delete = new Delete(row.getBytes());
    //删除指定列族
    //delete.addFamily(Bytes.toBytes(colFamily));
    //删除指定列
    //delete.addColumn(Bytes.toBytes(colFamily),Bytes.toBytes(col));
    table.delete(delete);
    table.close();
    close();
}

教材第332页

mysql>create table student(
    name varchar(30) not null,
    English tinyint unsigned not null,
    Math tinyint unsigned not null,
    Computer tinyint unsigned not null
    );
mysql>insert into student values("zhangsan",69,86,77);
mysql>insert into student values("lisi",55,100,88);
mysql>select * from student;

教材第333页

mysql>select name , Computer from student where name = "zhangsan";
mysql>update student set Math=95 where name="lisi";
import java.sql.*;
public class mysql_test {

    /**
     * @param args
     */
    //JDBC DRIVER and DB
    static final String  DRIVER="com.mysql.jdbc.Driver";
    static final String DB="jdbc:mysql://localhost/test";
    //Database auth
    static final String USER="root";
    static final String PASSWD="root";

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Connection conn=null;
        Statement stmt=null;
        try {
            //加载驱动程序
            Class.forName(DRIVER);
            System.out.println("Connecting to a selected database...");
            //打开一个连接
            conn=DriverManager.getConnection(DB, USER, PASSWD);
            //执行一个查询
            stmt=conn.createStatement();
            String sql="insert into student values('scofield',45,89,100)";
            stmt.executeUpdate(sql);
            System.out.println("Inserting records into the table successfully!");
        } catch (ClassNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally
        {
            if(stmt!=null)
                try {
                    stmt.close();
                } catch (SQLException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            if(conn!=null)
                try {
                    conn.close();
                } catch (SQLException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
        }
    }
}

教材第335页

import java.sql.*;
public class mysql_qurty {

    /**
     * @param args
     */
    //JDBC DRIVER and DB
    static final String  DRIVER="com.mysql.jdbc.Driver";
    static final String DB="jdbc:mysql://localhost/test";
    //Database auth
    static final String USER="root";
    static final String PASSWD="root";

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Connection conn=null;
        Statement stmt=null;
        ResultSet rs=null;
        try {
            //加载驱动程序
            Class.forName(DRIVER);
            System.out.println("Connecting to a selected database...");
            //打开一个连接
            conn=DriverManager.getConnection(DB, USER, PASSWD);
            //执行一个查询
            stmt=conn.createStatement();
            String sql="select name,English from student where name='scofield' ";
            //获得结果集
            rs=stmt.executeQuery(sql);
            System.out.println("name"+"\t\t"+"English");
            while(rs.next())
            {
                System.out.print(rs.getString(1)+"\t\t");
                System.out.println(rs.getInt(2));
            }
        } catch (ClassNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally
        {
            if(rs!=null)
                try {
                    rs.close();
                } catch (SQLException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
            if(stmt!=null)
                try {
                    stmt.close();
                } catch (SQLException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            if(conn!=null)
                try {
                    conn.close();
                } catch (SQLException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
        }
    }
}

教材第336页

hbase>create 'student','score'

教材第337页

hbase>put 'student','zhangsan','score:English','69'
hbase>put 'student','zhangsan','score:Math','86'
hbase>put 'student','zhangsan','score:Computer','77'
hbase>put 'student','lisi','score:English','55'
hbase>put 'student','lisi','score:Math','100'
hbase>put 'student','lisi','score:Computer','88'
hbase>scan 'student'
hbase>get 'student','zhangsan','score:Computer'

教材第338页

hbase>put 'student','lisi','score:Math','95'
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;


public class hbase_insert {

    /**
     * @param args
     */
      public static Configuration configuration;
      public static Connection connection;
      public static Admin admin;
    public static void main(String[] args) {
        // TODO Auto-generated method stub
         configuration  = HBaseConfiguration.create();
         configuration.set("hbase.rootdir","hdfs://localhost:9000/hbase");
         try{
             connection = ConnectionFactory.createConnection(configuration);
             admin = connection.getAdmin();
         }catch (IOException e){
             e.printStackTrace();
         }
         try {
            insertRow("student","scofield","score","English","45");
            insertRow("student","scofield","score","Math","89");
            insertRow("student","scofield","score","Computer","100");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
         close();
    }
     public static void insertRow(String tableName,String rowKey,String colFamily,
             String col,String val) throws IOException {
         Table table = connection.getTable(TableName.valueOf(tableName));
         Put put = new Put(rowKey.getBytes());
         put.addColumn(colFamily.getBytes(), col.getBytes(), val.getBytes());
         table.put(put);
         table.close();
     }
      public static void close(){
            try{
                if(admin != null){
                    admin.close();
                }
                if(null != connection){
                    connection.close();
                }
            }catch (IOException e){
                e.printStackTrace();
            }
        }
}

教材第340页

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;

public class hbase_query {

    /**
     * @param args
     */
      public static Configuration configuration;
      public static Connection connection;
      public static Admin admin;
    public static void main(String[] args) {
        // TODO Auto-generated method stub
         configuration  = HBaseConfiguration.create();
         configuration.set("hbase.rootdir","hdfs://localhost:9000/hbase");
         try{
             connection = ConnectionFactory.createConnection(configuration);
             admin = connection.getAdmin();
         }catch (IOException e){
             e.printStackTrace();
         }
         try {
            getData("student","scofield","score","English");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
         close();
    }
      public static void getData(String tableName,String rowKey,String colFamily,
              String col)throws  IOException{
          Table table = connection.getTable(TableName.valueOf(tableName));
          Get get = new Get(rowKey.getBytes());
          get.addColumn(colFamily.getBytes(),col.getBytes());
          Result result = table.get(get);
          showCell(result);
          table.close();
      }
      public static void showCell(Result result){
          Cell[] cells = result.rawCells();
          for(Cell cell:cells){
              System.out.println("RowName:"+new String(CellUtil.cloneRow(cell))+" ");
              System.out.println("Timetamp:"+cell.getTimestamp()+" ");
              System.out.println("column Family:"+new String(CellUtil.cloneFamily(cell))+" ");
              System.out.println("row Name:"+new String(CellUtil.cloneQualifier(cell))+" ");
              System.out.println("value:"+new String(CellUtil.cloneValue(cell))+" ");
          }
      }
      public static void close(){
            try{
                if(admin != null){
                    admin.close();
                }
                if(null != connection){
                    connection.close();
                }
            }catch (IOException e){
                e.printStackTrace();
            }
        }
}

教材第342页

hset student.zhangsan English 69
hset student.zhangsan Math 86
hset student.zhangsan Computer 77
hset student.lisi English 55
hset student.lisi Math 100
hset student.lisi Computer 88
hgetall student.zhangsan

教材第343页

hgetall student.lisi
hget student.zhangsan Computer
hset student.lisi Math 95

教材第344页

import java.util.Map;
import redis.clients.jedis.Jedis;

public class jedis_test {

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Jedis jedis = new Jedis("localhost");
        jedis.hset("student.scofield", "English","45");
        jedis.hset("student.scofield", "Math","89");
        jedis.hset("student.scofield", "Computer","100");
        Map<String,String>  value = jedis.hgetAll("student.scofield");
        for(Map.Entry<String, String> entry:value.entrySet())
        {
            System.out.println(entry.getKey()+":"+entry.getValue());
        }
    }
}
import java.util.Map;
import redis.clients.jedis.Jedis;

public class jedis_query {

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Jedis jedis = new Jedis("localhost");
        String value=jedis.hget("student.scofield", "English");
        System.out.println("scofield's English score is:    "+value);
    }
}

教材第345页

use student
var stus=[
{"name":"zhangsan","scores":{"English":69,"Math":86,"Computer":77}},                    {"name":"lisi","score":{"English":55,"Math":100,"Computer":88}} ]
db.student.insert(stus)

教材第346页

db.student.find().pretty()
db.student.find({"name":"zhangsan"},{"_id":0,"name":0})

教材第347页

db.student.update({"name":"lisi"}, {"$set":{"score.Math":95}} )
import java.util.ArrayList;
import java.util.List;

import org.bson.Document;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;

public class mongo_insert {

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        //实例化一个mongo客户端
        MongoClient  mongoClient=new MongoClient("localhost",27017);
        //实例化一个mongo数据库
        MongoDatabase mongoDatabase = mongoClient.getDatabase("student");
        //获取数据库中某个集合
        MongoCollection<Document> collection = mongoDatabase.getCollection("student");
        //实例化一个文档,内嵌一个子文档
        Document document=new Document("name","scofield").
                append("score", new Document("English",45).
                        append("Math", 89).
                        append("Computer", 100));
        List<Document> documents = new ArrayList<Document>();  
        documents.add(document);  
        //将文档插入集合中
        collection.insertMany(documents);  
        System.out.println("文档插入成功"); 
    }

教材第348页

import java.util.ArrayList;
import java.util.List;

import org.bson.Document;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import static com.mongodb.client.model.Filters.eq;
public class mongo_query {

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        //实例化一个mongo客户端
        MongoClient  mongoClient=new MongoClient("localhost",27017);
        //实例化一个mongo数据库
        MongoDatabase mongoDatabase = mongoClient.getDatabase("student");
        //获取数据库中某个集合
        MongoCollection<Document> collection = mongoDatabase.getCollection("student");
        //进行数据查找,查询条件为name=scofield, 对获取的结果集只显示score这个域
        MongoCursor<Document>  cursor=collection.find( new Document("name","scofield")).
                projection(new Document("score",1).append("_id", 0)).iterator();
        while(cursor.hasNext())
            System.out.println(cursor.next().toJson());
    }
}

教材第350页

package com.Merge;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Merge {

    /**
     * @param args
     * 对A,B两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C
     */
    //重载map函数,直接将输入中的value复制到输出数据的key上
    public static class Map extends Mapper<Object, Text, Text, Text>{
        private static Text text = new Text();
        public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
            text = value;
            context.write(text, new Text(""));
        }
    }

    //重载reduce函数,直接将输入中的key复制到输出数据的key上
    public static class Reduce extends Reducer<Text, Text, Text, Text>{
        public void reduce(Text key, Iterable<Text> values, Context context ) throws IOException,InterruptedException{
            context.write(key, new Text(""));
        }
    }

    public static void main(String[] args) throws Exception{

        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://localhost:9000");
        String[] otherArgs = new String[]{"input","output"}; /* 直接设置输入参数 */
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in><out>");
            System.exit(2);
            }
        Job job = Job.getInstance(conf,"Merge and duplicate removal");
        job.setJarByClass(Merge.class);
        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

教材第353页

package com.MergeSort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class MergeSort {

    /**
     * @param args
     * 输入多个文件,每个文件中的每行内容均为一个整数
     * 输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数
     */
    //map函数读取输入中的value,将其转化成IntWritable类型,最后作为输出key
    public static class Map extends Mapper<Object, Text, IntWritable, IntWritable>{

        private static IntWritable data = new IntWritable();
        public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
            String text = value.toString();
            data.set(Integer.parseInt(text));
            context.write(data, new IntWritable(1));
        }
    }

    //reduce函数将map输入的key复制到输出的value上,然后根据输入的value-list中元素的个数决定key的输出次数,定义一个全局变量line_num来代表key的位次
    public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
        private static IntWritable line_num = new IntWritable(1);

        public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException{
            for(IntWritable val : values){
                context.write(line_num, key);
                line_num = new IntWritable(line_num.get() + 1);
            }
        }
    }

    //自定义Partition函数,此函数根据输入数据的最大值和MapReduce框架中Partition的数量获取将输入数据按照大小分块的边界,然后根据输入数值和边界的关系返回对应的Partiton ID
    public static class Partition extends Partitioner<IntWritable, IntWritable>{
        public int getPartition(IntWritable key, IntWritable value, int num_Partition){
            int Maxnumber = 65223;//int型的最大数值
            int bound = Maxnumber/num_Partition+1;
            int keynumber = key.get();
            for (int i = 0; i<num_Partition; i++){
                if(keynumber<bound * (i+1) && keynumber>=bound * i){
                    return i;
                }
            }
            return -1;
        }
    }

    public static void main(String[] args) throws Exception{
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://localhost:9000");
        String[] otherArgs = new String[]{"input","output"}; /* 直接设置输入参数 */
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in><out>");
            System.exit(2);
            }
Job job = Job.getInstance(conf,"Merge and sort");
        job.setJarByClass(MergeSort.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setPartitionerClass(Partition.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

教材第356页

package com.simple_data_mining;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class simple_data_mining {
    public static int time = 0;

    /**
     * @param args
     * 输入一个child-parent的表格
     * 输出一个体现grandchild-grandparent关系的表格
     */
    //Map将输入文件按照空格分割成child和parent,然后正序输出一次作为右表,反序输出一次作为左表,需要注意的是在输出的value中必须加上左右表区别标志
    public static class Map extends Mapper<Object, Text, Text, Text>{
        public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
            String child_name = new String();
            String parent_name = new String();
            String relation_type = new String();
            String line = value.toString();
            int i = 0;
            while(line.charAt(i) != ' '){
                i++;
            }
            String[] values = {line.substring(0,i),line.substring(i+1)};
            if(values[0].compareTo("child") != 0){
                child_name = values[0];
                parent_name = values[1];
                relation_type = "1";//左右表区分标志
                context.write(new Text(values[1]), new Text(relation_type+"+"+child_name+"+"+parent_name));
                //左表
                relation_type = "2";
                context.write(new Text(values[0]), new Text(relation_type+"+"+child_name+"+"+parent_name));
                //右表
            }
        }
    }

    public static class Reduce extends Reducer<Text, Text, Text, Text>{
        public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{
            if(time == 0){   //输出表头
                context.write(new Text("grand_child"), new Text("grand_parent"));
                time++;
            }
            int grand_child_num = 0;
            String grand_child[] = new String[10];
            int grand_parent_num = 0;
            String grand_parent[]= new String[10];
            Iterator ite = values.iterator();
            while(ite.hasNext()){
                String record = ite.next().toString();
                int len = record.length();
                int i = 2;
                if(len == 0) continue;
                char relation_type = record.charAt(0);
                String child_name = new String();
                String parent_name = new String();
                //获取value-list中value的child

                while(record.charAt(i) != '+'){
                    child_name = child_name + record.charAt(i);
                    i++;
                }
                i=i+1;
                //获取value-list中value的parent
                while(i<len){
                    parent_name = parent_name+record.charAt(i);
                    i++;
                }
                //左表,取出child放入grand_child
                if(relation_type == '1'){
                    grand_child[grand_child_num] = child_name;
                    grand_child_num++;
                }
                else{//右表,取出parent放入grand_parent
                    grand_parent[grand_parent_num] = parent_name;
                    grand_parent_num++;
                }
            }

            if(grand_parent_num != 0 && grand_child_num != 0 ){
                for(int m = 0;m<grand_child_num;m++){
                    for(int n=0;n<grand_parent_num;n++){
                        context.write(new Text(grand_child[m]), new Text(grand_parent[n]));
                        //输出结果
                    }
                }
            }
        }
    }
    public static void main(String[] args) throws Exception{
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://localhost:9000");
        String[] otherArgs = new String[]{"input","output"}; /* 直接设置输入参数 */
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in><out>");
            System.exit(2);
            }
Job job = Job.getInstance(conf,"Single table join");
        job.setJarByClass(simple_data_mining.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
}