Spark RDD常用算子是什么类型的(spark rdd,开发技术)

时间:2024-05-10 10:05:45 作者 : 石家庄SEO 分类 : 开发技术
  • TAG :

Spark RDD常用算子:Value类型

Spark之所以比Hadoop灵活和强大,其中一个原因是Spark内置了许多有用的算子,也就是方法。通过对这些方法的组合,编程人员就可以写出自己想要的功能。说白了spark编程就是对spark算子的使用,下面为大家详细讲解一下SparkValue类型的常用算子

Spark RDD常用算子是什么类型的

map

函数说明:

map() 接收一个函数,该函数将RDD中的元素逐条进行映射转换,可以是类型的转换,也可以是值的转换,将函数的返回结果作为结果RDD编程。

函数签名:

defmap[U:ClassTag](f:T=>U):RDD[U]

案例演示

valsparkConf=newSparkConf().setMaster("local[*]").setAppName("Operator")valsc=newSparkContext(sparkConf)//算子-mapvalrdd=sc.makeRDD(List(1,2,3,4),2)valmapRdd1=rdd.map(_*2)mapRdd1.collect().foreach(println)sc.stop()

运行结果

2468

mapPartitons

函数说明:

将待处理的数据以分区为单位发送到待计算节点上进行处理,mapPartition是对RDD的每一个分区的迭代器进行操作,返回的是迭代器。这里的处理可以进行任意的处理。

函数签名:

defmapPartitions[U:ClassTag](f:Iterator[T]=>Iterator[U],preservesPartitioning:Boolean=false):RDD[U]

案例演示

