一、背景

最近在处理公司的一设备,内置的DTU通过UDP向服务器发送16进制的数据报文,由于第一次接触此类数据解析方式,在这里做总结与反省,避免大家走弯路

二、总结内容

2.1 UDP通信服务端创建方式

步骤

  1. 创建UDP的socket通信方式。
  2. 绑定具体的端口。
  3. 设置端口复用等待(这一步可以省略)
  4. 获取数据。
  5. 向客户端发送数据。
  6. 解析储存数据。
  7. 关闭UDP的socket链接。

下面分别讲解并总结,通过MVP的方式完成以上所有步骤,不做过多延展:

1. 创建UDP的socket通信方式。

import socket
Server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

在任何类型的通信开始之前,网络应用程序都必须创建套接字。
套接字最初是为同一主机上的应用程序所创建,使得主机上运行的一个程序(又名一个进程)与另一个运行的程序进行通信。这就是所谓的进程间通信(Inter Process Communication,IPC)
嵌套字有两种类型:

  • 基于文件的:AF_UNIX
  • 面向网络的:AF_INET(面向IPv4)、AF_INET6(面向IPv6)

DUP采用的是无连接的套接字

  • 特点:不可靠(局网内还是比较可靠的),开销小。为了创建UDP套接字,必须使用SOCK_DGRAM作为套接字类型。UDP套接字的SOCK_DGRAM名字来自于单词“datagram”(数据报)。

2. 绑定具体的端口

Server.bind("", 8600)

bind 表示将创建好的Server绑定到具体的端口,注意:
当作为UDP的服务端时,前面的IP地址是可以省略的,后面是端口号有效的端口号范围为0~65535(小于1024的端口号预留给了系统)

**3.设置端口复用等待(这一步可以省略) **

Server.setsockopt(socket.IPPROTO_IP, socket.SO_REUSEADDR, 1)

如果端口被使用过,并且利用Socket.close()关闭了端口链接,但是端口还没有释放,可以用上述函数来进行等待端口的重调用。

4. 获取数据

Msg, ClientAddr = Server.recvfrom(1024)

在UDP中使用recvform返回的是客户端发送过来的**字节流(10进制、ASCII码、16进制等)**与客户端的IP地址,1024表示缓存数据的大小。

5. 向客户端发送数据

Server.sendto(Data)

一般UDP服务器在接收到数据后会向客户端发送心跳包确认,可以通过此函数发送数据。

6. 解析数据

Data = binascii.b2a_hex(Msg)

此函数是将传送过来的字节流报文数据解析成16进制数据。相关用法见字节流转换成ASCII码

7. 关闭UDP的socket链接

Server.close()

没什么好讲的,一定要有这个就行,不然下一次没法继续用这个端口。

2.2 16进制数据解析

1. 报文时间解析

Time = datetime.datetime.strptime(Data[26:38], "%y%m%d%H%M%S")  # 解析时间

由于时间是十进制数据,这里不用做16进制转换,直接通过datetime的strptime方法进行数据转换。

2. 16进制转换成10进制,再转换成ASCII码

chr(int(Data[224 + (i - 2):224 + i],16))

int(num, 16) 将16进制转换成10进制
chr(十进制)解析出对应的ASCII码

2.3 文件创建与数据储存分析

不得不说在储存数据文件的时候遇到了很多的坑,现在总结:
坑1:如何自动创建 年->月->日的文件夹结构?

在自己创建文件夹的时候有想到过使用makedirs()函数链式创建文件夹,但是在判断文件夹是否存在的时候一直报错,一直以为是在makedirs()使用不当造成的,有想过联合使用chdir函数与mkdir()函数来进行逐级的文件夹创建,最后效果也不是特别好。后来仔细查了好久的资料发现:

if not os.path.exists(NewPath):
    os.makedirs(NewPath)

在默认的IDLE中判断文件夹是否存在时,一直是有问题会报错的,折腾了好久,后来改为:

if not os.path.isdir(FileDir):
     os.makedirs(FileDir)

才没有报错,这里做笔记好好提醒一下自己。

以上是我总结的所有错误,希望看到这篇帖子的你能够避开这些坑。下面附上我完整的UDP报文接收以及解析的代码:

#!/usr/local/bin/python3
# coding:utf-8

import socket
import binascii
import datetime
import os
import csv

''' 作者:Zflyee Mailto: zflyee@126.com '''

