版本说明: spark-2.3.0
SparkSQL支持很多数据源,我们可以使用Spark内置的数据源,目前Spark支持的数据源有:json,parquet,jdbc,orc,libsvm,csv,text。也可以指定自定义的数据源,只需要在读取数据源的时候,指定数据源的全名。在https://spark-packages.org/这个网站,我们可以获取到更多的第三方的数据源。
官网文档:http://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources
收藏笔记:https://blog.csdn.net/wsdc0521/article/details/50011349
1 JSON数据源
1.1 以json格式写入
先手动生成一个DataFrame,然后以json格式写入文件
import spark.implicits._
val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
df.show()
//以json格式写入文件
df.write.format("json").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/json")
保存数据时,可以指定SaveMode,如:
df.write.format("json").mode("errorifexists").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/json")
这里指定了SaveMode为errorifexists也就是如果文件已经存在,就报错,这种保存模式也是系统默认的。常见的SaveMode有:
SaveMode | 描述 |
---|---|
errorifexists | 默认,如果文件存在报错 |
append | 将DataFrame保存到数据源时,如果数据/表已存在,则DataFrame的内容应附加到现有数据。 |
overwrite | 覆盖模式意味着在将DataFrame保存到数据源时,如果数据/表已经存在,则预期现有数据将被DataFrame的内容覆盖。 |
ignore | 忽略模式意味着在将DataFrame保存到数据源时,如果数据已存在,则预期保存操作不会保存DataFrame的内容并且不会更改现有数据。这与CREATE TABLE IF NOT EXISTS SQL中的类似。 |
在写json时,我们也可以通过option传入特定的参数,支持参数如下所示:
名称 | 默认值 | 描述 |
---|---|---|
compression | null | 保存到文件时使用的压缩编解码器,如(none ,bzip2 ,gzip ,lz4 ,*snappy 和deflate ),不区分大小写。 |
dateFormat | yyyy-MM-dd | 设置指示日期格式的字符串。 自定义日期格式遵循java.text.SimpleDateFormat 中的格式。这适用于日期类型。 |
timestampFormat | yyyy-MM-dd’T’HH:mm:ss.SSSXXX | 设置表示时间戳格式的字符串。自定义日期格式遵循java.text.SimpleDateFormat 中的格式。这适用于时间戳类型 |
encoding | 指定已保存的json 文件的编码(charset)。如果未设置,将使用UTF-8字符集。 | |
lineSep | \n | 指定行分隔符 |
1.2 以json格式读取
读取上面我们写入的json文件
//读取json数据源
val jsonDF = spark.read.format("json").load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/json")
jsonDF.show()
/**
* +---+----+-----------+
* |age|name| phone|
* +---+----+-----------+
* | 20|Ming|15552211521|
* | 19|hong|13287994007|
* | 21| zhi|15552211523|
* +---+----+-----------+
*/
当然读取json文件时,我们也可以通过option传入特定的参数,读取json支持的参数如下:
名称 | 默认值 | 描述 |
---|---|---|
primitivesAsString | false | 将所有原始值推断为String类型 |
prefersDecimal | false | 将所有浮点值推断为十进制类型。如果值不适合十进制,那么它将它们推断为双精度数。 |
allowComments | false | 忽略JSON记录中的Java / C ++样式注释 |
allowUnquotedFieldNames | false | 允许不带引号的JSON字段名称 |
allowSingleQuotes | true | 除了双引号外允许使用单引号 |
allowNumericLeadingZeros | false | 允许数字之前有零,(e.g. 00012) |
allowBackslashEscapingAnyCharacter | false | 允许使用反斜杠引用机制接受所有字符的引用 |
allowUnquotedControlChars | false | 允许JSON字符串包含不带引号的控制字符(值小于32的ASCII字符,包括制表符和换行符)或不包含。 |
mode | PERMISSIVE | 允许在解析过程中处理损坏记录的模式。共有三种模式:PERMISSIVE :当它遇到损坏的记录时,将格式错误的字符串放入由columnNameOfCorruptRecord 配置的字段中,并将其他字段设置为“null”。为了保持损坏的记录,用户可以在用户定义的模式中设置名为columnNameOfCorruptRecord 的字符串类型字段。如果架构没有字段,它在解析过程中丢弃损坏的记录。在推断模式时,它会在输出模式中隐式添加 columnNameOfCorruptRecord 字段。DROPMALFORMED :忽略整个损坏的记录FAILFAST :当遇到损坏记录时抛出异常 |
columnNameOfCorruptRecord | spark.sql.columnNameOfCorruptRecord 中指定的值 |
允许重命名具有由’PERMISSIVE模式创建的格式错误的字符串的新字段。这会覆盖 spark.sql.columnNameOfCorruptRecord` |
dateFormat | yyyy-MM-dd | 设置指示日期格式的字符串。 自定义日期格式遵循java.text.SimpleDateFormat 中的格式。这适用于日期类型。 |
timestampFormat | yyyy-MM-dd’T’HH:mm:ss.SSSXXX | 设置表示时间戳格式的字符串。自定义日期格式遵循java.text.SimpleDateFormat 中的格式。这适用于时间戳类型。 |
multiLine | false | 解析一个记录,每个文件可能跨越多行 |
encoding | 允许强制为JSON文件设置标准基本或扩展编码之一。例如UTF-16BE,UTF-32LE。如果未指定encoding 并且multiLine 设置为true ,则会自动检测到它。 |
|
lineSep | \r , \r\n and \n |
行分隔符 |
samplingRatio | 1.0 | 定义用于模式推断的输入JSON对象的分数 |
2 CSV数据源
2.1 以csv格式写入
手动生成一个DataFrame然后以csv格式写入文件
import spark.implicits._
val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
df.show()
//以csv格式写入文件
df.write.format("csv").mode("overwrite").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/csv")
支持option列表
名称 | 默认值 | 描述 |
---|---|---|
sep | , | 列分隔符 |
quote | “ | 设置用于转义引用值的单个字符,其中分隔符可以是值的一部分。如果设置了空字符串,则使用u0000 (空字符) |
escape | \ | 设置一个用于在已引用的值内转义引号的单个字符。 |
charToEscapeQuoteEscaping | escape or \0 |
设置一个单独的字符,用于转义引号字符的转义。当转义和引号字符不同时,默认值为转义字符,否则为“\ 0” |
escapeQuotes | true | 一个标志,指示包含引号的值是否应始终用引号括起来。默认是转义包含引号字符的所有值。 |
quoteAll | false | 一个标志,指示是否所有值都应始终用引号括起来。默认是仅转义包含引号字符的值。 |
header | false | 将第一行写为列的名称 |
nullValue | 空字符串 | 设置空值的字符串表示形式。 |
compression | null | 保存到文件时使用的压缩编解码器,如(none ,bzip2 ,gzip ,lz4 ,snappy 和deflate ),不区分大小写。 |
dateFormat | yyyy-MM-dd | 设置指示日期格式的字符串。 自定义日期格式遵循java.text.SimpleDateFormat 中的格式。这适用于日期类型。 |
timestampFormat | yyyy-MM-dd’T’HH:mm:ss.SSSXXX | 设置表示时间戳格式的字符串。自定义日期格式遵循java.text.SimpleDateFormat 中的格式。这适用于时间戳类型。 |
ignoreLeadingWhiteSpace | true | 一个标志,指示是否应该跳过正在写入的值的头部空格。 |
ignoreTrailingWhiteSpace | true | 一个标志,指示是否应该跳过正在写入的值的尾部空格。 |
2.2 以csv格式读取
读取我们上面写入的csv文件
//读取csv数据源
val csvDF = spark.read.format("csv").options(Map("sep"->",","inferSchema"->"true","header"->"true")).load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/csv")
csvDF.show()
在读取csv文件时我们可以指定option参数:
名称 | 默认值 | 描述 |
---|---|---|
sep | , | 列分隔符 |
encoding | UTF-8 | 编码格式 |
quote | “ | 设置用于转义引用值的单个字符,其中分隔符可以是值的一部分。如果设置了空字符串,则使用u0000 (空字符) |
escape | \ | 设置一个用于在已引用的值内转义引号的单个字符。 |
comment | empty string | 设置一个用于跳过以此字符开头的行的单个字符。默认情况下,它被禁用。 |
header | false | 将第一行写为列的名称 |
enforceSchema | true | 如果将其设置为“true”,则将强制将指定或推断的模式应用于数据源文件,并忽略CSV文件中的标头。 如果该选项设置为“false”,则在header 选项设置为“true”的情况下,将针对CSV文件中的所有标头验证模式。模式中的字段名称和CSV标题中的列名称通过考虑spark.sql.caseSensitive 的位置进行检查。虽然默认值为true,但建议禁用 enforceSchema`选项以避免不正确的结果。 |
inferSchema | true | 从数据中自动推断输入模式。 需要对数据进行一次额外的传递。 |
samplingRatio | 1.0 | 定义用于模式推断的行的分数。 |
dateFormat | yyyy-MM-dd | 设置指示日期格式的字符串。 自定义日期格式遵循java.text.SimpleDateFormat 中的格式。这适用于日期类型。 |
timestampFormat | yyyy-MM-dd’T’HH:mm:ss.SSSXXX | 设置表示时间戳格式的字符串。自定义日期格式遵循java.text.SimpleDateFormat 中的格式。这适用于时间戳类型。 |
ignoreLeadingWhiteSpace | false | 忽略开头是空格的值 |
ignoreTrailingWhiteSpace | false | 忽略结尾是空格的值 |
nullValue | empty string | 设置空值的字符串表示形式。从*2.0.1开始,这适用于所有支持的类型,包括字符串类型。 |
nanValue | NaN | 设置非数字值的字符串表示形式。 |
positiveInf | Inf | 设置正无穷大值的字符串表示形式 |
negativeInf | -Inf | 设置负无穷大值的字符串表示形式 |
maxColumns | 20480 | 定义记录可以有多少列的硬限制。 |
maxCharsPerColumn | -1 | 定义允许读取的任何给定值的最大字符数。默认情况下,它为-1表示无限长度 |
mode | 允许在解析过程中处理损坏记录的模式。共有三种模式:PERMISSIVE :当它遇到损坏的记录时,将格式错误的字符串放入由columnNameOfCorruptRecord 配置的字段中,并将其他字段设置为“null”。为了保持损坏的记录,用户可以在用户定义的模式中设置名为columnNameOfCorruptRecord 的字符串类型字段。如果架构没有字段,它在解析过程中丢弃损坏的记录。在推断模式时,它会在输出模式中隐式添加 columnNameOfCorruptRecord 字段。DROPMALFORMED :忽略整个损坏的记录FAILFAST :当遇到损坏记录时抛出异常 |
|
columnNameOfCorruptRecord | spark.sql.columnNameOfCorruptRecord 中指定的值 |
允许重命名具有由’PERMISSIVE模式创建的格式错误的字符串的新字段。这会覆盖 spark.sql.columnNameOfCorruptRecord` |
multiLine | false | 解析一条记录,可能跨越多行。 |
3 Text数据源
3.1 以Text格式写入
将手动生成的DataFrame以text格式写入文件
import spark.implicits._
val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
df.show()
val textDF = df.map(_.toSeq.foldLeft("")(_+","+_).substring(1))
//以text格式写入文件
textDF.write.format("text").mode("overwrite").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/text")
写入text格式时,要求我们的DataFrame只有一列,否则会报如下错误:
支持的option参数
名称 | 默认值 | 描述 |
---|---|---|
compression | null | 保存到文件时使用的压缩编解码器,如(none ,bzip2 ,gzip ,lz4 ,snappy 和deflate ),不区分大小写。 |
lineSep | \r , \r\n and \n |
行分隔符 |
3.2 以Text格式读取,并转为DataFrame
Spark SQL 读取text文件,只有一列,这是我们可以通过进一步的处理,转化为以“,”分割的多列DataFrame
//读取text文件
val text = spark.read.format("text").load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/text")
text.show()
lazy val first = textDF.first()
val numAttrs = first.split(",").length
import org.apache.spark.sql.functions._
var newDF = textDF.withColumn("splitCols", split($"value", ","))
0.until(numAttrs).foreach(x => {
newDF = newDF.withColumn("col" + "_" + x, $"splitCols".getItem(x))
})
newDF.show()
/**
* +-------------------+--------------------+-----+-----+-----------+
* | value| splitCols|col_0|col_1| col_2|
* +-------------------+--------------------+-----+-----+-----------+
* |Ming,20,15552211521|[Ming, 20, 155522...| Ming| 20|15552211521|
* |hong,19,13287994007|[hong, 19, 132879...| hong| 19|13287994007|
* | zhi,21,15552211523|[zhi, 21, 1555221...| zhi| 21|15552211523|
* +-------------------+--------------------+-----+-----+-----------+
*/
支持的option参数
名称 | 默认值 | 描述 |
---|---|---|
wholetext | false | 如果为true,则将文件作为单行读取,而不是按“\ n”拆分 |
lineSep | \r , \r\n and \n |
行分隔符 |
4 Parquet数据源
Spark 默认的数据源操作为Parquet格式,也就是说,当我们read或者write的时候,不指定数据源类型,Spark默认会使用Parquet格式来处理。
4.1 以Parquet格式写入
import spark.implicits._
//从内存中创建一个DataFrame
val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
df.show()
//以Parquet的格式写入
df.write.format("parquet").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet")
option支持参数
名称 | 默认值 | 描述 |
---|---|---|
compression | null | 保存到文件时使用的压缩编解码器,如(none ,bzip2 ,gzip ,lz4 ,snappy 和deflate ),不区分大小写。 |
4.2 以Parquet格式读取
//读取Parquet
val parquetDF = spark.read.format("parquet").load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet")
parquetDF.printSchema()
/**
* root
* |-- name: string (nullable = true)
* |-- age: integer (nullable = true)
* |-- phone: long (nullable = true)
*/
option参数
名称 | 默认值 | 描述 |
---|---|---|
mergeSchema | false | 如果为true,则将文件作为单行读取,而不是按“\ n”拆分 |
4.3 模式合并
官网的例子:
// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet/test_table/key=2")
// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet/test_table")
mergedDF.printSchema()
/***
* root
* |-- value: integer (nullable = true)
* |-- square: integer (nullable = true)
* |-- cube: integer (nullable = true)
* |-- key: integer (nullable = true)
*/
5 JDBC数据源
5.1 以JDBC数据源写入
import spark.implicits._
//从内存中创建一个DataFrame
val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
df.show()
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "hollysys")
//写入mysql数据库
df.write.jdbc("jdbc:mysql://localhost:3306/springboot?useSSL=false&characterEncoding=utf-8","spark_people",connectionProperties)
执行后会在数据库中,自动创建表,并将数据写入到表中:
支持参数:
名称 | 默认值 | 描述 |
---|---|---|
url | jdbc:subprotocol:subname 形式的JDBC数据库url |
|
table | 外部数据库中表的名称。 | |
connectionProperties | JDBC数据库连接参数,任意字符串标记/值的列表。通常至少应包括“用户”和“密码”属性。 “batchsize”可用于控制每个插入的行数。 “isolationLevel”可以是“NONE”,“READ_COMMITTED”,“READ_UNCOMMITTED”,“REPEATABLE_READ”,或“SERIALIZABLE”之一,对应于JDBC的Connection对象定义的标准事务*隔离级别,默认值为“READ_UNCOMMITTED” 。 |
5.2 以JDBC数据源读取
使用Spark读取我们上面写入数据库表中的数据
//读取mysql数据库表数据
val mysqlDF = spark.read.jdbc("jdbc:mysql://localhost:3306/springboot?useSSL=false&characterEncoding=utf-8","spark_people",connectionProperties)
mysqlDF.show()
/**
* +----+---+-----------+
* |name|age| phone|
* +----+---+-----------+
* |Ming| 20|15552211521|
* |hong| 19|13287994007|
* | zhi| 21|15552211523|
* +----+---+-----------+
*/
参数
名称 | 默认值 | 描述 |
---|---|---|
url | jdbc:subprotocol:subname 形式的JDBC数据库url |
|
table | 表名 | |
columnName | 将用于分区的整数类型列的名称。 | |
lowerBound | columnName 的最小值用于决定分区步幅。 |
|
upperBound | columnName 的最大值用于决定分区步幅。 |
|
numPartitions | 分区数量。这与lowerBound (包含),upperBound (不包括)一起形成分区,用于生成WHERE 子句表达式,用于均匀地分割列columnName 。当*输入小于1时,数字设置为1。 |
|
connectionProperties | JDBC数据库连接参数,任意字符串标记/值的列表。通常至少应包括“用户”和“密码”属性。 “batchsize”可用于控制每个插入的行数。 “isolationLevel”可以是“NONE”,“READ_COMMITTED”,“READ_UNCOMMITTED”,“REPEATABLE_READ”,或“SERIALIZABLE”之一,对应于JDBC的Connection对象定义的标准事务*隔离级别,默认值为“READ_UNCOMMITTED” 。 |
完整代码:
package com.hollysys.spark.sql
import java.util.Properties
import org.apache.spark.sql.SparkSession
/**
* Created by shirukai on 2018/9/11
* Spark SQL 对数据源操作
*/
object DataSourceApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local").getOrCreate()
jsonExample(spark)
//csvExample(spark)
//textExample(spark)
//parquetExample(spark)
//jdbcExample(spark)
}
def jsonExample(spark: SparkSession): Unit = {
import spark.implicits._
val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
df.show()
//以json格式写入文件
df.write.format("json").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/json")
//读取json数据源
val jsonDF = spark.read.format("json").load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/json")
jsonDF.printSchema()
jsonDF.show()
/**
* +---+----+-----------+
* |age|name| phone|
* +---+----+-----------+
* | 20|Ming|15552211521|
* | 19|hong|13287994007|
* | 21| zhi|15552211523|
* +---+----+-----------+
*/
}
def csvExample(spark: SparkSession): Unit = {
import spark.implicits._
val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
df.show()
//以csv格式写入文件
df.write.format("csv").mode("overwrite").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/csv")
//读取csv数据源
val csvDF = spark.read.format("csv").options(Map("sep" -> ";", "inferSchema" -> "true", "header" -> "true")).load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/csv")
csvDF.show()
}
def textExample(spark: SparkSession): Unit = {
import spark.implicits._
val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
df.show()
val textDF = df.map(_.toSeq.foldLeft("")(_ + "," + _).substring(1))
//以text格式写入文件
textDF.write.format("text").mode("overwrite").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/text")
//读取text文件
val text = spark.read.format("text").load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/text")
text.show()
lazy val first = textDF.first()
val numAttrs = first.split(",").length
import org.apache.spark.sql.functions._
var newDF = textDF.withColumn("splitCols", split($"value", ","))
0.until(numAttrs).foreach(x => {
newDF = newDF.withColumn("col" + "_" + x, $"splitCols".getItem(x))
})
newDF.show()
/**
* +-------------------+--------------------+-----+-----+-----------+
* | value| splitCols|col_0|col_1| col_2|
* +-------------------+--------------------+-----+-----+-----------+
* |Ming,20,15552211521|[Ming, 20, 155522...| Ming| 20|15552211521|
* |hong,19,13287994007|[hong, 19, 132879...| hong| 19|13287994007|
* | zhi,21,15552211523|[zhi, 21, 1555221...| zhi| 21|15552211523|
* +-------------------+--------------------+-----+-----+-----------+
*/
}
def parquetExample(spark: SparkSession): Unit = {
import spark.implicits._
//从内存中创建一个DataFrame
val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
df.show()
//以Parquet的格式写入
df.write.format("parquet").mode("overwrite").save("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet")
//读取Parquet
val parquetDF = spark.read.format("parquet").load("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet")
parquetDF.printSchema()
/**
* root
* |-- name: string (nullable = true)
* |-- age: integer (nullable = true)
* |-- phone: long (nullable = true)
*/
// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet/test_table/key=2")
// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("/Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/parquet/test_table")
mergedDF.printSchema()
/** *
* root
* |-- value: integer (nullable = true)
* |-- square: integer (nullable = true)
* |-- cube: integer (nullable = true)
* |-- key: integer (nullable = true)
*/
}
def jdbcExample(spark: SparkSession): Unit = {
import spark.implicits._
//从内存中创建一个DataFrame
val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone")
df.show()
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "hollysys")
//写入mysql数据库
df.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/springboot?useSSL=false&characterEncoding=utf-8", "spark_people", connectionProperties)
//读取mysql数据库表数据
val mysqlDF = spark.read.jdbc("jdbc:mysql://localhost:3306/springboot?useSSL=false&characterEncoding=utf-8", "spark_people", connectionProperties)
mysqlDF.show()
/**
* +----+---+-----------+
* |name|age| phone|
* +----+---+-----------+
* |Ming| 20|15552211521|
* |hong| 19|13287994007|
* | zhi| 21|15552211523|
* +----+---+-----------+
*/
}
}