概念:
一个只读且分区的数据集
RDD的优势:
高效容错
可以控制数据的分区来优化计算性能
并行处理
提供了丰富的操作数据的api
可以显示的将任何类型的中间结果存储在内存中
RDD的五个主要特性
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions 一系列的分区/分片
* - A function for computing each split 一个用于计算每一个分区的函数
* - A list of dependencies on other RDDs 一些列依赖
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 分区器
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file) 数据在哪优先把作业调度到数据所在的节点进行计算:移动数据不如移动计算
深入查看RDD的五个抽象API
源码:
/**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
*
* 在一个task上下文中计算某一个分区的数据得到一个Iterator
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*
* The partitions in this array must satisfy the following property:
* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
* 获取RDD的分区列表,用于并行计算
*/
protected def getPartitions: Array[Partition]
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
* 获取依赖列表
*/
protected def getDependencies: Seq[Dependency[_]] = deps
/**
* Optionally overridden by subclasses to specify placement preferences.
* 获取RDD某一个分区的数据存储在哪一个机器上
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/** Optionally overridden by subclasses to specify how they are partitioned.
* 分区器
* */
@transient val partitioner: Option[Partitioner] = None
RDD的创建方式
- 从一个稳定的存储系统中,比如HDFS文件
- 从一个存在的RDD上可以创建一个RDD
- 从内存中已存在的序列列表中
package com.hollysys.spark
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by shirukai on 2018/6/21
*/
object RDDCreationTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.toString).setMaster("local")
val sc = new SparkContext(conf)
//创建RDD的方法
//1.从一个稳定的存储系统中,比如hdfs文件,或者本地文件系统
val hdfsFileRDD = sc.textFile("hdfs://cdh-master:8020//user/root/srk/words.txt")
hdfsFileRDD.count()
//2.从一个已经存在的RDD中,即RDD的transformation api
val mapRDD = hdfsFileRDD.map(x => x + "test")
mapRDD.count()
//3.从一个已经存在于内存中的列表,可以指定分区
val listRDD = sc.parallelize[Int](Seq(1,2,3,3,4),2)
listRDD.collect()
//查看哪个分区有哪些元素
listRDD.glom().collect()
//创建一个range RDD 从0到10步长为2,分区个数是4个
val rangeRDD = sc.range(0,10,2,4)
rangeRDD.collect()
/**
* res7: Array[Long] = Array(0, 2, 4, 6, 8)
* */
//与parallelize一样,makeRDD可以指定存储的机器
val makeRDD = sc.makeRDD(Seq(1,2,3,3,4),2)
makeRDD.collect()
}
}
Parallelize
RDD Dependency
窄依赖:父亲RDD的一个分区的数据只能被子RDD的一个分区消费
宽依赖:父亲RDD的一个分区的数据同时被子RDD的多个分区消费
RDD分区
从存储系统创建RDD的分区不需要分区
非key-value RDD分区不需要分区
key-value需要分区
HashPartitioner
partitionBy(newHashPartitioner(2))
HashPartitioner性能优化
两个知识点:
对RDD预分区会提高计算性能
是否保留父RDD的分区器
RangePartitioner原理
将可以排序的key分到几个大概相等的范围分区中的一个分区汇中
比如一个有10个分区的RDD[(Int,String)]需要按照RangePartitioner重分区为3个分区:
分区一接收>=0且<=10的key的数据
分区二接收>10且<=30的key的数据
分区三接收>30的key的数据
实现步骤如下:
- 对每一个分区进行数据采样并计算每一个采样到的数据的权重
- 根据采样到的数据和权重计算每一个分区的最大的key值
- 用需要分区的key和上面计算得到的每一个分区最大的key值对比决定这个key所在的分区
自定义Partitioner
如果key为url,我们希望域名相同的key进入到同一个分区
我们自定义DomainNamePartitioner
package com.hollysys.spark
import java.net.URL
import org.apache.spark.{HashPartitioner, Partitioner, SparkContext}
/**
* Created by shirukai on 2018/6/21
*/
class DomainNamePartitioner(val numParts: Int) extends Partitioner {
//分多少分区
override def numPartitions: Int = numParts
//获取分区
override def getPartition(key: Any): Int = {
val domain = new URL(key.toString).getHost
val code = (domain.hashCode % numParts)
if (code < 0) {
code + numParts
} else {
code
}
}
override def equals(obj: scala.Any): Boolean = obj match {
case dnp: DomainNamePartitioner =>
dnp.numParts == numParts
case _ => false
}
//override def hashCode(): Int = numParts
}
object DomainNamePartitioner {
def main(args: Array[String]): Unit = {
val sc = new SparkContext("local", this.getClass.getSimpleName)
val urlRDD = sc.makeRDD(Seq(("http://baidu.com/test", 2), ("http://baidu.com/index", 2),
("http://ali.com/index", 3), ("http://ali.com/bigdata", 4), ("http://baidu.com/change", 3)))
urlRDD.glom().collect()
//Array(Array((http://baidu.com/test,2), (http://baidu.com/index,2)),
// Array((http://hollysys.com/index,3), (http://hollysys.com/bigdata,4), (http://baidu.com/change,3)))
val hashPartitionedRDD = urlRDD.partitionBy(new HashPartitioner(2))
hashPartitionedRDD.glom().collect()
//Array(Array(),
// Array((http://hollysys.com/index,3), (http://hollysys.com/bigdata,4), (http://baidu.com/change,3), (http://baidu.com/test,2), (http://baidu.com/index,2)))
val domainNamePartitionedRDD = urlRDD.partitionBy(new DomainNamePartitioner(2))
val a = domainNamePartitionedRDD.glom().collect()
println(a)
}
}
Hash 与Range两种Partitioner对比
coalesce使用场景
改变RDD的分区数
package com.hollysys.spark
import org.apache.spark.SparkContext
/**
* Created by shirukai on 2018/6/21
*/
object CoalesceTest {
def main(args: Array[String]): Unit = {
val sc = new SparkContext("local",this.getClass.getSimpleName)
//创建一个RDD,并设置分区数为1000个
val hdfsFileRDD = sc.textFile("hdfs://cdh-master:8020//user/root/srk/words.txt",1000)
//查看 hdfsFileRDD的分区数是否为1000
hdfsFileRDD.partitions.size
//我们通过coalesce来降低分区数量的目的是:
//分区太多,每个分区的数据量太少,导致太多的task,我们想介绍task的数量,所以要降低分区数
//第一个参数表示我们期望的分区数
//第二个参数表示是否需要经过shuffle来达到我们的分区数
val coalesceRDD = hdfsFileRDD.coalesce(100,false)
coalesceRDD.partitions.size
}
}
场景一:将一个含有100个分区的RDD的分区降为10个
APi: hdfsFileRDD.coalesce(10,false)
场景二:将一个含有10个分区的RDD的分区数升为100个
Api: hdfsFileRDD.coalesce(100,false) 不会增
场景三:将一个含有1000个分区的RDD的分区数降为2个
Api:hdfsFileRDD.coalesce(2,true)
场景四:将一个含有10个分区的RDD的分区数升为100个
Api:hdfsFileRDD.coalesce(100,true)
hdfsFileRDD.repartition(100) ==hdfsFileRDD.coalesce(100,true)
单类型RDD操作API
RDD基本transformation api 介绍
map、flatmap、filter、mapPartitions、mapPatitonWithIndexRDD
package com.hollysys.spark
import org.apache.spark.SparkContext
/**
* Created by shirukai on 2018/6/21
*/
object MapApiTest {
def main(args: Array[String]): Unit = {
val sc = new SparkContext("local", this.getClass.getSimpleName)
val listRDD = sc.parallelize(Seq(1, 2, 3, 3, 4), 2)
val mapRDD = listRDD.map(x => x + 2)
mapRDD.collect
//Array(3, 4, 5, 5, 6)
val users = listRDD.map(x => {
if (x < 3) User("小于3", x) else User("大于3", x)
})
//res0: Array[User] = Array(User(小于3,1), User(小于3,2), User(大于3,3), User(大于3,3), User(大于3,4))
//scala中的map和flatmap的区别
val l = List(List(1, 2, 3), List(2, 3, 4))
l.map(x => x.toString())
//res0: List[String] = List(List(1, 2, 3), List(2, 3, 4))
l.flatMap(x => x)
//res1: List[Int] = List(1, 2, 3, 2, 3, 4)
0.to(3)
//res2: scala.collection.immutable.Range.Inclusive = Range(0, 1, 2, 3)
//spark中的flatMap
val flatMapRDD = listRDD.flatMap(x => x.to(3))
flatMapRDD.collect()
//res3: Array[Int] = Array(1, 2, 3, 2, 3, 3, 3)
val filterRDD = listRDD.filter(x => x != 1)
filterRDD.collect()
//res4: Array[Int] = Array(2, 3, 3, 4)
//将rdd的每一个分区的数据转成一个数组,进而将所有的分区数据转成一个二维数组
val glomRDD = listRDD.glom()
glomRDD.collect()
//res5: Array[Array[Int]] = Array(Array(1, 2), Array(3, 3, 4))
val mapPartitionRDD = listRDD.mapPartitions(iterator => {
iterator.map(x => x + 1)
})
mapPartitionRDD.collect()
//res0: Array[Int] = Array(2, 3, 4, 4, 5)
val mapPatitonWithIndexRDD = listRDD.mapPartitionsWithIndex((index, iterator) => {
iterator.map(x => x + index)
})
mapPatitonWithIndexRDD.collect()
mapPartitionRDD.saveAsTextFile("hdfs://localhost:9000/output")
}
}
case class User(userId: String, amount: Int)
采样API介绍
sample(false,0.1)
sample(withReplacement:boolean,fraction:double,seed:Long)
有放回采样,无放回采样
如果withReplacement=true的话表示有放回的抽样,采用泊松抽样算法实现
如果withReplacement=false的话表示无返回的抽样,采用伯努利抽样算法实现
fraction表示每一个元素被抽为样本的概率,并不是表示需要抽取的数据量的因子
seed 种子,每一个分区采样的随机种子
takeSample(false,5)
randomSplit(Array(0.2,0.4,0.4))
package com.hollysys.spark
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by shirukai on 2018/6/22
* 抽样api测试
*/
object SampleApiTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local")
val sc = new SparkContext(conf)
val listRDD = sc.parallelize(Seq(1, 2, 3, 3), 2)
//第一个参数为withReplacenment
//如果withReplacement=true的话表示又放回的抽样,采用泊松抽样算法实现
//如果withReplacement=false的话表示无放回抽样,采用伯努利抽样算法实现
//第二个参数为:fraction,表示每一个元素被抽取为样本的概率,并不是表示需要抽取的数据量的因子
//比如从100个数据中抽样,fraction=0.2,并不是表示需要抽取100*0.2=20个数据,
//而是100个元素的被抽取为样本的概率为0.2;样本的大小并不是固定的,而是服从二项分布
//当withReplacement=true的时候,fraction>=0
//当withReplacement=false的时候,0<fraction<1
//第三个参数为:reed表示生成随机数的种子,即根据这个reed为rdd的每一个分区生成一个随机种子
val sampleRDD = listRDD.sample(false, 0.5, 100)
sampleRDD.glom().collect()
//Array[Array[Int]] = Array(Array(1), Array(3))
//按照权重对RDD进行随机抽样切分,有几个权重,就切分成几个RDD
val splitRDD = listRDD.randomSplit(Array(0.2, 0.8))
splitRDD.size
splitRDD(0).glom().collect()
//res3: Array[Array[Int]] = Array(Array(), Array())
splitRDD(1).glom().collect()
//res4: Array[Array[Int]] = Array(Array(1, 2), Array(3, 3))
//随机抽取指定数量的样本数据
listRDD.takeSample(false, 1, 100)
//res1: Array[Int] = Array(1)
}
}
分层采样API
分层采样:将数据根据不同的特征组成不同的组,然后按特定条件从不同的组中获取样本并重新组成新的数组。
对于一个键值RDD,key用于分类,value可以使任意的值
然后我们通过fractions参数定义分类条件和采样几率
因此fracions参数定义一个Map[K,Double]类型,其中key是键值的分层条件,Double是满足条件的key条件的采样比例
sampleBykey
sampleByKeyExact
val pairRDD = sc.parallelize[(Int, Int)](Seq(
(1, 2), (3, 4), (3, 6), (5, 6)
), 4)
pairRDD.collect()
//Array[(Int, Int)] = Array((1,2), (3,4), (3,6), (5,6))
//分层采样
val fractions = Map(1 -> 0.3, 3 -> 0.6, 5 -> 0.3)
val sampleByKeyRDD = pairRDD.sampleByKey(true, fractions)
sampleByKeyRDD.glom().collect()
//Array[Array[(Int, Int)]] = Array(Array((1,2)), Array(), Array(), Array((5,6)))
val sampleByKeyExacRDD = pairRDD.sampleByKeyExact(true, fractions)
sampleByKeyExacRDD.glom().collect()
//res1: Array[Array[(Int, Int)]] = Array(Array((1,2)), Array(), Array((3,6), (3,6)), Array((5,6))
sampleBykey 和sampleByKeyExact的区别
sampleBykey并不对过滤全量数据。因此只得到近似值
sampleByKeyExtra会对全量数据做采样计算,因此耗费大量的计算资源,但是结果会更准确
pipe的使用方式及其特点
执行python或者sh脚本,然后生成新的脚本
package com.hollysys.spark
import org.apache.spark.SparkContext
/**
* Created by shirukai on 2018/6/27
*/
object pipeTest {
def main(args: Array[String]): Unit = {
val sc = new SparkContext("local", this.getClass.getSimpleName)
val dataRDD = sc.parallelize(List("hi", "hello", "how", "are", "you"), 2)
//运行进程需要的环境变量
val env = Map("env" -> "test-env")
def printPipeContext(func: String => Unit): Unit = {
val tastkContextData = "this is task context data per partition"
func(tastkContextData)
}
def printRDDElement(ele: String, func: String => Unit): Unit = {
if (ele == "hello") {
func("dog")
} else func(ele)
}
val pipeRDD = dataRDD.pipe(Seq("sh", "/Users/shirukai/Desktop/HollySys/Repository/sparkLearn/src/main/resource/echo.sh"),
env, printPipeContext, printRDDElement, false)
pipeRDD.glom().collect()
}
}
RDD基本操作-action
foreach、foreachPartition、collect、take、first、top、max、min
package com.hollysys.spark
import org.apache.spark.SparkContext
/**
* Created by shirukai on 2018/6/27
*/
object BaseActionApiTest {
def main(args: Array[String]): Unit = {
val sc = new SparkContext("local", this.getClass.getSimpleName)
val listRDD = sc.parallelize[Int](Seq(1, 2, 4, 3, 3, 6), 2)
listRDD.collect()
//res6: Array[Int] = Array(1, 2, 4, 3, 3, 6)
listRDD.take(2)
//res7: Array[Int] = Array(1, 2)
listRDD.top(2)
//res8: Array[Int] = Array(6, 4)
listRDD.first()
//res9: Int = 1
listRDD.min()
//res10: Int = 1
listRDD.max()
//res11: Int = 6
listRDD.takeOrdered(2)
//res12: Array[Int] = Array(1, 2)
listRDD.reduce((x, y) => x + y)
//res13: Int = 19
listRDD.treeReduce((x, y) => x + y)
//res14: Int = 19
listRDD.fold(0)((x, y) => x + y)
//res15: Int = 19
}
}
class MyOrdering extends Ordering[Int] {
override def compare(x: Int, y: Int): Int = {
x - y
}
}
key-value类型RDD操作API
package com.hollysys.spark
import org.apache.spark.SparkContext
/**
* Created by shirukai on 2018/6/28
*/
object KeyValueCreationTest {
def main(args: Array[String]): Unit = {
val sc = new SparkContext("local", this.getClass.getSimpleName)
val kvPairRDD = sc.parallelize(Seq(
("key1", "value1"),
("key2", "value2"),
("key3", "value3")
))
kvPairRDD.collect()
//res0: Array[(String, String)] = Array((key1,value1), (key2,value2), (key3,value3))
val personSeqRDD = sc.parallelize(Seq(
User("jeffy", 30),
User("kkk", 20),
User("jeffy", 30),
User("kkk", 30)
))
//将RDD变成二元组类型的RDD
val keyByRDD = personSeqRDD.keyBy(_.userId)
keyByRDD.collect()
//res1: Array[(String, User)] = Array((jeffy,User(jeffy,30)), (kkk,User(kkk,20)), (jeffy,User(jeffy,30)), (kkk,User(kkk,30)))
//等价于
val keyRDD2 = personSeqRDD.map(user => (user.userId, user))
keyRDD2.collect()
val groupByRDD = personSeqRDD.groupBy(_.userId)
groupByRDD.collect()
//res3: Array[(String, Iterable[User])] = Array((jeffy,CompactBuffer(User(jeffy,30), User(jeffy,30))), (kkk,CompactBuffer(User(kkk,20), User(kkk,30))))
}
}
case class User(userId: String, amount: Int)
combineByKey
前三个参数:
package com.hollysys.spark
import org.apache.spark.SparkContext
import scala.reflect.ClassTag
/**
* Created by shirukai on 2018/6/28
*/
object CombineByKeyApiTest {
def test[C: ClassTag]() = {
println(reflect.classTag[C].runtimeClass.getName)
}
def main(args: Array[String]): Unit = {
test[String]()
val sc = new SparkContext("local", this.getClass.getSimpleName)
val pairStrRDD = sc.parallelize[(String, Int)](Seq(
("coffee", 1),
("coffee", 2),
("panda", 3),
("coffee", 9)
))
/**
*
* 功能:对pairStrRDD这个RDD统计每一个相同key对用的所有value值的累加值以及这个key出现的次数
* 需要的三个参数
*
* createCombiner:V=>C, ==>Int -> (Int,Int)
* mergeValue:(C,V)=>C, ==>((Int,Int),Int) -> (Int,Int)
* mergeCombiners:(C,C) =>C ==> ((Int,Int),(Int,Int)) ->(Int,Int)
*
*/
def createCombiner = (value: Int) => (value, 1)
def mergeValue = (acc: (Int, Int), value: Int) => (acc._1 + value, acc._2 + 1)
def mergeCombiners = (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
val testCombineByKeyRDD =
pairStrRDD.combineByKey(createCombiner, mergeValue, mergeCombiners)
testCombineByKeyRDD.collect()
}
}
参数:partitioner
参数:mapSideCombine
参数:serializer
基于conmbineByKey实现的api详解
aggregateByKey、reduceByKey(distinct是利用reduceByKey实现的)、foldByKey、groupByKey(groupBy是利用groupByKey实现的)
以上api都是基于combineByKey实现的,只是参数不同而已。