Server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  # 创建UDP传输数据
# Server.setsockopt(socket.IPPROTO_IP, socket.SO_REUSEADDR, 1) # 快速实现端口复用

Server.bind(("", 8600))
print(u"已绑定本机端口:8600,正在监听数据...")


def DataParsing(Data):
    Time = datetime.datetime.strptime(Data[26:38], "%y%m%d%H%M%S")  # 解析时间

    Voltage = int(Data[44:46], 16) / 10.0  # 电压数据

    Temp = int(Data[60:64], 16) / 10.0  # 实时温度数据

    Temp_1h_max = int(Data[70:74], 16) / 10.0  # 温度小时最大值

    Temp_1h_min = int(Data[80:84], 16) / 10.0  # 温度小时最小值

    Hum = int(Data[90:94], 16) / 10.0  # 湿度数据

    Hum_1h_max = int(Data[100:104], 16) / 10.0  # 湿度小时最大值

    Hum_1h_min = int(Data[110:114], 16) / 10.0  # 湿度小时最小值

    Pa = int(Data[120:124], 16) / 10.0  # 气压数据

    Pa_1h_max = int(Data[130:134], 16) / 10.0  # 气压小时最大值

    Pa_1h_min = int(Data[150:154], 16) / 10.0  # 气压小时最小值

    WindSpd = int(Data[140:144], 16) / 10.0  # 瞬时风速值

    WindSpd_10min_avg = int(Data[160:164], 16) / 10.0  # 10分钟风速平均值

    WindSpd_max = int(Data[170:174], 16) / 10.0  # 当前风速最大值

    WindSpd_min = int(Data[180:184], 16) / 10.0  # 当前风速最小值

    WindDir = int(Data[190:194], 16) / 10.0  # 当前风向瞬时值

    WindDir_10min_avg = int(Data[200:204], 16) / 10.0  # 10分钟风向平均值

    Rain_1day = int(Data[210:214], 16) / 10.0  # 当日降雨量

    # 推算纬度数据长度
    Lati_len = int(Data[220:224], 16)  # 转换为10进制
    Lati = ""
    for i in range(2, Lati_len * 2, 2):
        Lati += (chr(int(Data[224 + (i - 2):224 + i], 16)))

    # 推算维度长度与数据
    Long_len = int(Data[250:252], 16)
    Long = ""
    for i in range(2, Long_len * 2, 2):
        Long += (chr(int(Data[254 + (i - 2):254 + i], 16)))

    # 组装数据
    DataList = [Time, WindSpd, WindSpd_10min_avg, WindSpd_max, WindSpd_min, WindDir, WindDir_10min_avg,
                Temp, Temp_1h_max, Temp_1h_min, Hum, Hum_1h_max, Hum_1h_min, Pa, Pa_1h_max, Pa_1h_min, Rain_1day,
                Voltage]

    # 创建文件路径
    Path = "E:\\富奥通\\data\\SY_FWS600\\"
    Text = os.path.join(Path, Time.strftime("%Y"), Time.strftime("%m")) + "\\" + Time.strftime("%d") + ".csv"
    FileDir = os.path.split(Text)[0]
    if not os.path.isdir(FileDir):
        os.makedirs(FileDir)
    # 写入数据
    with open(Text, "a+", newline="") as f:
        Writer = csv.writer(f)
        Writer.writerow(DataList)
    # 储存提示
    print(u"{}时刻数据已成功储存。".format(Time))

while True:
    try:
        Msg, ClientAddr = Server.recvfrom(1024)
        Data = binascii.b2a_hex(Msg)  # 将数据转换成16进制数据
        Data = str(Data, encoding="utf-8")
        print(u"从{0}的{1}端口得到如下数据:\n{2}".format(ClientAddr[0], ClientAddr[1], Data))
        if Data[0:2] == "7b":
            Content = "7B810010" + Data[8:30] + "7B"
            Response = binascii.a2b_hex(Content)
            Server.sendto(Response, ClientAddr)  # 回复客户端数据
            print(u"已回复客户端")
        elif Data[0:2] == '01':
            DataParsing(Data)
        else:
            print(u"%%%%%%%%%%%%%%%数据报错,重新获取%%%%%%%%%%%%%%%")
            continue
    except:
        print(u"%%%%%%%%%%%%%%%数据报错,重新获取%%%%%%%%%%%%%%%")
        continue


Server.close()