版本说明:
spark 2.3
前言
前几天在群里摸鱼的时候,碰都一位同学问了一个比较有趣的问题,他提问:Spark如何读取原生JSON?看到这个问题,心里有些疑惑,Spark不是有JSON数据源支持吗,怎么这里还要问如何读取原生JSON,这原生JSON又是什么鬼?经过交流才明白,原来他所说的原生JSON是类似如下这种格式:
{
"昌平区东小": [
116.4021289,
40.05688698
],
"昌平区回龙": [
116.3412241,
40.07942604
],
"昌平区北七": [
116.4179459,
40.11644403
],
"昌平区阳坊": [
116.1332611,
40.13701398
]
}
而我们平时使用Spark读取的JSON是如下格式
{"address":"昌平区东小","location":[116.4021289,40.05688698]}
{"address":"昌平区回龙","location":[116.3412241, 40.07942604]}
{"address":"昌平区北七","location":[116.4179459,40.11644403]}
{"address":"昌平区阳坊","location":[116.1332611,40.13701398]}
如果使用spark.read.json直接读取他所说的”原生JSON”格式的文件,将会报如下错误:
感觉这个问题还是比较有趣的,所以自己就亲自试试了,找了种解决方案。
1 需求说明
上文中,我们已经可以看出问题所在。其实就是将所谓的”原生JSON”格式的文件读取出来转成如下格式的DataFrame。
+-------+-------------------------+
|address|location |
+-------+-------------------------+
|昌平区北七 |[116.4179459,40.11644403]|
|昌平区回龙 |[116.3412241,40.07942604]|
|昌平区阳坊 |[116.1332611,40.13701398]|
|昌平区东小 |[116.4021289,40.05688698]|
+-------+-------------------------+
上文也提到如果我们的JSON文件时如下格式的
{"address":"昌平区东小","location":[116.4021289,40.05688698]}
{"address":"昌平区回龙","location":[116.3412241, 40.07942604]}
{"address":"昌平区北七","location":[116.4179459,40.11644403]}
{"address":"昌平区阳坊","location":[116.1332611,40.13701398]}
那么使用Spark很容易读出来,并转成DataFrame
spark.read.json("/Users/shirukai/hollysys/repository/learn-demo-spark/data/location.json").show(false)
那么”原生JSON”格式的该如何处理呢,我带着这个需求,进行了一些尝试。
2 方案一:手动解析JSON
这个方案的大体思路是:读取文本内容,手动解析JSON,然后使用flatmap算子转换,最后生成DataFrame。这个方案也是最先想到的,感觉也是最容易实现的,但是在实现的时候却遇到了一个小问题,导致我花费了几个小时的时间才解决,其实主要是因为基本功不扎实,走了不少弯路。先说一下遇到的问题吧,这里也做一个小记录,读取文本的时候,直接使用了read.text(),结果默认按照行分隔符生成了多条记录,拿不到一条完整的JSON。其实我的目的就是将整个文本当做一条记录来处理,一开始想着指定别的行分隔符就能拿到一条记录了,百度、google了好多方法没有结果。傻逼了半天,看了看源码,发现:
/**
* Loads text files and returns a `DataFrame` whose schema starts with a string column named
* "value", and followed by partitioned columns if there are any.
*
* By default, each line in the text files is a new row in the resulting DataFrame. For example:
* {{{
* // Scala:
* spark.read.text("/path/to/spark/README.md")
*
* // Java:
* spark.read().text("/path/to/spark/README.md")
* }}}
*
* You can set the following text-specific option(s) for reading text files:
* <ul>
* <li>`wholetext` (default `false`): If true, read a file as a single row and not split by "\n".
* </li>
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
* that should be used for parsing.</li>
* </ul>
*
* @param paths input paths
* @since 1.6.0
*/
@scala.annotation.varargs
def text(paths: String*): DataFrame = format("text").load(paths : _*)
wholetext这个参数不就是我想要嘛,疏忽了疏忽了竟然忘了这一茬,早在之前整理过Spark的各种数据源,但是只是简单的读取和写入,对于需要的参数没有研究,所以这几天又参照源码,重新整理了《SparkSQL数据源操作》这个笔记,将每个数据源支持的Option参数都完整的整理了出来。
解决了上述读取文件的问题之后,也事半功倍了,后面只需要使用FastJSON加载数据,按key分行即可。如下代码所示:
import spark.implicits._
import scala.collection.JavaConverters._
val text = spark.read
.option("wholetext", value = true)
.text("/Users/shirukai/hollysys/repository/learn-demo-spark/data/location.json").as[String]
val jsonDF = text.flatMap(line => {
val json = JSON.parseObject(line)
val keys = json.keySet()
keys.asScala.map(key => {
(key, json.getString(key))
})
})
jsonDF.toDF("address", "location").show(false)
2 方案二:自定义数据源
思路:继承DataSourceV2接口自定义数据源。
针对这一类型的数据单独写一个数据源,其实这类型的就是K,V格式的,k做成一列、v做成一列,如果需要经常用到处理这种数据,可以自己写一个数据源。这里只是简单写一下步骤,真正读取数据的时候比较复杂。
2.1 创建一个名为KVJSONDataSource的数据源
代码如下所示,读取文件实际上会复杂,涉及到读本地文件和HDFS文件等等。这里只是简单的读取了一下本地文件。
package com.hollysys.spark.sql.datasource.kvjson
import java.util
import com.alibaba.fastjson.JSON
import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, DataSourceReader}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport}
import org.apache.spark.sql.types.StructType
import scala.io.Source
/**
* Created by shirukai on 2019-06-15 10:50
* K、V格式JSON数据源
*/
class KVJSONDataSource extends DataSourceV2 with ReadSupport {
override def createReader(options: DataSourceOptions): DataSourceReader = new KVJSONDataSourceReader(options)
}
class KVJSONDataSourceReader(options: DataSourceOptions) extends DataSourceReader {
var requiredSchema: StructType = StructType.fromDDL(options.get("schema").get())
override def readSchema(): StructType = requiredSchema
override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] = {
import collection.JavaConverters._
Seq(
new KVJSONDataSourceReaderFactory(options.get("path").get()).asInstanceOf[DataReaderFactory[Row]]
).asJava
}
}
class KVJSONDataSourceReaderFactory(path: String) extends DataReaderFactory[Row] {
override def createDataReader(): DataReader[Row] = new KVJSONDataReader(path)
}
class KVJSONDataReader(path: String) extends DataReader[Row] {
val data: Iterator[Seq[AnyRef]] = readData
override def next(): Boolean = data.hasNext
override def get(): Row = {
val line = data.next()
Row(line: _*)
}
override def close(): Unit = {}
def readData: Iterator[Seq[AnyRef]] = {
import scala.collection.JavaConverters._
val source = Source.fromFile(path)
val jsonStr = source.mkString
val json = JSON.parseObject(jsonStr)
val keys = json.keySet()
keys.asScala.map(key => {
Seq(key, json.getString(key))
}).toIterator
}
}
2.2 使用自定义数据源
数据源自定义完成后,我们就可以使用了,使用方式很简单,如下代码:
spark.read.format("com.hollysys.spark.sql.datasource.kvjson.KVJSONDataSource")
.option("schema","`address` STRING,`location` STRING")
.option("path","/Users/shirukai/hollysys/repository/learn-demo-spark/data/location.json")
.load().show(false)
输出
+-------+-------------------------+
|address|location |
+-------+-------------------------+
|昌平区北七 |[116.4179459,40.11644403]|
|昌平区东小 |[116.4021289,40.05688698]|
|昌平区回龙 |[116.3412241,40.07942604]|
|昌平区阳坊 |[116.1332611,40.13701398]|
+-------+-------------------------+
3 总结
目前只想到两种方案,其实第二种方案有点大材小用了,如果大家有更好的方案,欢迎交流。