2.14 haas506开发教程-高级组件库-tcp/usocket
1.tcp测试用例
代码主要功能:连接网络、网络校时获取实时时间、socket连接、socket发送/接收数据 修改host和port处的值,烧录代码。
'''
@File : main.py
@Description:
@Author :
@date :
@version : 1.0
'''
import utime as time
import network
import usocket
import _thread
import sntp
import net as mynet
g_connect_status = False
g_tcp=False
global sock
host = "112.125.89.8"
port = 37496
def on_4g_cb(args):
global g_connect_status
pdp = args[0]
netwk_sta = args[1]
if netwk_sta == 1:
g_connect_status = True
else:
g_connect_status = False
def connect_network():
global net,on_4g_cb,g_connect_status
net = network.NetWorkClient()
g_register_network = False
if net._stagecode is not None and net._stagecode == 3 and net._subcode == 1:
g_register_network = True
else:
g_register_network = False
if g_register_network:
net.on(1,on_4g_cb)
net.connect(None)
else:
print('network register failed')
while True:
if g_connect_status:
print('network register successed')
break
time.sleep_ms(20)
def tcpsend(s):
global g_tcp
while True:
try:
print('tcp send gateway heartbeat...')
ret=s.send("heartbeatbuf")
time.sleep(5)
g_tcp=True
except OSError:
g_tcp=False
print('TCP OSError')
break
except:
g_tcp=False
break
def doConnect(host,port):
global sock
sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM)
try :
sock.connect((host,port))
except :
pass
return sock
def tcprecv(s):
print('Running thread tcprecv...')
while True:
try:
recvdata=s.recv(255)
except OSError:
print('TCP OSError')
if len(recvdata)!=0:
print(recvdata)
time.sleep(1)
def main():
global sockLocal
global sock
print(host,port)
sockLocal = doConnect(host,port)
_thread.start_new_thread(tcprecv, (sockLocal,))
while True:
try :
t=time.localtime()
t_time="{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format(t[0],t[1],t[2],t[3],t[4],t[5])
msg = str(t_time)
sockLocal.send(msg)
print("send msg ok : ",msg)
except:
print("\r\nsocket error,do reconnect ")
time.sleep(3)
sock.close()
sockLocal = doConnect(host,port)
time.sleep(2)
if __name__ == '__main__':
connect_network()
sntp.settime()
time.sleep(1)
print("csq:",mynet.csqQueryPoll())
main()
{
"name": "haas506",
"version": "1.0.0",
"io": {
"serial1":{
"type":"UART",
"port":0,
"dataWidth":8,
"baudRate":115200,
"stopBits":1,
"flowControl":"disable",
"parity":"none"
},
"serial2":{
"type":"UART",
"port":1,
"dataWidth":8,
"baudRate":115200,
"stopBits":1,
"flowControl":"disable",
"parity":"none"
},
"serial3":{
"type":"UART",
"port":2,
"dataWidth":8,
"baudRate":115200,
"stopBits":1,
"flowControl":"disable",
"parity":"none"
}
},
"debugLevel": "ERROR",
"repl":"enable",
"replPort":0
}
2.测试结果
- 连接成功后,将时间戳发送过去
- 利用网络测试工具发送字符串数据、HEX数据
- 串口助手可以看到测试工具发过来的数据和被发送的时间戳数据
注意:网络测试工具一旦断开TCP连接,它的host:port数据会改变。
|