json读文件:https://zhuanlan.zhihu.com/p/267353998
pyspark配置:http://dblab.xmu.edu.cn/blog/1689-2/
数据分析案例:http://dblab.xmu.edu.cn/blog/2738-2/
pycharm配置pyspark环境:https://blog.csdn.net/ringsuling/article/details/84448369
Crontab+Flume+Kafka+Spark Streaming+Spring Boot 统计网页访问量项目:https://blog.csdn.net/qq_36329973/article/details/104738993?utm_medium=distribute.pc_relevant.none-task-blog-OPENSEARCH-8.not_use_machine_learn_pai&depth_1-utm_source=distribute.pc_relevant.none-task-blog-OPENSEARCH-8.not_use_machine_learn_pai
pyspark dataframe基本操作:https://www.jianshu.com/p/acd96549ee15
https://www.cnblogs.com/cxhzy/p/11067246.html
使用Python串口实时显示数据并绘图:https://zhuanlan.zhihu.com/p/100798858
pyecharts:https://pyecharts.org/#/zh-cn/web_flask
pyecharts-gallery:https://gallery.pyecharts.org/#/Bar/bar_markline_type
echarts:https://echarts.apache.org/examples/en/index.html#chart-type-bar
解决Django运行报错Error: That port is already in use.:https://www.jianshu.com/p/7ceebb422d09
pip install djangorestframework
写入数据:
import sys
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf
if __name__ == "__main__":
conf = SparkConf().setAppName("streamingkafka").set('spark.executor.memory', '10g').set("spark.executor.cores", '8')
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN") # 减少shell打印日志
ssc = StreamingContext(sc, 5) # 5秒的计算窗口
brokers='''10.132.221.111:6667,10.132.221.112:6667,10.132.221.113:6667,10.132.221.114:6667,10.132.221.116:6667,10.132.221.117:6667,10.132.221.118:6667,10.132.221.119:6667,10.132.221.120:6667,10.132.221.121:6667,10.132.221.123:6667,10.132.221.124:6667,10.132.221.125:6667,10.132.221.126:6667,10.132.221.127:6667,10.132.221.128:6667,10.132.221.129:6667,10.132.221.130:6667,10.132.221.132:6667'''
topic = 'telecom'
kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc,[topic] ,kafkaParams={"metadata.broker.list": brokers,"group.id":'18211170230ylt'})
kafka_streaming_rdd.map(lambda x:x[1]).foreachRDD(lambda x:x.saveAsTextFile("file:///home/hadoop/rdd/file" + str(int(time.time()))))
ssc.start()
ssc.awaitTermination() 写入数据:
from pyspark import SparkContext
from pyspark.sql import SparkSession
import json
if __name__ == "__main__":
sc = SparkContext( 'local', 'test')
sc.setLogLevel("WARN")
spark = SparkSession.builder.getOrCreate()
file = "file:///home/hadoop/rdd/file1605959588"
df1 = spark.read.json(file)
df1.show() 读入数据并存为df类型
import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql import SparkSession
if __name__ == "__main__":
conf = SparkConf().setAppName("streamingkafka").set('spark.executor.memory', '10g').set("spark.executor.cores", '8')
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN") # 减少shell打印日志
ssc = StreamingContext(sc, 5) # 5秒的计算窗口
spark = SparkSession.builder.getOrCreate()
brokers='''10.132.221.111:6667,10.132.221.112:6667,10.132.221.113:6667,10.132.221.114:6667,10.132.221.116:6667,10.132.221.117:6667,10.132.221.118:6667,10.132.221.119:6667,10.132.221.120:6667,10.132.221.121:6667,10.132.221.123:6667,10.132.221.124:6667,10.132.221.125:6667,10.132.221.126:6667,10.132.221.127:6667,10.132.221.128:6667,10.132.221.129:6667,10.132.221.130:6667,10.132.221.132:6667'''
schema_telecomdata = StructType([StructField("callerflag", LongType(), True), StructField("timespan", LongType(), True),StructField("timestamp", LongType(), True),
StructField("caller1", StructType([StructField("phone", StringType(), True), StructField("caller1_site", StringType(), True),StructField("offsets", StringType(), True)]),True),
StructField("caller2", StructType([StructField("phone", StringType(), True), StructField("caller2_site", StringType(), True),StructField("offsets", StringType(), True)]),True)])
schema_fin = StructType([StructField("city", StringType(), True), StructField("nums", LongType(), True)])
sumDF = spark.createDataFrame(spark.sparkContext.emptyRDD(),schema_telecomdata)
topic = 'telecom'
kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc,[topic] ,kafkaParams={"metadata.broker.list": brokers,"group.id":'18211170230ylt'})
kafka_streaming_rdd.map(lambda x: json.loads(x[1])).foreachRDD(lambda x: spark.createDataFrame(x,schema_telecomdata).groupBy("caller1.caller1_site").count().show())
ssc.start()
ssc.awaitTermination()
渲染
index.html
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Awesome-pyecharts</title>
<script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script>
<script type="text/javascript" src="https://assets.pyecharts.org/assets/echarts.min.js"></script>
</head>
<body>
<div id="bar" style="width:1000px; height:600px;"></div>
<script>
var chart = echarts.init(document.getElementById('bar'), 'white', {renderer: 'canvas'});
$(
function () {
fetchData(chart);
setInterval(fetchData, 2000);
}
);
function fetchData() {
$.ajax({
type: "GET",
url: "http://127.0.0.1:8000/demo/bar",
dataType: 'json',
success: function (result) {
chart.setOption(result.data);
}
});
}
</script>
</body>
</html> index1.html
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Awesome-pyecharts</title>
<script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script>
<script type="text/javascript" src="https://assets.pyecharts.org/assets/echarts.min.js"></script>
</head>
<body>
<div id="bar1" style="width:1000px; height:600px;"></div>
<script>
var chart = echarts.init(document.getElementById('bar1'), 'white', {renderer: 'canvas'});
$(
function () {
fetchData(chart);
setInterval(fetchData, 2000);
}
);
function fetchData() {
$.ajax({
type: "GET",
url: "http://127.0.0.1:8000/demo/bar1",
dataType: 'json',
success: function (result) {
chart.setOption(result.data);
}
});
}
</script>
</body>
</html> demo/urls.py
# demo/urls.py
from django.conf.urls import url
from . import views
urlpatterns = [
url(r'^bar/$', views.ChartView.as_view(), name='demo'),
url(r'^index/$', views.IndexView.as_view(), name='demo'),
url(r'^bar1/$', views.ChartView1.as_view(), name='demo'),
url(r'^index1/$', views.IndexView1.as_view(), name='demo'),
] views.py
import json
from random import randrange
from django.http import HttpResponse
from rest_framework.views import APIView
from pyecharts.charts import Bar,Gauge
from pyecharts import options as opts
from pyecharts.faker import Faker
import random
# Create your views here.
def response_as_json(data):
json_str = json.dumps(data)
response = HttpResponse(
json_str,
content_type="application/json",
)
response["Access-Control-Allow-Origin"] = "*"
return response
def json_response(data, code=200):
data = {
"code": code,
"msg": "success",
"data": data,
}
return response_as_json(data)
def json_error(error_string="error", code=500, **kwargs):
data = {
"code": code,
"msg": error_string,
"data": {}
}
data.update(kwargs)
return response_as_json(data)
JsonResponse = json_response
JsonError = json_error
def bar_base() -> Bar:
data = [['乌海市', 90, 110], ['锡林郭勒盟', 99, 98], ['阿拉善盟', 99, 114], ['包头市', 99, 109], ['乌兰察布市', 99, 122],
['兴安盟', 99, 135], ['鄂尔多斯市', 102, 93], ['丽江市', 106, 107], ['呼伦贝尔市', 106, 100], ['赤峰市', 107, 104],
['昭通市', 110, 111], ['呼和浩特市', 113, 92], ['通辽市', 113, 110], ['巴彦淖尔市', 115, 103], ['红河哈尼族彝族自治州', 115, 114],
['葫芦岛市', 116, 143], ['锦州市', 119, 116], ['文山壮族苗族自治州', 120, 132], ['营口市', 121, 140], ['昆明市', 122, 125],
['楚雄彝族自治州', 122, 127], ['西双版纳傣族自治州', 123, 119], ['铁岭市', 124, 148], ['普洱市', 124, 131],
['怒江傈僳族自治州', 124, 129], ['德宏傣族景颇族自治州', 126, 144], ['玉溪市', 126, 126], ['沈阳市', 126, 142], ['阜新市', 132, 141],
['曲靖市', 132, 122], ['临沧市', 134, 131], ['丹东市', 136, 134], ['盘锦市', 137, 133], ['焦作市', 138, 141],
['商丘市', 139, 162], ['雅安市', 139, 171], ['大理白族自治州', 139, 121], ['本溪市', 140, 127], ['保山市', 140, 121],
['抚顺市', 141, 148], ['迪庆藏族自治州', 141, 153], ['鞍山市', 142, 129], ['自贡市', 147, 148], ['驻马店市', 149, 164],
['遂宁市', 150, 166], ['安阳市', 150, 153], ['漯河市', 151, 164], ['平顶山市', 152, 162], ['甘孜藏族自治州', 152, 132],
['资阳市', 152, 153], ['乐山市', 154, 158], ['周口市', 155, 169], ['凉山彝族自治州', 155, 151], ['大连市', 157, 121],
['朝阳市', 157, 157], ['新乡市', 157, 144], ['阿坝藏族羌族自治州', 158, 165], ['广元市', 159, 172], ['许昌市', 160, 172],
['巴中市', 160, 165], ['德阳市', 160, 171], ['广安市', 160, 176], ['郑州市', 163, 138], ['辽阳市', 164, 147],
['信阳市', 164, 165], ['濮阳市', 166, 165], ['三门峡市', 166, 150], ['宜宾市', 166, 147], ['绵阳市', 166, 171],
['攀枝花市', 167, 150], ['开封市', 167, 147], ['南充市', 168, 160], ['酒泉市', 169, 224], ['南阳市', 169, 191],
['泸州市', 170, 165], ['内江市', 171, 166], ['眉山市', 173, 154], ['洛阳市', 178, 163], ['鹤壁市', 180, 155],
['成都市', 180, 155], ['嘉峪关市', 181, 201], ['定西市', 187, 173], ['天水市', 193, 223], ['武威市', 194, 192],
['南宁市', 196, 199], ['达州市', 196, 188], ['平凉市', 197, 196], ['甘南藏族自治州', 197, 197], ['衡阳市', 197, 206],
['兰州市', 197, 237], ['崇左市', 198, 206], ['临夏回族自治州', 198, 201], ['益阳市', 199, 224], ['株洲市', 199, 211],
['白银市', 199, 193], ['陇南市', 200, 178], ['长沙市', 201, 191], ['永州市', 201, 238], ['湘西土家族苗族自治州', 203, 218],
['北海市', 204, 223], ['邵阳市', 206, 225], ['襄阳市', 208, 218], ['恩施土家族苗族自治州', 208, 208], ['黄石市', 209, 215],
['玉林市', 210, 225], ['荆州市', 210, 222], ['金昌市', 211, 193], ['张掖市', 211, 211], ['张家界市', 211, 209],
['湘潭市', 212, 218], ['承德市', 214, 207], ['常德市', 214, 196], ['绥化市', 214, 254], ['厦门市', 214, 230],
['钦州市', 214, 212], ['岳阳市', 215, 204], ['宜昌市', 215, 212], ['贵港市', 215, 212], ['贺州市', 215, 227],
['防城港市', 217, 211], ['庆阳市', 217, 190], ['黑河市', 218, 253], ['柳州市', 219, 209], ['郴州市', 219, 200],
['保定市', 219, 245], ['来宾市', 219, 230], ['十堰市', 219, 224], ['梧州市', 219, 227], ['喀什地区', 220, 243],
['邯郸市', 220, 239], ['克拉玛依市', 222, 245], ['秦皇岛市', 224, 239], ['孝感市', 224, 222], ['随州市', 225, 227],
['石家庄市', 227, 232], ['桂林市', 227, 191], ['云浮市', 228, 237], ['娄底市', 229, 199], ['宁德市', 229, 247],
['衡水市', 230, 227], ['河源市', 230, 263], ['百色市', 231, 205], ['怀化市', 231, 204], ['福州市', 231, 257],
['沧州市', 232, 225], ['泉州市', 232, 252], ['黄冈市', 234, 239], ['龙岩市', 234, 221], ['鄂州市', 235, 225],
['巴音郭楞蒙古自治州', 235, 241], ['博尔塔拉蒙古自治州', 236, 232], ['廊坊市', 236, 239], ['荆门市', 237, 215], ['和田地区', 237, 244],
['阿勒泰地区', 237, 233], ['双鸭山市', 237, 243], ['乌鲁木齐市', 237, 258], ['七台河市', 238, 220], ['肇庆市', 238, 289],
['珠海市', 238, 281], ['南平市', 240, 235], ['邢台市', 240, 228], ['塔城地区', 240, 251], ['莆田市', 240, 236],
['清远市', 240, 253], ['鸡西市', 240, 253], ['武汉市', 241, 242], ['唐山市', 242, 219], ['亳州市', 242, 263],
['咸宁市', 243, 222], ['湛江市', 244, 240], ['汕尾市', 244, 254], ['景德镇市', 245, 274], ['哈密市', 245, 267],
['阿克苏地区', 245, 227], ['伊春市', 246, 253], ['吐鲁番市', 247, 256], ['漳州市', 248, 209], ['茂名市', 248, 262],
['齐齐哈尔市', 249, 245], ['鹰潭市', 249, 304], ['佛山市', 250, 264], ['佳木斯市', 250, 245], ['鹤岗市', 250, 241],
['马鞍山市', 251, 249], ['昌吉回族自治州', 251, 224], ['大兴安岭地区', 252, 246], ['克孜勒苏柯尔克孜自治州', 253, 246],
['河池市', 253, 247], ['东莞市', 254, 275], ['中山市', 254, 239], ['梅州市', 257, 256], ['淮北市', 258, 320],
['海南藏族自治州', 258, 298], ['潮州市', 258, 245], ['揭阳市', 259, 245], ['三明市', 259, 227], ['南昌市', 260, 271],
['新余市', 262, 277], ['哈尔滨市', 263, 253], ['蚌埠市', 263, 303], ['张家口市', 263, 244], ['阳江市', 264, 242],
['六安市', 264, 293], ['海西蒙古族藏族自治州', 265, 273], ['萍乡市', 265, 272], ['大庆市', 266, 256], ['上饶市', 266, 283],
['惠州市', 267, 231], ['江门市', 268, 292], ['果洛藏族自治州', 269, 267], ['淮南市', 269, 263], ['铜陵市', 269, 302],
['伊犁哈萨克自治州', 269, 240], ['韶关市', 270, 235], ['滁州市', 273, 285], ['池州市', 277, 267], ['西宁市', 277, 295],
['宿州市', 278, 251], ['赣州市', 278, 246], ['玉树藏族自治州', 279, 265], ['汕头市', 280, 284], ['牡丹江市', 281, 237],
['宜春市', 283, 252], ['合肥市', 283, 297], ['黄山市', 283, 274], ['广州市', 286, 249], ['黄南藏族自治州', 286, 289],
['深圳市', 288, 231], ['安庆市', 289, 270], ['九江市', 291, 259], ['吉安市', 292, 301], ['抚州市', 293, 288],
['阜阳市', 294, 271], ['芜湖市', 297, 251], ['宣城市', 299, 259], ['威海市', 300, 331], ['济宁市', 301, 281],
['海东市', 304, 272], ['日照市', 306, 321], ['烟台市', 308, 323], ['东营市', 309, 299], ['泰安市', 309, 309],
['青岛市', 309, 294], ['海北藏族自治州', 311, 310], ['济南市', 313, 341], ['菏泽市', 316, 333], ['枣庄市', 317, 336],
['淄博市', 319, 343], ['莱芜市', 321, 330], ['潍坊市', 331, 291], ['滨州市', 332, 336], ['聊城市', 338, 326],
['德州市', 338, 322], ['阿里地区', 338, 358], ['湖州市', 339, 344], ['临沂市', 341, 302], ['那曲市', 341, 342],
['林芝市', 341, 367], ['吴忠市', 346, 347], ['衢州市', 346, 367], ['丽水市', 349, 359], ['嘉兴市', 353, 365],
['拉萨市', 355, 362], ['固原市', 359, 373], ['银川市', 364, 357], ['中卫市', 364, 368], ['温州市', 364, 379],
['石嘴山市', 364, 348], ['日喀则市', 366, 343], ['舟山市', 367, 346], ['山南市', 370, 355], ['台州市', 374, 346],
['金华市', 375, 367], ['宁波市', 376, 386], ['昌都市', 379, 343], ['忻州市', 380, 428], ['太原市', 382, 433],
['杭州市', 392, 377], ['绍兴市', 393, 394], ['淮安市', 398, 441], ['运城市', 399, 411], ['连云港市', 402, 464],
['吕梁市', 405, 422], ['海口市', 405, 413], ['苏州市', 413, 418], ['泰州市', 414, 446], ['儋州市', 417, 391],
['延边朝鲜族自治州', 420, 430], ['临汾市', 421, 425], ['三沙市', 421, 417], ['辽源市', 421, 402], ['阳泉市', 423, 462],
['通化市', 423, 438], ['晋中市', 424, 418], ['宿迁市', 425, 472], ['晋城市', 427, 433], ['吉林市', 432, 452],
['白山市', 435, 429], ['松原市', 437, 468], ['徐州市', 438, 472], ['三亚市', 439, 443], ['盐城市', 447, 465],
['白城市', 447, 425], ['长春市', 449, 445], ['长治市', 451, 404], ['扬州市', 455, 451], ['四平市', 456, 394],
['无锡市', 463, 429], ['南京市', 463, 398], ['南通市', 469, 404], ['朔州市', 470, 410], ['大同市', 472, 413],
['镇江市', 486, 437], ['常州市', 486, 432], ['宝鸡市', 560, 586], ['铜川市', 567, 590], ['铜仁市', 569, 592],
['榆林市', 579, 608], ['汉中市', 589, 614], ['安康市', 596, 618], ['黔南布依族苗族自治州', 598, 655], ['黔东南苗族侗族自治州', 604, 608],
['商洛市', 605, 552], ['西安市', 606, 586], ['贵阳市', 608, 622], ['咸阳市', 614, 596], ['安顺市', 621, 618],
['延安市', 623, 578], ['渭南市', 623, 601], ['六盘水市', 633, 547], ['黔西南布依族苗族自治州', 636, 621], ['毕节市', 639, 591],
['遵义市', 642, 654], ['重庆市', 2654, 2665], ['澳门', 3115, 3077], ['上海市', 4307, 4305], ['天津市', 4398, 4381],
['北京市', 4566, 4623], ['香港', 5159, 5121]]
usedata = [random.choice(data) for i in range(10)]
print(usedata)
c = (
Bar()
.add_xaxis([i[0] for i in usedata])
.add_yaxis("呼入", [i[1] for i in usedata])
.add_yaxis("呼出", [i[2] for i in usedata])
.set_global_opts(title_opts=opts.TitleOpts(title="每日城市的呼入呼出量", subtitle="我是副标题"),
datazoom_opts=opts.DataZoomOpts(type_="inside"))
.dump_options_with_quotes()
)
return c
def bar_diff() -> Bar:
c = (
Bar()
.add_xaxis(Faker.choose())
.add_yaxis("商家A", Faker.values())
.add_yaxis("商家B", Faker.values())
.set_global_opts(title_opts=opts.TitleOpts(title="Bar-MarkLine(指定类型)"))
.set_series_opts(
label_opts=opts.LabelOpts(is_show=False),
markline_opts=opts.MarkLineOpts(
data=[
opts.MarkLineItem(type_="min", name="最小值"),
opts.MarkLineItem(type_="max", name="最大值"),
opts.MarkLineItem(type_="average", name="平均值"),
]
),
)
.dump_options_with_quotes()
)
return c
def gauge_base() -> Gauge:
data = [[385293, 1606039296], [368545, 1606039351], [377871, 1606039413], [373240, 1606039470],
[381710, 1606039117], [377038, 1606039053], [375771, 1606038755]]
c = (
Gauge()
.add(
"当前通话频度",
[("通话频度", random.choice(data)[0] / 10000)],
axisline_opts=opts.AxisLineOpts(
linestyle_opts=opts.LineStyleOpts(
color=[(0.3, "#67e0e3"), (0.7, "#37a2da"), (1, "#fd666d")], width=30
)
), detail_label_opts=opts.LabelOpts(formatter="{value}万"),
)
.set_global_opts(
title_opts=opts.TitleOpts(title="当前通话频度"),
legend_opts=opts.LegendOpts(is_show=False),
)
.dump_options_with_quotes()
)
return c
class ChartView(APIView):
def get(self, request, *args, **kwargs):
# return JsonResponse(json.loads(bar_base()))
return JsonResponse(json.loads(bar_base()))
class IndexView(APIView):
def get(self, request, *args, **kwargs):
return HttpResponse(content=open("./templates/index.html").read())
class ChartView1(APIView):
def get(self, request, *args, **kwargs):
# return JsonResponse(json.loads(bar_base()))
return JsonResponse(json.loads(gauge_base()))
class IndexView1(APIView):
def get(self, request, *args, **kwargs):
return HttpResponse(content=open("./templates/index1.html").read()) join table and save to hive
import json
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
def getpersonnums(DF):
DF = DF.where("callerflag = 1 or callerflag = 0")
DF = DF.where("timespan<=1000")
DF = DF.where("length(caller1.phone)=11")
DF = DF.where("length(caller2.phone)=11")
df1 = DF.groupBy("caller1.phone").sum("timespan").toDF("caller1.phone","timespan");
df1 = df1.withColumnRenamed("caller1.phone","phone");
# df1.show();
df2 = DF.groupBy("caller2.phone").sum("timespan").toDF("caller2.phone", "timespan");
df2 = df2.withColumnRenamed("caller2.phone", "phone");
df = df1.union(df2).groupBy("phone").sum("timespan")
df = df.withColumnRenamed("sum(timespan)","timespan");
return df
if __name__ == "__main__":
conf = SparkConf().setAppName("save_hive").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
lines = sc.textFile("file:///home/hadoop/data.txt").map(lambda x:json.loads(x.replace("'",'"')))
schema = StructType([StructField("callerflag", LongType(), True), StructField("timespan", LongType(), True),StructField("timestamp", LongType(), True),
StructField("caller1", StructType([StructField("phone", StringType(), True), StructField("caller1_site", StringType(), True),StructField("offsets", StringType(), True)]),True),
StructField("caller2", StructType([StructField("phone", StringType(), True), StructField("caller2_site", StringType(), True),StructField("offsets", StringType(), True)]),True)])
data = spark.createDataFrame(lines,schema)
DF = getpersonnums(data)
DF.show()
schema = StructType([
StructField("phone", StringType(), True),
StructField("name", StringType(), True),
StructField("idno", StringType(), True),
StructField("latitude", FloatType(), True),
StructField("longitude", FloatType(), True),
StructField("sex", StringType(), True),
StructField("blood_group", StringType(), True),
StructField("mail", StringType(), True),
StructField("job", StringType(), True),
StructField("province", StringType(), True),
StructField("city", StringType(), True),
StructField("birthdate", StringType(), True),
StructField("pricing_package", IntegerType(), True),
]
)
df = spark.read.csv(r"file:///home/hadoop/PycharmProjects/pythonProject/user.csv", encoding='utf-8',
header=True, schema=schema) # 使用指定的schema
df.show()
DF = DF.join(df,['phone'],"left_outer")
DF.show()
# 写到csv
file="/user/hive/warehouse/ylt.db/basic"
DF.write.csv(path=file, header=True, sep=",", mode='overwrite')
sc.stop() get birth
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import json
def save(path, data):
with open(path, 'w') as f:
f.write(data)
conf = SparkConf().setAppName("getbirth").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
file="/user/hive/warehouse/ylt.db/basic"
df = spark.read.csv(file, header=False, inferSchema=True).toDF("phone","timespan","name","idno","latitude","longitude","sex","blood_group","mail","job","province","city","birthdate","pricing_package");
df.show()
df = df.select("birthdate")
df = df.withColumn("birthdate",split(df['birthdate'],"/")[2].substr(1,4))
df.write.csv("/user/hive/warehouse/ylt.db/birth", mode="overwrite")
df = df.filter(df.birthdate.isNotNull())
save("/home/hadoop/datasave/birth.txt", json.dumps(df.collect()))
df.show()from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import json
def save(path, data):
with open(path, 'w') as f:
f.write(data)
conf = SparkConf().setAppName("getbirth").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
file="/user/hive/warehouse/ylt.db/basic"
df = spark.read.csv(file, header=False, inferSchema=True).toDF("phone","timespan","name","idno","latitude","longitude","sex","blood_group","mail","job","province","city","birthdate","pricing_package");
df.show()
df = df.select("birthdate")
df = df.withColumn("birthdate",split(df['birthdate'],"/")[2].substr(1,4))
df.write.csv("/user/hive/warehouse/ylt.db/birth", mode="overwrite")
df = df.filter(df.birthdate.isNotNull())
save("/home/hadoop/datasave/birth.txt", json.dumps(df.collect()))
df.show() getjob
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
def save(path, data):
with open(path, 'w') as f:
f.write(str(data))
def getcity_job(DF):
DF = DF.groupBy("job").count().toDF("job", "nums")
return DF.orderBy(-DF["nums"]).take(100)
if __name__ == "__main__":
conf = SparkConf().setAppName("getjob").set('spark.executor.memory', '10g').set("spark.executor.cores", '8').set(
"spark.driver.memory", '10g')
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
file = "/user/hive/warehouse/ylt.db/basic"
DF = spark.read.csv(file, header=False, inferSchema=True).toDF("phone", "timespan", "name", "idno", "latitude",
"longitude", "sex", "blood_group", "mail", "job",
"province", "city", "birthdate", "pricing_package");
DF.show()
DF = getcity_job(DF)
DF = DF.filter(DF.job.isNotNull())
DF.write.csv("/user/hive/warehouse/ylt.db/birth", mode="overwrite")
save("/home/hadoop/datasave/job.txt", DF)
sc.stop() getsex
import json
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
def save(path, data):
with open(path, 'w') as f:
f.write(data)
def getsex_blood(DF):
df = DF.groupBy("sex","blood_group").count().toDF("sex","blood_group","nums")
df.show()
return df
if __name__ == "__main__":
conf = SparkConf().setAppName("getsex").set('spark.executor.memory', '10g').set("spark.executor.cores",
'8').set("spark.driver.memory",
'10g')
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
file = "/user/hive/warehouse/ylt.db/basic"
DF = spark.read.csv(file, header=False, inferSchema=True).toDF("phone", "timespan", "name", "idno", "latitude",
"longitude", "sex", "blood_group", "mail", "job",
"province", "city", "birthdate", "pricing_package");
DF.show()
DF = getsex_blood(DF)
DF = DF.filter(DF.sex.isNotNull())
DF.write.csv("/user/hive/warehouse/ylt.db/sex", mode="overwrite")
save("/home/hadoop/datasave/sex.txt",json.dumps(DF.collect()))
sc.stop() getpayment
import json
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
def save(path, data):
with open(path, 'w') as f:
f.write(data)
def getpay(DF):
df1 = DF.where("pricing_package=1")
df2 = DF.where("pricing_package=2")
df3 = DF.where("pricing_package=3")
df4 = DF.where("pricing_package=4")
df1 = df1.withColumn("payment",5+df1.timespan/10)
df2 = df2.withColumn("payment",greatest(lit(10),(df2.timespan-30)+10)/20)
df3 = df3.withColumn("payment", greatest(lit(50), 0.03 * (df3.timespan - 50) + 50))
df4 = df4.withColumn("payment", greatest(0.03 * (df4.timespan - 100) + 41, 0.01 * df4.timespan + 40))
df4.show()
DF = df1.union(df2).union(df3).union(df4)
DF.show()
DF = DF.select(DF.payment)
return DF
if __name__ == "__main__":
conf = SparkConf().setAppName("getpayment").set('spark.executor.memory', '10g').set("spark.executor.cores",
'8').set("spark.driver.memory",
'10g')
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
file = "/user/hive/warehouse/ylt.db/basic"
DF = spark.read.csv(file, header=False, inferSchema=True).toDF("phone", "timespan", "name", "idno", "latitude",
"longitude", "sex", "blood_group", "mail", "job",
"province", "city", "birthdate", "pricing_package");
DF.show()
DF = getpay(DF)
DF = DF.filter(DF.payment.isNotNull())
DF.write.csv("/user/hive/warehouse/ylt.db/payment", mode="overwrite")
save("/home/hadoop/datasave/payment.txt", json.dumps(DF.collect()))
sc.stop() getcityallnums:get the moment all city in and out number
import json
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import os
def getcitycallnums(DF):
basedf1 = DF.where("callerflag=0")
basedf2 = DF.where("callerflag=1")
df1 = basedf1.groupBy("caller1.caller1_site").count().toDF("city","callout")
df2 = basedf2.groupBy("caller2.caller2_site").count().toDF("city","callout")
df3 = basedf1.groupBy("caller2.caller2_site").count().toDF("city","callin")
df4 = basedf2.groupBy("caller1.caller1_site").count().toDF("city","callin")
callerinDF = df1.union(df2).groupBy("city").sum().toDF("city","callout")
calleroutDF = df3.union(df4).groupBy("city").sum().toDF("city","callin")
finDF = callerinDF.join(calleroutDF,"city").toDF("city","callout","callin")
allcity = ["allcitydata",[list(i) for i in finDF.orderBy("callout").collect()]]
try:
print(allcity)
with open("/home/hadoop/datasave/allcity.txt","w") as fin:
for i in allcity[1]:
fin.write(",".join([str(j) for j in i])+'\n')
except Exception as e:
print(e)
if __name__ == "__main__":
memory = '10g'
pyspark_submit_args = ' --driver-memory ' + memory + ' --executor-memory ' + memory+' pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args
conf = SparkConf().setAppName("streamingkafka").set('spark.executor.memory', '10g').set("spark.executor.cores", '8')
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN") # 减少shell打印日志
ssc = StreamingContext(sc, 60) # 5秒的计算窗口
spark = SparkSession.builder.getOrCreate()
brokers='''10.132.221.111:6667,10.132.221.112:6667,10.132.221.113:6667,10.132.221.114:6667,10.132.221.116:6667,10.132.221.117:6667,10.132.221.118:6667,10.132.221.119:6667,10.132.221.120:6667,10.132.221.121:6667,10.132.221.123:6667,10.132.221.124:6667,10.132.221.125:6667,10.132.221.126:6667,10.132.221.127:6667,10.132.221.128:6667,10.132.221.129:6667,10.132.221.130:6667,10.132.221.132:6667'''
schema_telecomdata = StructType([StructField("callerflag", LongType(), True), StructField("timespan", LongType(), True),StructField("timestamp", LongType(), True),
StructField("caller1", StructType([StructField("phone", StringType(), True), StructField("caller1_site", StringType(), True),StructField("offsets", StringType(), True)]),True),
StructField("caller2", StructType([StructField("phone", StringType(), True), StructField("caller2_site", StringType(), True),StructField("offsets", StringType(), True)]),True)])
topic = 'telecom'
kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc,[topic] ,kafkaParams={"metadata.broker.list": brokers,"group.id":'18211170230ylt'})
kafka_streaming_rdd.map(lambda x: json.loads(x[1])).foreachRDD(
lambda x: getcitycallnums(spark.createDataFrame(x, schema_telecomdata)) )
ssc.start()
ssc.awaitTermination() getdaytotnums:get a whole day in and out number ,can save to local or hive
import json
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import pyhdfs
import time
def tohive(DF):
client = pyhdfs.HdfsClient("localhost,9000","hadoop")
nums = DF.count()
timec = int(time.time())
row = str(nums)+","+str(timec)+"\n"
client.append("/user/hive/warehouse/ylt.db/hourtotalnums/inputsource1.txt",row)
print(row)
def gettotalnums(DF):
nums = DF.count()
timec = int(time.time())
data = ["totalnums",[nums,timec]]
try:
print(data)
with open("/home/hadoop/datasave/daytotalnums.txt","w") as fin:
fin.write(",".join([str(i) for i in data[1]]))
except Exception as e:
print(e)
if __name__ == "__main__":
conf = SparkConf().setAppName("streamingkafka").set('spark.executor.memory', '10g').set("spark.executor.cores", '8')
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN") # 减少shell打印日志
ssc = StreamingContext(sc, 5)
spark = SparkSession.builder.getOrCreate()
brokers='''10.132.221.111:6667,10.132.221.112:6667,10.132.221.113:6667,10.132.221.114:6667,10.132.221.116:6667,10.132.221.117:6667,10.132.221.118:6667,10.132.221.119:6667,10.132.221.120:6667,10.132.221.121:6667,10.132.221.123:6667,10.132.221.124:6667,10.132.221.125:6667,10.132.221.126:6667,10.132.221.127:6667,10.132.221.128:6667,10.132.221.129:6667,10.132.221.130:6667,10.132.221.132:6667'''
schema_telecomdata = StructType([StructField("callerflag", LongType(), True), StructField("timespan", LongType(), True),StructField("timestamp", LongType(), True),
StructField("caller1", StructType([StructField("phone", StringType(), True), StructField("caller1_site", StringType(), True),StructField("offsets", StringType(), True)]),True),
StructField("caller2", StructType([StructField("phone", StringType(), True), StructField("caller2_site", StringType(), True),StructField("offsets", StringType(), True)]),True)])
topic = 'telecom'
kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc,[topic] ,kafkaParams={"metadata.broker.list": brokers,"group.id":'18211170223wlt_t2'})
# kafka_streaming_rdd.map(lambda x: json.loads(x[1])).foreachRDD(
# lambda x: gettotalnums(spark.createDataFrame(x, schema_telecomdata)) )
kafka_streaming_rdd.map(lambda x: json.loads(x[1])).foreachRDD(
lambda x: tohive(spark.createDataFrame(x, schema_telecomdata)))
ssc.start()
ssc.awaitTermination() gettotalnums:get the moment all num
import json
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import pyhdfs
import time
def tohive(DF):
client = pyhdfs.HdfsClient("localhost,9000","hadoop")
nums = DF.count()
timec = int(time.time())
row = str(nums)+","+str(timec)+"\n"
client.append("/user/hive/warehouse/ylt.db/calltotalnums/inputsource1.txt",row)
print(row)
def gettotalnums(DF):
DF = DF.where("callerflag = 1 or callerflag = 0")
DF = DF.where("timespan<=1000")
DF = DF.where("length(caller1.phone)=11")
DF = DF.where("length(caller2.phone)=11")
nums = DF.count()
timec = int(time.time())
data = ["totalnums",[nums,timec]]
try:
print(data)
with open("/home/hadoop/datasave/totalnums.txt","w") as fin:
fin.write(",".join([str(i) for i in data[1]]))
except Exception as e:
print(e)
if __name__ == "__main__":
conf = SparkConf().setAppName("streamingkafka").set('spark.executor.memory', '10g').set("spark.executor.cores", '8')
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN") # 减少shell打印日志
ssc = StreamingContext(sc, 5)
spark = SparkSession.builder.getOrCreate()
brokers='''10.132.221.111:6667,10.132.221.112:6667,10.132.221.113:6667,10.132.221.114:6667,10.132.221.116:6667,10.132.221.117:6667,10.132.221.118:6667,10.132.221.119:6667,10.132.221.120:6667,10.132.221.121:6667,10.132.221.123:6667,10.132.221.124:6667,10.132.221.125:6667,10.132.221.126:6667,10.132.221.127:6667,10.132.221.128:6667,10.132.221.129:6667,10.132.221.130:6667,10.132.221.132:6667'''
schema_telecomdata = StructType([StructField("callerflag", LongType(), True), StructField("timespan", LongType(), True),StructField("timestamp", LongType(), True),
StructField("caller1", StructType([StructField("phone", StringType(), True), StructField("caller1_site", StringType(), True),StructField("offsets", StringType(), True)]),True),
StructField("caller2", StructType([StructField("phone", StringType(), True), StructField("caller2_site", StringType(), True),StructField("offsets", StringType(), True)]),True)])
topic = 'telecom'
kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc,[topic] ,kafkaParams={"metadata.broker.list": brokers,"group.id":'18211170230ylt'})
kafka_streaming_rdd.map(lambda x: json.loads(x[1])).foreachRDD(
lambda x: gettotalnums(spark.createDataFrame(x, schema_telecomdata)) )
# kafka_streaming_rdd.map(lambda x: json.loads(x[1])).foreachRDD(
# lambda x: tohive(spark.createDataFrame(x, schema_telecomdata)))
ssc.start()
ssc.awaitTermination() drawjob
from pyecharts import options as opts
from pyecharts.charts import Pie,WordCloud
with open('/home/hadoop/datasave/job.txt','r') as fin:
data = fin.read()
data = data.replace("Row(","[")
data = data.replace(")","]")
data = data.replace("job=","")
data = data.replace("nums=","")
data = eval(data)
print(data)
c = (
WordCloud()
.add(series_name="岗位分析", data_pair=data, word_size_range=[20, 50])
.set_global_opts(
title_opts=opts.TitleOpts(
title="岗位分析", title_textstyle_opts=opts.TextStyleOpts(font_size=23)
),
tooltip_opts=opts.TooltipOpts(is_show=True),
)
.render("/home/hadoop/datasave/job.html")
) drawsex
from pyecharts import options as opts
from pyecharts.charts import Bar
lines = open('/home/hadoop/datasave/sex.txt','r')
for line in lines:
data = eval(line)
lines.close()
d1 = dict()
d2 = dict()
for i in data:
if i[0] == "F":
d1[i[1]] = i[2]
else:
d2[i[1]] = i[2]
d1 = sorted(d1.items(), key = lambda i: i[0])
d2 = sorted(d2.items(), key = lambda i: i[0])
key = []
v1 = []
v2 = []
for i in d1:
key.append(i[0])
v1.append(i[1])
for i in d2:
v2.append(i[1])
print(v1)
print(v2)
c = (
Bar()
.add_xaxis(key)
.add_yaxis("男性", v1)
.add_yaxis("女性", v2)
.set_global_opts(
title_opts=opts.TitleOpts(title="血型性别条形图", subtitle="男女不同血型的比较"),
brush_opts=opts.BrushOpts(),
)
.render("/home/hadoop/datasave/sex.html")
) drawpayment
from pyecharts import options as opts
from pyecharts.charts import Pie
lines = open('/home/hadoop/datasave/payment.txt','r')
for line in lines:
data = eval(line)
lines.close()
num1,num2,num3,num4 = 0,0,0,0
for i in data:
if i[0] <= 10:
num1+=1
elif i[0] <=30:
num2+= 1
elif i[0] <= 50:
num3 +=1
else:
num4 +=1
data = [['话费在0~10之间',num1],['话费在10~30之间',num2],["话费在30~50之间",num3],["话费超过50",num4]]
c = (
Pie()
.add(
"",
data,
radius=["40%", "55%"],
label_opts=opts.LabelOpts(
position="outside",
formatter="{a|{a}}{abg|}\n{hr|}\n {b|{b}: }{c} {per|{d}%} ",
background_color="#eee",
border_color="#aaa",
border_width=1,
border_radius=4,
rich={
"a": {"color": "#999", "lineHeight": 22, "align": "center"},
"abg": {
"backgroundColor": "#e3e3e3",
"width": "100%",
"align": "right",
"height": 22,
"borderRadius": [4, 4, 0, 0],
},
"hr": {
"borderColor": "#aaa",
"width": "100%",
"borderWidth": 0.5,
"height": 0,
},
"b": {"fontSize": 16, "lineHeight": 33},
"per": {
"color": "#eee",
"backgroundColor": "#334455",
"padding": [2, 4],
"borderRadius": 2,
},
},
),
)
.set_global_opts(title_opts=opts.TitleOpts(title="话费构成"))
.render("/home/hadoop/datasave/payment.html")
) drawyear
from pyecharts import options as opts
from pyecharts.charts import Pie,Funnel
lines = open('/home/hadoop/datasave/birth.txt','r')
for line in lines:
data = eval(line)
lines.close()
n1,n2,n3,n4,n5,n6,n7,n8 = 0,0,0,0,0,0,0,0
for i in data:
if abs(int(i[0])-2000) <= 10:
n1+=1
elif abs(int(i[0])-2000) <= 20:
n2+=1
elif abs(int(i[0])-2000) <= 30:
n3+=1
elif abs(int(i[0])-2000) <= 40:
n4+=1
elif abs(int(i[0])-2000) <= 50:
n5+=1
elif abs(int(i[0])-2000) <= 60:
n6+=1
elif abs(int(i[0])-2000) <= 70:
n7+=1
else:
n8+=1
print(n1,n2,n3,n4,n5,n6,n7,n8)
data = [["0~10岁",n1],["10~20岁",n2],["20~30岁",n3],["30~40岁",n4],["40~50岁",n5],["50~60岁",n6],["60~70岁",n7],["超过70岁",n8]]
c = (
Funnel()
.add(
"年龄",
data,
label_opts=opts.LabelOpts(position="inside"),
)
.set_global_opts(title_opts=opts.TitleOpts(title="Funnel-Label(inside)"))
.render("/home/hadoop/datasave/year.html")
) 
京公网安备 11010502036488号