SPARK June 15, 2019

SparkSQL数据源操作

Words count 27k Reading time 25 mins. Read count 0

版本说明: spark-2.3.0

SparkSQL支持很多数据源,我们可以使用Spark内置的数据源,目前Spark支持的数据源有:json,parquet,jdbc,orc,libsvm,csv,text。也可以指定自定义的数据源,只需要在读取数据源的时候,指定数据源的全名。在https://spark-packages.org/这个网站,我们可以获取到更多的第三方的数据源。

官网文档:http://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources

收藏笔记:https://blog.csdn.net/wsdc0521/article/details/50011349

1 JSON数据源

1.1 以json格式写入

先手动生成一个DataFrame,然后以json格式写入文件

import spark.implicits._
val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
df.show()
//以json格式写入文件
df.write.format("json").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/json")

保存数据时,可以指定SaveMode,如:

df.write.format("json").mode("errorifexists").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/json")

这里指定了SaveMode为errorifexists也就是如果文件已经存在,就报错,这种保存模式也是系统默认的。常见的SaveMode有:

SaveMode 描述
errorifexists 默认,如果文件存在报错
append 将DataFrame保存到数据源时,如果数据/表已存在,则DataFrame的内容应附加到现有数据。
overwrite 覆盖模式意味着在将DataFrame保存到数据源时,如果数据/表已经存在,则预期现有数据将被DataFrame的内容覆盖。
ignore 忽略模式意味着在将DataFrame保存到数据源时,如果数据已存在,则预期保存操作不会保存DataFrame的内容并且不会更改现有数据。这与CREATE TABLE IF NOT EXISTSSQL中的类似。

在写json时,我们也可以通过option传入特定的参数,支持参数如下所示:

名称 默认值 描述
compression null 保存到文件时使用的压缩编解码器,如(nonebzip2gziplz4,*snappydeflate),不区分大小写。
dateFormat yyyy-MM-dd 设置指示日期格式的字符串。 自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于日期类型。
timestampFormat yyyy-MM-dd’T’HH:mm:ss.SSSXXX 设置表示时间戳格式的字符串。自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于时间戳类型
encoding 指定已保存的json 文件的编码(charset)。如果未设置,将使用UTF-8字符集。
lineSep \n 指定行分隔符

1.2 以json格式读取

读取上面我们写入的json文件

  //读取json数据源
    val jsonDF = spark.read.format("json").load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/json")
    jsonDF.show()
    /**
      * +---+----+-----------+
      * |age|name|      phone|
      * +---+----+-----------+
      * | 20|Ming|15552211521|
      * | 19|hong|13287994007|
      * | 21| zhi|15552211523|
      * +---+----+-----------+
      */

当然读取json文件时,我们也可以通过option传入特定的参数,读取json支持的参数如下:

