json读文件:https://zhuanlan.zhihu.com/p/267353998
pyspark配置:http://dblab.xmu.edu.cn/blog/1689-2/
数据分析案例:http://dblab.xmu.edu.cn/blog/2738-2/
pycharm配置pyspark环境:https://blog.csdn.net/ringsuling/article/details/84448369
Crontab+Flume+Kafka+Spark Streaming+Spring Boot 统计网页访问量项目:https://blog.csdn.net/qq_36329973/article/details/104738993?utm_medium=distribute.pc_relevant.none-task-blog-OPENSEARCH-8.not_use_machine_learn_pai&depth_1-utm_source=distribute.pc_relevant.none-task-blog-OPENSEARCH-8.not_use_machine_learn_pai
pyspark dataframe基本操作:https://www.jianshu.com/p/acd96549ee15
https://www.cnblogs.com/cxhzy/p/11067246.html
使用Python串口实时显示数据并绘图:https://zhuanlan.zhihu.com/p/100798858
pyecharts:https://pyecharts.org/#/zh-cn/web_flask
pyecharts-gallery:https://gallery.pyecharts.org/#/Bar/bar_markline_type
echarts:https://echarts.apache.org/examples/en/index.html#chart-type-bar
解决Django运行报错Error: That port is already in use.:https://www.jianshu.com/p/7ceebb422d09
pip install djangorestframework
写入数据:
import sys import time from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark import SparkContext, SparkConf if __name__ == "__main__": conf = SparkConf().setAppName("streamingkafka").set('spark.executor.memory', '10g').set("spark.executor.cores", '8') sc = SparkContext(conf=conf) sc.setLogLevel("WARN") # 减少shell打印日志 ssc = StreamingContext(sc, 5) # 5秒的计算窗口 brokers='''10.132.221.111:6667,10.132.221.112:6667,10.132.221.113:6667,10.132.221.114:6667,10.132.221.116:6667,10.132.221.117:6667,10.132.221.118:6667,10.132.221.119:6667,10.132.221.120:6667,10.132.221.121:6667,10.132.221.123:6667,10.132.221.124:6667,10.132.221.125:6667,10.132.221.126:6667,10.132.221.127:6667,10.132.221.128:6667,10.132.221.129:6667,10.132.221.130:6667,10.132.221.132:6667''' topic = 'telecom' kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc,[topic] ,kafkaParams={"metadata.broker.list": brokers,"group.id":'18211170230ylt'}) kafka_streaming_rdd.map(lambda x:x[1]).foreachRDD(lambda x:x.saveAsTextFile("file:///home/hadoop/rdd/file" + str(int(time.time())))) ssc.start() ssc.awaitTermination()
写入数据:
from pyspark import SparkContext from pyspark.sql import SparkSession import json if __name__ == "__main__": sc = SparkContext( 'local', 'test') sc.setLogLevel("WARN") spark = SparkSession.builder.getOrCreate() file = "file:///home/hadoop/rdd/file1605959588" df1 = spark.read.json(file) df1.show()
读入数据并存为df类型
import sys import json from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark import SparkContext, SparkConf from pyspark.sql.types import * from pyspark.sql import SparkSession if __name__ == "__main__": conf = SparkConf().setAppName("streamingkafka").set('spark.executor.memory', '10g').set("spark.executor.cores", '8') sc = SparkContext(conf=conf) sc.setLogLevel("WARN") # 减少shell打印日志 ssc = StreamingContext(sc, 5) # 5秒的计算窗口 spark = SparkSession.builder.getOrCreate() brokers='''10.132.221.111:6667,10.132.221.112:6667,10.132.221.113:6667,10.132.221.114:6667,10.132.221.116:6667,10.132.221.117:6667,10.132.221.118:6667,10.132.221.119:6667,10.132.221.120:6667,10.132.221.121:6667,10.132.221.123:6667,10.132.221.124:6667,10.132.221.125:6667,10.132.221.126:6667,10.132.221.127:6667,10.132.221.128:6667,10.132.221.129:6667,10.132.221.130:6667,10.132.221.132:6667''' schema_telecomdata = StructType([StructField("callerflag", LongType(), True), StructField("timespan", LongType(), True),StructField("timestamp", LongType(), True), StructField("caller1", StructType([StructField("phone", StringType(), True), StructField("caller1_site", StringType(), True),StructField("offsets", StringType(), True)]),True), StructField("caller2", StructType([StructField("phone", StringType(), True), StructField("caller2_site", StringType(), True),StructField("offsets", StringType(), True)]),True)]) schema_fin = StructType([StructField("city", StringType(), True), StructField("nums", LongType(), True)]) sumDF = spark.createDataFrame(spark.sparkContext.emptyRDD(),schema_telecomdata) topic = 'telecom' kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc,[topic] ,kafkaParams={"metadata.broker.list": brokers,"group.id":'18211170230ylt'}) kafka_streaming_rdd.map(lambda x: json.loads(x[1])).foreachRDD(lambda x: spark.createDataFrame(x,schema_telecomdata).groupBy("caller1.caller1_site").count().show()) ssc.start() ssc.awaitTermination()
渲染
index.html
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>Awesome-pyecharts</title> <script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script> <script type="text/javascript" src="https://assets.pyecharts.org/assets/echarts.min.js"></script> </head> <body> <div id="bar" style="width:1000px; height:600px;"></div> <script> var chart = echarts.init(document.getElementById('bar'), 'white', {renderer: 'canvas'}); $( function () { fetchData(chart); setInterval(fetchData, 2000); } ); function fetchData() { $.ajax({ type: "GET", url: "http://127.0.0.1:8000/demo/bar", dataType: 'json', success: function (result) { chart.setOption(result.data); } }); } </script> </body> </html>
index1.html
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>Awesome-pyecharts</title> <script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script> <script type="text/javascript" src="https://assets.pyecharts.org/assets/echarts.min.js"></script> </head> <body> <div id="bar1" style="width:1000px; height:600px;"></div> <script> var chart = echarts.init(document.getElementById('bar1'), 'white', {renderer: 'canvas'}); $( function () { fetchData(chart); setInterval(fetchData, 2000); } ); function fetchData() { $.ajax({ type: "GET", url: "http://127.0.0.1:8000/demo/bar1", dataType: 'json', success: function (result) { chart.setOption(result.data); } }); } </script> </body> </html>
demo/urls.py
# demo/urls.py from django.conf.urls import url from . import views urlpatterns = [ url(r'^bar/$', views.ChartView.as_view(), name='demo'), url(r'^index/$', views.IndexView.as_view(), name='demo'), url(r'^bar1/$', views.ChartView1.as_view(), name='demo'), url(r'^index1/$', views.IndexView1.as_view(), name='demo'), ]
views.py
import json from random import randrange from django.http import HttpResponse from rest_framework.views import APIView from pyecharts.charts import Bar,Gauge from pyecharts import options as opts from pyecharts.faker import Faker import random # Create your views here. def response_as_json(data): json_str = json.dumps(data) response = HttpResponse( json_str, content_type="application/json", ) response["Access-Control-Allow-Origin"] = "*" return response def json_response(data, code=200): data = { "code": code, "msg": "success", "data": data, } return response_as_json(data) def json_error(error_string="error", code=500, **kwargs): data = { "code": code, "msg": error_string, "data": {} } data.update(kwargs) return response_as_json(data) JsonResponse = json_response JsonError = json_error def bar_base() -> Bar: data = [['乌海市', 90, 110], ['锡林郭勒盟', 99, 98], ['阿拉善盟', 99, 114], ['包头市', 99, 109], ['乌兰察布市', 99, 122], ['兴安盟', 99, 135], ['鄂尔多斯市', 102, 93], ['丽江市', 106, 107], ['呼伦贝尔市', 106, 100], ['赤峰市', 107, 104], ['昭通市', 110, 111], ['呼和浩特市', 113, 92], ['通辽市', 113, 110], ['巴彦淖尔市', 115, 103], ['红河哈尼族彝族自治州', 115, 114], ['葫芦岛市', 116, 143], ['锦州市', 119, 116], ['文山壮族苗族自治州', 120, 132], ['营口市', 121, 140], ['昆明市', 122, 125], ['楚雄彝族自治州', 122, 127], ['西双版纳傣族自治州', 123, 119], ['铁岭市', 124, 148], ['普洱市', 124, 131], ['怒江傈僳族自治州', 124, 129], ['德宏傣族景颇族自治州', 126, 144], ['玉溪市', 126, 126], ['沈阳市', 126, 142], ['阜新市', 132, 141], ['曲靖市', 132, 122], ['临沧市', 134, 131], ['丹东市', 136, 134], ['盘锦市', 137, 133], ['焦作市', 138, 141], ['商丘市', 139, 162], ['雅安市', 139, 171], ['大理白族自治州', 139, 121], ['本溪市', 140, 127], ['保山市', 140, 121], ['抚顺市', 141, 148], ['迪庆藏族自治州', 141, 153], ['鞍山市', 142, 129], ['自贡市', 147, 148], ['驻马店市', 149, 164], ['遂宁市', 150, 166], ['安阳市', 150, 153], ['漯河市', 151, 164], ['平顶山市', 152, 162], ['甘孜藏族自治州', 152, 132], ['资阳市', 152, 153], ['乐山市', 154, 158], ['周口市', 155, 169], ['凉山彝族自治州', 155, 151], ['大连市', 157, 121], ['朝阳市', 157, 157], ['新乡市', 157, 144], ['阿坝藏族羌族自治州', 158, 165], ['广元市', 159, 172], ['许昌市', 160, 172], ['巴中市', 160, 165], ['德阳市', 160, 171], ['广安市', 160, 176], ['郑州市', 163, 138], ['辽阳市', 164, 147], ['信阳市', 164, 165], ['濮阳市', 166, 165], ['三门峡市', 166, 150], ['宜宾市', 166, 147], ['绵阳市', 166, 171], ['攀枝花市', 167, 150], ['开封市', 167, 147], ['南充市', 168, 160], ['酒泉市', 169, 224], ['南阳市', 169, 191], ['泸州市', 170, 165], ['内江市', 171, 166], ['眉山市', 173, 154], ['洛阳市', 178, 163], ['鹤壁市', 180, 155], ['成都市', 180, 155], ['嘉峪关市', 181, 201], ['定西市', 187, 173], ['天水市', 193, 223], ['武威市', 194, 192], ['南宁市', 196, 199], ['达州市', 196, 188], ['平凉市', 197, 196], ['甘南藏族自治州', 197, 197], ['衡阳市', 197, 206], ['兰州市', 197, 237], ['崇左市', 198, 206], ['临夏回族自治州', 198, 201], ['益阳市', 199, 224], ['株洲市', 199, 211], ['白银市', 199, 193], ['陇南市', 200, 178], ['长沙市', 201, 191], ['永州市', 201, 238], ['湘西土家族苗族自治州', 203, 218], ['北海市', 204, 223], ['邵阳市', 206, 225], ['襄阳市', 208, 218], ['恩施土家族苗族自治州', 208, 208], ['黄石市', 209, 215], ['玉林市', 210, 225], ['荆州市', 210, 222], ['金昌市', 211, 193], ['张掖市', 211, 211], ['张家界市', 211, 209], ['湘潭市', 212, 218], ['承德市', 214, 207], ['常德市', 214, 196], ['绥化市', 214, 254], ['厦门市', 214, 230], ['钦州市', 214, 212], ['岳阳市', 215, 204], ['宜昌市', 215, 212], ['贵港市', 215, 212], ['贺州市', 215, 227], ['防城港市', 217, 211], ['庆阳市', 217, 190], ['黑河市', 218, 253], ['柳州市', 219, 209], ['郴州市', 219, 200], ['保定市', 219, 245], ['来宾市', 219, 230], ['十堰市', 219, 224], ['梧州市', 219, 227], ['喀什地区', 220, 243], ['邯郸市', 220, 239], ['克拉玛依市', 222, 245], ['秦皇岛市', 224, 239], ['孝感市', 224, 222], ['随州市', 225, 227], ['石家庄市', 227, 232], ['桂林市', 227, 191], ['云浮市', 228, 237], ['娄底市', 229, 199], ['宁德市', 229, 247], ['衡水市', 230, 227], ['河源市', 230, 263], ['百色市', 231, 205], ['怀化市', 231, 204], ['福州市', 231, 257], ['沧州市', 232, 225], ['泉州市', 232, 252], ['黄冈市', 234, 239], ['龙岩市', 234, 221], ['鄂州市', 235, 225], ['巴音郭楞蒙古自治州', 235, 241], ['博尔塔拉蒙古自治州', 236, 232], ['廊坊市', 236, 239], ['荆门市', 237, 215], ['和田地区', 237, 244], ['阿勒泰地区', 237, 233], ['双鸭山市', 237, 243], ['乌鲁木齐市', 237, 258], ['七台河市', 238, 220], ['肇庆市', 238, 289], ['珠海市', 238, 281], ['南平市', 240, 235], ['邢台市', 240, 228], ['塔城地区', 240, 251], ['莆田市', 240, 236], ['清远市', 240, 253], ['鸡西市', 240, 253], ['武汉市', 241, 242], ['唐山市', 242, 219], ['亳州市', 242, 263], ['咸宁市', 243, 222], ['湛江市', 244, 240], ['汕尾市', 244, 254], ['景德镇市', 245, 274], ['哈密市', 245, 267], ['阿克苏地区', 245, 227], ['伊春市', 246, 253], ['吐鲁番市', 247, 256], ['漳州市', 248, 209], ['茂名市', 248, 262], ['齐齐哈尔市', 249, 245], ['鹰潭市', 249, 304], ['佛山市', 250, 264], ['佳木斯市', 250, 245], ['鹤岗市', 250, 241], ['马鞍山市', 251, 249], ['昌吉回族自治州', 251, 224], ['大兴安岭地区', 252, 246], ['克孜勒苏柯尔克孜自治州', 253, 246], ['河池市', 253, 247], ['东莞市', 254, 275], ['中山市', 254, 239], ['梅州市', 257, 256], ['淮北市', 258, 320], ['海南藏族自治州', 258, 298], ['潮州市', 258, 245], ['揭阳市', 259, 245], ['三明市', 259, 227], ['南昌市', 260, 271], ['新余市', 262, 277], ['哈尔滨市', 263, 253], ['蚌埠市', 263, 303], ['张家口市', 263, 244], ['阳江市', 264, 242], ['六安市', 264, 293], ['海西蒙古族藏族自治州', 265, 273], ['萍乡市', 265, 272], ['大庆市', 266, 256], ['上饶市', 266, 283], ['惠州市', 267, 231], ['江门市', 268, 292], ['果洛藏族自治州', 269, 267], ['淮南市', 269, 263], ['铜陵市', 269, 302], ['伊犁哈萨克自治州', 269, 240], ['韶关市', 270, 235], ['滁州市', 273, 285], ['池州市', 277, 267], ['西宁市', 277, 295], ['宿州市', 278, 251], ['赣州市', 278, 246], ['玉树藏族自治州', 279, 265], ['汕头市', 280, 284], ['牡丹江市', 281, 237], ['宜春市', 283, 252], ['合肥市', 283, 297], ['黄山市', 283, 274], ['广州市', 286, 249], ['黄南藏族自治州', 286, 289], ['深圳市', 288, 231], ['安庆市', 289, 270], ['九江市', 291, 259], ['吉安市', 292, 301], ['抚州市', 293, 288], ['阜阳市', 294, 271], ['芜湖市', 297, 251], ['宣城市', 299, 259], ['威海市', 300, 331], ['济宁市', 301, 281], ['海东市', 304, 272], ['日照市', 306, 321], ['烟台市', 308, 323], ['东营市', 309, 299], ['泰安市', 309, 309], ['青岛市', 309, 294], ['海北藏族自治州', 311, 310], ['济南市', 313, 341], ['菏泽市', 316, 333], ['枣庄市', 317, 336], ['淄博市', 319, 343], ['莱芜市', 321, 330], ['潍坊市', 331, 291], ['滨州市', 332, 336], ['聊城市', 338, 326], ['德州市', 338, 322], ['阿里地区', 338, 358], ['湖州市', 339, 344], ['临沂市', 341, 302], ['那曲市', 341, 342], ['林芝市', 341, 367], ['吴忠市', 346, 347], ['衢州市', 346, 367], ['丽水市', 349, 359], ['嘉兴市', 353, 365], ['拉萨市', 355, 362], ['固原市', 359, 373], ['银川市', 364, 357], ['中卫市', 364, 368], ['温州市', 364, 379], ['石嘴山市', 364, 348], ['日喀则市', 366, 343], ['舟山市', 367, 346], ['山南市', 370, 355], ['台州市', 374, 346], ['金华市', 375, 367], ['宁波市', 376, 386], ['昌都市', 379, 343], ['忻州市', 380, 428], ['太原市', 382, 433], ['杭州市', 392, 377], ['绍兴市', 393, 394], ['淮安市', 398, 441], ['运城市', 399, 411], ['连云港市', 402, 464], ['吕梁市', 405, 422], ['海口市', 405, 413], ['苏州市', 413, 418], ['泰州市', 414, 446], ['儋州市', 417, 391], ['延边朝鲜族自治州', 420, 430], ['临汾市', 421, 425], ['三沙市', 421, 417], ['辽源市', 421, 402], ['阳泉市', 423, 462], ['通化市', 423, 438], ['晋中市', 424, 418], ['宿迁市', 425, 472], ['晋城市', 427, 433], ['吉林市', 432, 452], ['白山市', 435, 429], ['松原市', 437, 468], ['徐州市', 438, 472], ['三亚市', 439, 443], ['盐城市', 447, 465], ['白城市', 447, 425], ['长春市', 449, 445], ['长治市', 451, 404], ['扬州市', 455, 451], ['四平市', 456, 394], ['无锡市', 463, 429], ['南京市', 463, 398], ['南通市', 469, 404], ['朔州市', 470, 410], ['大同市', 472, 413], ['镇江市', 486, 437], ['常州市', 486, 432], ['宝鸡市', 560, 586], ['铜川市', 567, 590], ['铜仁市', 569, 592], ['榆林市', 579, 608], ['汉中市', 589, 614], ['安康市', 596, 618], ['黔南布依族苗族自治州', 598, 655], ['黔东南苗族侗族自治州', 604, 608], ['商洛市', 605, 552], ['西安市', 606, 586], ['贵阳市', 608, 622], ['咸阳市', 614, 596], ['安顺市', 621, 618], ['延安市', 623, 578], ['渭南市', 623, 601], ['六盘水市', 633, 547], ['黔西南布依族苗族自治州', 636, 621], ['毕节市', 639, 591], ['遵义市', 642, 654], ['重庆市', 2654, 2665], ['澳门', 3115, 3077], ['上海市', 4307, 4305], ['天津市', 4398, 4381], ['北京市', 4566, 4623], ['香港', 5159, 5121]] usedata = [random.choice(data) for i in range(10)] print(usedata) c = ( Bar() .add_xaxis([i[0] for i in usedata]) .add_yaxis("呼入", [i[1] for i in usedata]) .add_yaxis("呼出", [i[2] for i in usedata]) .set_global_opts(title_opts=opts.TitleOpts(title="每日城市的呼入呼出量", subtitle="我是副标题"), datazoom_opts=opts.DataZoomOpts(type_="inside")) .dump_options_with_quotes() ) return c def bar_diff() -> Bar: c = ( Bar() .add_xaxis(Faker.choose()) .add_yaxis("商家A", Faker.values()) .add_yaxis("商家B", Faker.values()) .set_global_opts(title_opts=opts.TitleOpts(title="Bar-MarkLine(指定类型)")) .set_series_opts( label_opts=opts.LabelOpts(is_show=False), markline_opts=opts.MarkLineOpts( data=[ opts.MarkLineItem(type_="min", name="最小值"), opts.MarkLineItem(type_="max", name="最大值"), opts.MarkLineItem(type_="average", name="平均值"), ] ), ) .dump_options_with_quotes() ) return c def gauge_base() -> Gauge: data = [[385293, 1606039296], [368545, 1606039351], [377871, 1606039413], [373240, 1606039470], [381710, 1606039117], [377038, 1606039053], [375771, 1606038755]] c = ( Gauge() .add( "当前通话频度", [("通话频度", random.choice(data)[0] / 10000)], axisline_opts=opts.AxisLineOpts( linestyle_opts=opts.LineStyleOpts( color=[(0.3, "#67e0e3"), (0.7, "#37a2da"), (1, "#fd666d")], width=30 ) ), detail_label_opts=opts.LabelOpts(formatter="{value}万"), ) .set_global_opts( title_opts=opts.TitleOpts(title="当前通话频度"), legend_opts=opts.LegendOpts(is_show=False), ) .dump_options_with_quotes() ) return c class ChartView(APIView): def get(self, request, *args, **kwargs): # return JsonResponse(json.loads(bar_base())) return JsonResponse(json.loads(bar_base())) class IndexView(APIView): def get(self, request, *args, **kwargs): return HttpResponse(content=open("./templates/index.html").read()) class ChartView1(APIView): def get(self, request, *args, **kwargs): # return JsonResponse(json.loads(bar_base())) return JsonResponse(json.loads(gauge_base())) class IndexView1(APIView): def get(self, request, *args, **kwargs): return HttpResponse(content=open("./templates/index1.html").read())
join table and save to hive
import json from pyspark.sql.types import * from pyspark.sql.functions import * from pyspark.sql import SparkSession from pyspark import SparkContext, SparkConf def getpersonnums(DF): DF = DF.where("callerflag = 1 or callerflag = 0") DF = DF.where("timespan<=1000") DF = DF.where("length(caller1.phone)=11") DF = DF.where("length(caller2.phone)=11") df1 = DF.groupBy("caller1.phone").sum("timespan").toDF("caller1.phone","timespan"); df1 = df1.withColumnRenamed("caller1.phone","phone"); # df1.show(); df2 = DF.groupBy("caller2.phone").sum("timespan").toDF("caller2.phone", "timespan"); df2 = df2.withColumnRenamed("caller2.phone", "phone"); df = df1.union(df2).groupBy("phone").sum("timespan") df = df.withColumnRenamed("sum(timespan)","timespan"); return df if __name__ == "__main__": conf = SparkConf().setAppName("save_hive").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession.builder.getOrCreate() lines = sc.textFile("file:///home/hadoop/data.txt").map(lambda x:json.loads(x.replace("'",'"'))) schema = StructType([StructField("callerflag", LongType(), True), StructField("timespan", LongType(), True),StructField("timestamp", LongType(), True), StructField("caller1", StructType([StructField("phone", StringType(), True), StructField("caller1_site", StringType(), True),StructField("offsets", StringType(), True)]),True), StructField("caller2", StructType([StructField("phone", StringType(), True), StructField("caller2_site", StringType(), True),StructField("offsets", StringType(), True)]),True)]) data = spark.createDataFrame(lines,schema) DF = getpersonnums(data) DF.show() schema = StructType([ StructField("phone", StringType(), True), StructField("name", StringType(), True), StructField("idno", StringType(), True), StructField("latitude", FloatType(), True), StructField("longitude", FloatType(), True), StructField("sex", StringType(), True), StructField("blood_group", StringType(), True), StructField("mail", StringType(), True), StructField("job", StringType(), True), StructField("province", StringType(), True), StructField("city", StringType(), True), StructField("birthdate", StringType(), True), StructField("pricing_package", IntegerType(), True), ] ) df = spark.read.csv(r"file:///home/hadoop/PycharmProjects/pythonProject/user.csv", encoding='utf-8', header=True, schema=schema) # 使用指定的schema df.show() DF = DF.join(df,['phone'],"left_outer") DF.show() # 写到csv file="/user/hive/warehouse/ylt.db/basic" DF.write.csv(path=file, header=True, sep=",", mode='overwrite') sc.stop()
get birth
from pyspark.sql.functions import * from pyspark.sql import SparkSession from pyspark import SparkContext, SparkConf import json def save(path, data): with open(path, 'w') as f: f.write(data) conf = SparkConf().setAppName("getbirth").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession.builder.getOrCreate() file="/user/hive/warehouse/ylt.db/basic" df = spark.read.csv(file, header=False, inferSchema=True).toDF("phone","timespan","name","idno","latitude","longitude","sex","blood_group","mail","job","province","city","birthdate","pricing_package"); df.show() df = df.select("birthdate") df = df.withColumn("birthdate",split(df['birthdate'],"/")[2].substr(1,4)) df.write.csv("/user/hive/warehouse/ylt.db/birth", mode="overwrite") df = df.filter(df.birthdate.isNotNull()) save("/home/hadoop/datasave/birth.txt", json.dumps(df.collect())) df.show()from pyspark.sql.functions import * from pyspark.sql import SparkSession from pyspark import SparkContext, SparkConf import json def save(path, data): with open(path, 'w') as f: f.write(data) conf = SparkConf().setAppName("getbirth").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession.builder.getOrCreate() file="/user/hive/warehouse/ylt.db/basic" df = spark.read.csv(file, header=False, inferSchema=True).toDF("phone","timespan","name","idno","latitude","longitude","sex","blood_group","mail","job","province","city","birthdate","pricing_package"); df.show() df = df.select("birthdate") df = df.withColumn("birthdate",split(df['birthdate'],"/")[2].substr(1,4)) df.write.csv("/user/hive/warehouse/ylt.db/birth", mode="overwrite") df = df.filter(df.birthdate.isNotNull()) save("/home/hadoop/datasave/birth.txt", json.dumps(df.collect())) df.show()
getjob
from pyspark import SparkConf from pyspark.sql import SparkSession from pyspark.sql.functions import * def save(path, data): with open(path, 'w') as f: f.write(str(data)) def getcity_job(DF): DF = DF.groupBy("job").count().toDF("job", "nums") return DF.orderBy(-DF["nums"]).take(100) if __name__ == "__main__": conf = SparkConf().setAppName("getjob").set('spark.executor.memory', '10g').set("spark.executor.cores", '8').set( "spark.driver.memory", '10g') sc = SparkContext(conf=conf) spark = SparkSession.builder.getOrCreate() file = "/user/hive/warehouse/ylt.db/basic" DF = spark.read.csv(file, header=False, inferSchema=True).toDF("phone", "timespan", "name", "idno", "latitude", "longitude", "sex", "blood_group", "mail", "job", "province", "city", "birthdate", "pricing_package"); DF.show() DF = getcity_job(DF) DF = DF.filter(DF.job.isNotNull()) DF.write.csv("/user/hive/warehouse/ylt.db/birth", mode="overwrite") save("/home/hadoop/datasave/job.txt", DF) sc.stop()
getsex
import json from pyspark.sql import SparkSession from pyspark import SparkContext, SparkConf def save(path, data): with open(path, 'w') as f: f.write(data) def getsex_blood(DF): df = DF.groupBy("sex","blood_group").count().toDF("sex","blood_group","nums") df.show() return df if __name__ == "__main__": conf = SparkConf().setAppName("getsex").set('spark.executor.memory', '10g').set("spark.executor.cores", '8').set("spark.driver.memory", '10g') sc = SparkContext(conf=conf) spark = SparkSession.builder.getOrCreate() file = "/user/hive/warehouse/ylt.db/basic" DF = spark.read.csv(file, header=False, inferSchema=True).toDF("phone", "timespan", "name", "idno", "latitude", "longitude", "sex", "blood_group", "mail", "job", "province", "city", "birthdate", "pricing_package"); DF.show() DF = getsex_blood(DF) DF = DF.filter(DF.sex.isNotNull()) DF.write.csv("/user/hive/warehouse/ylt.db/sex", mode="overwrite") save("/home/hadoop/datasave/sex.txt",json.dumps(DF.collect())) sc.stop()
getpayment
import json from pyspark import SparkConf from pyspark.sql import SparkSession from pyspark.sql.functions import * def save(path, data): with open(path, 'w') as f: f.write(data) def getpay(DF): df1 = DF.where("pricing_package=1") df2 = DF.where("pricing_package=2") df3 = DF.where("pricing_package=3") df4 = DF.where("pricing_package=4") df1 = df1.withColumn("payment",5+df1.timespan/10) df2 = df2.withColumn("payment",greatest(lit(10),(df2.timespan-30)+10)/20) df3 = df3.withColumn("payment", greatest(lit(50), 0.03 * (df3.timespan - 50) + 50)) df4 = df4.withColumn("payment", greatest(0.03 * (df4.timespan - 100) + 41, 0.01 * df4.timespan + 40)) df4.show() DF = df1.union(df2).union(df3).union(df4) DF.show() DF = DF.select(DF.payment) return DF if __name__ == "__main__": conf = SparkConf().setAppName("getpayment").set('spark.executor.memory', '10g').set("spark.executor.cores", '8').set("spark.driver.memory", '10g') sc = SparkContext(conf=conf) spark = SparkSession.builder.getOrCreate() file = "/user/hive/warehouse/ylt.db/basic" DF = spark.read.csv(file, header=False, inferSchema=True).toDF("phone", "timespan", "name", "idno", "latitude", "longitude", "sex", "blood_group", "mail", "job", "province", "city", "birthdate", "pricing_package"); DF.show() DF = getpay(DF) DF = DF.filter(DF.payment.isNotNull()) DF.write.csv("/user/hive/warehouse/ylt.db/payment", mode="overwrite") save("/home/hadoop/datasave/payment.txt", json.dumps(DF.collect())) sc.stop()
getcityallnums:get the moment all city in and out number
import json from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark import SparkContext, SparkConf from pyspark.sql.types import * from pyspark.sql import SparkSession import os def getcitycallnums(DF): basedf1 = DF.where("callerflag=0") basedf2 = DF.where("callerflag=1") df1 = basedf1.groupBy("caller1.caller1_site").count().toDF("city","callout") df2 = basedf2.groupBy("caller2.caller2_site").count().toDF("city","callout") df3 = basedf1.groupBy("caller2.caller2_site").count().toDF("city","callin") df4 = basedf2.groupBy("caller1.caller1_site").count().toDF("city","callin") callerinDF = df1.union(df2).groupBy("city").sum().toDF("city","callout") calleroutDF = df3.union(df4).groupBy("city").sum().toDF("city","callin") finDF = callerinDF.join(calleroutDF,"city").toDF("city","callout","callin") allcity = ["allcitydata",[list(i) for i in finDF.orderBy("callout").collect()]] try: print(allcity) with open("/home/hadoop/datasave/allcity.txt","w") as fin: for i in allcity[1]: fin.write(",".join([str(j) for j in i])+'\n') except Exception as e: print(e) if __name__ == "__main__": memory = '10g' pyspark_submit_args = ' --driver-memory ' + memory + ' --executor-memory ' + memory+' pyspark-shell' os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args conf = SparkConf().setAppName("streamingkafka").set('spark.executor.memory', '10g').set("spark.executor.cores", '8') sc = SparkContext(conf=conf) sc.setLogLevel("WARN") # 减少shell打印日志 ssc = StreamingContext(sc, 60) # 5秒的计算窗口 spark = SparkSession.builder.getOrCreate() brokers='''10.132.221.111:6667,10.132.221.112:6667,10.132.221.113:6667,10.132.221.114:6667,10.132.221.116:6667,10.132.221.117:6667,10.132.221.118:6667,10.132.221.119:6667,10.132.221.120:6667,10.132.221.121:6667,10.132.221.123:6667,10.132.221.124:6667,10.132.221.125:6667,10.132.221.126:6667,10.132.221.127:6667,10.132.221.128:6667,10.132.221.129:6667,10.132.221.130:6667,10.132.221.132:6667''' schema_telecomdata = StructType([StructField("callerflag", LongType(), True), StructField("timespan", LongType(), True),StructField("timestamp", LongType(), True), StructField("caller1", StructType([StructField("phone", StringType(), True), StructField("caller1_site", StringType(), True),StructField("offsets", StringType(), True)]),True), StructField("caller2", StructType([StructField("phone", StringType(), True), StructField("caller2_site", StringType(), True),StructField("offsets", StringType(), True)]),True)]) topic = 'telecom' kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc,[topic] ,kafkaParams={"metadata.broker.list": brokers,"group.id":'18211170230ylt'}) kafka_streaming_rdd.map(lambda x: json.loads(x[1])).foreachRDD( lambda x: getcitycallnums(spark.createDataFrame(x, schema_telecomdata)) ) ssc.start() ssc.awaitTermination()
getdaytotnums:get a whole day in and out number ,can save to local or hive
import json from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark import SparkContext, SparkConf from pyspark.sql.types import * from pyspark.sql import SparkSession import pyhdfs import time def tohive(DF): client = pyhdfs.HdfsClient("localhost,9000","hadoop") nums = DF.count() timec = int(time.time()) row = str(nums)+","+str(timec)+"\n" client.append("/user/hive/warehouse/ylt.db/hourtotalnums/inputsource1.txt",row) print(row) def gettotalnums(DF): nums = DF.count() timec = int(time.time()) data = ["totalnums",[nums,timec]] try: print(data) with open("/home/hadoop/datasave/daytotalnums.txt","w") as fin: fin.write(",".join([str(i) for i in data[1]])) except Exception as e: print(e) if __name__ == "__main__": conf = SparkConf().setAppName("streamingkafka").set('spark.executor.memory', '10g').set("spark.executor.cores", '8') sc = SparkContext(conf=conf) sc.setLogLevel("WARN") # 减少shell打印日志 ssc = StreamingContext(sc, 5) spark = SparkSession.builder.getOrCreate() brokers='''10.132.221.111:6667,10.132.221.112:6667,10.132.221.113:6667,10.132.221.114:6667,10.132.221.116:6667,10.132.221.117:6667,10.132.221.118:6667,10.132.221.119:6667,10.132.221.120:6667,10.132.221.121:6667,10.132.221.123:6667,10.132.221.124:6667,10.132.221.125:6667,10.132.221.126:6667,10.132.221.127:6667,10.132.221.128:6667,10.132.221.129:6667,10.132.221.130:6667,10.132.221.132:6667''' schema_telecomdata = StructType([StructField("callerflag", LongType(), True), StructField("timespan", LongType(), True),StructField("timestamp", LongType(), True), StructField("caller1", StructType([StructField("phone", StringType(), True), StructField("caller1_site", StringType(), True),StructField("offsets", StringType(), True)]),True), StructField("caller2", StructType([StructField("phone", StringType(), True), StructField("caller2_site", StringType(), True),StructField("offsets", StringType(), True)]),True)]) topic = 'telecom' kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc,[topic] ,kafkaParams={"metadata.broker.list": brokers,"group.id":'18211170223wlt_t2'}) # kafka_streaming_rdd.map(lambda x: json.loads(x[1])).foreachRDD( # lambda x: gettotalnums(spark.createDataFrame(x, schema_telecomdata)) ) kafka_streaming_rdd.map(lambda x: json.loads(x[1])).foreachRDD( lambda x: tohive(spark.createDataFrame(x, schema_telecomdata))) ssc.start() ssc.awaitTermination()
gettotalnums:get the moment all num
import json from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark import SparkContext, SparkConf from pyspark.sql.types import * from pyspark.sql import SparkSession import pyhdfs import time def tohive(DF): client = pyhdfs.HdfsClient("localhost,9000","hadoop") nums = DF.count() timec = int(time.time()) row = str(nums)+","+str(timec)+"\n" client.append("/user/hive/warehouse/ylt.db/calltotalnums/inputsource1.txt",row) print(row) def gettotalnums(DF): DF = DF.where("callerflag = 1 or callerflag = 0") DF = DF.where("timespan<=1000") DF = DF.where("length(caller1.phone)=11") DF = DF.where("length(caller2.phone)=11") nums = DF.count() timec = int(time.time()) data = ["totalnums",[nums,timec]] try: print(data) with open("/home/hadoop/datasave/totalnums.txt","w") as fin: fin.write(",".join([str(i) for i in data[1]])) except Exception as e: print(e) if __name__ == "__main__": conf = SparkConf().setAppName("streamingkafka").set('spark.executor.memory', '10g').set("spark.executor.cores", '8') sc = SparkContext(conf=conf) sc.setLogLevel("WARN") # 减少shell打印日志 ssc = StreamingContext(sc, 5) spark = SparkSession.builder.getOrCreate() brokers='''10.132.221.111:6667,10.132.221.112:6667,10.132.221.113:6667,10.132.221.114:6667,10.132.221.116:6667,10.132.221.117:6667,10.132.221.118:6667,10.132.221.119:6667,10.132.221.120:6667,10.132.221.121:6667,10.132.221.123:6667,10.132.221.124:6667,10.132.221.125:6667,10.132.221.126:6667,10.132.221.127:6667,10.132.221.128:6667,10.132.221.129:6667,10.132.221.130:6667,10.132.221.132:6667''' schema_telecomdata = StructType([StructField("callerflag", LongType(), True), StructField("timespan", LongType(), True),StructField("timestamp", LongType(), True), StructField("caller1", StructType([StructField("phone", StringType(), True), StructField("caller1_site", StringType(), True),StructField("offsets", StringType(), True)]),True), StructField("caller2", StructType([StructField("phone", StringType(), True), StructField("caller2_site", StringType(), True),StructField("offsets", StringType(), True)]),True)]) topic = 'telecom' kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc,[topic] ,kafkaParams={"metadata.broker.list": brokers,"group.id":'18211170230ylt'}) kafka_streaming_rdd.map(lambda x: json.loads(x[1])).foreachRDD( lambda x: gettotalnums(spark.createDataFrame(x, schema_telecomdata)) ) # kafka_streaming_rdd.map(lambda x: json.loads(x[1])).foreachRDD( # lambda x: tohive(spark.createDataFrame(x, schema_telecomdata))) ssc.start() ssc.awaitTermination()
drawjob
from pyecharts import options as opts from pyecharts.charts import Pie,WordCloud with open('/home/hadoop/datasave/job.txt','r') as fin: data = fin.read() data = data.replace("Row(","[") data = data.replace(")","]") data = data.replace("job=","") data = data.replace("nums=","") data = eval(data) print(data) c = ( WordCloud() .add(series_name="岗位分析", data_pair=data, word_size_range=[20, 50]) .set_global_opts( title_opts=opts.TitleOpts( title="岗位分析", title_textstyle_opts=opts.TextStyleOpts(font_size=23) ), tooltip_opts=opts.TooltipOpts(is_show=True), ) .render("/home/hadoop/datasave/job.html") )
drawsex
from pyecharts import options as opts from pyecharts.charts import Bar lines = open('/home/hadoop/datasave/sex.txt','r') for line in lines: data = eval(line) lines.close() d1 = dict() d2 = dict() for i in data: if i[0] == "F": d1[i[1]] = i[2] else: d2[i[1]] = i[2] d1 = sorted(d1.items(), key = lambda i: i[0]) d2 = sorted(d2.items(), key = lambda i: i[0]) key = [] v1 = [] v2 = [] for i in d1: key.append(i[0]) v1.append(i[1]) for i in d2: v2.append(i[1]) print(v1) print(v2) c = ( Bar() .add_xaxis(key) .add_yaxis("男性", v1) .add_yaxis("女性", v2) .set_global_opts( title_opts=opts.TitleOpts(title="血型性别条形图", subtitle="男女不同血型的比较"), brush_opts=opts.BrushOpts(), ) .render("/home/hadoop/datasave/sex.html") )
drawpayment
from pyecharts import options as opts from pyecharts.charts import Pie lines = open('/home/hadoop/datasave/payment.txt','r') for line in lines: data = eval(line) lines.close() num1,num2,num3,num4 = 0,0,0,0 for i in data: if i[0] <= 10: num1+=1 elif i[0] <=30: num2+= 1 elif i[0] <= 50: num3 +=1 else: num4 +=1 data = [['话费在0~10之间',num1],['话费在10~30之间',num2],["话费在30~50之间",num3],["话费超过50",num4]] c = ( Pie() .add( "", data, radius=["40%", "55%"], label_opts=opts.LabelOpts( position="outside", formatter="{a|{a}}{abg|}\n{hr|}\n {b|{b}: }{c} {per|{d}%} ", background_color="#eee", border_color="#aaa", border_width=1, border_radius=4, rich={ "a": {"color": "#999", "lineHeight": 22, "align": "center"}, "abg": { "backgroundColor": "#e3e3e3", "width": "100%", "align": "right", "height": 22, "borderRadius": [4, 4, 0, 0], }, "hr": { "borderColor": "#aaa", "width": "100%", "borderWidth": 0.5, "height": 0, }, "b": {"fontSize": 16, "lineHeight": 33}, "per": { "color": "#eee", "backgroundColor": "#334455", "padding": [2, 4], "borderRadius": 2, }, }, ), ) .set_global_opts(title_opts=opts.TitleOpts(title="话费构成")) .render("/home/hadoop/datasave/payment.html") )
drawyear
from pyecharts import options as opts from pyecharts.charts import Pie,Funnel lines = open('/home/hadoop/datasave/birth.txt','r') for line in lines: data = eval(line) lines.close() n1,n2,n3,n4,n5,n6,n7,n8 = 0,0,0,0,0,0,0,0 for i in data: if abs(int(i[0])-2000) <= 10: n1+=1 elif abs(int(i[0])-2000) <= 20: n2+=1 elif abs(int(i[0])-2000) <= 30: n3+=1 elif abs(int(i[0])-2000) <= 40: n4+=1 elif abs(int(i[0])-2000) <= 50: n5+=1 elif abs(int(i[0])-2000) <= 60: n6+=1 elif abs(int(i[0])-2000) <= 70: n7+=1 else: n8+=1 print(n1,n2,n3,n4,n5,n6,n7,n8) data = [["0~10岁",n1],["10~20岁",n2],["20~30岁",n3],["30~40岁",n4],["40~50岁",n5],["50~60岁",n6],["60~70岁",n7],["超过70岁",n8]] c = ( Funnel() .add( "年龄", data, label_opts=opts.LabelOpts(position="inside"), ) .set_global_opts(title_opts=opts.TitleOpts(title="Funnel-Label(inside)")) .render("/home/hadoop/datasave/year.html") )