电眼监控设备是否到位–信号传递给网络继电器(聚英的设备)–服务端监控继电器输入–打开相机采集图像–图像分类算法–返回结果进行逻辑处理–输出信号
1、下载海康MVS的客户端,文件包中有相关python例子;找到MvImport工具包 2、借助flask部署海康抓拍和一个简单的图像分类模型(resNet)
sys.path.append("./MvImport")
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
data_transform = transforms.Compose(
[transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.54, 0.40, 0.43], [0.23, 0.22, 0.22])])
json_path = './class_img/class_carton.json'
assert os.path.exists(json_path), "file: '{}' dose not exist.".format(json_path)
json_file = open(json_path, "r")
class_indict = json.load(json_file)
model = action(num_classes=3)
weights_path = "./class_img/carton_0225.pth"
assert os.path.exists(weights_path), "file: '{}' dose not exist.".format(weights_path)
model.load_state_dict(torch.load(weights_path, map_location=device))
def press_any_key_exit():
fd = sys.stdin.fileno()
old_ttyinfo = termios.tcgetattr(fd)
new_ttyinfo = old_ttyinfo[:]
new_ttyinfo[3] &= ~termios.ICANON
new_ttyinfo[3] &= ~termios.ECHO
termios.tcsetattr(fd, termios.TCSANOW, new_ttyinfo)
try:
os.read(fd, 7)
except:
pass
finally:
termios.tcsetattr(fd, termios.TCSANOW, old_ttyinfo)
SDKVersion = MvCamera.MV_CC_GetSDKVersion()
print("SDKVersion[0x%x]" % SDKVersion)
deviceList = MV_CC_DEVICE_INFO_LIST()
tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE
ret = MvCamera.MV_CC_EnumDevices(tlayerType, deviceList)
if ret != 0:
print("enum devices fail! ret[0x%x]" % ret)
sys.exit()
if deviceList.nDeviceNum == 0:
print("find no device!")
sys.exit()
print("Find %d devices!" % deviceList.nDeviceNum)
for i in range(0, deviceList.nDeviceNum):
mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
print("\ngige device: [%d]" % i)
strModeName = ""
for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName:
strModeName = strModeName + chr(per)
print("device model name: %s" % strModeName)
nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
print("current ip: %d.%d.%d.%d\n" % (nip1, nip2, nip3, nip4))
elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
print("\nu3v device: [%d]" % i)
strModeName = ""
for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName:
if per == 0:
break
strModeName = strModeName + chr(per)
print("device model name: %s" % strModeName)
strSerialNumber = ""
for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
if per == 0:
break
strSerialNumber = strSerialNumber + chr(per)
print("user serial number: %s" % strSerialNumber)
if sys.version >= '3':
nConnectionNum = 0
else:
nConnectionNum = raw_input("please input the number of the device to connect:")
if int(nConnectionNum) >= deviceList.nDeviceNum:
print("intput error!")
sys.exit()
cam = MvCamera()
stDeviceList = cast(deviceList.pDeviceInfo[int(nConnectionNum)], POINTER(MV_CC_DEVICE_INFO)).contents
ret = cam.MV_CC_CreateHandle(stDeviceList)
if ret != 0:
print("create handle fail! ret[0x%x]" % ret)
sys.exit()
ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
if ret != 0:
print("open device fail! ret[0x%x]" % ret)
sys.exit()
ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)
if ret != 0:
print("set trigger mode fail! ret[0x%x]" % ret)
sys.exit()
ret = cam.MV_CC_FeatureLoad("FeatureFile.ini")
if MV_OK != ret:
print("load feature fail! ret [0x%x]" % ret)
print("finish import the camera properties from the file")
stParam = MVCC_INTVALUE()
memset(byref(stParam), 0, sizeof(MVCC_INTVALUE))
ret = cam.MV_CC_GetIntValue("PayloadSize", stParam)
if ret != 0:
print("get payload size fail! ret[0x%x]" % ret)
sys.exit()
nPayloadSize = stParam.nCurValue
ret = cam.MV_CC_StartGrabbing()
if ret != 0:
print("start grabbing fail! ret[0x%x]" % ret)
sys.exit()
data_buf = (c_ubyte * nPayloadSize)()
nDataSize = nPayloadSize
print("press a key to stop grabbing.")
g_bExit = True
完整代码如下
from flask import Flask, request
import sys
import termios
import cv2 as cv
from ctypes import *
import os
import time
import os
import json
import time
import torch
from PIL import Image,ImageFile
from torchvision import transforms
import matplotlib.pyplot as plt
from class_img.model import resnet34,action
sys.path.append("./MvImport")
from MvCameraControl_class import *
ImageFile.LOAD_TRUNCATED_IMAGES = True
g_bExit = False
app = Flask(__name__)
global cam, data_buf, nDataSize
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
data_transform = transforms.Compose(
[transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.54, 0.40, 0.43], [0.23, 0.22, 0.22])])
json_path = './class_img/class_carton.json'
assert os.path.exists(json_path), "file: '{}' dose not exist.".format(json_path)
json_file = open(json_path, "r")
class_indict = json.load(json_file)
model = action(num_classes=3)
weights_path = "./class_img/carton_0225.pth"
assert os.path.exists(weights_path), "file: '{}' dose not exist.".format(weights_path)
model.load_state_dict(torch.load(weights_path, map_location=device))
def press_any_key_exit():
fd = sys.stdin.fileno()
old_ttyinfo = termios.tcgetattr(fd)
new_ttyinfo = old_ttyinfo[:]
new_ttyinfo[3] &= ~termios.ICANON
new_ttyinfo[3] &= ~termios.ECHO
termios.tcsetattr(fd, termios.TCSANOW, new_ttyinfo)
try:
os.read(fd, 7)
except:
pass
finally:
termios.tcsetattr(fd, termios.TCSANOW, old_ttyinfo)
SDKVersion = MvCamera.MV_CC_GetSDKVersion()
print("SDKVersion[0x%x]" % SDKVersion)
deviceList = MV_CC_DEVICE_INFO_LIST()
tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE
ret = MvCamera.MV_CC_EnumDevices(tlayerType, deviceList)
if ret != 0:
print("enum devices fail! ret[0x%x]" % ret)
sys.exit()
if deviceList.nDeviceNum == 0:
print("find no device!")
sys.exit()
print("Find %d devices!" % deviceList.nDeviceNum)
for i in range(0, deviceList.nDeviceNum):
mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
print("\ngige device: [%d]" % i)
strModeName = ""
for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName:
strModeName = strModeName + chr(per)
print("device model name: %s" % strModeName)
nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
print("current ip: %d.%d.%d.%d\n" % (nip1, nip2, nip3, nip4))
elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
print("\nu3v device: [%d]" % i)
strModeName = ""
for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName:
if per == 0:
break
strModeName = strModeName + chr(per)
print("device model name: %s" % strModeName)
strSerialNumber = ""
for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
if per == 0:
break
strSerialNumber = strSerialNumber + chr(per)
print("user serial number: %s" % strSerialNumber)
if sys.version >= '3':
nConnectionNum = 0
else:
nConnectionNum = raw_input("please input the number of the device to connect:")
if int(nConnectionNum) >= deviceList.nDeviceNum:
print("intput error!")
sys.exit()
cam = MvCamera()
stDeviceList = cast(deviceList.pDeviceInfo[int(nConnectionNum)], POINTER(MV_CC_DEVICE_INFO)).contents
ret = cam.MV_CC_CreateHandle(stDeviceList)
if ret != 0:
print("create handle fail! ret[0x%x]" % ret)
sys.exit()
ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
if ret != 0:
print("open device fail! ret[0x%x]" % ret)
sys.exit()
ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)
if ret != 0:
print("set trigger mode fail! ret[0x%x]" % ret)
sys.exit()
ret = cam.MV_CC_FeatureLoad("FeatureFile.ini")
if MV_OK != ret:
print("load feature fail! ret [0x%x]" % ret)
print("finish import the camera properties from the file")
stParam = MVCC_INTVALUE()
memset(byref(stParam), 0, sizeof(MVCC_INTVALUE))
ret = cam.MV_CC_GetIntValue("PayloadSize", stParam)
if ret != 0:
print("get payload size fail! ret[0x%x]" % ret)
sys.exit()
nPayloadSize = stParam.nCurValue
ret = cam.MV_CC_StartGrabbing()
if ret != 0:
print("start grabbing fail! ret[0x%x]" % ret)
sys.exit()
data_buf = (c_ubyte * nPayloadSize)()
nDataSize = nPayloadSize
print("press a key to stop grabbing.")
g_bExit = True
@app.route('/detect', methods=['POST'])
def upload_image1():
if request.method == 'POST':
start =time.time()
haha = grab_img(cam,data_buf,nDataSize)
print(time.time()-start)
return haha
return
def grab_img(cam, data_buf, nDataSize):
stFrameInfo = MV_FRAME_OUT_INFO_EX()
memset(byref(stFrameInfo), 0, sizeof(stFrameInfo))
while True:
ret = cam.MV_CC_GetOneFrameTimeout(byref(data_buf), nDataSize, stFrameInfo, 10000)
if ret == 0:
print("get one frame: Width[%d], Height[%d], PixelType[0x%x], nFrameNum[%d]" % (
stFrameInfo.nWidth, stFrameInfo.nHeight, stFrameInfo.enPixelType, stFrameInfo.nFrameNum))
stConvertParam = MV_SAVE_IMAGE_PARAM_EX()
stConvertParam.nWidth = stFrameInfo.nWidth
stConvertParam.nHeight = stFrameInfo.nHeight
stConvertParam.pData = data_buf
stConvertParam.nDataLen = stFrameInfo.nFrameLen
stConvertParam.enPixelType = stFrameInfo.enPixelType
file_name = str(time.time()).replace(".","_")+".bmp"
file_path = os.path.join("./images/",file_name)
stConvertParam.enImageType = MV_Image_Bmp
bmpsize = stFrameInfo.nWidth * stFrameInfo.nHeight * 3 + 54
stConvertParam.nBufferSize = bmpsize
bmp_buf = (c_ubyte * bmpsize)()
stConvertParam.pImageBuffer = bmp_buf
ret = cam.MV_CC_SaveImageEx2(stConvertParam)
if ret != 0:
print("save file executed failed0:! ret[0x%x]" % ret)
del data_buf
sys.exit()
file_open = open(file_path.encode('ascii'), 'wb+')
try:
img_buff = (c_ubyte * stConvertParam.nDataLen)()
memmove(byref(img_buff), stConvertParam.pImageBuffer, stConvertParam.nDataLen)
file_open.write(img_buff)
except Exception as e:
raise Exception("save file executed failed1::%s" % e)
finally:
file_open.close()
img = Image.open(file_path).convert('RGB')
img = cv.resize(img,(int(img.shape[1]/2)),int(img.shape[1]/2))
img = data_transform(img)
img = torch.unsqueeze(img, dim=0)
model.eval()
with torch.no_grad():
output = torch.squeeze(model(img))
predict = torch.softmax(output, dim=0)
predict_cla = torch.argmax(predict).numpy()
class_name = class_indict[str(predict_cla)]
calss_num = predict[predict_cla].numpy()
print_res = "class: {} prob: {:.3}".format(class_indict[str(predict_cla)],
predict[predict_cla].numpy())
print(print_res)
return json.dumps({"class":str(class_name), "prob":str(calss_num)})
else:
print("no data[0x%x]" % ret)
return json.dumps({"class":"null", "prob":"null"})
if g_bExit == True:
break
if __name__ == "__main__":
app.run('0.0.0.0', port=8989, debug=False,processes=1)
如果仅仅需要海康相机抓拍,可以不采用flask部署和检测模型模块代码。
例子:
import sys
import threading
import os
import termios
import cv2 as cv
from ctypes import *
import time
import numpy as np
sys.path.append("./MvImport")
from MvCameraControl_class import *
g_bExit = False
def work_thread(cam=0, data_buf=0, nDataSize=0):
stFrameInfo = MV_FRAME_OUT_INFO_EX()
memset(byref(stFrameInfo), 0, sizeof(stFrameInfo))
while True:
ret = cam.MV_CC_GetOneFrameTimeout(byref(data_buf), nDataSize, stFrameInfo, 10000)
if ret == 0:
print("get one frame: Width[%d], Height[%d], PixelType[0x%x], nFrameNum[%d]" % (
stFrameInfo.nWidth, stFrameInfo.nHeight, stFrameInfo.enPixelType, stFrameInfo.nFrameNum))
stConvertParam = MV_SAVE_IMAGE_PARAM_EX()
stConvertParam.nWidth = stFrameInfo.nWidth
stConvertParam.nHeight = stFrameInfo.nHeight
stConvertParam.pData = data_buf
stConvertParam.nDataLen = stFrameInfo.nFrameLen
stConvertParam.enPixelType = stFrameInfo.enPixelType
file_name = str(time.time()).replace(".","_")+".bmp"
file_path = os.path.join("./images/",file_name)
stConvertParam.enImageType = MV_Image_Bmp
bmpsize = stFrameInfo.nWidth * stFrameInfo.nHeight * 3 + 54
stConvertParam.nBufferSize = bmpsize
bmp_buf = (c_ubyte * bmpsize)()
stConvertParam.pImageBuffer = bmp_buf
ret = cam.MV_CC_SaveImageEx2(stConvertParam)
if ret != 0:
print("save file executed failed0:! ret[0x%x]" % ret)
del data_buf
sys.exit()
file_open = open(file_path.encode('ascii'), 'wb+')
try:
img_buff = (c_ubyte * stConvertParam.nDataLen)()
memmove(byref(img_buff), stConvertParam.pImageBuffer, stConvertParam.nDataLen)
file_open.write(img_buff)
except Exception as e:
raise Exception("save file executed failed1::%s" % e)
finally:
file_open.close()
else:
print("no data[0x%x]" % ret)
if g_bExit == True:
break
def press_any_key_exit():
fd = sys.stdin.fileno()
old_ttyinfo = termios.tcgetattr(fd)
new_ttyinfo = old_ttyinfo[:]
new_ttyinfo[3] &= ~termios.ICANON
new_ttyinfo[3] &= ~termios.ECHO
termios.tcsetattr(fd, termios.TCSANOW, new_ttyinfo)
try:
os.read(fd, 7)
except:
pass
finally:
termios.tcsetattr(fd, termios.TCSANOW, old_ttyinfo)
if __name__ == "__main__":
SDKVersion = MvCamera.MV_CC_GetSDKVersion()
print("SDKVersion[0x%x]" % SDKVersion)
deviceList = MV_CC_DEVICE_INFO_LIST()
tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE
ret = MvCamera.MV_CC_EnumDevices(tlayerType, deviceList)
if ret != 0:
print("enum devices fail! ret[0x%x]" % ret)
sys.exit()
if deviceList.nDeviceNum == 0:
print("find no device!")
sys.exit()
print("Find %d devices!" % deviceList.nDeviceNum)
for i in range(0, deviceList.nDeviceNum):
mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
print("\ngige device: [%d]" % i)
strModeName = ""
for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName:
strModeName = strModeName + chr(per)
print("device model name: %s" % strModeName)
nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
print("current ip: %d.%d.%d.%d\n" % (nip1, nip2, nip3, nip4))
elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
print("\nu3v device: [%d]" % i)
strModeName = ""
for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName:
if per == 0:
break
strModeName = strModeName + chr(per)
print("device model name: %s" % strModeName)
strSerialNumber = ""
for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
if per == 0:
break
strSerialNumber = strSerialNumber + chr(per)
print("user serial number: %s" % strSerialNumber)
if sys.version >= '3':
nConnectionNum = input("please input the number of the device to connect:")
else:
nConnectionNum = raw_input("please input the number of the device to connect:")
if int(nConnectionNum) >= deviceList.nDeviceNum:
print("intput error!")
sys.exit()
cam = MvCamera()
stDeviceList = cast(deviceList.pDeviceInfo[int(nConnectionNum)], POINTER(MV_CC_DEVICE_INFO)).contents
ret = cam.MV_CC_CreateHandle(stDeviceList)
if ret != 0:
print("create handle fail! ret[0x%x]" % ret)
sys.exit()
ret = cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
if ret != 0:
print("open device fail! ret[0x%x]" % ret)
sys.exit()
ret = cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)
if ret != 0:
print("set trigger mode fail! ret[0x%x]" % ret)
sys.exit()
ret = cam.MV_CC_FeatureLoad("FeatureFile.ini")
if MV_OK != ret:
print("load feature fail! ret [0x%x]" % ret)
print("finish import the camera properties from the file")
stParam = MVCC_INTVALUE()
memset(byref(stParam), 0, sizeof(MVCC_INTVALUE))
ret = cam.MV_CC_GetIntValue("PayloadSize", stParam)
if ret != 0:
print("get payload size fail! ret[0x%x]" % ret)
sys.exit()
nPayloadSize = stParam.nCurValue
ret = cam.MV_CC_StartGrabbing()
if ret != 0:
print("start grabbing fail! ret[0x%x]" % ret)
sys.exit()
data_buf = (c_ubyte * nPayloadSize)()
try:
hThreadHandle = threading.Thread(target=work_thread, args=(cam, data_buf, nPayloadSize))
hThreadHandle.start()
except:
print("error: unable to start thread")
print("press a key to stop grabbing.")
press_any_key_exit()
g_bExit = True
hThreadHandle.join()
ret = cam.MV_CC_StopGrabbing()
if ret != 0:
print("stop grabbing fail! ret[0x%x]" % ret)
del data_buf
sys.exit()
ret = cam.MV_CC_CloseDevice()
if ret != 0:
print("close deivce fail! ret[0x%x]" % ret)
del data_buf
sys.exit()
ret = cam.MV_CC_DestroyHandle()
if ret != 0:
print("destroy handle fail! ret[0x%x]" % ret)
del data_buf
sys.exit()
del data_buf
|