Hame是Google Pregel的开源实现,与Hadoop适合于分布式大数据处理不同,Hama主要用于分布式的矩阵、graph、网络算法的计算。简单说,Hama是在HDFS上实现的BSP(Bulk Synchronous Parallel)计算框架,弥补Hadoop在计算能力上的不足。本教程主要介绍hama的单机模式安装配置以及用hama解决一些算法问题。
Hama单机环境安装配置
- 安装好合适版本的jdk和hadoop,并且进行测试,保证他们能用。我的java jdk 是1.7版本,hadoop是2.6.0版本。
- 下载hama安装文件,从http://hama.apache.org/downloads.html 处下载合适的版本,我当时下的是0.7.0版本的。
- 下载文件后,运用命令 sudo tar -zxf ~/下载/hama-dist-0.7.0.tar.gz -C /usr/local 解压至 /usr/local/hama ,再运用命令 sudo mv ./hama-0.7.0/ ./hama 将文件夹名改为hama。
- 进入hama中的conf文件夹,修改hama-env.sh文件,在其中加入java的home路径,即加入:
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64 - 修改 hama-site.xml文件,这时hama配置的核心文件,具体内容如下:
<configuration>
<property>
<name>bsp.master.address</name>
<value>local</value>
<description>The address of the bsp master server. Either the
literal string "local" or a host:port for distributed mode
</description>
</property>
<property>
<name>fs.default.name</name>
<value>file:///</value>
<description>
The name of the default file system. Either the literal string
"local" or a host:port for HDFS.
</description>
</property>
<property>
<name>hama.zookeeper.quorum</name>
<value>localhost</value>
<description>Comma separated list of servers in the ZooKeeper Quorum.
For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com".
By default this is set to localhost for local and pseudo-distributed modes
of operation. For a fully-distributed setup, this should be set to a full
list of ZooKeeper quorum servers. If HAMA_MANAGES_ZK is set in hama-env.sh
this is the list of servers which we will start/stop zookeeper on.
</description>
</property>
</configuration>
其中,bsp.master.address即bsp中的BSPMaster的地址和端口。这里因为是单机模式,所以ip地址为本机地址。fs.default.name这个值要特别注意,是hadoop中nameNode的地址和端口,因为hama要用到hadoop的hdfs分布式文件系统。剩下的俩个是zookeeper的相关配置。因为是单机模式下配置,这里简单的配置为本机的ip地址,端口一般是固定的。
6.另外,在conf文件夹下还有一个groomservers文件,这个在分布式环境下配置groomserver的地址,在单机模式下就不用配置了,里面默认值为localhost。同时,你也可以在~/.bashrc中添加hama的环境变量,这样每次启动就不同转到相应的目录下去了。
Hama单机模式实例
1、PageRank
(1)生成 randomgraph,运行如下命令:
./bin/hama jar hama-examples-0.7.0.jar gen fastgen -v 100 -e 10 -o randomgraph -t 2
生成的文件位于 /usr/local/hama 下的 randomgraph。它表示100个节点,1000条边的数据,存储在两个文件中。
(2)执行pagerank
./bin/hama jar hama-examples-0.7.0.jar pagerank -i randomgraph -o pagerankresult -t 4
运行结果截图如下:
单机模式下,数据读取都是在本地文件系统,不需要读取HDFS中的文件。
2、计算PI
(1)首先设置classpath,运行命令
export classpath=$classpath:/usr/local/hama/:/usr/local/hama/lib/
(2)然后对代码进行编译,运行命令
javac -classpath $classpath PiEstimator.java
(3)接着进行打包并命名为PiEstimator.jar
jar -cvf PiEstimator.jar *.class
(4)最后运行命令计算PI
./bin/hama jar PiEstimator.jar PiEstimator
其中计算PI源码如下:
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.ClusterStatus;
import org.apache.hama.bsp.FileOutputFormat;
import org.apache.hama.bsp.NullInputFormat;
import org.apache.hama.bsp.TextOutputFormat;
import org.apache.hama.bsp.sync.SyncException;
public class PiEstimator
{
private static Path TMP_OUTPUT = new Path("/tmp/pi-"
+ System.currentTimeMillis());
public static class MyEstimator
extends
BSP<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable>
{
public static final Log LOG = LogFactory.getLog(MyEstimator.class);
private String masterTask;
private static final int iterations = 10000;
@Override
public void bsp(
BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
throws IOException, SyncException, InterruptedException
{
int in = 0;
for (int i = 0; i < iterations; i++)
{
double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
if ((Math.sqrt(x * x + y * y) < 1.0))
{
in++;
}
}
double data = 4.0 * in / iterations;
peer.send(masterTask, new DoubleWritable(data));
peer.sync();
}
@Override
public void setup(
BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
throws IOException
{
// Choose one as a master
this.masterTask = peer.getPeerName(peer.getNumPeers() / 2);
}
@Override
public void cleanup(
BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
throws IOException
{
if (peer.getPeerName().equals(masterTask))
{
double pi = 0.0;
int numPeers = peer.getNumCurrentMessages();
DoubleWritable received;
while ((received = peer.getCurrentMessage()) != null)
{
pi += received.get();
}
pi = pi / numPeers;
peer.write(new Text("Estimated value of PI is"),
new DoubleWritable(pi));
}
}
}
static void printOutput(HamaConfiguration conf) throws IOException
{
FileSystem fs = FileSystem.get(conf);
FileStatus[] files = fs.listStatus(TMP_OUTPUT);
for (int i = 0; i < files.length; i++)
{
if (files[i].getLen() > 0)
{
FSDataInputStream in = fs.open(files[i].getPath());
IOUtils.copyBytes(in, System.out, conf, false);
in.close();
break;
}
}
fs.delete(TMP_OUTPUT, true);
}
public static void main(String[] args) throws InterruptedException,
IOException, ClassNotFoundException
{
// BSP job configuration
HamaConfiguration conf = new HamaConfiguration();
BSPJob bsp = new BSPJob(conf, PiEstimator.class);
// Set the job name
bsp.setJobName("Pi Estimation Example");
bsp.setBspClass(MyEstimator.class);
bsp.setInputFormat(NullInputFormat.class);
bsp.setOutputKeyClass(Text.class);
bsp.setOutputValueClass(DoubleWritable.class);
bsp.setOutputFormat(TextOutputFormat.class);
FileOutputFormat.setOutputPath(bsp, TMP_OUTPUT);
BSPJobClient jobClient = new BSPJobClient(conf);
ClusterStatus cluster = jobClient.getClusterStatus(true);
if (args.length > 0)
{
bsp.setNumBspTask(Integer.parseInt(args[0]));
} else
{
// Set to maximum
bsp.setNumBspTask(cluster.getMaxTasks());
}
long startTime = System.currentTimeMillis();
if (bsp.waitForCompletion(true))
{
printOutput(conf);
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0
+ " seconds");
}
}
}