一、前言
Spark这么火,越来越多的小伙伴开始搞大数据。 通过多方查阅资料,这个单机版的Spark的HelloWorld终于跑出来了。 此HelloWorld非彼HelloWorld,并不是打印出HelloWorld那么简单,而是一个单词统计程序,就是统计出一个文件中单词出现的次数并排序。
会通过原生的Scala的方式,传统的Java方式和java8的方式分别实现同一功能。
其实单机版和运行于集群之上的Spark程序,差别就在于运行环境,开发流程是一样的。
以后的文章会记录如何建立集群。
另外,该系列文章会在本人闲暇时同时在 CSDN 和 简书 更新。
欢迎各位道友纠错。
二、环境搭建
C:\Users\hylexus>java -version java version "1.8.0_91" Java(TM) SE Runtime Environment (build 1.8.0_91-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode) C:\Users\hylexus>scala -version Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL
Eclipse for scala:
Scala IDE build of Eclipse SDK Build id: 4.4.1-vfinal-2016-05-04T11:16:00Z-Typesafe
此处scala版和java版都将使用maven来管理依赖,如何使用maven创建scala工程,请看本人另一文章 http://blog.csdn.net/hylexus/article/details/52602774
注意:使用的spark-core_2.11
依赖的jar文件多的吓人,耐心等待下载jar吧…………^_^
2.1 scala版
pom.xml部分内容如下:
<properties> <maven.compiler.source>1.6</maven.compiler.source> <maven.compiler.target>1.6</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.5</scala.version> <scala.compat.version>2.11</scala.compat.version></properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.0</version> </dependency> <!-- Test --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.specs2</groupId> <artifactId>specs2-junit_2.11</artifactId> <version>2.4.16</version> </dependency></dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <!-- see http://davidb.github.com/scala-maven-plugin --> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <!-- <arg>-make:transitive</arg> --> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> <compilerArguments> <!-- <extdirs>src/main/webapp/plugins/ueditor/jsp/lib</extdirs> --> </compilerArguments> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.18.1</version> <configuration> <useFile>false</useFile> <disableXmlReport>true</disableXmlReport> <!-- If you have classpath issue like NoDefClassError,... --> <!-- useManifestOnlyJar>false</useManifestOnlyJar --> <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> </plugins> </build>
2.2 java版
pom.xml文件内容如下:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency></dependencies><build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> <compilerArguments> <!-- <extdirs>src/main/webapp/plugins/ueditor/jsp/lib</extdirs> --> </compilerArguments> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.18.1</version> <configuration> <useFile>false</useFile> <disableXmlReport>true</disableXmlReport> <!-- If you have classpath issue like NoDefClassError,... --> <!-- useManifestOnlyJar>false</useManifestOnlyJar --> <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> </plugins> </build>
三、 代码
3.1 scala-低调版
object WordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("WordCount") // .setMaster("local") val sc = new SparkContext(conf) val fileName = "E:/workspace/eclipse/scala/spark-base/src/main/scala/log4j.properties" //获取文件内容 val lines = sc.textFile(fileName, 1) //分割单词,此处仅以空格分割作为示例 val words = lines.flatMap(line => line.split(" ")) //String===>(word,count),count==1 val pairs = words.map(word => (word, 1)) //(word,1)==>(word,count) val result = pairs.reduceByKey((word, acc) => word + acc) //sort by count DESC val sorted=result.sortBy(e => { e._2 }, false, 1) val mapped=sorted.map(e => (e._2, e._1)) mapped.foreach(e => { println("【" + e._1 + "】出现了" + e._2 + "次") }) sc.stop()
3.2 scala-高调版
object WordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf conf.setAppName("rank test").setMaster("local") val sc = new SparkContext(conf) val fileName = "E:/workspace/eclipse/scala/spark-base/src/main/scala/log4j.properties" sc.textFile(fileName, 1) //lines .flatMap(_.split(" ")) //all words .map(word => (word, 1)) //to pair .reduceByKey(_ + _) //count .map(e => (e._2, e._1)) // .sortByKey(false, 1) // .map(e => (e._2, e._1)) // .foreach(e => { println("【" + e._1 + "】出现了" + e._2 + "次") }) sc.stop()
3.3 java-传统版
代码恶心的没法看啊……
到处都是匿名内部类……
还好有java8的lambda来拯救你
import java.util.Arrays; import java.util.Iterator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; public class WordCount { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("WordCounter")// .setMaster("local"); String fileName = "E:/workspace/eclipse/scala/spark-base/src/main/scala/log4j.properties"; JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile(fileName, 1); JavaRDD<String> words = lines .flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; // 以前的版本好像是Iterable而不是Iterator @Override public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); JavaPairRDD<String, Integer> pairs = words .mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairRDD<String, Integer> result = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer e, Integer acc) throws Exception { return e + acc; } }, 1); result.map( new Function<Tuple2<String, Integer>, Tuple2<String, Integer>>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call( Tuple2<String, Integer> v1) throws Exception { return new Tuple2<>(v1._1, v1._2); } })// .sortBy(new Function<Tuple2<String, Integer>, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Tuple2<String, Integer> v1) throws Exception { return v1._2; } }, false, 1)// .foreach(new VoidFunction<Tuple2<String, Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> e) throws Exception { System.out.println("【" + e._1 + "】出现了" + e._2 + "次"); } }); sc.close(); } }
四、 运行效果
//...............//............... 【->】出现了6次 【+】出现了5次 【import】出现了5次 【new】出现了4次 【=】出现了4次 //...............//...............
转载自“http://blog.csdn.net/hylexus/article/details/52606540” 作者:“ hylexus”