json读文件:https://zhuanlan.zhihu.com/p/267353998

pyspark配置:http://dblab.xmu.edu.cn/blog/1689-2/

数据分析案例:http://dblab.xmu.edu.cn/blog/2738-2/

pycharm配置pyspark环境:https://blog.csdn.net/ringsuling/article/details/84448369

Crontab+Flume+Kafka+Spark Streaming+Spring Boot 统计网页访问量项目:https://blog.csdn.net/qq_36329973/article/details/104738993?utm_medium=distribute.pc_relevant.none-task-blog-OPENSEARCH-8.not_use_machine_learn_pai&depth_1-utm_source=distribute.pc_relevant.none-task-blog-OPENSEARCH-8.not_use_machine_learn_pai

pyspark dataframe基本操作:https://www.jianshu.com/p/acd96549ee15
https://www.cnblogs.com/cxhzy/p/11067246.html

使用Python串口实时显示数据并绘图:https://zhuanlan.zhihu.com/p/100798858

pyecharts:https://pyecharts.org/#/zh-cn/web_flask
pyecharts-gallery:https://gallery.pyecharts.org/#/Bar/bar_markline_type
echarts:https://echarts.apache.org/examples/en/index.html#chart-type-bar

解决Django运行报错Error: That port is already in use.:https://www.jianshu.com/p/7ceebb422d09

pip install djangorestframework

写入数据:

import sys
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf

if __name__ == "__main__":
    conf = SparkConf().setAppName("streamingkafka").set('spark.executor.memory', '10g').set("spark.executor.cores", '8')
    sc = SparkContext(conf=conf)
    sc.setLogLevel("WARN") # 减少shell打印日志
    ssc = StreamingContext(sc, 5) # 5秒的计算窗口
    brokers='''10.132.221.111:6667,10.132.221.112:6667,10.132.221.113:6667,10.132.221.114:6667,10.132.221.116:6667,10.132.221.117:6667,10.132.221.118:6667,10.132.221.119:6667,10.132.221.120:6667,10.132.221.121:6667,10.132.221.123:6667,10.132.221.124:6667,10.132.221.125:6667,10.132.221.126:6667,10.132.221.127:6667,10.132.221.128:6667,10.132.221.129:6667,10.132.221.130:6667,10.132.221.132:6667'''

    topic = 'telecom'
    kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc,[topic] ,kafkaParams={"metadata.broker.list": brokers,"group.id":'18211170230ylt'})
    kafka_streaming_rdd.map(lambda x:x[1]).foreachRDD(lambda x:x.saveAsTextFile("file:///home/hadoop/rdd/file" + str(int(time.time()))))

    ssc.start()
    ssc.awaitTermination()

写入数据:

from pyspark import SparkContext
from pyspark.sql import SparkSession
import json

if __name__ == "__main__":
    sc = SparkContext( 'local', 'test')
    sc.setLogLevel("WARN")
    spark = SparkSession.builder.getOrCreate()
    file = "file:///home/hadoop/rdd/file1605959588"
    df1 = spark.read.json(file)
    df1.show()

读入数据并存为df类型

import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql import SparkSession

if __name__ == "__main__":
    conf = SparkConf().setAppName("streamingkafka").set('spark.executor.memory', '10g').set("spark.executor.cores", '8')
    sc = SparkContext(conf=conf)
    sc.setLogLevel("WARN") # 减少shell打印日志
    ssc = StreamingContext(sc, 5) # 5秒的计算窗口
    spark = SparkSession.builder.getOrCreate()
    brokers='''10.132.221.111:6667,10.132.221.112:6667,10.132.221.113:6667,10.132.221.114:6667,10.132.221.116:6667,10.132.221.117:6667,10.132.221.118:6667,10.132.221.119:6667,10.132.221.120:6667,10.132.221.121:6667,10.132.221.123:6667,10.132.221.124:6667,10.132.221.125:6667,10.132.221.126:6667,10.132.221.127:6667,10.132.221.128:6667,10.132.221.129:6667,10.132.221.130:6667,10.132.221.132:6667'''

    schema_telecomdata = StructType([StructField("callerflag", LongType(), True), StructField("timespan", LongType(), True),StructField("timestamp", LongType(), True),
                         StructField("caller1", StructType([StructField("phone", StringType(), True), StructField("caller1_site", StringType(), True),StructField("offsets", StringType(), True)]),True),
                         StructField("caller2", StructType([StructField("phone", StringType(), True), StructField("caller2_site", StringType(), True),StructField("offsets", StringType(), True)]),True)])
    schema_fin = StructType([StructField("city", StringType(), True), StructField("nums", LongType(), True)])

    sumDF = spark.createDataFrame(spark.sparkContext.emptyRDD(),schema_telecomdata)

    topic = 'telecom'
    kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc,[topic] ,kafkaParams={"metadata.broker.list": brokers,"group.id":'18211170230ylt'})
    kafka_streaming_rdd.map(lambda x: json.loads(x[1])).foreachRDD(lambda x: spark.createDataFrame(x,schema_telecomdata).groupBy("caller1.caller1_site").count().show())

    ssc.start()
    ssc.awaitTermination()

渲染
index.html

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>Awesome-pyecharts</title>
    <script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script>
    <script type="text/javascript" src="https://assets.pyecharts.org/assets/echarts.min.js"></script>

</head>
<body>
    <div id="bar" style="width:1000px; height:600px;"></div>
    <script>
        var chart = echarts.init(document.getElementById('bar'), 'white', {renderer: 'canvas'});

        $(
            function () {
                fetchData(chart);
                setInterval(fetchData, 2000);
            }
        );

        function fetchData() {
            $.ajax({
                type: "GET",
                url: "http://127.0.0.1:8000/demo/bar",
                dataType: 'json',
                success: function (result) {
                    chart.setOption(result.data);
                }
            });
        }
    </script>
