改成9-1,,,谢谢大神,PS,遥感图像处理理

Spark SQL & Spark Hive编程开发, 并和Hive执行效率对比 – 过往记忆
欢迎关注Hadoop、Spark、Flink、Hive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop。
文章总数:792
浏览总数:9,377,718
评论:4814
分类目录:91 个
注册用户数:2496
最后更新:日
欢迎关注微信公共帐号:iteblog_hadoop
关注iteblog_hadoop公众号,并在评论区留言并且留言点赞数排名前5名的粉丝,各免费赠送一本《Druid实时大数据分析原理与实践》,活动截止至3月14日19:00,心动不如行动。
   SQL也公布了很久,今天写了个程序来看下 SQL、Spark 以及直接用执行的效率进行了对比。以上测试都是跑在YARN上。
  首先我们来看看我的环境:
3台DataNode,2台NameNode,每台机器20G内存,24核
数据都是lzo格式的,共336个文件,338.6 G
无其他任务执行
如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop
三个测试都是执行
select count(*), host, module
from ewaplog
group by host, module
order by host,
下面我们先来看看Spark SQL核心的代码(关于Spark SQL的详细介绍请参见Spark官方文档,这里我就不介绍了。):
* User: 过往记忆
* Date: 14-8-13
* Time: 下午23:16
* 本文地址:/archives/1090
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
public static class Entry implements Serializable {
public Entry(String host, String module, String content) {
this.host =
this.module =
this.content =
public String getHost() {
public void setHost(String host) {
this.host =
public String getModule() {
public void setModule(String module) {
this.module =
public String getContent() {
public void setContent(String content) {
this.content =
public String toString() {
return &[& + host + &\t& + module + &\t& + content + &]&;
JavaSparkContext ctx = ...
JavaSQLContext sqlCtx = ...
JavaRDD&Entry& stringJavaRDD = ctx.textFile(args[0]).map(
new Function&String, Entry&() {
public Entry call(String str) throws Exception {
String[] split = str.split(&\u0001&);
if (split.length & 3) {
return new Entry(&&, &&, &&);
return new Entry(split[0], split[1], split[2]);
JavaSchemaRDD schemaPeople = sqlCtx.applySchema(stringJavaRDD, Entry.class);
schemaPeople.registerAsTable(&entry&);
JavaSchemaRDD teenagers = sqlCtx.sql(&select count(*), host, module & +
&from entry & +
&group by host, module & +
&order by host, module&);
List&String& teenagerNames = teenagers.map(new Function&Row, String&() {
public String call(Row row) {
return row.getLong(0) + &\t& +
row.getString(1) + &\t& + row.getString(2);
}).collect();
for (String name : teenagerNames) {
System.out.println(name);
Spark 核心代码:
* User: 过往记忆
* Date: 14-8-23
* Time: 下午23:16
* 本文地址:/archives/1090
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
JavaHiveContext hiveContext =....;
JavaSchemaRDD result = hiveContext.hql(&select count(*), host, module & +
&from ewaplog & +
&group by host, module & +
&order by host, module&);
List&Row& collect = result.collect();
for (Row row : collect) {
System.out.println(row.get(0) + &\t& + row.get(1) + &\t& + row.get(2));
  大家可以看到Spark Hive核心代码里面的SQL语句和直接在Hive上面执行一样,在执行这个代码的时候,需要确保ewaplog存在。而且在运行这个程序的时候需要依赖Hive的一些jar包,需要依赖Hive的元数据等信息。对Hive的依赖比较大。而Spark SQL直接读取lzo文件,并没有涉及到Hive,相比Spark Hive依赖性这方便很好。Spark SQL直接读取lzo文件,然后将数据存放在RDD中,applySchema方法将JavaRDD转换成JavaSchemaRDD,我们来看看文档是怎么来描述的
  At the core of this component is a new type of RDD, SchemaRDD. SchemaRDDs are composed Row objects along with a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table in a traditional relational database. A SchemaRDD can be created from an existing RDD, Parquet file, a JSON dataset, or by running HiveQL against data stored in Apache Hive.
转换成JavaSchemaRDD之后,我们可以用用registerAsTable将它注册到表中,之后就可以通过JavaSQLContext的sql方法来执行相应的sql语句了。
  用Maven编译完上面的程序之后,放到Hadoop集群上面运行:
iteblog@Spark $ spark-submit --master yarn-cluster
--jars lib/spark-sql_2.10-1.0.0.jar
--class SparkSQLTest
--queue queue1
./spark-1.0-SNAPSHOT.jar
/home/wyp/test/*.lzo
分别经过了20分钟左右的时间,Spark SQL和Spark Hive都可以运行完,结果如下:
bokingserver1 CN1_hbase_android_client
bokingserver1 CN1_hbase_iphone_client
bokingserver2 CN1_hbase_android_client
bokingserver2 CN1_hbase_iphone_client
bokingserver3 CN1_hbase_android_client
bokingserver3 CN1_hbase_iphone_client
bokingserver4 CN1_hbase_android_client
bokingserver4 CN1_hbase_iphone_client
bokingserver5 CN1_hbase_android_client
bokingserver5 CN1_hbase_iphone_client
bokingserver6 CN1_hbase_android_client
bokingserver6 CN1_hbase_iphone_client
bokingserver7 CN1_hbase_android_client
bokingserver7 CN1_hbase_iphone_client
bokingserver8 CN1_hbase_android_client
bokingserver8 CN1_hbase_iphone_client
bokingserver9 CN1_hbase_android_client
bokingserver9 CN1_hbase_iphone_client
bokingserver10 CN1_hbase_android_client
bokingserver10 CN1_hbase_iphone_client
bokingserver11 CN1_hbase_android_client
bokingserver11 CN1_hbase_iphone_client
bokingserver12 CN1_hbase_android_client
bokingserver12 CN1_hbase_iphone_client
  为了比较基于Spark的任务确实比基于Mapreduce的快,我特意用Hive执行了同样的任务,如下:
hive& select count(*), host, module from ewaplog
& group by host, module order by host,
Job 0: Map: 2845
Reduce: 364
Cumulative CPU: 17144.59 sec
HDFS Read:
HDFS Write: 36516 SUCCESS
Job 1: Map: 1
Cumulative CPU: 4.82 sec
HDFS Read: 114193 HDFS Write: 1260 SUCCESS
Total MapReduce CPU Time Spent: 0 days 4 hours 45 minutes 49 seconds 410 msec
bokingserver1 CN1_hbase_android_client
bokingserver1 CN1_hbase_iphone_client
bokingserver2 CN1_hbase_android_client
bokingserver2 CN1_hbase_iphone_client
bokingserver3 CN1_hbase_android_client
bokingserver3 CN1_hbase_iphone_client
bokingserver4 CN1_hbase_android_client
bokingserver4 CN1_hbase_iphone_client
bokingserver5 CN1_hbase_android_client
bokingserver5 CN1_hbase_iphone_client
bokingserver6 CN1_hbase_android_client
bokingserver6 CN1_hbase_iphone_client
bokingserver7 CN1_hbase_android_client
bokingserver7 CN1_hbase_iphone_client
bokingserver8 CN1_hbase_android_client
bokingserver8 CN1_hbase_iphone_client
bokingserver9 CN1_hbase_android_client
bokingserver9 CN1_hbase_iphone_client
bokingserver10 CN1_hbase_android_client
bokingserver10 CN1_hbase_iphone_client
bokingserver11 CN1_hbase_android_client
bokingserver11 CN1_hbase_iphone_client
bokingserver12 CN1_hbase_android_client
bokingserver12 CN1_hbase_iphone_client
Time taken:
seconds, Fetched: 24 row(s)
  从上面的显示我们可以看出,Hive执行同样的任务用了30分钟,而Spark用了20分钟,也就是省了1/3的时间,还是很快的。在运行的过程中,我发现Spark消耗内存比较大,在程序运行期间,三个子节点负载很高,整个队列的资源消耗了一半以上。我想如果集群的机器数据更多的话,Spark的运行速度应该还会有一些提升。好了今天就说到这,欢迎关注本博客。
优秀人才不缺工作机会,只缺适合自己的好机会。但是他们往往没有精力从海量机会中找到最适合的那个。
100offer 会对平台上的人才和企业进行严格筛选,让「最好的人才」和「最好的公司」相遇。
注册 100offer,谈谈你对下一份工作的期待。一周内,收到 5-10 个满足你要求的好机会!
本博客文章除特别声明,全部都是原创!
禁止个人和公司转载本文、谢谢理解:
下面文章您可能感兴趣Spark SQL操作Hive数据库 -
- ITeye技术网站
博客分类:
本次例子通过scala编程实现Spark SQL操作Hive数据库!
Hadoop集群搭建:
Spark集群搭建:
数据准备
在/usr/local/sparkApps/SparkSQL2Hive/resources/目录下创建people.txt内容如下,name和age之间是"\t"分割
Michael&&& 20
Andy&&& 17
Justin&&& 19
创建份数peopleScores.txt,内容如下,name和score之间用“\t”分割
Michael&&& 98
Andy&&& 95
Justin&&& 68
代码实现
package com.imf.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.HiveContext
* 通过spark sql操作hive数据源
object SparkSQL2Hive {
def main(args: Array[String]): Unit = {
val conf = new SparkConf();
conf.setAppName("SparkSQL2Hive for scala")
conf.setMaster("spark://master1:7077")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
//用户年龄
hiveContext.sql("use testdb")
hiveContext.sql("DROP TABLE IF EXISTS people")
hiveContext.sql("CREATE TABLE IF NOT EXISTS people(name STRING, age INT)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n'")
//把本地数据加载到hive中(实际上发生了数据拷贝),也可以直接使用HDFS中的数据
hiveContext.sql("LOAD DATA LOCAL INPATH '/usr/local/sparkApps/SparkSQL2Hive/resources/people.txt' INTO TABLE people")
//用户份数
hiveContext.sql("use testdb")
hiveContext.sql("DROP TABLE IF EXISTS peopleScores")
hiveContext.sql("CREATE TABLE IF NOT EXISTS peopleScores(name STRING, score INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n'")
hiveContext.sql("LOAD DATA LOCAL INPATH '/usr/local/sparkApps/SparkSQL2Hive/resources/peopleScore.txt' INTO TABLE peopleScores")
* 通过HiveContext使用join直接基于hive中的两种表进行操作
val resultDF = hiveContext.sql("select pi.name,pi.age,ps.score "
+" from people pi join peopleScores ps on pi.name=ps.name"
+" where ps.score&90");
* 通过saveAsTable创建一张hive managed table,数据的元数据和数据即将放的具体位置都是由
* hive数据仓库进行管理的,当删除该表的时候,数据也会一起被删除(磁盘的数据不再存在)
hiveContext.sql("drop table if exists peopleResult")
resultDF.saveAsTable("peopleResult")
* 使用HiveContext的table方法可以直接读取hive数据仓库的Table并生成DataFrame,
* 接下来机器学习、图计算、各种复杂的ETL等操作
val dataframeHive = hiveContext.table("peopleResult")
dataframeHive.show()
调度脚本
并将上面的程序打包成SparkSQL2Hive.jar,将SparkSQL2Hive.jar拷贝到/usr/local/sparkApps/SparkSQL2Hive/目录下面,并创建调度脚本run.sh,内容如下:
/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit \
--class com.imf.spark.sql.SparkSQL2Hive \
--files /usr/local/hive/apache-hive-1.2.1-bin/conf/hive-site.xml \
--master spark://master1:7077 \
/usr/local/sparkApps/SparkSQL2Hive/SparkSQL2Hive.jar
#如果已经将msyql的驱动放到了spark的lib目录下面,则不用在添加下面的mysql的驱动了
#--driver-class-path /usr/local/hive/apache-hive-1.2.1-bin/lib/mysql-connector-java-5.1.35-bin.jar \
详细执行的日志见附件 run.log
用hive来查看表内容和执行结果
root@master1:/usr/local/tools# hive
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/hadoop/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/spark/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/hadoop/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/spark/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Logging initialized using configuration in jar:file:/usr/local/hive/apache-hive-1.2.1-bin/lib/hive-common-1.2.1.jar!/hive-log4j.properties
Time taken: 1.013 seconds, Fetched: 2 row(s)
Time taken: 0.103 seconds
peopleresult
peoplescores
tmp_pre_hour_seach_info
Time taken: 0.082 seconds, Fetched: 9 row(s)
hive& select *
Time taken: 1.252 seconds, Fetched: 3 row(s)
hive& select *
Time taken: 0.142 seconds, Fetched: 3 row(s)
hive& select *
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Time taken: 0.298 seconds, Fetched: 2 row(s)
至此,通过SparkSQL操作hive数据库成功!
下载次数: 3
浏览: 56521 次
来自: 上海
target jvm版本也要选择正确。不能选择太高。2.10对 ...}

我要回帖

更多关于 图像处理软件 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信