defmain(args:Array[String]):Unit={valsparkConf=newSparkConf().setMaster("local[*]").setAppName("Operator")valsc=newSparkContext(sparkConf)//算子-mapPartitons计算每个分区的最大数valrdd=sc.makeRDD(List(1,34,36,345,2435,2342,62,35,4),4)valmapParRdd=rdd.mapPartitions(iter=>{List(iter.max).iterator})mapParRdd.foreach(println)sc.stop()}

运行结果:

62243534345

mapPartitonsWithIndex

函数说明:

将待处理的数据以分区为单位发送到计算节点上,这里的处理可以进行任意的处理,哪怕是过滤数据,在处理的同时可以获取当前分区的索引值。

函数签名:

defmapPartitionsWithIndex[U:ClassTag](f:(Int,Iterator[T])=>Iterator[U],preservesPartitioning:Boolean=false):RDD[U]

案例演示:

  1. 将数据进行扁平化映射并且打印所在的分区数
defmain(args:Array[String]):Unit={valconf=newSparkConf().setMaster("local[*]").setAppName("rdd")valsc=newSparkContext(conf)valrdd=sc.makeRDD(List("HelloSpark","HelloScala","WordCount"),2)valmapRDD=rdd.flatMap(_.split(""))valmpwiRdd=mapRDD.mapPartitionsWithIndex((index,datas)=>{datas.map(num=>{(index,num)})})mpwiRdd.collect().foreach(println)}

运行结果:

(0,Hello)(0,Spark)(1,Hello)(1,Scala)(1,Word)(1,Count)
  1. 将数据进行扁平化映射只打印所在第一分区的数据
defmain(args:Array[String]):Unit={valconf=newSparkConf().setMaster("local[*]").setAppName("rdd")valsc=newSparkContext(conf)valrdd=sc.makeRDD(List("HelloSpark","HelloScala","WordCount"),2)valmapRDD=rdd.flatMap(_.split(""))valmpwiRdd=mapRDD.mapPartitionsWithIndex((index,datas)=>{if(index==0){datas.map(num=>{(index,num)})}else{Nil.iterator}})mpwiRdd.collect().foreach(println)

运行结果:

(0,Hello)(0,Spark)

flatMap

函数说明:

将数据进行扁平化之后在做映射处理,所以算子也称为扁平化映射

函数签名:

defflatMap[U:ClassTag](f:T=>TraversableOnce[U]):RDD[U]

案例演示:

将每个单词进行扁平化映射

defmain(args:Array[String]):Unit={valsparkConf=newSparkConf().setMaster("local[*]").setAppName("Operator")valsc=newSparkContext(sparkConf)//算子-mapvalrdd=sc.makeRDD(List("HelloScala","HelloSpark"),2)valFltRdd=rdd.flatMap(_.split(""))FltRdd.foreach(println)sc.stop()}

运行结果:

HelloScalaHelloSpark

glom

函数说明:

glom的作用就是将一个分区的数据合并到一个array中。

函数签名:

defglom():RDD[Array[T]]

案例演示:

  1. 将不同分区rdd的元素合并到一个分区
defmain(args:Array[String]):Unit={valconf=newSparkConf().setMaster("local[*]").setAppName("rdd")valsc=newSparkContext(conf)valrdd=sc.makeRDD(List(1,2,3,4,5,6,7,8,9),2)valglomRdd=rdd.glom()glomRdd.collect().foreach(data=>println(data.mkString(",")))sc.stop()}

运行结果:

1,2,3,45,6,7,8,9

groupBy

函数说明:

将数据根据指定的规则进行分组,分区默认不变,单数数据会被打乱,我们成这样的操作为shuffer,

函数签名:

defgroupBy[K](f:T=>K)(implicitkt:ClassTag[K]):RDD[(K,Iterable[T])]

案例演示:

  1. 按照奇偶数进行groupby分区
defmain(args:Array[String]):Unit={valconf=newSparkConf().setMaster("local[*]").setAppName("rdd")valsc=newSparkContext(conf)valrdd=sc.makeRDD(List(1,2,3,4,5,6,7,8,10),2)valgroupByRDD=rdd.groupBy(_%2==0)groupByRDD.collect().foreach(println)sc.stop()}

运行结果:

(false,CompactBuffer(1,3,5,7))(true,CompactBuffer(2,4,6,8,10))
  1. 按照单词的首字母进行分组
defmain(args:Array[String]):Unit={valconf=newSparkConf().setMaster("local[*]").setAppName("rdd")valsc=newSparkContext(conf)valrdd=sc.makeRDD(List("Hello","Tom","Timi","Scala","Spark"))valgroupByRDD=rdd.groupBy(_.charAt(0))groupByRDD.collect().foreach(println)sc.stop()}

运行结果:

(T,CompactBuffer(Tom,Timi))(H,CompactBuffer(Hello))(S,CompactBuffer(Scala,Spark))

filter

函数说明:

filter即过滤器的意思,所以filter算子的作用就是过滤的作用。filter将根据指定的规则进行筛选过滤,符合条件的数据保留,不符合的数据丢弃,当数据进行筛选过滤之后,分区不变,但分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

函数签名:

deffilter(f:T=>Boolean):RDD[T]

案例演示:

  1. 筛选出能被二整除的数字
defmain(args:Array[String]):Unit={valconf=newSparkConf().setMaster("local[*]").setAppName("rdd")valsc=newSparkContext(conf)valrdd=sc.makeRDD(List(46,235,246,2346,3276,235,234,6234,6245,246,24,6246,235,26,265))valfilterRDD=rdd.filter(_%2==0)filterRDD.collect().foreach(println)sc.stop()}

运行结果:

4624623463276234623424624624626

2.筛选单词中包含H的

defmain(args:Array[String]):Unit={valconf=newSparkConf().setMaster("local[*]").setAppName("rdd")valsc=newSparkContext(conf)valrdd=sc.makeRDD(List("Hello","Horber","Hbeer","ersfgH","Scala","Hadoop","Zookeeper"))valfilterRDD=rdd.filter(_.contains("H"))filterRDD.collect().foreach(println)sc.stop()}

运行结果:

HelloHorberHbeerersfgHHadoop
 </div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
本文:Spark RDD常用算子是什么类型的的详细内容,希望对您有所帮助,信息来源于网络。
上一篇:Linux系统IO分析工具iotop怎么用下一篇:

3 人围观 / 0 条评论 ↓快速评论↓

(必须)

(必须,保密)

阿狸1 阿狸2 阿狸3 阿狸4 阿狸5 阿狸6 阿狸7 阿狸8 阿狸9 阿狸10 阿狸11 阿狸12 阿狸13 阿狸14 阿狸15 阿狸16 阿狸17 阿狸18