</body>
</html>

index1.html

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>Awesome-pyecharts</title>
    <script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script>
    <script type="text/javascript" src="https://assets.pyecharts.org/assets/echarts.min.js"></script>

</head>
<body>
    <div id="bar1" style="width:1000px; height:600px;"></div>
    <script>
        var chart = echarts.init(document.getElementById('bar1'), 'white', {renderer: 'canvas'});

        $(
            function () {
                fetchData(chart);
                setInterval(fetchData, 2000);
            }
        );

        function fetchData() {
            $.ajax({
                type: "GET",
                url: "http://127.0.0.1:8000/demo/bar1",
                dataType: 'json',
                success: function (result) {
                    chart.setOption(result.data);
                }
            });
        }
    </script>
</body>
</html>

demo/urls.py

# demo/urls.py
from django.conf.urls import url
from . import views

urlpatterns = [
    url(r'^bar/$', views.ChartView.as_view(), name='demo'),
    url(r'^index/$', views.IndexView.as_view(), name='demo'),
    url(r'^bar1/$', views.ChartView1.as_view(), name='demo'),
    url(r'^index1/$', views.IndexView1.as_view(), name='demo'),
]

views.py

import json
from random import randrange

from django.http import HttpResponse
from rest_framework.views import APIView

from pyecharts.charts import Bar,Gauge
from pyecharts import options as opts
from pyecharts.faker import Faker

import random


# Create your views here.
def response_as_json(data):
    json_str = json.dumps(data)
    response = HttpResponse(
        json_str,
        content_type="application/json",
    )
    response["Access-Control-Allow-Origin"] = "*"
    return response


def json_response(data, code=200):
    data = {
        "code": code,
        "msg": "success",
        "data": data,
    }
    return response_as_json(data)


def json_error(error_string="error", code=500, **kwargs):
    data = {
        "code": code,
        "msg": error_string,
        "data": {}
    }
    data.update(kwargs)
    return response_as_json(data)


JsonResponse = json_response
JsonError = json_error



