测试 liveGBS摄像头的可用性
import cv2
import json
import tqdm
import requests
import logging
import pandas as pd
class liveGBS(object):
def __init__(self):
self.url = "http://xx.xx.xx.xx:xxxxx"
class liveGBSStream(object):
def __init__(self, base_url):
self.url = base_url + "/api/v1/stream"
def start(self, serial=None, channel=None, code=None, audio=False):
url = self.url + "/start"
rtmp_url = None
try:
r = requests.get("{}?serial={}&code={}".format(url, serial, code))
res = json.loads(r.text)
if isinstance(res, dict):
rtmp_url = res["RTMP"]
else:
print("Error when start play {}-{}: {}".format(serial, code, res))
if isinstance(res, str):
rtmp_url = res
except (ValueError, Exception) as e:
print("Error when start play {}-{}: {}".format(serial, code, e))
return rtmp_url
def stop(self, serial=None, channel=None, code=None, check_outputs=False):
url = self.url + "/stop"
rtmp_url = None
try:
r = requests.get("{}?serial={}&code={}".format(url))
except (ValueError, Exception) as e:
print("Error when stop play {}-{}: {}".format(serial, code, e))
return rtmp_url
def list(self, start=0, limit=20, serial=None, code=None, sms=None):
url = self.url + "/list"
pass
def info(self, streamid=None, touch=False):
url = self.url + "/info"
pass
with open('xxxx.xxx', 'r', encoding='utf=8') as fp:
data = json.load(fp)
df = pd.DataFrame(data['ChannelList'])
df['rtmp_url'] = ''
df['Avaiable'] = False
livegbs_stream = liveGBSStream("http://xx.xx.xx.xx:xxxxx")
print(livegbs_stream.url)
for i in tqdm.trange(len(df)):
body = df.iloc[i].to_dict()
serial = body["DeviceID"]
code = body["ID"]
url = livegbs_stream.start(serial=serial, code=code)
df.iloc[i, -2] = url
if url is not None:
cap = cv2.VideoCapture(url)
_, img = cap.read()
try:
cv2.imwrite("{}-{}.jpg".format(serial, code), img)
df.iloc[i, -1] = True
except cv2.error:
print("Error when save screenshot to disk: {}-{}".format(serial, code))
else:
print("Error when get stream url: {}-{}".format(serial, code))
if (i != 0) and (i % 100 == 0):
df.to_excel("results.xlsx")
df.to_excel("results.xlsx")
|