SparkStreaming可以处理多种数据源,比如从socket里获取数据流,从文件系统获取数据流,从Flume获取数据流、从Kafka里获取数据流等。
需要注意的是:
- SparkStreaming 在处理socket、flume、kafka、Kinesis数据源的时候,本地模式下不能用以local、或者local[1]运行,因为需要启动一个线程运行Receivers来接收数据。读取文件系统的时候,不需要启动Receivers,所以在处理文件系统数据源的时候,不需要设置多个线程。
- 将逻辑扩展到在集群上运行,分配给Spark Streaming应用程序的核心数必须大于接收器数。否则系统将接收数据,但无法处理数据。
详细的介绍可以参考官网:http://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers。
本笔记主要记录一下Spark Steaming处理Socket、文件系统、Flume、Kafka等多种数据源。
1 Socket Stream
本小节,将以SparkStreaming处理Socket数据为例,实现简单的WordCount、并将执行结果写入到MySQL、进行有状态的词频统计即统计单词包括之前的数据一共出现的次数、使用Transform api进行黑名单过滤操作、最后整合Spark SQL 使用 DataFrame进行词频统计等。
1.1 处理Socket Stream进行简单的WordCount
NetworkWordCount.scala
package com.hollysys.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by shirukai on 2018/10/16
* Spark Streaming 处理Socket 数据
* nc -lk 9999
*/
object NetworkWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
/**
* 创建StreamingContext需要两个参数,SparkConf 和 batch interval
*/
val ssc = new StreamingContext(conf, Seconds(2))
// 从socket中获取数据
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
效果:
1.2 将处理结果写到MySQL里
将数据写入MySQL我们需要使用foreachRDD去遍历RDD,然后RDD的数据写入到MySQL。
package com.hollysys.spark.streaming
import java.sql.{Connection, DriverManager}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by shirukai on 2018/10/16
* 将统计结果写到MySQL中
*
* 创建MySQL表:
* create table wordcount(word varchar(50) default null,wordcount int(10) default null);
*/
object ResultWriteMySQL {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD(rdd => {
rdd.foreachPartition(iterator => {
val connection = createConnection()
iterator.foreach(record => {
val sql = "insert into wordcount(word,wordcount) values ('" + record._1 + "','" + record._2 + "')"
connection.createStatement().execute(sql)
})
})
})
ssc.start()
ssc.awaitTermination()
}
/**
* 获取数据库连接
* @return
*/
def createConnection(): Connection = {
DriverManager.getConnection("jdbc:mysql://localhost:3306/spark?useSSL=false&characterEncoding=utf-8&user=root&password=hollysys")
}
}
1.3 进行有状态的WordCount
应用场景:统计单词一种出现的次数
官网:http://spark.apache.org/docs/latest/streaming-programming-guide.html#updatestatebykey-operation
package com.hollysys.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by shirukai on 2018/10/16
* 使用Spark Streaming 完成有状态统计
*/
object StatefulWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
// 如果使用了stateful的算子,必须要设置checkpoint
// 在生产环境中,建议大家把checkpoint设置到HDFS的某个文件夹中
ssc.checkpoint("data/checkpoint")
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map((_, 1)).updateStateByKey(updateFunction)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
def updateFunction(currentValue: Seq[Int], previousValue: Option[Int]): Option[Int] = {
val current = currentValue.sum
val pre = previousValue.getOrElse(0)
Some(current + pre)
}
}
1.4 利用Transform进行黑名单过滤
官网地址:http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation
package com.hollysys.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by shirukai on 2018/10/16
* 黑名单过滤
*/
object TransformApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
/**
* 创建StreamingContext需要两个参数,SparkConf 和 batch interval
*/
val lines = ssc.socketTextStream("localhost", 9999)
/**
* 构建黑名单
*/
val blacks = List("zs", "ls")
val blacksRDD = ssc.sparkContext.parallelize(blacks).map((_, true))
val res = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
rdd.leftOuterJoin(blacksRDD).filter(!_._2._2.getOrElse(false))
.map(_._2._1)
})
res.print()
ssc.start()
ssc.awaitTermination()
}
}
1.5 使用Spark SQL 处理数据
官网地址:http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations
package com.hollysys.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by shirukai on 2018/10/16
* 黑名单过滤
*/
object TransformApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
/**
* 创建StreamingContext需要两个参数,SparkConf 和 batch interval
*/
val lines = ssc.socketTextStream("localhost", 9999)
/**
* 构建黑名单
*/
val blacks = List("zs", "ls")
val blacksRDD = ssc.sparkContext.parallelize(blacks).map((_, true))
val res = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
rdd.leftOuterJoin(blacksRDD).filter(!_._2._2.getOrElse(false))
.map(_._2._1)
})
res.print()
ssc.start()
ssc.awaitTermination()
}
}
2 File Stream
SparkStreaming也可以使用文件系统作为数据源。本地文件系统、或者hdfs都可以。
package com.hollysys.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by shirukai on 2018/10/16
* 使用Spark Streaming 处理文件系统(local/hdfs)的数据
*/
object FileWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.textFileStream("file:///Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/data/text")
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
3 Flume Stream
3.1 方式一:Push方式整合Flume Agent
官网:http://spark.apache.org/docs/latest/streaming-flume-integration.html
Flume 推送数据到SparkStreaming。这时SparkStreaming需要启动一个Avro代理的接受器来接受数据。实现流程为:Flume 设置一个netcat-source,经过memory-channel,然后使用avro-sink发送数据,最后spark streaming接受数据然后处理。
3.1.1 配置Flume Agent
我们需要配置一个Flume Agent 用来收集收集数据然后发送给Spark Streaming。
Flume Agent的配置选型为:netcat-source –> memory-channel –> avro-sink
在$FLUME_HOME/conf 下创建flume-push-spark.conf
vi $FLUME_HOME/conf/flume-push-spark.conf
内容如下:
flume2spark.sources = netcat-source
flume2spark.sinks = avro-sink
flume2spark.channels = memory-channel
# Describe/configure the source
# sources类型
flume2spark.sources.netcat-source.type = netcat
flume2spark.sources.netcat-source.bind = localhost
flume2spark.sources.netcat-source.port = 9090
flume2spark.sinks.avro-sink.type = avro
flume2spark.sinks.avro-sink.hostname = localhost
flume2spark.sinks.avro-sink.port = 9999
flume2spark.channels.memory-channel.type = memory
flume2spark.channels.memory-channel.capacity = 1000
flume2spark.channels.memory-channel.transactionCapacity = 100
flume2spark.sources.netcat-source.channels = memory-channel
flume2spark.sinks.avro-sink.channel = memory-channel
3.1.2 Spark Streaming 代码开发
3.1.2.1 引入依赖
这里我们的spark.version版本为2.3.0
<!--Spark Streaming flume-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
3.1.2.2 代码开发
创建FlumePushWordCount.scala
package com.hollysys.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.flume._
/**
* Created by shirukai on 2018/10/17
* Spark Streaming 整合Flume,Flume push 数据到Spark Streaming
*/
object FlumePushWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
val flumeData = FlumeUtils.createStream(ssc, "localhost", 9999)
val result = flumeData.map(x => new String(x.event.getBody.array()).trim).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
3.1.3 IDEA 启动运行测试
代码开发完成后,我们可以在IDEA里直接启动测试,这里我们需要先启动Spark Streaming 的应用程序,然后再启动Flume 的Agent。
3.1.3.1 启动Spark Streaming应用
启动我们编写好的Spark Streaming应用的main方法。
3.1.3.2 启动Flume Agent
启动我们上面创建的Flume Agent
flume-ng agent \
--name flume2spark \
--conf-file /Users/shirukai/apps/flume-1.8.0/conf/flume-push-spark.conf \
-Dflume.root.logger=INFO,console
3.1.3.3 使用telnet发送数据
telnet localhost 9090
执行结果:
3.1.4 以部署的方式运行测试
上面我们使用IDEA直接运行main方法测试了我们的应用程序,下面来记录一下如何以部署的形式运行应用程序。
通过官方文档我们可以看出,在使用SBT或者Maven打包项目的时候,我们需要把spark-streaming-flume_2.11及其依赖也打包到应用程序的JAR中,对于Python的应用程序在spark-submit的时候通过制定–packages来指定依赖包。接下来将演示两种方式加载依赖,一种是把依赖打包到JAR中,另一种是在提交应用程序时指定依赖包。如果不引入依赖会报如下错误:
3.1.4.1 将依赖打包到JAR中
我们可以通过maven的打包插件,将spark-streaming-flume_2.11打包到应用程序的JAR中。可以通过如下配置:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>com.alibaba:fastjson</include>
<include>org.apache.spark:spark-streaming-flume_2.11</include>
<include>org.apache.flume:flume-ng-core</include>
<include>org.apache.flume:flume-ng-sdk</include>
</includes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
经测试,需要打包spark-streaming-flume_2.11、flume-ng-core\flume-ng-sdk这三个依赖包。否则回报错。
将项目打包
mvn package -DskipTests
执行命令后我们会在项目的target目录下得到一个learn-demo-spark-1.0-SNAPSHOT.jar的JAR。
使用spark-submit 提交应用
spark-submit --master local --class com.hollysys.spark.streaming.FlumePushWordCount /Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/target/learn-demo-spark-1.0-SNAPSHOT.jar
3.1.4.2 在提交应用是指定依赖
spark-submit --master local[2] --packages org.apache.spark:spark-streaming-flume_2.11:2.3.0 --class com.hollysys.spark.streaming.FlumePushWordCount /Users/shirukai/Desktop/HollySys/Repository/learn-demo-spark/target/learn-demo-spark-1.0-SNAPSHOT.jar
注意:如果遇到下载依赖包报错的情况, 可以到maven仓库里先把之前下载好的包删掉。
3.2 方式二:Pull方式整合Flume Agent
Flume 将数据发送到spark-sink,然后Spark Streaming 从spark-sink上拉取数据。
3.2.1 配置Flume Agent
我们需要配置一个Flume Agent 用来收集收集数据然后发送给spark-sink。
Flume Agent的配置选型为:netcat-source –> memory-channel –> spark-sink
在$FLUME_HOME/conf 下创建flume-pull-spark.conf
vi $FLUME_HOME/conf/flume-pull-spark.conf
内容如下:
flume2spark.sources = netcat-source
flume2spark.sinks = spark-sink
flume2spark.channels = memory-channel
# Describe/configure the source
# sources类型
flume2spark.sources.netcat-source.type = netcat
flume2spark.sources.netcat-source.bind = localhost
flume2spark.sources.netcat-source.port = 9090
flume2spark.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
flume2spark.sinks.spark-sink.hostname = localhost
flume2spark.sinks.spark-sink.port = 9999
flume2spark.channels.memory-channel.type = memory
flume2spark.channels.memory-channel.capacity = 1000
flume2spark.channels.memory-channel.transactionCapacity = 100
flume2spark.sources.netcat-source.channels = memory-channel
flume2spark.sinks.spark-sink.channel = memory-channel
重要!重要!重要!
为了避免下面过程中出现的错误,在这里需要将Flume依赖的jar包放到$FLUME_HOME/lib下,可以参考官网
3.2.2 Spark Streaming 代码开发
3.2.2.1 引入依赖
<!--Spark Streaming flume sink-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--commons lang3-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>
3.2.2.2 代码开发
创建FlumePullWordCount.scala
package com.hollysys.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by shirukai on 2018/10/17
* Spark Streaming 整合Flume,Spark Streaming 从flume pull 数据
*/
object FlumePullWordCount {
def main(args: Array[String]): Unit = {
// if(args.length !=2){
// System.err.println("Usage: FlumePushWordCount <hostname> <port>")
// System.exit(1)
// }
val conf = new SparkConf().setAppName("FlumePullWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
val flumeData = FlumeUtils.createPollingStream(ssc,"localhost",9999)
val result = flumeData.map(x => new String(x.event.getBody.array()).trim).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
3.2.3 IDEA 启动运行测试
代码开发完成后,我们可以在IDEA里直接启动测试,这里我们需要先启动Flume 的Agent,然后再启动Spark Streaming 的应用程序。与Push方式相反。
3.2.3.1 启动Flume Agent
flume-ng agent \
--name flume2spark \
--conf-file /Users/shirukai/apps/flume-1.8.0/conf/flume-pull-spark.conf \
-Dflume.root.logger=INFO,console
这时候会报如下错误:
根据报错信息可以看出,我们的flume使用了spark-sink,但是没有找到org.apache.spark.streaming.flume.sink.SparkSink这个类,这时需要将依赖的spark-streaming-flume-sink_2.11.jar包复制到$FLUME_HOME/lib下。
关于如何下载jar包:
可以到https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume-sink_2.11/2.3.0 maven仓库下载。
下载完成之后,将jar移动到$FLUME_HOME/lib下,然后重新启动。
mv spark-streaming-flume-sink_2.11-2.3.0.jar /Users/shirukai/apps/flume-1.8.0/lib/
3.2.3.2 启动Spark Streaming 应用
当spark streaming应用启动的时候,Flume Agent回报如下错误,原因是因为我们需要导入相应的jar包。
https://mvnrepository.com/artifact/org.scala-lang/scala-library/2.11.8
删除之前的版本,替换成最新下载的2.11.8
https://mvnrepository.com/artifact/org.apache.commons/commons-lang3/3.5
3.2.3.3 使用telnet 发送数据
telnet localhost 9090
4 Kafka Stream
官网:http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
4.1 Spark Streaming 代码开发
4.1.1 引入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
</dependency>
3.1.2 代码开发
package com.hollysys.spark.streaming
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by shirukai on 2018/10/29
* Spark Streaming 整合Kafka 进行词频统计
*/
object KafkaDirectWordCount {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("Usage:KafkaDirectWordCount <brokers> <topics>")
System.exit(1)
}
val Array(servers, topics) = args
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
//val spark = SparkSession.builder().getOrCreate()
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> servers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark_streaming",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topicSet = topics.split(",").toSet
val message = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicSet, kafkaParams)
)
message.map(_.value()).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
ssc.start()
ssc.awaitTermination()
}
}
4.2 运行测试
4.2.1 启动Kafka
启动zookeeper
sh $ZK_HOME/bin/zkServer.sh start
启动kafka
sh $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
创建topic
sh $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic
查看topic
sh $KAFKA_HOME/bin/kafka_topics.sh --list --zookeeper localhost:2181
使用kafka console producer 发送消息
sh $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_streaming_topic
4.2.2 启动应用
5 实战:获取应用日志,并进行实时分析
需求描述:通过Flume收集应用程序产生的日志,然后Flume将日志发送到kafka消息队里,最后Spark Streaming 分析 kafka里的数据,判断是否为WARN或者ERROR类日志,并打印输出。
实现思路:应用 通过log4j Appender 将日志信息发送给Flume,Flume使用Avro Souce接收数据,经过Memory Channel 通过 Kafka Sink 发送给Kafka,最后SparkStreaming进行数据处理,如下图所示:
5.1 配置 Kafka
启动zookeeper
sh $ZK_HOME/bin/zkServer.sh start
启动kafka
sh $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
创建topic:log-streaming
sh $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic log-streami
查看topic
sh $KAFKA_HOME/bin/kafka_topics.sh --list --zookeeper localhost:2181
5.2 配置Flume Angent
在FLUME_HOME/conf/下创建log-angent.conf的配置文件,内容如下:
log-angent.sources = avro-source
log-angent.sinks = kafka-sink
log-angent.channels = memory-channel
log-angent.sources.avro-source.type = avro
log-angent.sources.avro-source.bind = localhost
log-angent.sources.avro-source.port = 9999
log-angent.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
log-angent.sinks.kafka-sink.kafka.topic = log-streaming
log-angent.sinks.kafka-sink.kafka.bootstrap.servers = localhost:9092
log-angent.channels.memory-channel.type = memory
log-angent.channels.memory-channel.capacity = 1000
log-angent.channels.memory-channel.transactionCapacity = 100
log-angent.sources.avro-source.channels = memory-channel
log-angent.sinks.kafka-sink.channel = memory-channel
启动 Flume Agent
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/log-angent.conf --name log-angent
5.3 Application 整合Log4j Appender
想要收集应用里的日志,Flume官网提供了一个Log4j Appender的类用来收集应用的日志。需要我们在应用中整合Log4j Appender。官网地址:http://flume.apache.org/FlumeUserGuide.html#load-balancing-log4j-appender
5.3.1 修改日志配置
修改应用中resources目录下的log4j.properties配置文件,添加如下内容:
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = localhost
log4j.appender.flume.Port = 9999
log4j.appender.flume.UnsafeMode = true
然后在log4j.rootCategory添加flume
log4j.rootCategory=INFO, console,flume
5.3.2 添加log4j-appender依赖
添加log4j-appender依赖,否则会报如下错误:
依赖:
<!--flume log4j appender-->
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.8.0</version>
</dependency>
5.3.3 模拟日志生成
创建LogGenerator类,模拟日志生成,并没10条数据产生一条错误日志。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by shirukai on 2018/10/30
* 模拟产生日志
*/
public class LogGenerator {
public static void main(String[] args) throws Exception {
Logger LOG = LoggerFactory.getLogger(LogGenerator.class);
int index = 0;
while (true) {
Thread.sleep(1000);
if (index % 10 == 0) {
LOG.error("value:{}", index);
} else {
LOG.info("value:{}", index);
}
index++;
}
}
}
5.4 开发Spark Streaming应用
创建Spark Streaming应用,用来处理kafka数据。如下所示:KafkaLogHandler
package com.hollysys.spark.streaming
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
/**
* Created by shirukai on 2018/10/31
* 处理kafka里log数据
*/
object KafkaLogHandler {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("Usage:KafkaDirectWordCount <brokers> <topics>")
System.exit(1)
}
val Array(servers, topics) = args
val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaLogHandler")
val ssc = new StreamingContext(conf, Seconds(5))
//val spark = SparkSession.builder().getOrCreate()
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> servers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark_streaming",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topicSet = topics.split(",").toSet
val message = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicSet, kafkaParams)
)
message.map(_.value()).foreachRDD(rdd => rdd.foreach(println))
ssc.start()
ssc.awaitTermination()
}
}