def bar_base() -> Bar:
    data = [['乌海市', 90, 110], ['锡林郭勒盟', 99, 98], ['阿拉善盟', 99, 114], ['包头市', 99, 109], ['乌兰察布市', 99, 122],
            ['兴安盟', 99, 135], ['鄂尔多斯市', 102, 93], ['丽江市', 106, 107], ['呼伦贝尔市', 106, 100], ['赤峰市', 107, 104],
            ['昭通市', 110, 111], ['呼和浩特市', 113, 92], ['通辽市', 113, 110], ['巴彦淖尔市', 115, 103], ['红河哈尼族彝族自治州', 115, 114],
            ['葫芦岛市', 116, 143], ['锦州市', 119, 116], ['文山壮族苗族自治州', 120, 132], ['营口市', 121, 140], ['昆明市', 122, 125],
            ['楚雄彝族自治州', 122, 127], ['西双版纳傣族自治州', 123, 119], ['铁岭市', 124, 148], ['普洱市', 124, 131],
            ['怒江傈僳族自治州', 124, 129], ['德宏傣族景颇族自治州', 126, 144], ['玉溪市', 126, 126], ['沈阳市', 126, 142], ['阜新市', 132, 141],
            ['曲靖市', 132, 122], ['临沧市', 134, 131], ['丹东市', 136, 134], ['盘锦市', 137, 133], ['焦作市', 138, 141],
            ['商丘市', 139, 162], ['雅安市', 139, 171], ['大理白族自治州', 139, 121], ['本溪市', 140, 127], ['保山市', 140, 121],
            ['抚顺市', 141, 148], ['迪庆藏族自治州', 141, 153], ['鞍山市', 142, 129], ['自贡市', 147, 148], ['驻马店市', 149, 164],
            ['遂宁市', 150, 166], ['安阳市', 150, 153], ['漯河市', 151, 164], ['平顶山市', 152, 162], ['甘孜藏族自治州', 152, 132],
            ['资阳市', 152, 153], ['乐山市', 154, 158], ['周口市', 155, 169], ['凉山彝族自治州', 155, 151], ['大连市', 157, 121],
            ['朝阳市', 157, 157], ['新乡市', 157, 144], ['阿坝藏族羌族自治州', 158, 165], ['广元市', 159, 172], ['许昌市', 160, 172],
            ['巴中市', 160, 165], ['德阳市', 160, 171], ['广安市', 160, 176], ['郑州市', 163, 138], ['辽阳市', 164, 147],
            ['信阳市', 164, 165], ['濮阳市', 166, 165], ['三门峡市', 166, 150], ['宜宾市', 166, 147], ['绵阳市', 166, 171],
            ['攀枝花市', 167, 150], ['开封市', 167, 147], ['南充市', 168, 160], ['酒泉市', 169, 224], ['南阳市', 169, 191],
            ['泸州市', 170, 165], ['内江市', 171, 166], ['眉山市', 173, 154], ['洛阳市', 178, 163], ['鹤壁市', 180, 155],
            ['成都市', 180, 155], ['嘉峪关市', 181, 201], ['定西市', 187, 173], ['天水市', 193, 223], ['武威市', 194, 192],
            ['南宁市', 196, 199], ['达州市', 196, 188], ['平凉市', 197, 196], ['甘南藏族自治州', 197, 197], ['衡阳市', 197, 206],
            ['兰州市', 197, 237], ['崇左市', 198, 206], ['临夏回族自治州', 198, 201], ['益阳市', 199, 224], ['株洲市', 199, 211],
            ['白银市', 199, 193], ['陇南市', 200, 178], ['长沙市', 201, 191], ['永州市', 201, 238], ['湘西土家族苗族自治州', 203, 218],
            ['北海市', 204, 223], ['邵阳市', 206, 225], ['襄阳市', 208, 218], ['恩施土家族苗族自治州', 208, 208], ['黄石市', 209, 215],
            ['玉林市', 210, 225], ['荆州市', 210, 222], ['金昌市', 211, 193], ['张掖市', 211, 211], ['张家界市', 211, 209],
            ['湘潭市', 212, 218], ['承德市', 214, 207], ['常德市', 214, 196], ['绥化市', 214, 254], ['厦门市', 214, 230],
            ['钦州市', 214, 212], ['岳阳市', 215, 204], ['宜昌市', 215, 212], ['贵港市', 215, 212], ['贺州市', 215, 227],
            ['防城港市', 217, 211], ['庆阳市', 217, 190], ['黑河市', 218, 253], ['柳州市', 219, 209], ['郴州市', 219, 200],
            ['保定市', 219, 245], ['来宾市', 219, 230], ['十堰市', 219, 224], ['梧州市', 219, 227], ['喀什地区', 220, 243],
            ['邯郸市', 220, 239], ['克拉玛依市', 222, 245], ['秦皇岛市', 224, 239], ['孝感市', 224, 222], ['随州市', 225, 227],
            ['石家庄市', 227, 232], ['桂林市', 227, 191], ['云浮市', 228, 237], ['娄底市', 229, 199], ['宁德市', 229, 247],
            ['衡水市', 230, 227], ['河源市', 230, 263], ['百色市', 231, 205], ['怀化市', 231, 204], ['福州市', 231, 257],
            ['沧州市', 232, 225], ['泉州市', 232, 252], ['黄冈市', 234, 239], ['龙岩市', 234, 221], ['鄂州市', 235, 225],
            ['巴音郭楞蒙古自治州', 235, 241], ['博尔塔拉蒙古自治州', 236, 232], ['廊坊市', 236, 239], ['荆门市', 237, 215], ['和田地区', 237, 244],
            ['阿勒泰地区', 237, 233], ['双鸭山市', 237, 243], ['乌鲁木齐市', 237, 258], ['七台河市', 238, 220], ['肇庆市', 238, 289],
            ['珠海市', 238, 281], ['南平市', 240, 235], ['邢台市', 240, 228], ['塔城地区', 240, 251], ['莆田市', 240, 236],
            ['清远市', 240, 253], ['鸡西市', 240, 253], ['武汉市', 241, 242], ['唐山市', 242, 219], ['亳州市', 242, 263],
            ['咸宁市', 243, 222], ['湛江市', 244, 240], ['汕尾市', 244, 254], ['景德镇市', 245, 274], ['哈密市', 245, 267],
            ['阿克苏地区', 245, 227], ['伊春市', 246, 253], ['吐鲁番市', 247, 256], ['漳州市', 248, 209], ['茂名市', 248, 262],
            ['齐齐哈尔市', 249, 245], ['鹰潭市', 249, 304], ['佛山市', 250, 264], ['佳木斯市', 250, 245], ['鹤岗市', 250, 241],
            ['马鞍山市', 251, 249], ['昌吉回族自治州', 251, 224], ['大兴安岭地区', 252, 246], ['克孜勒苏柯尔克孜自治州', 253, 246],
            ['河池市', 253, 247], ['东莞市', 254, 275], ['中山市', 254, 239], ['梅州市', 257, 256], ['淮北市', 258, 320],
            ['海南藏族自治州', 258, 298], ['潮州市', 258, 245], ['揭阳市', 259, 245], ['三明市', 259, 227], ['南昌市', 260, 271],
            ['新余市', 262, 277], ['哈尔滨市', 263, 253], ['蚌埠市', 263, 303], ['张家口市', 263, 244], ['阳江市', 264, 242],
            ['六安市', 264, 293], ['海西蒙古族藏族自治州', 265, 273], ['萍乡市', 265, 272], ['大庆市', 266, 256], ['上饶市', 266, 283],
            ['惠州市', 267, 231], ['江门市', 268, 292], ['果洛藏族自治州', 269, 267], ['淮南市', 269, 263], ['铜陵市', 269, 302],
            ['伊犁哈萨克自治州', 269, 240], ['韶关市', 270, 235], ['滁州市', 273, 285], ['池州市', 277, 267], ['西宁市', 277, 295],
            ['宿州市', 278, 251], ['赣州市', 278, 246], ['玉树藏族自治州', 279, 265], ['汕头市', 280, 284], ['牡丹江市', 281, 237],
            ['宜春市', 283, 252], ['合肥市', 283, 297], ['黄山市', 283, 274], ['广州市', 286, 249], ['黄南藏族自治州', 286, 289],
            ['深圳市', 288, 231], ['安庆市', 289, 270], ['九江市', 291, 259], ['吉安市', 292, 301], ['抚州市', 293, 288],
            ['阜阳市', 294, 271], ['芜湖市', 297, 251], ['宣城市', 299, 259], ['威海市', 300, 331], ['济宁市', 301, 281],
            ['海东市', 304, 272], ['日照市', 306, 321], ['烟台市', 308, 323], ['东营市', 309, 299], ['泰安市', 309, 309],
            ['青岛市', 309, 294], ['海北藏族自治州', 311, 310], ['济南市', 313, 341], ['菏泽市', 316, 333], ['枣庄市', 317, 336],
            ['淄博市', 319, 343], ['莱芜市', 321, 330], ['潍坊市', 331, 291], ['滨州市', 332, 336], ['聊城市', 338, 326],
            ['德州市', 338, 322], ['阿里地区', 338, 358], ['湖州市', 339, 344], ['临沂市', 341, 302], ['那曲市', 341, 342],
            ['林芝市', 341, 367], ['吴忠市', 346, 347], ['衢州市', 346, 367], ['丽水市', 349, 359], ['嘉兴市', 353, 365],
            ['拉萨市', 355, 362], ['固原市', 359, 373], ['银川市', 364, 357], ['中卫市', 364, 368], ['温州市', 364, 379],
            ['石嘴山市', 364, 348], ['日喀则市', 366, 343], ['舟山市', 367, 346], ['山南市', 370, 355], ['台州市', 374, 346],
            ['金华市', 375, 367], ['宁波市', 376, 386], ['昌都市', 379, 343], ['忻州市', 380, 428], ['太原市', 382, 433],
            ['杭州市', 392, 377], ['绍兴市', 393, 394], ['淮安市', 398, 441], ['运城市', 399, 411], ['连云港市', 402, 464],
            ['吕梁市', 405, 422], ['海口市', 405, 413], ['苏州市', 413, 418], ['泰州市', 414, 446], ['儋州市', 417, 391],
            ['延边朝鲜族自治州', 420, 430], ['临汾市', 421, 425], ['三沙市', 421, 417], ['辽源市', 421, 402], ['阳泉市', 423, 462],
            ['通化市', 423, 438], ['晋中市', 424, 418], ['宿迁市', 425, 472], ['晋城市', 427, 433], ['吉林市', 432, 452],
            ['白山市', 435, 429], ['松原市', 437, 468], ['徐州市', 438, 472], ['三亚市', 439, 443], ['盐城市', 447, 465],
            ['白城市', 447, 425], ['长春市', 449, 445], ['长治市', 451, 404], ['扬州市', 455, 451], ['四平市', 456, 394],
            ['无锡市', 463, 429], ['南京市', 463, 398], ['南通市', 469, 404], ['朔州市', 470, 410], ['大同市', 472, 413],
            ['镇江市', 486, 437], ['常州市', 486, 432], ['宝鸡市', 560, 586], ['铜川市', 567, 590], ['铜仁市', 569, 592],
            ['榆林市', 579, 608], ['汉中市', 589, 614], ['安康市', 596, 618], ['黔南布依族苗族自治州', 598, 655], ['黔东南苗族侗族自治州', 604, 608],
            ['商洛市', 605, 552], ['西安市', 606, 586], ['贵阳市', 608, 622], ['咸阳市', 614, 596], ['安顺市', 621, 618],
            ['延安市', 623, 578], ['渭南市', 623, 601], ['六盘水市', 633, 547], ['黔西南布依族苗族自治州', 636, 621], ['毕节市', 639, 591],
            ['遵义市', 642, 654], ['重庆市', 2654, 2665], ['澳门', 3115, 3077], ['上海市', 4307, 4305], ['天津市', 4398, 4381],
            ['北京市', 4566, 4623], ['香港', 5159, 5121]]
    usedata = [random.choice(data) for i in range(10)]
    print(usedata)
    c = (
        Bar()
        .add_xaxis([i[0] for i in usedata])
        .add_yaxis("呼入", [i[1] for i in usedata])
        .add_yaxis("呼出", [i[2] for i in usedata])
        .set_global_opts(title_opts=opts.TitleOpts(title="每日城市的呼入呼出量", subtitle="我是副标题"),
                         datazoom_opts=opts.DataZoomOpts(type_="inside"))
        .dump_options_with_quotes()
    )
    return c

