一、背景
最近在处理公司的一设备,内置的DTU通过UDP向服务器发送16进制的数据报文,由于第一次接触此类数据解析方式,在这里做总结与反省,避免大家走弯路
二、总结内容
2.1 UDP通信服务端创建方式
步骤
- 创建UDP的socket通信方式。
- 绑定具体的端口。
- 设置端口复用等待(这一步可以省略)
- 获取数据。
- 向客户端发送数据。
- 解析储存数据。
- 关闭UDP的socket链接。
下面分别讲解并总结,通过MVP的方式完成以上所有步骤,不做过多延展:
1. 创建UDP的socket通信方式。
import socket
Server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
在任何类型的通信开始之前,网络应用程序都必须创建套接字。
套接字最初是为同一主机上的应用程序所创建,使得主机上运行的一个程序(又名一个进程)与另一个运行的程序进行通信。这就是所谓的进程间通信(Inter Process Communication,IPC)
嵌套字有两种类型:
- 基于文件的:AF_UNIX
- 面向网络的:AF_INET(面向IPv4)、AF_INET6(面向IPv6)
DUP采用的是无连接的套接字
- 特点:不可靠(局网内还是比较可靠的),开销小。为了创建UDP套接字,必须使用SOCK_DGRAM作为套接字类型。UDP套接字的SOCK_DGRAM名字来自于单词“datagram”(数据报)。
2. 绑定具体的端口
Server.bind("", 8600)
bind 表示将创建好的Server绑定到具体的端口,注意:
当作为UDP的服务端时,前面的IP地址是可以省略的,后面是端口号有效的端口号范围为0~65535(小于1024的端口号预留给了系统)
**3.设置端口复用等待(这一步可以省略) **
Server.setsockopt(socket.IPPROTO_IP, socket.SO_REUSEADDR, 1)
如果端口被使用过,并且利用Socket.close()关闭了端口链接,但是端口还没有释放,可以用上述函数来进行等待端口的重调用。
4. 获取数据
Msg, ClientAddr = Server.recvfrom(1024)
在UDP中使用recvform返回的是客户端发送过来的**字节流(10进制、ASCII码、16进制等)**与客户端的IP地址,1024表示缓存数据的大小。
5. 向客户端发送数据
Server.sendto(Data)
一般UDP服务器在接收到数据后会向客户端发送心跳包确认,可以通过此函数发送数据。
6. 解析数据
Data = binascii.b2a_hex(Msg)
此函数是将传送过来的字节流报文数据解析成16进制数据。相关用法见字节流转换成ASCII码
7. 关闭UDP的socket链接
Server.close()
没什么好讲的,一定要有这个就行,不然下一次没法继续用这个端口。
2.2 16进制数据解析
1. 报文时间解析
Time = datetime.datetime.strptime(Data[26:38], "%y%m%d%H%M%S") # 解析时间
由于时间是十进制数据,这里不用做16进制转换,直接通过datetime的strptime方法进行数据转换。
2. 16进制转换成10进制,再转换成ASCII码
chr(int(Data[224 + (i - 2):224 + i],16))
int(num, 16) 将16进制转换成10进制
chr(十进制)解析出对应的ASCII码
2.3 文件创建与数据储存分析
不得不说在储存数据文件的时候遇到了很多的坑,现在总结:
坑1:如何自动创建 年->月->日的文件夹结构?
在自己创建文件夹的时候有想到过使用
makedirs()
函数链式创建文件夹,但是在判断文件夹是否存在的时候一直报错,一直以为是在makedirs()使用不当造成的,有想过联合使用chdir
函数与mkdir()
函数来进行逐级的文件夹创建,最后效果也不是特别好。后来仔细查了好久的资料发现:
if not os.path.exists(NewPath):
os.makedirs(NewPath)
在默认的IDLE中判断文件夹是否存在时,一直是有问题会报错的,折腾了好久,后来改为:
if not os.path.isdir(FileDir):
os.makedirs(FileDir)
才没有报错,这里做笔记好好提醒一下自己。
以上是我总结的所有错误,希望看到这篇帖子的你能够避开这些坑。下面附上我完整的UDP报文接收以及解析的代码:
#!/usr/local/bin/python3
# coding:utf-8
import socket
import binascii
import datetime
import os
import csv
''' 作者:Zflyee Mailto: zflyee@126.com '''
Server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 创建UDP传输数据
# Server.setsockopt(socket.IPPROTO_IP, socket.SO_REUSEADDR, 1) # 快速实现端口复用
Server.bind(("", 8600))
print(u"已绑定本机端口:8600,正在监听数据...")
def DataParsing(Data):
Time = datetime.datetime.strptime(Data[26:38], "%y%m%d%H%M%S") # 解析时间
Voltage = int(Data[44:46], 16) / 10.0 # 电压数据
Temp = int(Data[60:64], 16) / 10.0 # 实时温度数据
Temp_1h_max = int(Data[70:74], 16) / 10.0 # 温度小时最大值
Temp_1h_min = int(Data[80:84], 16) / 10.0 # 温度小时最小值
Hum = int(Data[90:94], 16) / 10.0 # 湿度数据
Hum_1h_max = int(Data[100:104], 16) / 10.0 # 湿度小时最大值
Hum_1h_min = int(Data[110:114], 16) / 10.0 # 湿度小时最小值
Pa = int(Data[120:124], 16) / 10.0 # 气压数据
Pa_1h_max = int(Data[130:134], 16) / 10.0 # 气压小时最大值
Pa_1h_min = int(Data[150:154], 16) / 10.0 # 气压小时最小值
WindSpd = int(Data[140:144], 16) / 10.0 # 瞬时风速值
WindSpd_10min_avg = int(Data[160:164], 16) / 10.0 # 10分钟风速平均值
WindSpd_max = int(Data[170:174], 16) / 10.0 # 当前风速最大值
WindSpd_min = int(Data[180:184], 16) / 10.0 # 当前风速最小值
WindDir = int(Data[190:194], 16) / 10.0 # 当前风向瞬时值
WindDir_10min_avg = int(Data[200:204], 16) / 10.0 # 10分钟风向平均值
Rain_1day = int(Data[210:214], 16) / 10.0 # 当日降雨量
# 推算纬度数据长度
Lati_len = int(Data[220:224], 16) # 转换为10进制
Lati = ""
for i in range(2, Lati_len * 2, 2):
Lati += (chr(int(Data[224 + (i - 2):224 + i], 16)))
# 推算维度长度与数据
Long_len = int(Data[250:252], 16)
Long = ""
for i in range(2, Long_len * 2, 2):
Long += (chr(int(Data[254 + (i - 2):254 + i], 16)))
# 组装数据
DataList = [Time, WindSpd, WindSpd_10min_avg, WindSpd_max, WindSpd_min, WindDir, WindDir_10min_avg,
Temp, Temp_1h_max, Temp_1h_min, Hum, Hum_1h_max, Hum_1h_min, Pa, Pa_1h_max, Pa_1h_min, Rain_1day,
Voltage]
# 创建文件路径
Path = "E:\\富奥通\\data\\SY_FWS600\\"
Text = os.path.join(Path, Time.strftime("%Y"), Time.strftime("%m")) + "\\" + Time.strftime("%d") + ".csv"
FileDir = os.path.split(Text)[0]
if not os.path.isdir(FileDir):
os.makedirs(FileDir)
# 写入数据
with open(Text, "a+", newline="") as f:
Writer = csv.writer(f)
Writer.writerow(DataList)
# 储存提示
print(u"{}时刻数据已成功储存。".format(Time))
while True:
try:
Msg, ClientAddr = Server.recvfrom(1024)
Data = binascii.b2a_hex(Msg) # 将数据转换成16进制数据
Data = str(Data, encoding="utf-8")
print(u"从{0}的{1}端口得到如下数据:\n{2}".format(ClientAddr[0], ClientAddr[1], Data))
if Data[0:2] == "7b":
Content = "7B810010" + Data[8:30] + "7B"
Response = binascii.a2b_hex(Content)
Server.sendto(Response, ClientAddr) # 回复客户端数据
print(u"已回复客户端")
elif Data[0:2] == '01':
DataParsing(Data)
else:
print(u"%%%%%%%%%%%%%%%数据报错,重新获取%%%%%%%%%%%%%%%")
continue
except:
print(u"%%%%%%%%%%%%%%%数据报错,重新获取%%%%%%%%%%%%%%%")
continue
Server.close()