SPARK August 24, 2020

Spark读取JSON的小扩展

Words count 8.2k Reading time 7 mins. Read count 0

版本说明:

spark 2.3

前言

前几天在群里摸鱼的时候,碰都一位同学问了一个比较有趣的问题,他提问:Spark如何读取原生JSON?看到这个问题,心里有些疑惑,Spark不是有JSON数据源支持吗,怎么这里还要问如何读取原生JSON,这原生JSON又是什么鬼?经过交流才明白,原来他所说的原生JSON是类似如下这种格式:

{
  "昌平区东小": [
    116.4021289,
    40.05688698
  ],
  "昌平区回龙": [
    116.3412241,
    40.07942604
  ],
  "昌平区北七": [
    116.4179459,
    40.11644403
  ],
  "昌平区阳坊": [
    116.1332611,
    40.13701398
  ]
}

而我们平时使用Spark读取的JSON是如下格式

{"address":"昌平区东小","location":[116.4021289,40.05688698]}
{"address":"昌平区回龙","location":[116.3412241, 40.07942604]}
{"address":"昌平区北七","location":[116.4179459,40.11644403]}
{"address":"昌平区阳坊","location":[116.1332611,40.13701398]}

如果使用spark.read.json直接读取他所说的”原生JSON”格式的文件,将会报如下错误:

感觉这个问题还是比较有趣的,所以自己就亲自试试了,找了种解决方案。

1 需求说明

上文中,我们已经可以看出问题所在。其实就是将所谓的”原生JSON”格式的文件读取出来转成如下格式的DataFrame。

+-------+-------------------------+
|address|location                 |
+-------+-------------------------+
|昌平区北七  |[116.4179459,40.11644403]|
|昌平区回龙  |[116.3412241,40.07942604]|
|昌平区阳坊  |[116.1332611,40.13701398]|
|昌平区东小  |[116.4021289,40.05688698]|
+-------+-------------------------+

上文也提到如果我们的JSON文件时如下格式的

{"address":"昌平区东小","location":[116.4021289,40.05688698]}
{"address":"昌平区回龙","location":[116.3412241, 40.07942604]}
{"address":"昌平区北七","location":[116.4179459,40.11644403]}
{"address":"昌平区阳坊","location":[116.1332611,40.13701398]}

那么使用Spark很容易读出来,并转成DataFrame

spark.read.json("/Users/shirukai/hollysys/repository/learn-demo-spark/data/location.json").show(false)

那么”原生JSON”格式的该如何处理呢,我带着这个需求,进行了一些尝试。

2 方案一:手动解析JSON

这个方案的大体思路是:读取文本内容,手动解析JSON,然后使用flatmap算子转换,最后生成DataFrame。这个方案也是最先想到的,感觉也是最容易实现的,但是在实现的时候却遇到了一个小问题,导致我花费了几个小时的时间才解决,其实主要是因为基本功不扎实,走了不少弯路。先说一下遇到的问题吧,这里也做一个小记录,读取文本的时候,直接使用了read.text(),结果默认按照行分隔符生成了多条记录,拿不到一条完整的JSON。其实我的目的就是将整个文本当做一条记录来处理,一开始想着指定别的行分隔符就能拿到一条记录了,百度、google了好多方法没有结果。傻逼了半天,看了看源码,发现:

  /**
   * Loads text files and returns a `DataFrame` whose schema starts with a string column named
   * "value", and followed by partitioned columns if there are any.
   *
   * By default, each line in the text files is a new row in the resulting DataFrame. For example:
   * {{{
   *   // Scala:
   *   spark.read.text("/path/to/spark/README.md")
   *
   *   // Java:
   *   spark.read().text("/path/to/spark/README.md")
   * }}}
   *
   * You can set the following text-specific option(s) for reading text files:
   * <ul>
   * <li>`wholetext` (default `false`): If true, read a file as a single row and not split by "\n".
   * </li>
   * <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
   * that should be used for parsing.</li>
   * </ul>
   *
   * @param paths input paths
   * @since 1.6.0
   */
  @scala.annotation.varargs
  def text(paths: String*): DataFrame = format("text").load(paths : _*)