def bar_diff() -> Bar:
    c = (
        Bar()
            .add_xaxis(Faker.choose())
            .add_yaxis("商家A", Faker.values())
            .add_yaxis("商家B", Faker.values())
            .set_global_opts(title_opts=opts.TitleOpts(title="Bar-MarkLine(指定类型)"))
            .set_series_opts(
            label_opts=opts.LabelOpts(is_show=False),
            markline_opts=opts.MarkLineOpts(
                data=[
                    opts.MarkLineItem(type_="min", name="最小值"),
                    opts.MarkLineItem(type_="max", name="最大值"),
                    opts.MarkLineItem(type_="average", name="平均值"),
                ]
            ),
        )
        .dump_options_with_quotes()
    )
    return c


def gauge_base() -> Gauge:
    data = [[385293, 1606039296], [368545, 1606039351], [377871, 1606039413], [373240, 1606039470],
            [381710, 1606039117], [377038, 1606039053], [375771, 1606038755]]

    c = (
        Gauge()
            .add(
            "当前通话频度",
            [("通话频度", random.choice(data)[0] / 10000)],
            axisline_opts=opts.AxisLineOpts(
                linestyle_opts=opts.LineStyleOpts(
                    color=[(0.3, "#67e0e3"), (0.7, "#37a2da"), (1, "#fd666d")], width=30
                )
            ), detail_label_opts=opts.LabelOpts(formatter="{value}万"),
        )
            .set_global_opts(
            title_opts=opts.TitleOpts(title="当前通话频度"),
            legend_opts=opts.LegendOpts(is_show=False),
        )
            .dump_options_with_quotes()
    )

    return c

class ChartView(APIView):
    def get(self, request, *args, **kwargs):
        # return JsonResponse(json.loads(bar_base()))
        return JsonResponse(json.loads(bar_base()))


class IndexView(APIView):
    def get(self, request, *args, **kwargs):
        return HttpResponse(content=open("./templates/index.html").read())


