说明
后端开发的时候,有时候会对接摄像头资源。有实时播放以及录制视频的功能需求。 此段代码就是对接萤石云视频,并录制视频,返给小程序前端可展示的MP4视频。作为记录。
准备
- 需要注册萤石云账户,配置好摄像头协议,我们使用的264编码格式
- 安装ffmpeg 工具
- 安装依赖包cv2
代码如下
import subprocess
import os
import time
import cv2
import requests
APP_KEY = "app_key"
APP_SECRET = "app_secret"
HEADERS = {'Host': 'open.ys7.com', 'Content-Type': 'application/x-www-form-urlencoded'}
def get_camera_token():
token_url = f'https://open.ys7.com/api/lapp/token/get?appKey={APP_KEY}&appSecret={APP_SECRET}'
response = requests.post(token_url, headers=HEADERS)
token = response.json()['data']['accessToken']
return token
def get_video_stream_url(accessToken, deviceSerial, protocol):
"""
:param accessToken: 访问token
:param deviceSerial: 设备序列号
:param protocol: 协议编号
:return:
"""
try:
url = f"https://open.ys7.com/api/lapp/v2/live/address/get?accessToken={accessToken}&deviceSerial={deviceSerial}&protocol={protocol}&supportH265=0"
response = requests.post(url, headers=HEADERS, verify=False)
msg = response.json()['msg']
if "Operation succeeded" not in msg:
return False,msg
video_url = response.json()['data']['url']
return True,video_url
except Exception as e:
print(e)
return False,""
def record_video(deviceSerial, save_path, fps=24, seconds=10):
"""
:param deviceSerial: 设备序列号
:param save_path: 录制视频保存路径
:param fps: 视频帧数
:param seconds: 视频录制时长
:return:
"""
file_dir = os.path.join(save_path, deviceSerial)
tmp_dir = os.path.join(save_path, 'tmp')
if not os.path.exists(file_dir):
os.makedirs(file_dir)
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
accessToken = get_camera_token()
status,stream_url = get_video_stream_url(accessToken, deviceSerial, "2")
if not status:
raise Exception("获取stream_url失败")
cap = cv2.VideoCapture(stream_url)
fourcc = cv2.VideoWriter_fourcc('M', 'P', '4', 'v')
size = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) / 3),
int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) / 3))
start_time = time.time()
title = f'{int(start_time)}'
video_tmp = os.path.join(tmp_dir, f'{title}.mp4')
video_result = os.path.join(file_dir, f'{title}.mp4')
video = cv2.VideoWriter(video_tmp, fourcc, fps, size)
while cap.isOpened():
is_success, frame = cap.read()
if is_success:
img = cv2.resize(frame, size, interpolation=cv2.INTER_LINEAR)
video.write(img)
if time.time() - start_time >= seconds:
break
video.release()
cap.release()
cv2.destroyAllWindows()
return video_tmp, video_result, title
def mp4_video_record(deviceSerial,save_path):
video_tmp, video_path, title = record_video(
deviceSerial=deviceSerial,
save_path=save_path
)
cmd = f'ffmpeg -i {video_tmp} -strict -2 {video_path}'
res = subprocess.Popen(
cmd, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
result = str(res.stderr.readlines()[-1], encoding='utf-8')
if 'No such file or directory' in result:
print("录制视频失败")
elif not os.path.exists(video_path):
print("录制视频失败")
else:
print("保存视频路径到数据库")
if __name__ == '__main__':
mp4_video_record("123","/root/")
|