wholetext这个参数不就是我想要嘛,疏忽了疏忽了竟然忘了这一茬,早在之前整理过Spark的各种数据源,但是只是简单的读取和写入,对于需要的参数没有研究,所以这几天又参照源码,重新整理了《SparkSQL数据源操作》这个笔记,将每个数据源支持的Option参数都完整的整理了出来。

解决了上述读取文件的问题之后,也事半功倍了,后面只需要使用FastJSON加载数据,按key分行即可。如下代码所示:

    import spark.implicits._
    import scala.collection.JavaConverters._
    val text = spark.read
      .option("wholetext", value = true)
      .text("/Users/shirukai/hollysys/repository/learn-demo-spark/data/location.json").as[String]

    val jsonDF = text.flatMap(line => {
      val json = JSON.parseObject(line)
      val keys = json.keySet()
      keys.asScala.map(key => {
        (key, json.getString(key))
      })
    })
    jsonDF.toDF("address", "location").show(false)

2 方案二:自定义数据源

思路:继承DataSourceV2接口自定义数据源。

针对这一类型的数据单独写一个数据源,其实这类型的就是K,V格式的,k做成一列、v做成一列,如果需要经常用到处理这种数据,可以自己写一个数据源。这里只是简单写一下步骤,真正读取数据的时候比较复杂。

2.1 创建一个名为KVJSONDataSource的数据源

代码如下所示,读取文件实际上会复杂,涉及到读本地文件和HDFS文件等等。这里只是简单的读取了一下本地文件。

package com.hollysys.spark.sql.datasource.kvjson

import java.util

import com.alibaba.fastjson.JSON
import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, DataSourceReader}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport}
import org.apache.spark.sql.types.StructType

import scala.io.Source

/**
  * Created by shirukai on 2019-06-15 10:50
  * K、V格式JSON数据源
  */
class KVJSONDataSource extends DataSourceV2 with ReadSupport {
  override def createReader(options: DataSourceOptions): DataSourceReader = new KVJSONDataSourceReader(options)
}

class KVJSONDataSourceReader(options: DataSourceOptions) extends DataSourceReader {
  var requiredSchema: StructType = StructType.fromDDL(options.get("schema").get())

  override def readSchema(): StructType = requiredSchema

  override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] = {
    import collection.JavaConverters._
    Seq(
      new KVJSONDataSourceReaderFactory(options.get("path").get()).asInstanceOf[DataReaderFactory[Row]]
    ).asJava
  }
}

class KVJSONDataSourceReaderFactory(path: String) extends DataReaderFactory[Row] {
  override def createDataReader(): DataReader[Row] = new KVJSONDataReader(path)
}

class KVJSONDataReader(path: String) extends DataReader[Row] {

  val data: Iterator[Seq[AnyRef]] = readData

  override def next(): Boolean = data.hasNext

  override def get(): Row = {
    val line = data.next()
    Row(line: _*)
  }

  override def close(): Unit = {}
  def readData: Iterator[Seq[AnyRef]] = {
    import scala.collection.JavaConverters._
    val source = Source.fromFile(path)
    val jsonStr = source.mkString
    val json = JSON.parseObject(jsonStr)
    val keys = json.keySet()
    keys.asScala.map(key => {
      Seq(key, json.getString(key))
    }).toIterator
  }
}

2.2 使用自定义数据源

数据源自定义完成后,我们就可以使用了,使用方式很简单,如下代码:

    spark.read.format("com.hollysys.spark.sql.datasource.kvjson.KVJSONDataSource")
      .option("schema","`address` STRING,`location` STRING")
      .option("path","/Users/shirukai/hollysys/repository/learn-demo-spark/data/location.json")
      .load().show(false)

输出

+-------+-------------------------+
|address|location                 |
+-------+-------------------------+
|昌平区北七  |[116.4179459,40.11644403]|
|昌平区东小  |[116.4021289,40.05688698]|
|昌平区回龙  |[116.3412241,40.07942604]|
|昌平区阳坊  |[116.1332611,40.13701398]|
+-------+-------------------------+

3 总结

目前只想到两种方案,其实第二种方案有点大材小用了,如果大家有更好的方案,欢迎交流。

0%