SparkShell和IDEA中如何编写Spark程序(idea,spark,开发技术)

时间:2024-05-10 01:23:39 作者 : 石家庄SEO 分类 : 开发技术
  • TAG :

spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用Scala编写Spark程序。spark-shell程序一般用作Spark程序测试练习来用。spark-shell属于Spark的特殊应用程序,我们可以在这个特殊的应用程序中提交应用程序

spark-shell启动有两种模式,local模式和cluster模式,分别为

local模式:

spark-shell

local模式仅在本机启动一个SparkSubmit进程,没有与集群建立联系,虽然进程中有SparkSubmit但是不会被提交到集群红

SparkShell和IDEA中如何编写Spark程序

Cluster模式(集群模式):

spark-shell \
--master spark://hadoop01:7077 \
--executor-memory 512m \
--total-executor-cores 1

后两个命令不是必须的--master这条命令是必须的(除非在jar包中已经指可以不指定,不然就必须指定)

退出shell

千万不要ctrl+c spark-shell 正确退出 :quit 千万不要ctrl+c退出 这样是错误的 若使用了ctrl+c退出 使用命令查看监听端口 netstat - apn | grep 4040 在使用kill -9 端口号 杀死即可

3.25.11 spark2.2shell和spark1.6shell对比

SparkShell和IDEA中如何编写Spark程序

ps:启动spark-shell若是集群模式,在webUI会有一个一直执行的任务

通过IDEA创建Spark工程

ps:工程创建之前步骤省略,在scala中已经讲解,直接默认是创建好工程的

对工程中的pom.xml文件配置

<!--声明公有的属性-->
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
<hadoop.version>2.7.1</hadoop.version>
<scala.compat.version>2.11</scala.compat.version>
</properties>
<!--声明并引入公有的依赖-->
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>

Spark实现WordCount程序

Scala版本
importorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}

objectSparkWordCount {
defmain(args: Array[String]): Unit = {
valconf =newSparkConf().setAppName("dri/wordcount").setMaster("local[*]")
//创建sparkContext对象
valsc = newSparkContext(conf)
//通过sparkcontext对象就可以处理数据
//读取文件 参数是一个String类型的字符串 传入的是路径
vallines: RDD[String] = sc.textFile(“dir/wordcount”)
//切分数据
valwords: RDD[String] = lines.flatMap(_.split(" "))
//将每一个单词生成元组(单词,1)
valtuples: RDD[(String, Int)] = words.map((_,1))
//spark中提供一个算子reduceByKey相同key为一组进行求和 计算value
valsumed: RDD[(String, Int)] = tuples.reduceByKey(_+_)
//对当前这个结果进行排序sortBy和scala中sotrBy是不一样的 多了一个参数
//默认是升序 false就是降序
valsorted: RDD[(String, Int)] = sumed.sortBy(_._2,false)
//将数据提交到集群存储 无法返回值
sorted.foreach(println)
//回收资源停止sc,结束任务
sc.stop()
}
}

Java版本

importorg.apache.spark.SparkConf;
importorg.apache.spark.api.java.JavaPairRDD;
importorg.apache.spark.api.java.JavaRDD;
importorg.apache.spark.api.java.JavaSparkContext;
importorg.apache.spark.api.java.function.FlatMapFunction;
importorg.apache.spark.api.java.function.Function2;
importorg.apache.spark.api.java.function.PairFunction;
importscala.Tuple2;
importjava.util.Arrays;
importjava.util.Iterator;
importjava.util.List;
publicclassJavaWordCount {
public static voidmain(String[] args) {
//1.先创建conf对象进行配置主要是设置名称,为了设置运行模式
SparkConf conf =newSparkConf().setAppName("JavaWordCount").setMaster("local");
//2.创建context对象
JavaSparkContext jsc =newJavaSparkContext(conf);
JavaRDD<String> lines = jsc.textFile("dir/file");
//进行切分数据flatMapFunction是具体实现类
JavaRDD<String> words = lines.flatMap(newFlatMapFunction<String, String>() {
@Override
public Iterator<String>call(String s) throws Exception {
List<String> splited = Arrays.asList(s.split(" "));
returnsplited.iterator();
}

});
//将数据生成元组
//第一个泛型是输入的数据类型 后两个参数是输出参数元组的数据
JavaPairRDD<String, Integer> tuples = words.mapToPair(newPairFunction<String, String,
Integer>() {
@Override
public Tuple2<String, Integer>call(String s) throws Exception {
returnnewTuple2<String, Integer>(s,1);
}
});
//聚合
JavaPairRDD<String, Integer> sumed = tuples.reduceByKey(newFunction2<Integer, Integer,
Integer>() {
@Override
//第一个Integer是相同key对应的value
//第二个Integer是相同key对应的value
public Integercall(Integer v1, Integer v2) throws Exception {
returnv1 + v2;
}
});
//因为Java api没有提供sortBy算子,此时需要将元组中的数据进行位置调换,然后在排序,排完序在换回
//第一次交换是为了排序
JavaPairRDD<Integer, String> swaped = sumed.mapToPair(newPairFunction<Tuple2<String,
Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String>call(Tuple2<String, Integer> tup) throws Exception {
returntup.swap();
}
});
//排序
JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
//第二次交换是为了最终结果<单词,数量>
JavaPairRDD<String, Integer> res = sorted.mapToPair(newPairFunction<Tuple2<Integer,
String>, String, Integer>() {
@Override
public Tuple2<String, Integer>call(Tuple2<Integer, String> tuple2) throws Exception
{
returntuple2.swap();
}
});
System.out.println(res.collect());
res.saveAsTextFile("out1");
jsc.stop();
}
}

 </div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
本文:SparkShell和IDEA中如何编写Spark程序的详细内容,希望对您有所帮助,信息来源于网络。
上一篇:MySQL结构化查询语言有哪些下一篇:

24 人围观 / 0 条评论 ↓快速评论↓

(必须)

(必须,保密)

阿狸1 阿狸2 阿狸3 阿狸4 阿狸5 阿狸6 阿狸7 阿狸8 阿狸9 阿狸10 阿狸11 阿狸12 阿狸13 阿狸14 阿狸15 阿狸16 阿狸17 阿狸18