class ChartView1(APIView):
    def get(self, request, *args, **kwargs):
        # return JsonResponse(json.loads(bar_base()))
        return JsonResponse(json.loads(gauge_base()))


class IndexView1(APIView):
    def get(self, request, *args, **kwargs):
        return HttpResponse(content=open("./templates/index1.html").read())

join table and save to hive

import json
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf


def getpersonnums(DF):
    DF = DF.where("callerflag = 1 or callerflag = 0")
    DF = DF.where("timespan<=1000")
    DF = DF.where("length(caller1.phone)=11")
    DF = DF.where("length(caller2.phone)=11")
    df1 = DF.groupBy("caller1.phone").sum("timespan").toDF("caller1.phone","timespan");
    df1 = df1.withColumnRenamed("caller1.phone","phone");
    # df1.show();

    df2 = DF.groupBy("caller2.phone").sum("timespan").toDF("caller2.phone", "timespan");
    df2 = df2.withColumnRenamed("caller2.phone", "phone");

    df = df1.union(df2).groupBy("phone").sum("timespan")
    df = df.withColumnRenamed("sum(timespan)","timespan");
    return df


if __name__ == "__main__":
    conf = SparkConf().setAppName("save_hive").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    spark = SparkSession.builder.getOrCreate()

    lines = sc.textFile("file:///home/hadoop/data.txt").map(lambda x:json.loads(x.replace("'",'"')))
    schema = StructType([StructField("callerflag", LongType(), True), StructField("timespan", LongType(), True),StructField("timestamp", LongType(), True),
                         StructField("caller1", StructType([StructField("phone", StringType(), True), StructField("caller1_site", StringType(), True),StructField("offsets", StringType(), True)]),True),
                         StructField("caller2", StructType([StructField("phone", StringType(), True), StructField("caller2_site", StringType(), True),StructField("offsets", StringType(), True)]),True)])

    data = spark.createDataFrame(lines,schema)
    DF = getpersonnums(data)
    DF.show()

    schema = StructType([
        StructField("phone", StringType(), True),
        StructField("name", StringType(), True),
        StructField("idno", StringType(), True),
        StructField("latitude", FloatType(), True),
        StructField("longitude", FloatType(), True),
        StructField("sex", StringType(), True),
        StructField("blood_group", StringType(), True),
        StructField("mail", StringType(), True),
        StructField("job", StringType(), True),
        StructField("province", StringType(), True),
        StructField("city", StringType(), True),
        StructField("birthdate", StringType(), True),
        StructField("pricing_package", IntegerType(), True),
    ]
    )

    df = spark.read.csv(r"file:///home/hadoop/PycharmProjects/pythonProject/user.csv", encoding='utf-8',
                        header=True, schema=schema)  # 使用指定的schema
    df.show()

    DF = DF.join(df,['phone'],"left_outer")
    DF.show()

    # 写到csv
    file="/user/hive/warehouse/ylt.db/basic"
    DF.write.csv(path=file, header=True, sep=",", mode='overwrite')
    sc.stop()

get birth

from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import json

def save(path, data):
    with open(path, 'w') as f:
        f.write(data)


conf = SparkConf().setAppName("getbirth").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
file="/user/hive/warehouse/ylt.db/basic"
df = spark.read.csv(file, header=False, inferSchema=True).toDF("phone","timespan","name","idno","latitude","longitude","sex","blood_group","mail","job","province","city","birthdate","pricing_package");
df.show()

df = df.select("birthdate")
df = df.withColumn("birthdate",split(df['birthdate'],"/")[2].substr(1,4))
df.write.csv("/user/hive/warehouse/ylt.db/birth", mode="overwrite")
df = df.filter(df.birthdate.isNotNull())
save("/home/hadoop/datasave/birth.txt", json.dumps(df.collect()))
df.show()from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import json

def save(path, data):
    with open(path, 'w') as f:
        f.write(data)


conf = SparkConf().setAppName("getbirth").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
file="/user/hive/warehouse/ylt.db/basic"
df = spark.read.csv(file, header=False, inferSchema=True).toDF("phone","timespan","name","idno","latitude","longitude","sex","blood_group","mail","job","province","city","birthdate","pricing_package");
df.show()

df = df.select("birthdate")
df = df.withColumn("birthdate",split(df['birthdate'],"/")[2].substr(1,4))
df.write.csv("/user/hive/warehouse/ylt.db/birth", mode="overwrite")
df = df.filter(df.birthdate.isNotNull())
save("/home/hadoop/datasave/birth.txt", json.dumps(df.collect()))
df.show()

getjob

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *


def save(path, data):
    with open(path, 'w') as f:
        f.write(str(data))


def getcity_job(DF):
    DF = DF.groupBy("job").count().toDF("job", "nums")
    return DF.orderBy(-DF["nums"]).take(100)


if __name__ == "__main__":
    conf = SparkConf().setAppName("getjob").set('spark.executor.memory', '10g').set("spark.executor.cores", '8').set(
        "spark.driver.memory", '10g')
    sc = SparkContext(conf=conf)
    spark = SparkSession.builder.getOrCreate()

    file = "/user/hive/warehouse/ylt.db/basic"
    DF = spark.read.csv(file, header=False, inferSchema=True).toDF("phone", "timespan", "name", "idno", "latitude",
                                                                   "longitude", "sex", "blood_group", "mail", "job",
                                                                   "province", "city", "birthdate", "pricing_package");
    DF.show()
    DF = getcity_job(DF)
    DF = DF.filter(DF.job.isNotNull())
    DF.write.csv("/user/hive/warehouse/ylt.db/birth", mode="overwrite")
    save("/home/hadoop/datasave/job.txt", DF)
    sc.stop()

