Hama图计算模型

大数据学习路线图

Hame是Google Pregel的开源实现,与Hadoop适合于分布式大数据处理不同,Hama主要用于分布式的矩阵、graph、网络算法的计算。简单说,Hama是在HDFS上实现的BSP(Bulk Synchronous Parallel)计算框架,弥补Hadoop在计算能力上的不足。本教程主要介绍hama的单机模式安装配置以及用hama解决一些算法问题。

Hama单机环境安装配置

  1. 安装好合适版本的jdk和hadoop,并且进行测试,保证他们能用。我的java jdk 是1.7版本,hadoop是2.6.0版本。
  2. 下载hama安装文件,从http://hama.apache.org/downloads.html 处下载合适的版本,我当时下的是0.7.0版本的。
  3. 下载文件后,运用命令 sudo tar -zxf ~/下载/hama-dist-0.7.0.tar.gz -C /usr/local 解压至 /usr/local/hama ,再运用命令 sudo mv ./hama-0.7.0/ ./hama 将文件夹名改为hama。
  4. 进入hama中的conf文件夹,修改hama-env.sh文件,在其中加入java的home路径,即加入:
    export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
  5. 修改 hama-site.xml文件,这时hama配置的核心文件,具体内容如下:
  <configuration>
    <property>
      <name>bsp.master.address</name>
      <value>local</value>
      <description>The address of the bsp master server. Either the
      literal string "local" or a host:port for distributed mode
      </description>
    </property>

    <property>
      <name>fs.default.name</name>
      <value>file:///</value>
      <description>
        The name of the default file system. Either the literal string
        "local" or a host:port for HDFS.
      </description>
    </property>

    <property>
      <name>hama.zookeeper.quorum</name>
      <value>localhost</value>
      <description>Comma separated list of servers in the ZooKeeper Quorum.
      For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com".
      By default this is set to localhost for local and pseudo-distributed modes
      of operation. For a fully-distributed setup, this should be set to a full
      list of ZooKeeper quorum servers. If HAMA_MANAGES_ZK is set in hama-env.sh
      this is the list of servers which we will start/stop zookeeper on.
      </description>
    </property>
  </configuration>

其中,bsp.master.address即bsp中的BSPMaster的地址和端口。这里因为是单机模式,所以ip地址为本机地址。fs.default.name这个值要特别注意,是hadoop中nameNode的地址和端口,因为hama要用到hadoop的hdfs分布式文件系统。剩下的俩个是zookeeper的相关配置。因为是单机模式下配置,这里简单的配置为本机的ip地址,端口一般是固定的。
6.另外,在conf文件夹下还有一个groomservers文件,这个在分布式环境下配置groomserver的地址,在单机模式下就不用配置了,里面默认值为localhost。同时,你也可以在~/.bashrc中添加hama的环境变量,这样每次启动就不同转到相应的目录下去了。

Hama单机模式实例

1、PageRank
(1)生成 randomgraph,运行如下命令:
./bin/hama jar hama-examples-0.7.0.jar gen fastgen -v 100 -e 10 -o randomgraph -t 2

生成的文件位于 /usr/local/hama 下的 randomgraph。它表示100个节点,1000条边的数据,存储在两个文件中。

(2)执行pagerank
./bin/hama jar hama-examples-0.7.0.jar pagerank -i randomgraph -o pagerankresult -t 4
运行结果截图如下:

单机模式下,数据读取都是在本地文件系统,不需要读取HDFS中的文件。

2、计算PI
(1)首先设置classpath,运行命令
export classpath=$classpath:/usr/local/hama/:/usr/local/hama/lib/
(2)然后对代码进行编译,运行命令
javac -classpath $classpath PiEstimator.java
(3)接着进行打包并命名为PiEstimator.jar
jar -cvf PiEstimator.jar *.class
(4)最后运行命令计算PI
./bin/hama jar PiEstimator.jar PiEstimator

其中计算PI源码如下:

import java.io.IOException;  

