Spark SQL简介
Spark SQL是Spark用来操作结构化和半结构化数据的接口。Spark SQL会使得针对这些数据的读取和查询变得更加简单高效。具体来说,Spark SQL提供了以下三大功能。
- Spark SQL可以从各种结构化数据源(如JSON、Hive、Parquet等)中读取数据。
- Spark SQL不仅支持在Spark程序内使用SQL语句进行数据查询,也支持外部工具通过标准数据库连接器(JDBC/ODBC)连接Spark SQL进行查询。
- 当在Spark程序内使用Spark SQL时,Spark SQL支持SQL与常规的Python/Java/Scala代码高度整合,包括连接RDD与SQL表、公开的自定义SQL函数接口等。
为了实现这些功能,Spark SQL提供了一种特殊的RDD,叫作SchemaRDD。SchemaRDD是存放Row对象的RDD,每个Row对象代表一行记录。SchemaRDD还包含记录的结构信息(即数据字段)。SchemaRDD可以利用结构信息更加高效的存储数据。此外,SchemaRDDR还支持RDD上没有的一些新操作,比如运行SQL查询。SchemaRDDR可以从外部数据源创建,也可以从查询结果或普通RDD中创建。
连接Spark SQL
Apache Hive是Hadoop上的SQL引擎,Spark SQL编译时可以包含Hive支持,也可以不包含。包含Hive支持的Spark SQL可以支持Hive表访问、UDF(用户自定义函数)、SerDe(序列化格式和反序列化格式),以及Hive查询语言(HiveQL/HQL)。需要强调的是,如果要在Spark SQL中包含Hive的库,并不需要事先安装Hive。一般来说,最好还是在编译 Spark SQL 时引入 Hive 支持,这样就可以使用这些特性了。如果你下载的是二进制版本的 Spark,它应该已经在编译时添加了 Hive 支持。而如果你是从代码编译Spark,你应该使用 sbt/sbt -Phive assembly 编译,以打开 Hive 支持。
当使用 Spark SQL 进行编程时,根据是否使用 Hive 支持,有两个不同的入口。推荐使用的入口是 HiveContext,它可以提供 HiveQL 以及其他依赖于 Hive 的功能的支持。更为基础的 SQLContext 则支持 Spark SQL 功能的一个子集,子集中去掉了需要依赖于 Hive 的功能。
若要把 Spark SQL 连接到一个部署好的 Hive 上,你必须把 hive-site.xml 复制到Spark 的配置文件目录中($SPARK_HOME/conf)。即使没有部署好 Hive,Spark SQL 也可以运行。,如果你没有部署好 Hive,Spark SQL 会在当前的工作目录中创建出自己的Hive 元数据仓库,叫作 metastore_db。如果创建表,这些表会被放在你默认的文件系统中的/user/hive/warehouse 目录中(如果你的 classpath 中有配好的 hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)
在应用中使用Spark SQL
在Spark SQL应用中使用Spark SQL可以轻松读取数据并使用SQL查询,同时还能把这一过程和普通的Python/Java/Scala程序代码结合在一起。
要以这种方式使用 Spark SQL,需要基于已有的 SparkContext 创建出一个 HiveContext(如果使用的是去除了 Hive 支持的 Spark 版本,则创建出 SQLContext)。这个上下文环境提供了对Spark SQL 的数据进行查询和交互的额外函数。使用 HiveContext 可以创建出表示结构化数据的 SchemaRDD,并且使用 SQL 或是类似 map() 的普通 RDD 操作来操作这些 SchemaRDD。
- Scala中SQL的import声明
// 创建Spark SQL的HiveContext val hiveCtx = ... // 导入隐式转换支持 import hiveCtx._
- Java中SQL的import声明
// 导入Spark SQL import org.apache.spark.sql.hive.HiveContext; // 当不能使用hive依赖时 import org.apache.spark.sql.SQLContext; // 导入JavaSchemaRDD import org.apache.spark.sql.SchemaRDD; import org.apache.spark.sql.Row;
- Python中SQL的import声明
# 导入Spark SQL from pyspark.sql import HiveContext, Row # 当不能引入hive依赖时 from pyspark.sql import SQLContext, Row
添加import声明后,需要创建出一个HiveContext对象。如果无法引入Hive依赖就创建出一个SQLContext对象作为SQL的上下文环境。这两个类都需要传入一个SparkContext对象作为运行的基础。
- 在Scala中创建SQL上下文环境
val sc = new SparkContext(...) val hiveCtx = new HiveContext(sc)
- 在java中创建SQL上下文环境
JavaSparkContext ctx = new JavaSparkContext(...); SQLContext sqlCtx = new HiveContext(ctx);
- 在python中创建SQL上下文环境
hiveCtx = HiveContext(sc)
有了HiveContext或者SQLContext之后,我们就可以准备读取数据并进行查询了。
基本查询示例
要在一张数据表上进行查询,需要调用HiveContext或者SQLContext中的sql()
方法。通过SchemaRDD的registerTempTable()
方法可以把任意SchemaRDD注册为临时表,这样就可以使用HiveContext.sql或SQLContext.sql来对它进行查询了。
需要注意的是临时表是当前使用的HiveContext或SQLContext中的临时变量,在你的应用退出时这些临时表就不再存在了。
- 在Scala中读取并查询
val input = hiveCtx.jsonFile(inputFile) // 注册输入的SchemaRDD input.registerTempTable("tweets") // 依据retweetCount(转发计数)选出推文 val topTweets = hiveCtx.sql("SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10")
- 在java中读取并查询
SchemaRDD input = hiveCtx.jsonFile(inputFile); // 注册输入的SchemaRDD input.registerTempTable("tweets"); // 依据retweetCount(转发计数)选出推文 SchemaRDD topTweets = hiveCtx.sql("SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10");
- 在python中读取并查询
input = hiveCtx.jsonFile(inputFile) # 注册输入的SchemaRDD input.registerTempTable("tweets") # 依据retweetCount(转发计数)选出推文 topTweets = hiveCtx.sql("""SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10""")
将集群中已装好的Hive的配置文件hive-site.xml复制到$SPARK_HOME/conf目录下,那么你也可以直接运行hiveCtx.sql来查询已有的Hive表。
SchemaRDD可以存储的数据类型
缓存
Spark SQL的缓存机制与Spark中稍有不同。为了确保使用更节约内存的表示方式缓存而不是存储整个对象,应当使用专门的hiveCtx.cacheTable("tableName")
方法。当缓存数据表时,Spark SQL使用一种列式存储格式在内存中表示数据。这些缓存下来的表只会在驱动器程序的生命周期里保留在内存中,如果驱动器进程退出,就需要重新缓存数据。另外可以使用HiveQL/SQL语句来缓存表。需要运行CACHE TABLE tableName
或UNCACHE TABLE tableName
来缓存表或者删除已有的缓存。这种方式在JDBC服务器的命令行客户端中很常用。
读取和存储数据
Spark SQL支持多种结构化数据源,可以让你跳过复杂的读取过程,轻松从各种数据源中读取到Row对象。这些数据源包括Hive表、JSON和Parquet文件。当你使用SQL查询这些数据源中的数据并且只用到了一部分字段时,Spark SQL可以智能地只扫描这些用到的字段。
- Apache Hive
要把Spark SQL连接到已经部署好的额Hive上,你需要把hive-site.xml文件复制到Spark的./conf/目录下即可。
使用python从Hive读取
from pyspark.sql import HiveContext hiveCtx = HiveContext(sc) rows = hiveCtx.sql("SELECT key, value FROM mytable") keys = rows.map(lamda row: row[0])
使用Scala从Hive读取
import org.apache.spark.sql.hive.HiveContext val hiveCtx = new HiveContext(sc) val rows = hiveCtx.sql("SELECT key, value FROM mytable") val keys = rows.map(row => row.getInt(0))
使用Java从Hive读取
import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.sql.Row; import org.apache.spark.sql.SchemaRDD; HiveContext hiveCtx = new HiveContext(sc); SchemaRDD rows = hiveCtx.sql("SELECT key, value FROM mytable"); JavaRDD<Integer> keys = rdd.toJavaRDD().map(new Function<Row, Integer>() { public Integer call(Row row) { return row.getInt(0); } });
- Parquet
Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。Parquet格式经常在Hadoop生态圈中被使用,它也支持Spark SQL的全部数据类型。
通过HiveContext.parquetFile或者SQLContext.parquetFile来读取数据。
python中Parquet数据读取# 从一个有name和favouriteAnimal字段的Parquet文件中读取数据 rows = hiveCtx.parquetFile(parquetFile) names = rows.map(lambda row: row.name) print "Everyone" print names.collect() # 寻找熊猫爱好者 tbl = rows.registerTempTable("people") pandaFriends = hiveCtx.sql("SELECT name FROM people WHERE favouriteAnimal = \"panda\"") print "Panda friends" print pandaFriends.map(lambda row: row.name).collect()
python中的Parquet文件保存
pandaFriends.saveAsParquetFile("hdfs://...")
- JSON
Spark SQL可以通过扫描JSON文件推测出结构信息,并且让你可以使用名字访问对应字段。
要读取JSON数据,只要调用hiveCtx中的jsonFile()
方法即可,如果你想活动从数据中推断出来的结构信息,可以在生成的SchemaRDD上调用printSchema()
方法。# 输入记录 {"name": "Holden"} {"name": "Sparky The Bear", "lovesPandas":true,"knows": {"friends":["holden"]}} # 读取JSON数据 input = hiveCtx.jsonFile(inputFile) # 输出结构信息 input.printSchema()
在Scala中读取JSON数据
val input = hiveCtx.jsonFile(inputFile)
在Java中读取JSON数据
SchemaRDD input = hiveCtx.jsonFile(jsonFile);
如何访问嵌套字段和数据字段 如果使用python,或已经把数据注册为一张SQL表,你可以通过 . 来访问各个嵌套层级的嵌套元素(比如toplevel.nextlevel)。可以通过用[element]指定下标来访问数组中的元素。
- 基于RDD
除了读取数据,也可以基于RDD创建SchemaRDD。
在python中,可以创建一个由Row对象组成的RDD,然后调用inferSchema()
方法得到SchemaRDD。happyPeopleRDD = sc.parallelize([Row(name="holden", favouriteBeverage="coffee")]) happyPeopleSchemaRDD = hiveCtx.inferSchema(happyPeopleRDD) happyPeopleSchemaRDD.registerTempTable("happy_people")
在Scala中基于case calss创建SchemaRDD
case class HappyPerson(handle: String, favouriteBeverage: String) ... // 创建了一个人的对象,并且把它转成SchemaRDD val happyPeopleRDD = sc.parallelize(List(HappyPerson("holden", "coffee"))) // 注意:此处发生了隐式转换 // 该转换等价于sqlCtx.createSchemaRDD(happyPeopleRDD) happyPeopleRDD.registerTempTable("happy_people")
在Java中,可以调用
applySchema()
把RDD转为SchemaRDD,只需要这个RDD中的数据类型带有公有的getter和setter方法,并且可以被序列化。class HappyPerson implements Serializable { private String name; private String favouriteBeverage; public HappyPerson() {} public HappyPerson(String n, String b) { name = n; favouriteBeverage = b; } public String getName() { return name; } public void setName(String n) { name = n; } public String getFavouriteBeverage() { return favouriteBeverage; } public void setFavouriteBeverage(String b) { favouriteBeverage = b; } }; ... ArrayList<HappyPerson> peopleList = new ArrayList<HappyPerson>(); peopleList.add(new HappyPerson("holden", "coffee")); JavaRDD<HappyPerson> happyPeopleRDD = sc.parallelize(peopleList); SchemaRDD happyPeopleSchemaRDD = hiveCtx.applySchema(happyPeopleRDD, HappyPerson.class); happyPeopleSchemaRDD.registerTempTable("happy_people");
JDBC/ODBC服务器
JDBC服务器作为一个独立的Spark驱动器程序运行,可以在多个用户之间共享。任意一个客户端都可以在内存中缓存数据表,对表进行查询。集群的资源以及缓存数据都在所有用户之间共享。
Spark SQL的JDBC服务器可以通过Spark目录中的sbin/start-thriftserver.sh
启动。默认情况下,服务会在localhost:10000上进行监听,我们可以通过环境变量(HIVE_SERVER2_THRIFT_PORT和HIVE_SERVER2_THRIFT_BIND_HOST)修改这些设置,也可以通过Hive配置文件server2.thrift.port和hive.server2.thrift.bind.host)来修改。也可以通过命令行参数–hiveconf property=value来设置Hive参数。
启动JDBC服务器
./sbin/start-thriftserver.sh --master sparkMaster
使用Beeline连接JDBC服务器
beeline -u jdbc:hive2://localhost:10000
-
使用Beeline 在Beeline客户端中,可以使用标准的HiveQL命令来创建、列举以及查询数据表。
-
长生命周期的表与查询 使用Spark SQL的JDBC服务器的优点之一就是我们可以在多个不同程序之间共享缓存下来的数据表。只需要注册该数据表并对其进行CACHE命令,就可以利用缓存了。
spark-sql是一个单独的shell进程,这个shell会连接到你设置在conf/hive-site.xml中的Hive的元数据仓库,如果不存在,会在本地新建一个。这个shell对于本地开发比较有用。在共享的集群上,应该使用JDBC服务器,让各用户通过beeline进行连接。
Spark SQL UDF
在Scala和Python中,可以利用语言原生的函数和lambda语法的支持,在Java中,则需要扩展对应的UDF类。UDF能够支持各种数据类型,返回类型也可以与调用时的参数类型完全不一样。
python版本字符串长度UDF
# 写一个求字符串长度的UDF
hiveCtx.registerFunction("strLenPython", lambda x: len(x), IntegerType())
lengthSchemaRDD = hiveCtx.sql("SELECT strLenPython('text') FROM tweets LIMIT 10")
Scala版本的字符串长度UDF
registerFunction("strLenScala", (_: String).length)
val tweetLength = hiveCtx.sql("SELECT strLenScala('tweet') FROM tweets LIMIT 10")
java版本字符串长度UDF
// 导入UDF函数类以及数据类型
// 注意: 这些import路径可能会在将来的发行版中改变
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
hiveCtx.udf().register("stringLengthJava", new UDF1<String, Integer>() {
@Override
public Integer call(String str) throws Exception {
return str.length();
}
}, DataTypes.IntegerType);
SchemaRDD tweetLength = hiveCtx.sql(
"SELECT stringLengthJava('text') FROM tweets LIMIT 10");
List<Row> lengths = tweetLength.collect();
for (Row row : result) {
System.out.println(row.get(0));
}
Spark SQL支持已有的Hive UDF。标准的Hive UDF已经自动包含在了Spark SQL中。若自定义的Hive UDF,需要确保该UDF所在JAR包已经在应用中。若使用JDBC服务器,也可以使用–jars命令标记来添加JAR包。
使用Hive UDF,应该使用HiveContext,而不能使用SQLContext。要注册一个Hive UDF,只需调用hiveCtx.sql("CREATE TEMPORARY FUNCTION name AS class.function")
。sql中内容为Hive UDF创建临时函数的命令。
参考资料:《Spark快速大数据分析》