大家好,欢迎来到IT知识分享网。
题目要求
- 对文本文件内的每个单词都统计出其出现的次数。
- 按照每个单词出现次数的数量,降序排序。
文件内容:
思路
这题咱们分以下几个步骤:
- 实例化出SparkContext对象
- 将文件中的数据转为RDD数据
- 利用RDD的map算子或其它算子来进行格式转换
- 利用RDD的reduce算子进行数据叠加
- 利用RDD的sortBy算子进行排序
实例化SparkContext对象
sc = SparkContext("local","test1 app")
将文件数据转换为RDD数据格式
SparkContext对象读取数据有两种方式:
- 读取文件数据使用 sc.textFile()方法。
rdd_1 = sc.textFile("/root/wordcount.txt")
- 读取列表集合数据使用sc.parallelize(list)方法。
利用mapPartitions算子进行处理
这一步定义一个函数来接受每个分区的数据迭代器,利用for循环进行遍历可以得到每行的字符串,但这个字符串并不是我们想要的,我们需要对这个进行split分割处理得到每个单词,然后利用for循环对这个字符串数组进行遍历,将每个单词以(key,value)格式添加到结果列表,当所有结果都遍历结束后返回迭代器对象。
def myfunc(iterator): result = [] for item in iterator: strings = item.split() for string in strings: result.append((str(string),1)) return result rdd_2 = rdd_1.mapPartitions(myfunc)
对处理后的数据进行叠加处理
在得到每个单词的(key,value)格式后,我们需要对这些单词进行合并,当key相同的时候则将value进行相加,因此这里使用reduceByKey算子进行合并,然后再使用lambda表达式来进行加法合并。
rdd_3 = rdd_2.reduceByKey(lambda x,y:x+y)
对数据进行降序排序
这一步对上面处理完的数据进行降序排序处理,因此可以采用sortBy算子进行排序,然后用lambda表达式来设置排序规则,False设置为降序排序。
rdd_4 = rdd_3.sortBy(lambda x:x[1],False)
打印RDD数据内容
print(rdd_4.collect())
源码
# -*- coding: UTF-8 -*- from pyspark import SparkContext def myfunc(iterator): result = [] for item in iterator: strings = item.split() for string in strings: result.append((str(string),1)) return result if __name__ == "__main__": """ 需求:对本地文件系统URI为:/root/wordcount.txt 的内容进行词频统计 """ # 1. 实例化SparkContext对象 sc = SparkContext("local","test1 app") # 2. 读取文件数据转换为RDD数据 rdd_1 = sc.textFile("/root/wordcount.txt") # 3. 对RDD数据进行格式处理 rdd_2 = rdd_1.mapPartitions(myfunc) # 4. 对RDD数据进行叠加处理 rdd_3 = rdd_2.reduceByKey(lambda x,y:x+y) # 5. 对RDD数据进行排序 rdd_4 = rdd_3.sortBy(lambda x:x[1],False) print(rdd_4.collect()) sc.stop()
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/97675.html