机器学习—pySpark案例
1、统计PV,UV
1.if __name__ == ‘__main__’:
2. conf = SparkConf()
3. conf.setMaster(“local”)
4. conf.setAppName(“test”)
5. sc = SparkContext(conf=conf)
6.
7. #pv
8. sc.textFile(“./pvuv”).map(lambda line:(line.split(“\t”)[4],1)).reduceByKey(lambda v1,v2:v1+v2).sortBy(lambda tp:tp[1],ascending=False).foreach(print)
9.
10. #uv
11.sc.textFile(“./pvuv”).map(lambda line:line.split(“\t”)[1]+”_”+line.split(“\t”)[4]).distinct().map(lambda one:(one.split(“_”)[1],1)).reduceByKey(lambda v1,v2:v1+v2).sortBy(lambda tp:tp[1],ascending=False).foreach(print)
AI助手
2、统计除了某个地区外的UV
1.if __name__ == ‘__main__’:
2. conf = SparkConf()
3. conf.setMaster(“local”)
4. conf.setAppName(“test”)
5. sc = SparkContext(conf=conf)
6.
7. #uv
8. sc.textFile(“./pvuv”).filter(lambda line:line.split(“\t”)[3]==’beijing’).map(lambda line:line.split(“\t”)[1]+”_”+line.split(“\t”)[4]).distinct().map(lambda one:(one.split(“_”)[1],1)).reduceByKey(lambda v1,v2:v1+v2).sortBy(lambda tp:tp[1],ascending=False).foreach(print)
AI助手
3、统计每个网站最活跃的top2地区
1.def get_top2_local(one):
2. site = one[0]
3. local_iterable = one[1]
4.
5. local_dic = {}
6. for local in local_iterable:
7. if local in local_dic:
8. local_dic[local] += 1
9. else:
10. local_dic[local] = 1
11.
12. sorted_list = sorted(local_dic.items(),key = lambda x:x[1],reverse= True)
13. return_list = []
14. if(len(sorted_list)>=2):
15. for i in range(0,2):
16. return_list.append(sorted_list[i])
17. else:
18. return_list = sorted_list
19.
20. return return_list
21.
22.
23.if __name__ == ‘__main__’:
24. conf = SparkConf()
25. conf.setMaster(“local”)
26. conf.setAppName(“test”)
27. sc = SparkContext(conf=conf)
28.
29. #统计每个网站最活跃的top2地区
30. lines = sc.textFile(“./pvuv”)
31. site_local = lines.map(lambda line:(line.split(“\t”)[4],line.split(“\t”)[3]))
32. site_localIterable = site_local.groupByKey()
33. sorted_result = site_localIterable.map(lambda one:get_top2_local(one))
34. sorted_result.foreach(print)
35.
AI助手
4、统计每个网站最热门的操作
1.def get_hot_operator(one):
2. site = one[0]
3. operator_iterable = one[1]
4.
5. operator_dic = {}
6. for operator in operator_iterable:
7. if operator in operator_dic:
8. operator_dic[operator] += 1
9. else:
10. operator_dic[operator] = 1
11.
12. sorted_list = sorted(operator_dic.items(),key = lambda x:x[1],reverse= True)
13. return_list = []
14. if(len(sorted_list)>=2):
15. for i in range(0,1):
16. return_list.append(sorted_list[i])
17. else:
18. return_list = sorted_list
19.
20. return return_list
21.
22.
23.if __name__ == ‘__main__’:
24. conf = SparkConf()
25. conf.setMaster(“local”)
26. conf.setAppName(“test”)
27. sc = SparkContext(conf=conf)
28.
29. #统计每个网站最热门的操作
30. lines = sc.textFile(“./pvuv”)
31. site_operator = lines.map(lambda line:(line.split(“\t”)[4],line.split(“\t”)[5]))
32. site_operatorIterable = site_operator.groupByKey()
33. sorted_result = site_operatorIterable.map(lambda one:get_hot_operator(one))
34. sorted_result.foreach(print)
35.
AI助手
5、统计每个网站下最活跃的top3用户
1.def get_uid_site_count(one):
2. uid = one[0]
3. site_iterable = one[1]
4.
5. site_dic = {}
6. for site in site_iterable:
7. if site in site_dic:
8. site_dic[site] += 1
9. else:
10. site_dic[site] = 1
11.
12. return_list = []
13. for site,count in site_dic.items():
14. return_list.append((site,(uid,count)))
15. return return_list
16.
17.def get_top3_uid(one):
18. site = one[0]
19. uid_count_iterable = one[1]
20. top3_uid = [”,”,”]
21. for tp in uid_count_iterable:
22. uid = tp[0]
23. count = tp[1]
24. for i in range(0,len(top3_uid)):
25. if(top3_uid[i]==”):
26. top3_uid[i] = tp
27. break
28. elif(count > top3_uid[i][1]):
29. for j in range(2,i,-1):
30. top3_uid[j] = top3_uid[j-1]
31. top3_uid[i] = tp
32. break
33.
34. return top3_uid
35.
36.
37.
38.if __name__ == ‘__main__’:
39. conf = SparkConf()
40. conf.setMaster(“local”)
41. conf.setAppName(“test”)
42. sc = SparkContext(conf=conf)
43.
44. #统计每个网站最活跃的top3用户
45. lines = sc.textFile(“./pvuv”)
46. uid_site = lines.map(lambda line:(line.split(“\t”)[2],line.split(“\t”)[4]))
47. uid_siteIterable = uid_site.groupByKey()
48. uid_site_count = uid_siteIterable.flatMap(lambda one:get_uid_site_count(one))
49. top3_uid_info = uid_site_count.groupByKey().map(lambda one:get_top3_uid(one))
50. top3_uid_info.foreach(print)