import org.apache.commons.logging.Log;  
import org.apache.commons.logging.LogFactory;  
import org.apache.hadoop.fs.FSDataInputStream;  
import org.apache.hadoop.fs.FileStatus;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.DoubleWritable;  
import org.apache.hadoop.io.IOUtils;  
import org.apache.hadoop.io.NullWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hama.HamaConfiguration;  
import org.apache.hama.bsp.BSP;  
import org.apache.hama.bsp.BSPJob;  
import org.apache.hama.bsp.BSPJobClient;  
import org.apache.hama.bsp.BSPPeer;  
import org.apache.hama.bsp.ClusterStatus;  
import org.apache.hama.bsp.FileOutputFormat;  
import org.apache.hama.bsp.NullInputFormat;  
import org.apache.hama.bsp.TextOutputFormat;  
import org.apache.hama.bsp.sync.SyncException;  

public class PiEstimator  
{  
    private static Path TMP_OUTPUT = new Path("/tmp/pi-"  
        + System.currentTimeMillis());  

    public static class MyEstimator  
        extends  
        BSP<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable>  
    {  
    public static final Log LOG = LogFactory.getLog(MyEstimator.class);  
    private String masterTask;  
    private static final int iterations = 10000;  

    @Override  
    public void bsp(  
        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)  
        throws IOException, SyncException, InterruptedException  
    {  

        int in = 0;  
        for (int i = 0; i < iterations; i++)  
        {  
        double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;  
        if ((Math.sqrt(x * x + y * y) < 1.0))  
        {  
            in++;  
        }  
        }  

        double data = 4.0 * in / iterations;  

        peer.send(masterTask, new DoubleWritable(data));  
        peer.sync();  
    }  

    @Override  
    public void setup(  
        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)  
        throws IOException  
    {  
        // Choose one as a master  
        this.masterTask = peer.getPeerName(peer.getNumPeers() / 2);  
    }  

    @Override  
    public void cleanup(  
        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)  
        throws IOException  
    {  
        if (peer.getPeerName().equals(masterTask))  
        {  
        double pi = 0.0;  
        int numPeers = peer.getNumCurrentMessages();  
        DoubleWritable received;  
        while ((received = peer.getCurrentMessage()) != null)  
        {  
            pi += received.get();  
        }  

        pi = pi / numPeers;  
        peer.write(new Text("Estimated value of PI is"),  
            new DoubleWritable(pi));  
        }  
    }  
    }  

    static void printOutput(HamaConfiguration conf) throws IOException  
    {  
    FileSystem fs = FileSystem.get(conf);  
    FileStatus[] files = fs.listStatus(TMP_OUTPUT);  
    for (int i = 0; i < files.length; i++)  
    {  
        if (files[i].getLen() > 0)  
        {  
        FSDataInputStream in = fs.open(files[i].getPath());  
        IOUtils.copyBytes(in, System.out, conf, false);  
        in.close();  
        break;  
        }  
    }  

    fs.delete(TMP_OUTPUT, true);  
    }  

    public static void main(String[] args) throws InterruptedException,  
        IOException, ClassNotFoundException  
    {  
    // BSP job configuration  
    HamaConfiguration conf = new HamaConfiguration();  

    BSPJob bsp = new BSPJob(conf, PiEstimator.class);  
    // Set the job name  
    bsp.setJobName("Pi Estimation Example");  
    bsp.setBspClass(MyEstimator.class);  
    bsp.setInputFormat(NullInputFormat.class);  
    bsp.setOutputKeyClass(Text.class);  
    bsp.setOutputValueClass(DoubleWritable.class);  
    bsp.setOutputFormat(TextOutputFormat.class);  
    FileOutputFormat.setOutputPath(bsp, TMP_OUTPUT);  

    BSPJobClient jobClient = new BSPJobClient(conf);  
    ClusterStatus cluster = jobClient.getClusterStatus(true);  

    if (args.length > 0)  
    {  
        bsp.setNumBspTask(Integer.parseInt(args[0]));  
    } else  
    {  
        // Set to maximum  
        bsp.setNumBspTask(cluster.getMaxTasks());  
    }  

    long startTime = System.currentTimeMillis();  
    if (bsp.waitForCompletion(true))  
    {  
        printOutput(conf);  
        System.out.println("Job Finished in "  
            + (System.currentTimeMillis() - startTime) / 1000.0  
            + " seconds");  
    }  
    }  
}