getsex

import json
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf


def save(path, data):
    with open(path, 'w') as f:
        f.write(data)


def getsex_blood(DF):
    df = DF.groupBy("sex","blood_group").count().toDF("sex","blood_group","nums")
    df.show()

    return df


if __name__ == "__main__":
    conf = SparkConf().setAppName("getsex").set('spark.executor.memory', '10g').set("spark.executor.cores",
                                                                                        '8').set("spark.driver.memory",
                                                                                                 '10g')
    sc = SparkContext(conf=conf)
    spark = SparkSession.builder.getOrCreate()

    file = "/user/hive/warehouse/ylt.db/basic"
    DF = spark.read.csv(file, header=False, inferSchema=True).toDF("phone", "timespan", "name", "idno", "latitude",
                                                                   "longitude", "sex", "blood_group", "mail", "job",
                                                                   "province", "city", "birthdate", "pricing_package");
    DF.show()
    DF = getsex_blood(DF)
    DF = DF.filter(DF.sex.isNotNull())
    DF.write.csv("/user/hive/warehouse/ylt.db/sex", mode="overwrite")
    save("/home/hadoop/datasave/sex.txt",json.dumps(DF.collect()))
    sc.stop()

getpayment

import json
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

def save(path, data):
    with open(path, 'w') as f:
        f.write(data)

def getpay(DF):
    df1 = DF.where("pricing_package=1")
    df2 = DF.where("pricing_package=2")
    df3 = DF.where("pricing_package=3")
    df4 = DF.where("pricing_package=4")

    df1 = df1.withColumn("payment",5+df1.timespan/10)
    df2 = df2.withColumn("payment",greatest(lit(10),(df2.timespan-30)+10)/20)
    df3 = df3.withColumn("payment", greatest(lit(50), 0.03 * (df3.timespan - 50) + 50))
    df4 = df4.withColumn("payment", greatest(0.03 * (df4.timespan - 100) + 41, 0.01 * df4.timespan + 40))
    df4.show()

    DF = df1.union(df2).union(df3).union(df4)
    DF.show()
    DF = DF.select(DF.payment)
    return DF

if __name__ == "__main__":
    conf = SparkConf().setAppName("getpayment").set('spark.executor.memory', '10g').set("spark.executor.cores",
                                                                                    '8').set("spark.driver.memory",
                                                                                             '10g')
    sc = SparkContext(conf=conf)
    spark = SparkSession.builder.getOrCreate()

    file = "/user/hive/warehouse/ylt.db/basic"
    DF = spark.read.csv(file, header=False, inferSchema=True).toDF("phone", "timespan", "name", "idno", "latitude",
                                                                   "longitude", "sex", "blood_group", "mail", "job",
                                                                   "province", "city", "birthdate", "pricing_package");
    DF.show()
    DF = getpay(DF)
    DF = DF.filter(DF.payment.isNotNull())
    DF.write.csv("/user/hive/warehouse/ylt.db/payment", mode="overwrite")
    save("/home/hadoop/datasave/payment.txt", json.dumps(DF.collect()))
    sc.stop()

getcityallnums:get the moment all city in and out number

import json
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import os

def getcitycallnums(DF):
    basedf1 = DF.where("callerflag=0")
    basedf2 = DF.where("callerflag=1")
    df1 = basedf1.groupBy("caller1.caller1_site").count().toDF("city","callout")
    df2 = basedf2.groupBy("caller2.caller2_site").count().toDF("city","callout")

    df3 = basedf1.groupBy("caller2.caller2_site").count().toDF("city","callin")
    df4 = basedf2.groupBy("caller1.caller1_site").count().toDF("city","callin")

    callerinDF = df1.union(df2).groupBy("city").sum().toDF("city","callout")
    calleroutDF = df3.union(df4).groupBy("city").sum().toDF("city","callin")

    finDF = callerinDF.join(calleroutDF,"city").toDF("city","callout","callin")
    allcity = ["allcitydata",[list(i) for i in finDF.orderBy("callout").collect()]]
    try:
        print(allcity)
        with open("/home/hadoop/datasave/allcity.txt","w") as fin:
            for i in allcity[1]:
                fin.write(",".join([str(j) for j in i])+'\n')
    except Exception as e:
        print(e)

if __name__ == "__main__":
    memory = '10g'
    pyspark_submit_args = ' --driver-memory ' + memory + ' --executor-memory ' + memory+' pyspark-shell'
    os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

    conf = SparkConf().setAppName("streamingkafka").set('spark.executor.memory', '10g').set("spark.executor.cores", '8')
    sc = SparkContext(conf=conf)
    sc.setLogLevel("WARN") # 减少shell打印日志
    ssc = StreamingContext(sc, 60) # 5秒的计算窗口
    spark = SparkSession.builder.getOrCreate()
    brokers='''10.132.221.111:6667,10.132.221.112:6667,10.132.221.113:6667,10.132.221.114:6667,10.132.221.116:6667,10.132.221.117:6667,10.132.221.118:6667,10.132.221.119:6667,10.132.221.120:6667,10.132.221.121:6667,10.132.221.123:6667,10.132.221.124:6667,10.132.221.125:6667,10.132.221.126:6667,10.132.221.127:6667,10.132.221.128:6667,10.132.221.129:6667,10.132.221.130:6667,10.132.221.132:6667'''

    schema_telecomdata = StructType([StructField("callerflag", LongType(), True), StructField("timespan", LongType(), True),StructField("timestamp", LongType(), True),
                         StructField("caller1", StructType([StructField("phone", StringType(), True), StructField("caller1_site", StringType(), True),StructField("offsets", StringType(), True)]),True),
                         StructField("caller2", StructType([StructField("phone", StringType(), True), StructField("caller2_site", StringType(), True),StructField("offsets", StringType(), True)]),True)])
    topic = 'telecom'
    kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc,[topic] ,kafkaParams={"metadata.broker.list": brokers,"group.id":'18211170230ylt'})
    kafka_streaming_rdd.map(lambda x: json.loads(x[1])).foreachRDD(
        lambda x: getcitycallnums(spark.createDataFrame(x, schema_telecomdata)) )
    ssc.start()
    ssc.awaitTermination()

