代码:
- object StringDemo2 {
- def main(args: Array[String]): Unit = {
- //创建集合
- var list = ListBuffer("java","php","python","js")
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.makeRDD(list)
- val rdd2 = rdd.groupBy(_.charAt(0))//每一个值的首字母
- rdd2.collect().foreach(println)
- }
-
- }
运行结果:
- (p,CompactBuffer(php, python))
- (j,CompactBuffer(java, js))
日志log.txt的数据
- 83.149.9.216 - - 17/05/2015:10:05:03 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-search.png
- 83.149.9.216 - - 17/05/2015:10:05:43 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png
- 83.149.9.216 - - 17/05/2015:10:05:47 +0000 GET /presentations/logstash-monitorama-2013/plugin/highlight/highlight.js
- 83.149.9.216 - - 17/05/2015:10:05:12 +0000 GET /presentations/logstash-monitorama-2013/plugin/zoom-js/zoom.js
- 83.149.9.216 - - 17/05/2015:10:05:07 +0000 GET /presentations/logstash-monitorama-2013/plugin/notes/notes.js
- 83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png
- 83.149.9.216 - - 17/05/2015:10:05:57 +0000 GET /presentations/logstash-monitorama-2013/css/fonts/Roboto-Bold.ttf
代码:
- object reduceTest {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.textFile("input\\log.txt",1)
- val rdd2 = rdd.map(line=>{
- var data = line.split(" ")
- val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
- var date = simpleDateFormat.parse(data(3))
- var simpleDateFormat2 = new SimpleDateFormat("yyyy:MM:dd:HH")
- var time = simpleDateFormat2.format(date)
- (time,1)
- }).groupBy(_._1)
- val rdd3 = rdd2.map({
- case (k,v)=>{
- (k,v.size)
- }
- })
-
- rdd3.saveAsTextFile("out\\out17")
- }
-
- }
运行结果:
(2015:05:17:10,7)
text01.txt的数据
- abc acb java
- avaj bac
- cba abc
- jvaa php hpp
- pph python thonpy
代码:
- object Demo {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.textFile("input//text01.txt")
- rdd.flatMap(str=>{
- str.split(" ")
- }).map{
- str=>{
- val char = str.toCharArray
- java.util.Arrays.sort(char)
- val str2 =new String(char)
- (str2,str)
- }
- }.groupByKey().collect().foreach(println)
-
- }
-
- }
运行结果:
- (hpp,CompactBuffer(php, hpp, pph))
- (hnopty,CompactBuffer(python, thonpy))
- (aajv,CompactBuffer(java, avaj, jvaa))
- (abc,CompactBuffer(abc, acb, bac, cba, abc))
text02.txt的数据
- java php java
- python php java
- python mysql
- hadoop hadoop
- java php python
- hadoop php
代码:
- object Demo2 {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.textFile("input//text02.txt")
- rdd.flatMap(str=>str.split(" ")).distinct().collect().foreach(println)
-
- }
-
- }
运行结果:
- php
- python
- mysql
- java
- hadoop
text03.txt的数据
- 2005 01 02 04 -11
- 2005 01 02 05 -17
- 2005 01 02 06 -17
- 2005 01 02 07 -17
- 2005 01 02 08 -17
- 2005 01 02 09 -17
- 2005 01 02 10 -22
- 2005 01 02 11 -22
- 2005 01 02 12 -28
- 2005 01 02 13 -33
- 2005 02 02 14 -39
- 2005 02 02 15 -28
- 2005 02 02 16 0
- 2005 02 02 17 11
- 2005 02 02 18 17
- 2005 02 02 19 17
- 2005 02 02 20 22
- 2005 02 02 21 28
- 2005 02 02 22 28
- 2005 02 02 23 22
代码:
- object Demo3 {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.textFile("input//text03.txt")
- var rdd2:RDD[(String,Int)] = rdd.map{
- line =>{
- var month = line.substring(5,7)
- var temp = line.substring(16,19).trim.toInt
- (month,temp)
- }
- }
- val rdd3:RDD[(String,Iterable[Int])] = rdd2.groupByKey()
- rdd3.mapValues(line=> {
- line.toList.sum/line.size
- }).collect().foreach(println)
-
- }
-
- }
运行结果:
- (02,7)
- (01,-20)
text04.txt的数据
- 13726230503 81
- 13826544101 50
- 13926435656 30
- 13926251106 40
- 13826544101 2106
- 13826544101 1432
- 13719199419 300
代码:
- object Demo4 {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- val sc = new SparkContext(sparkConf)
- val rdd = sc.textFile("input//text04.txt")
- rdd.map { line => {
- val str = line.split(" ")
- (str(0).substring(0,3),str(1).toInt)
- }
- }.reduceByKey(_+_).collect().foreach(println)
- }
-
- }
运行结果:
- (138,3588)
- (139,70)
- (137,381)