名称 默认值 描述
primitivesAsString false 将所有原始值推断为String类型
prefersDecimal false 将所有浮点值推断为十进制类型。如果值不适合十进制,那么它将它们推断为双精度数。
allowComments false 忽略JSON记录中的Java / C ++样式注释
allowUnquotedFieldNames false 允许不带引号的JSON字段名称
allowSingleQuotes true 除了双引号外允许使用单引号
allowNumericLeadingZeros false 允许数字之前有零,(e.g. 00012)
allowBackslashEscapingAnyCharacter false 允许使用反斜杠引用机制接受所有字符的引用
allowUnquotedControlChars false 允许JSON字符串包含不带引号的控制字符(值小于32的ASCII字符,包括制表符和换行符)或不包含。
mode PERMISSIVE 允许在解析过程中处理损坏记录的模式。共有三种模式:
PERMISSIVE:当它遇到损坏的记录时,将格式错误的字符串放入由columnNameOfCorruptRecord配置的字段中,并将其他字段设置为“null”。为了保持损坏的记录,用户可以在用户定义的模式中设置名为columnNameOfCorruptRecord的字符串类型字段。如果架构没有
字段,它在解析过程中丢弃损坏的记录。在推断模式时,它会在输出模式中隐式添加columnNameOfCorruptRecord字段。
DROPMALFORMED:忽略整个损坏的记录
FAILFAST:当遇到损坏记录时抛出异常
columnNameOfCorruptRecord spark.sql.columnNameOfCorruptRecord中指定的值 允许重命名具有由’PERMISSIVE模式创建的格式错误的字符串的新字段。这会覆盖spark.sql.columnNameOfCorruptRecord`
dateFormat yyyy-MM-dd 设置指示日期格式的字符串。 自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于日期类型。
timestampFormat yyyy-MM-dd’T’HH:mm:ss.SSSXXX 设置表示时间戳格式的字符串。自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于时间戳类型。
multiLine false 解析一个记录,每个文件可能跨越多行
encoding 允许强制为JSON文件设置标准基本或扩展编码之一。例如UTF-16BE,UTF-32LE。如果未指定encoding 并且multiLine设置为true,则会自动检测到它。
lineSep \r, \r\n and \n 行分隔符
samplingRatio 1.0 定义用于模式推断的输入JSON对象的分数

2 CSV数据源

2.1 以csv格式写入

手动生成一个DataFrame然后以csv格式写入文件

import spark.implicits._
val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
df.show()
//以csv格式写入文件
df.write.format("csv").mode("overwrite").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/csv")

支持option列表

名称 默认值 描述
sep , 列分隔符
quote 设置用于转义引用值的单个字符,其中分隔符可以是值的一部分。如果设置了空字符串,则使用u0000 (空字符)
escape \ 设置一个用于在已引用的值内转义引号的单个字符。
charToEscapeQuoteEscaping escape or \0 设置一个单独的字符,用于转义引号字符的转义。当转义和引号字符不同时,默认值为转义字符,否则为“\ 0”
escapeQuotes true 一个标志,指示包含引号的值是否应始终用引号括起来。默认是转义包含引号字符的所有值。
quoteAll false 一个标志,指示是否所有值都应始终用引号括起来。默认是仅转义包含引号字符的值。
header false 将第一行写为列的名称
nullValue 空字符串 设置空值的字符串表示形式。
compression null 保存到文件时使用的压缩编解码器,如(nonebzip2gziplz4snappydeflate),不区分大小写。
dateFormat yyyy-MM-dd 设置指示日期格式的字符串。 自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于日期类型。
timestampFormat yyyy-MM-dd’T’HH:mm:ss.SSSXXX 设置表示时间戳格式的字符串。自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于时间戳类型。
ignoreLeadingWhiteSpace true 一个标志,指示是否应该跳过正在写入的值的头部空格。
ignoreTrailingWhiteSpace true 一个标志,指示是否应该跳过正在写入的值的尾部空格。

2.2 以csv格式读取

读取我们上面写入的csv文件

 //读取csv数据源
    val csvDF = spark.read.format("csv").options(Map("sep"->",","inferSchema"->"true","header"->"true")).load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/csv")
    csvDF.show()

在读取csv文件时我们可以指定option参数:

名称 默认值 描述
sep , 列分隔符
encoding UTF-8 编码格式
quote 设置用于转义引用值的单个字符,其中分隔符可以是值的一部分。如果设置了空字符串,则使用u0000 (空字符)
escape \ 设置一个用于在已引用的值内转义引号的单个字符。
comment empty string 设置一个用于跳过以此字符开头的行的单个字符。默认情况下,它被禁用。
header false 将第一行写为列的名称
enforceSchema true 如果将其设置为“true”,则将强制将指定或推断的模式应用于数据源文件,并忽略CSV文件中的标头。 如果该选项设置为“false”,则在header选项设置为“true”的情况下,将针对CSV文件中的所有标头验证模式。模式中的字段名称和CSV标题中的列名称通过考虑spark.sql.caseSensitive的位置进行检查。虽然默认值为true,但建议禁用 enforceSchema`选项以避免不正确的结果。
inferSchema true 从数据中自动推断输入模式。 需要对数据进行一次额外的传递。
samplingRatio 1.0 定义用于模式推断的行的分数。
dateFormat yyyy-MM-dd 设置指示日期格式的字符串。 自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于日期类型。
timestampFormat yyyy-MM-dd’T’HH:mm:ss.SSSXXX 设置表示时间戳格式的字符串。自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于时间戳类型。
ignoreLeadingWhiteSpace false 忽略开头是空格的值
ignoreTrailingWhiteSpace false 忽略结尾是空格的值
nullValue empty string 设置空值的字符串表示形式。从*2.0.1开始,这适用于所有支持的类型,包括字符串类型。
nanValue NaN 设置非数字值的字符串表示形式。
positiveInf Inf 设置正无穷大值的字符串表示形式
negativeInf -Inf 设置负无穷大值的字符串表示形式
maxColumns 20480 定义记录可以有多少列的硬限制。
maxCharsPerColumn -1 定义允许读取的任何给定值的最大字符数。默认情况下,它为-1表示无限长度
mode 允许在解析过程中处理损坏记录的模式。共有三种模式:
PERMISSIVE:当它遇到损坏的记录时,将格式错误的字符串放入由columnNameOfCorruptRecord配置的字段中,并将其他字段设置为“null”。为了保持损坏的记录,用户可以在用户定义的模式中设置名为columnNameOfCorruptRecord的字符串类型字段。如果架构没有
字段,它在解析过程中丢弃损坏的记录。在推断模式时,它会在输出模式中隐式添加columnNameOfCorruptRecord字段。
DROPMALFORMED:忽略整个损坏的记录
FAILFAST:当遇到损坏记录时抛出异常
columnNameOfCorruptRecord spark.sql.columnNameOfCorruptRecord中指定的值 允许重命名具有由’PERMISSIVE模式创建的格式错误的字符串的新字段。这会覆盖spark.sql.columnNameOfCorruptRecord`
multiLine false 解析一条记录,可能跨越多行。

3 Text数据源

3.1 以Text格式写入

将手动生成的DataFrame以text格式写入文件

 import spark.implicits._
    val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
    df.show()
    val textDF = df.map(_.toSeq.foldLeft("")(_+","+_).substring(1))
    //以text格式写入文件
    textDF.write.format("text").mode("overwrite").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/text")

写入text格式时,要求我们的DataFrame只有一列,否则会报如下错误:

支持的option参数

名称 默认值 描述
compression null 保存到文件时使用的压缩编解码器,如(nonebzip2gziplz4snappydeflate),不区分大小写。
lineSep \r, \r\n and \n 行分隔符

3.2 以Text格式读取,并转为DataFrame

Spark SQL 读取text文件,只有一列,这是我们可以通过进一步的处理,转化为以“,”分割的多列DataFrame

//读取text文件
val text = spark.read.format("text").load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/text")
text.show()
lazy val first = textDF.first()
val numAttrs = first.split(",").length
import org.apache.spark.sql.functions._
var newDF = textDF.withColumn("splitCols", split($"value", ","))
0.until(numAttrs).foreach(x => {
  newDF = newDF.withColumn("col" + "_" + x, $"splitCols".getItem(x))
})
newDF.show()

/**
  * +-------------------+--------------------+-----+-----+-----------+
  * |              value|           splitCols|col_0|col_1|      col_2|
  * +-------------------+--------------------+-----+-----+-----------+
  * |Ming,20,15552211521|[Ming, 20, 155522...| Ming|   20|15552211521|
  * |hong,19,13287994007|[hong, 19, 132879...| hong|   19|13287994007|
  * | zhi,21,15552211523|[zhi, 21, 1555221...|  zhi|   21|15552211523|
  * +-------------------+--------------------+-----+-----+-----------+
  */

支持的option参数

名称 默认值 描述
wholetext false 如果为true,则将文件作为单行读取,而不是按“\ n”拆分
lineSep \r, \r\n and \n 行分隔符

4 Parquet数据源

Spark 默认的数据源操作为Parquet格式,也就是说,当我们read或者write的时候,不指定数据源类型,Spark默认会使用Parquet格式来处理。

4.1 以Parquet格式写入

import spark.implicits._
//从内存中创建一个DataFrame
val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
df.show()
//以Parquet的格式写入
df.write.format("parquet").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet")

option支持参数

名称 默认值 描述
compression null 保存到文件时使用的压缩编解码器,如(nonebzip2gziplz4snappydeflate),不区分大小写。

4.2 以Parquet格式读取

//读取Parquet
val parquetDF = spark.read.format("parquet").load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet")
parquetDF.printSchema()

/**
  * root
  * |-- name: string (nullable = true)
  * |-- age: integer (nullable = true)
  * |-- phone: long (nullable = true)
  */

option参数

名称 默认值 描述
mergeSchema false 如果为true,则将文件作为单行读取,而不是按“\ n”拆分

4.3 模式合并

官网的例子:

 // Create a simple DataFrame, store into a partition directory
    val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
    squaresDF.write.parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet/test_table/key=1")

    // Create another DataFrame in a new partition directory,
    // adding a new column and dropping an existing column
    val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
    cubesDF.write.parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet/test_table/key=2")

    // Read the partitioned table
    val mergedDF = spark.read.option("mergeSchema", "true").parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet/test_table")
    mergedDF.printSchema()

    /***
      * root
      * |-- value: integer (nullable = true)
      * |-- square: integer (nullable = true)
      * |-- cube: integer (nullable = true)
      * |-- key: integer (nullable = true)
      */

5 JDBC数据源

5.1 以JDBC数据源写入

import spark.implicits._
//从内存中创建一个DataFrame
val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
df.show()
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "hollysys")
//写入mysql数据库
df.write.jdbc("jdbc:mysql://localhost:3306/springboot?useSSL=false&characterEncoding=utf-8","spark_people",connectionProperties)

执行后会在数据库中,自动创建表,并将数据写入到表中:

支持参数:

名称 默认值 描述
url jdbc:subprotocol:subname形式的JDBC数据库url
table 外部数据库中表的名称。
connectionProperties JDBC数据库连接参数,任意字符串标记/值的列表。通常至少应包括“用户”和“密码”属性。 “batchsize”可用于控制每个插入的行数。 “isolationLevel”可以是“NONE”,“READ_COMMITTED”,“READ_UNCOMMITTED”,“REPEATABLE_READ”,或“SERIALIZABLE”之一,对应于JDBC的Connection对象定义的标准事务*隔离级别,默认值为“READ_UNCOMMITTED” 。

5.2 以JDBC数据源读取

使用Spark读取我们上面写入数据库表中的数据

//读取mysql数据库表数据
val mysqlDF = spark.read.jdbc("jdbc:mysql://localhost:3306/springboot?useSSL=false&characterEncoding=utf-8","spark_people",connectionProperties)
mysqlDF.show()

/**
  * +----+---+-----------+
  * |name|age|      phone|
  * +----+---+-----------+
  * |Ming| 20|15552211521|
  * |hong| 19|13287994007|
  * | zhi| 21|15552211523|
  * +----+---+-----------+
  */

参数

名称 默认值 描述
url jdbc:subprotocol:subname形式的JDBC数据库url
table 表名
columnName 将用于分区的整数类型列的名称。
lowerBound columnName的最小值用于决定分区步幅。
upperBound columnName的最大值用于决定分区步幅。
numPartitions 分区数量。这与lowerBound(包含),upperBound(不包括)一起形成分区,用于生成WHERE 子句表达式,用于均匀地分割列columnName。当*输入小于1时,数字设置为1。
connectionProperties JDBC数据库连接参数,任意字符串标记/值的列表。通常至少应包括“用户”和“密码”属性。 “batchsize”可用于控制每个插入的行数。 “isolationLevel”可以是“NONE”,“READ_COMMITTED”,“READ_UNCOMMITTED”,“REPEATABLE_READ”,或“SERIALIZABLE”之一,对应于JDBC的Connection对象定义的标准事务*隔离级别,默认值为“READ_UNCOMMITTED” 。

完整代码:

package com.hollysys.spark.sql

import java.util.Properties

import org.apache.spark.sql.SparkSession

/**
  * Created by shirukai on 2018/9/11
  * Spark SQL 对数据源操作
  */
object DataSourceApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local").getOrCreate()
    jsonExample(spark)
    //csvExample(spark)
    //textExample(spark)
    //parquetExample(spark)
    //jdbcExample(spark)
  }

  def jsonExample(spark: SparkSession): Unit = {
    import spark.implicits._
    val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
    df.show()
    //以json格式写入文件
    df.write.format("json").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/json")

    //读取json数据源
    val jsonDF = spark.read.format("json").load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/json")
    jsonDF.printSchema()
    jsonDF.show()

    /**
      * +---+----+-----------+
      * |age|name|      phone|
      * +---+----+-----------+
      * | 20|Ming|15552211521|
      * | 19|hong|13287994007|
      * | 21| zhi|15552211523|
      * +---+----+-----------+
      */
  }

  def csvExample(spark: SparkSession): Unit = {
    import spark.implicits._
    val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
    df.show()
    //以csv格式写入文件
    df.write.format("csv").mode("overwrite").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/csv")
    //读取csv数据源
    val csvDF = spark.read.format("csv").options(Map("sep" -> ";", "inferSchema" -> "true", "header" -> "true")).load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/csv")
    csvDF.show()
  }

  def textExample(spark: SparkSession): Unit = {
    import spark.implicits._
    val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
    df.show()
    val textDF = df.map(_.toSeq.foldLeft("")(_ + "," + _).substring(1))
    //以text格式写入文件
    textDF.write.format("text").mode("overwrite").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/text")
    //读取text文件
    val text = spark.read.format("text").load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/text")
    text.show()
    lazy val first = textDF.first()
    val numAttrs = first.split(",").length
    import org.apache.spark.sql.functions._
    var newDF = textDF.withColumn("splitCols", split($"value", ","))
    0.until(numAttrs).foreach(x => {
      newDF = newDF.withColumn("col" + "_" + x, $"splitCols".getItem(x))
    })
    newDF.show()

    /**
      * +-------------------+--------------------+-----+-----+-----------+
      * |              value|           splitCols|col_0|col_1|      col_2|
      * +-------------------+--------------------+-----+-----+-----------+
      * |Ming,20,15552211521|[Ming, 20, 155522...| Ming|   20|15552211521|
      * |hong,19,13287994007|[hong, 19, 132879...| hong|   19|13287994007|
      * | zhi,21,15552211523|[zhi, 21, 1555221...|  zhi|   21|15552211523|
      * +-------------------+--------------------+-----+-----+-----------+
      */
  }

  def parquetExample(spark: SparkSession): Unit = {
    import spark.implicits._
    //从内存中创建一个DataFrame
    val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
    df.show()
    //以Parquet的格式写入
    df.write.format("parquet").mode("overwrite").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet")

    //读取Parquet
    val parquetDF = spark.read.format("parquet").load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet")
    parquetDF.printSchema()

    /**
      * root
      * |-- name: string (nullable = true)
      * |-- age: integer (nullable = true)
      * |-- phone: long (nullable = true)
      */

    // Create a simple DataFrame, store into a partition directory
    val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
    squaresDF.write.parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet/test_table/key=1")

    // Create another DataFrame in a new partition directory,
    // adding a new column and dropping an existing column
    val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
    cubesDF.write.parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet/test_table/key=2")

    // Read the partitioned table
    val mergedDF = spark.read.option("mergeSchema", "true").parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet/test_table")
    mergedDF.printSchema()

    /** *
      * root
      * |-- value: integer (nullable = true)
      * |-- square: integer (nullable = true)
      * |-- cube: integer (nullable = true)
      * |-- key: integer (nullable = true)
      */
  }

  def jdbcExample(spark: SparkSession): Unit = {
    import spark.implicits._
    //从内存中创建一个DataFrame
    val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
    df.show()
    val connectionProperties = new Properties()
    connectionProperties.put("user", "root")
    connectionProperties.put("password", "hollysys")
    //写入mysql数据库
    df.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/springboot?useSSL=false&characterEncoding=utf-8", "spark_people", connectionProperties)

    //读取mysql数据库表数据
    val mysqlDF = spark.read.jdbc("jdbc:mysql://localhost:3306/springboot?useSSL=false&characterEncoding=utf-8", "spark_people", connectionProperties)
    mysqlDF.show()

    /**
      * +----+---+-----------+
      * |name|age|      phone|
      * +----+---+-----------+
      * |Ming| 20|15552211521|
      * |hong| 19|13287994007|
      * | zhi| 21|15552211523|
      * +----+---+-----------+
      */

  }
}
0%