getdaytotnums:get a whole day in and out number ,can save to local or hive

import json
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import pyhdfs
import time
def tohive(DF):
    client = pyhdfs.HdfsClient("localhost,9000","hadoop")
    nums = DF.count()
    timec = int(time.time())
    row = str(nums)+","+str(timec)+"\n"
    client.append("/user/hive/warehouse/ylt.db/hourtotalnums/inputsource1.txt",row)
    print(row)
def gettotalnums(DF):
    nums = DF.count()
    timec = int(time.time())
    data =  ["totalnums",[nums,timec]]
    try:
        print(data)
        with open("/home/hadoop/datasave/daytotalnums.txt","w") as fin:
            fin.write(",".join([str(i) for i in data[1]]))
    except Exception as e:
        print(e)
if __name__ == "__main__":

    conf = SparkConf().setAppName("streamingkafka").set('spark.executor.memory', '10g').set("spark.executor.cores", '8')
    sc = SparkContext(conf=conf)
    sc.setLogLevel("WARN") # 减少shell打印日志
    ssc = StreamingContext(sc, 5)
    spark = SparkSession.builder.getOrCreate()
    brokers='''10.132.221.111:6667,10.132.221.112:6667,10.132.221.113:6667,10.132.221.114:6667,10.132.221.116:6667,10.132.221.117:6667,10.132.221.118:6667,10.132.221.119:6667,10.132.221.120:6667,10.132.221.121:6667,10.132.221.123:6667,10.132.221.124:6667,10.132.221.125:6667,10.132.221.126:6667,10.132.221.127:6667,10.132.221.128:6667,10.132.221.129:6667,10.132.221.130:6667,10.132.221.132:6667'''

    schema_telecomdata = StructType([StructField("callerflag", LongType(), True), StructField("timespan", LongType(), True),StructField("timestamp", LongType(), True),
                         StructField("caller1", StructType([StructField("phone", StringType(), True), StructField("caller1_site", StringType(), True),StructField("offsets", StringType(), True)]),True),
                         StructField("caller2", StructType([StructField("phone", StringType(), True), StructField("caller2_site", StringType(), True),StructField("offsets", StringType(), True)]),True)])
    topic = 'telecom'
    kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc,[topic] ,kafkaParams={"metadata.broker.list": brokers,"group.id":'18211170223wlt_t2'})
    # kafka_streaming_rdd.map(lambda x: json.loads(x[1])).foreachRDD(
    #     lambda x: gettotalnums(spark.createDataFrame(x, schema_telecomdata)) )
    kafka_streaming_rdd.map(lambda x: json.loads(x[1])).foreachRDD(
        lambda x: tohive(spark.createDataFrame(x, schema_telecomdata)))
    ssc.start()
    ssc.awaitTermination()

gettotalnums:get the moment all num

import json
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import pyhdfs
import time
def tohive(DF):
    client = pyhdfs.HdfsClient("localhost,9000","hadoop")
    nums = DF.count()
    timec = int(time.time())
    row = str(nums)+","+str(timec)+"\n"
    client.append("/user/hive/warehouse/ylt.db/calltotalnums/inputsource1.txt",row)
    print(row)
def gettotalnums(DF):
    DF = DF.where("callerflag = 1 or callerflag = 0")
    DF = DF.where("timespan<=1000")
    DF = DF.where("length(caller1.phone)=11")
    DF = DF.where("length(caller2.phone)=11")
    nums = DF.count()
    timec = int(time.time())
    data =  ["totalnums",[nums,timec]]
    try:
        print(data)
        with open("/home/hadoop/datasave/totalnums.txt","w") as fin:
            fin.write(",".join([str(i) for i in data[1]]))
    except Exception as e:
        print(e)
if __name__ == "__main__":

    conf = SparkConf().setAppName("streamingkafka").set('spark.executor.memory', '10g').set("spark.executor.cores", '8')
    sc = SparkContext(conf=conf)
    sc.setLogLevel("WARN") # 减少shell打印日志
    ssc = StreamingContext(sc, 5)
    spark = SparkSession.builder.getOrCreate()
    brokers='''10.132.221.111:6667,10.132.221.112:6667,10.132.221.113:6667,10.132.221.114:6667,10.132.221.116:6667,10.132.221.117:6667,10.132.221.118:6667,10.132.221.119:6667,10.132.221.120:6667,10.132.221.121:6667,10.132.221.123:6667,10.132.221.124:6667,10.132.221.125:6667,10.132.221.126:6667,10.132.221.127:6667,10.132.221.128:6667,10.132.221.129:6667,10.132.221.130:6667,10.132.221.132:6667'''

    schema_telecomdata = StructType([StructField("callerflag", LongType(), True), StructField("timespan", LongType(), True),StructField("timestamp", LongType(), True),
                         StructField("caller1", StructType([StructField("phone", StringType(), True), StructField("caller1_site", StringType(), True),StructField("offsets", StringType(), True)]),True),
                         StructField("caller2", StructType([StructField("phone", StringType(), True), StructField("caller2_site", StringType(), True),StructField("offsets", StringType(), True)]),True)])
    topic = 'telecom'
    kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc,[topic] ,kafkaParams={"metadata.broker.list": brokers,"group.id":'18211170230ylt'})
    kafka_streaming_rdd.map(lambda x: json.loads(x[1])).foreachRDD(
        lambda x: gettotalnums(spark.createDataFrame(x, schema_telecomdata)) )
    # kafka_streaming_rdd.map(lambda x: json.loads(x[1])).foreachRDD(
    #     lambda x: tohive(spark.createDataFrame(x, schema_telecomdata)))
    ssc.start()
    ssc.awaitTermination()

drawjob

from pyecharts import options as opts
from pyecharts.charts import Pie,WordCloud


with open('/home/hadoop/datasave/job.txt','r') as fin:
    data = fin.read()
    data = data.replace("Row(","[")
    data = data.replace(")","]")
    data = data.replace("job=","")
    data = data.replace("nums=","")
data = eval(data)
print(data)

c = (
    WordCloud()
    .add(series_name="岗位分析", data_pair=data, word_size_range=[20, 50])
    .set_global_opts(
        title_opts=opts.TitleOpts(
            title="岗位分析", title_textstyle_opts=opts.TextStyleOpts(font_size=23)
        ),
        tooltip_opts=opts.TooltipOpts(is_show=True),
    )
    .render("/home/hadoop/datasave/job.html")
)

drawsex

from pyecharts import options as opts
from pyecharts.charts import Bar


lines  = open('/home/hadoop/datasave/sex.txt','r')
for line in lines:
    data = eval(line)
lines.close()

d1 = dict()
d2 = dict()
for i in data:
    if i[0] == "F":
        d1[i[1]] = i[2]
    else:
        d2[i[1]] = i[2]


d1 = sorted(d1.items(), key = lambda i: i[0])
d2 = sorted(d2.items(), key = lambda i: i[0])

key = []
v1 = []
v2 = []
for i in d1:
    key.append(i[0])
    v1.append(i[1])
for i in d2:
    v2.append(i[1])

print(v1)
print(v2)

c = (
    Bar()
    .add_xaxis(key)
    .add_yaxis("男性", v1)
    .add_yaxis("女性", v2)
    .set_global_opts(
        title_opts=opts.TitleOpts(title="血型性别条形图", subtitle="男女不同血型的比较"),
        brush_opts=opts.BrushOpts(),
    )
    .render("/home/hadoop/datasave/sex.html")
)

drawpayment

from pyecharts import options as opts
from pyecharts.charts import Pie

lines  = open('/home/hadoop/datasave/payment.txt','r')
for line in lines:
    data = eval(line)
lines.close()

num1,num2,num3,num4 = 0,0,0,0
for i in data:
    if i[0] <= 10:
        num1+=1
    elif i[0] <=30:
        num2+= 1
    elif i[0] <= 50:
        num3 +=1
    else:
        num4 +=1
data = [['话费在0~10之间',num1],['话费在10~30之间',num2],["话费在30~50之间",num3],["话费超过50",num4]]
c = (
    Pie()
    .add(
        "",
        data,
        radius=["40%", "55%"],
        label_opts=opts.LabelOpts(
            position="outside",
            formatter="{a|{a}}{abg|}\n{hr|}\n {b|{b}: }{c}  {per|{d}%}  ",
            background_color="#eee",
            border_color="#aaa",
            border_width=1,
            border_radius=4,
            rich={
                "a": {"color": "#999", "lineHeight": 22, "align": "center"},
                "abg": {
                    "backgroundColor": "#e3e3e3",
                    "width": "100%",
                    "align": "right",
                    "height": 22,
                    "borderRadius": [4, 4, 0, 0],
                },
                "hr": {
                    "borderColor": "#aaa",
                    "width": "100%",
                    "borderWidth": 0.5,
                    "height": 0,
                },
                "b": {"fontSize": 16, "lineHeight": 33},
                "per": {
                    "color": "#eee",
                    "backgroundColor": "#334455",
                    "padding": [2, 4],
                    "borderRadius": 2,
                },
            },
        ),
    )
    .set_global_opts(title_opts=opts.TitleOpts(title="话费构成"))
    .render("/home/hadoop/datasave/payment.html")
)

drawyear

from pyecharts import options as opts
from pyecharts.charts import Pie,Funnel

lines  = open('/home/hadoop/datasave/birth.txt','r')
for line in lines:
    data = eval(line)
lines.close()

n1,n2,n3,n4,n5,n6,n7,n8 = 0,0,0,0,0,0,0,0

for i in data:
    if abs(int(i[0])-2000) <= 10:
        n1+=1
    elif abs(int(i[0])-2000) <= 20:
        n2+=1
    elif abs(int(i[0])-2000) <= 30:
        n3+=1
    elif abs(int(i[0])-2000) <= 40:
        n4+=1
    elif abs(int(i[0])-2000) <= 50:
        n5+=1
    elif abs(int(i[0])-2000) <= 60:
        n6+=1
    elif abs(int(i[0])-2000) <= 70:
        n7+=1
    else:
        n8+=1


print(n1,n2,n3,n4,n5,n6,n7,n8)

data = [["0~10岁",n1],["10~20岁",n2],["20~30岁",n3],["30~40岁",n4],["40~50岁",n5],["50~60岁",n6],["60~70岁",n7],["超过70岁",n8]]
c = (
    Funnel()
    .add(
        "年龄",
        data,
        label_opts=opts.LabelOpts(position="inside"),
    )
    .set_global_opts(title_opts=opts.TitleOpts(title="Funnel-Label(inside)"))
    .render("/home/hadoop/datasave/year.html")
)