完整的代码
"""
文件名: capture_web.py
功 能:XXX类,该类主要涉及XXX功能
版权信息:版本所有(C) 2019-2022
修改记录:
2022-04-30 11:37:15 创建
2022-04-30 11:37:15 修改
环境:
cd d:\temp
python -m venv py3104venv
Scripts\activate
pip install flask==2.1.2
pip install numpy==1.22.3
pip install opencv-python==4.5.5.64
pip install pillow==9.0.1
pip install pyinstaller==5.0.1
cd <path_to_capture_web.py>
打包:
pyinstaller --noconsole --add-data logs;logs --add-data static;static capture_web.py
功能:
1. 能抓屏,并保存为图片
2. 支持实时视频流
3. 支持历史图片保存
4. 支持当前最新图片查看
5. 随机采样屏幕,并保存位图片(1分钟随机时间间隔采集N次)
6. 支持web查看
7. 支持迁移到其他电脑上:使用pyinstaller打包
8. 支持多进程处理
9. 支持日志记录
10. 部署:支持自动启动: shell:startup
11. pyinstaller打包的程序最小化,隐藏到托盘
12. 做成window服务
13. 安全:限制访问的客户端ip
14. 使用说明:
(1)自动启动 : 【Win】+【R】 输入:shell:startup ,
然后在打开的目录中创建关于capture_web.exe的快捷方式
(2)空间需要大约5GB(3天的图片)
(3)第一次运行,网络应该许可
(4)被监控端最好固定ip,采用ipconfig查看ip地址
支持屏幕和摄像头查看
"""
import logging
from logging.handlers import RotatingFileHandler
import multiprocessing as mp
import os
import random
import signal
import sys
import time
import cv2
from flask import abort, Flask, jsonify, request, Response
from flask import render_template_string, send_file, url_for
import numpy as np
from PIL import ImageGrab
__all__ = ("MyLogger", "VideoCamera", "MonitorPageWebService",
"Capture", "MainProcess")
def __dir__():
return __all__
class MyLogger:
log_path = os.path.join(os.path.dirname(__file__), "logs", "my.log")
@classmethod
def get(cls, log_name=__name__, level=logging.INFO,
fmt_simple=True, logger=None):
if not logger:
logger = logging.getLogger(log_name)
logger.setLevel(level=level)
fmt_str = ("%(asctime)s.%(msecs)03d {}|%(name)s|"
"%(threadName)s:%(process)d-%(thread)d|"
"%(filename)s:%(lineno)s|"
"%(levelname)s|%(message)s").format(time.strftime("%z"))
if fmt_simple:
fmt_str = ("%(asctime)s.%(msecs)03d::%(name)s::"
"%(levelname)5s:: %(message)s")
date_fmt = "%Y-%m-%d %H:%M:%S"
log_format = logging.Formatter(fmt_str, datefmt=date_fmt)
log_file_name = os.path.join(os.path.dirname(cls.log_path),
f"{log_name}.log")
handle = RotatingFileHandler(log_file_name, maxBytes=5 * 1024 * 1024,
backupCount=3, encoding="utf-8")
handle.setLevel(level)
handle.setFormatter(log_format)
handle.namer = cls.log_file_name
logger.addHandler(handle)
console = logging.StreamHandler()
console.setLevel(level)
console.setFormatter(log_format)
logger.addHandler(console)
return logger
@staticmethod
def log_file_name(default_filename):
dir_name = os.path.dirname(default_filename)
file_name = os.path.basename(default_filename)
fields = file_name.split(".")
if len(fields) < 3:
return default_filename
fields[-1], fields[-2] = fields[-2], fields[-1]
file_name = ".".join(fields)
file_name = os.path.join(dir_name, file_name)
return file_name
class VideoCamera(object):
def __init__(self):
self.video = cv2.VideoCapture(0)
def __del__(self):
self.video.release()
def get_frame(self):
success, image = self.video.read()
ret, jpeg = cv2.imencode('.jpg', image)
return jpeg.tobytes()
class MonitorPageWebService:
logger = None
save_capture_pic_path = os.path.join(
os.path.dirname(__file__), "static", "capture_pic")
def __init__(self):
""""""
@classmethod
def get_capture_his_pic(cls):
his_filename_list = os.listdir(cls.save_capture_pic_path)
his_filename_list.sort(reverse=True)
return his_filename_list
@classmethod
def get_html(cls, form_field):
html = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>学习屏幕监控器</title>
<link rel="icon" href="{{ url_for('static', filename='favicon.ico') }}">
</head>
<body>
<form id=main_from action="." method="POST">
<table border = 0 cellpadding="20">
<tr>
<td></td>
{% for handle in handlers.values() %}
<td>
<input type="radio" name="operator"
value="{{handle['value']}}"
οnclick="main_from.submit();"
{{handle['checked']}}>{{handle['text']}}
</td>
{% endfor %}
</tr>
</table>
<table border = 0>
<tr>
<td colspan="2">
<div id="displayFileName" style="text-align:center">
</div>
</td>
</tr>
<tr>
<td>
<div style="height:600px; width:100%; overflow:auto">
<div>
{% for filename in his_filename_list %}
<a href="javascript:void(0);" οnclick="fa_click(this)">
{{ filename }} </a> <BR>
{% endfor %}
</div>
</div>
</td>
<td>
<img id="displayPic" src="{{ cur_handler.imgSrc }}"
width="100%" height="100%" alt="monitor capture"
οnclick="img_display_pic_onclick(this)">
</td>
</tr>
</table>
</form>
<script type="text/javascript">
function fa_click(elem){ //file_a_link_on_click
var fileName = elem.innerText;
var img = document.getElementById('displayPic');
img.src = './his_capture_pic/' + fileName + '/';
var div = document.getElementById('displayFileName');
div.innerHTML = '<H2>' + fileName + '</H2>';
}
function img_display_pic_onclick(elem){
var src = elem.src.split('?')[0];
src = src + '?' + (new Date().getTime());
elem.src = src;
}
</script>
</body>
</html>
"""
handlers = {
"realTimeScreen": {
"value": "realTimeScreen", "text": "实时屏幕画面",
"checked": "checked", "imgSrc": url_for('screen_video_feed')
},
"curPic": {"value": "curPic", "text": "当前屏幕图片",
"checked": "", "imgSrc": url_for('new_capture_pic')},
"hisPic": {"value": "hisPic", "text": "历史屏幕图片",
"checked": "", "imgSrc": url_for('new_capture_pic')},
"realTimeCamera": {
"value": "realTimeCamera", "text": "实时摄像头画面",
"checked": "", "imgSrc": url_for('camera_video_feed')},
}
cur_operator = ""
if form_field.get("operator", None):
for item in handlers.values():
if item["value"] == form_field["operator"]:
cur_operator = form_field["operator"]
item["checked"] = "checked"
else:
item["checked"] = ""
his_filename_list = []
if form_field["operator"] == "hisPic":
cur_operator = "hisPic"
his_filename_list = cls.get_capture_his_pic()
handlers["hisPic"]["imgSrc"] = url_for('new_capture_pic')
return render_template_string(html, form_field=form_field,
handlers=handlers,
cur_handler=handlers[cur_operator],
his_filename_list=his_filename_list)
@staticmethod
def camera_gen(camera):
while True:
frame = camera.get_frame()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')
@classmethod
def camera_video_feed(cls):
"""视频流的路线。将其放在img标记的src属性中。"""
return Response(cls.camera_gen(VideoCamera()),
mimetype='multipart/x-mixed-replace; boundary=frame')
@staticmethod
def screen_gen():
while True:
pil_image = ImageGrab.grab()
image = cv2.cvtColor(np.asarray(pil_image), cv2.COLOR_RGB2BGR)
ret, jpeg = cv2.imencode('.jpg', image)
frame = jpeg.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')
@classmethod
def screen_video_feed(cls):
"""视频流的路线。将其放在img标记的src属性中。"""
return Response(cls.screen_gen(),
mimetype='multipart/x-mixed-replace; boundary=frame')
@classmethod
def new_capture_pic(cls):
filenames = os.listdir(cls.save_capture_pic_path)
filenames.sort(reverse=True)
new_filename = os.path.join(cls.save_capture_pic_path, filenames[0])
return send_file(new_filename, mimetype='image/png')
@classmethod
def his_capture_pic(cls, filename):
if not filename:
filename_list = os.listdir(cls.save_capture_pic_path)
if filename_list:
filename = filename_list[0]
else:
filename = os.path.join(cls.save_capture_pic_path,
"..", "favicon.ico")
new_filename = os.path.join(cls.save_capture_pic_path, filename)
return send_file(new_filename, mimetype='image/png')
@classmethod
def main_page(cls):
"""主页面"""
if request.method not in ('GET', 'POST'):
return jsonify({'code': 500, 'msg': '不支持该请求'})
form_field = {"operator": "realTimeScreen"}
if request.method == 'POST':
form_field = request.form
return cls.get_html(form_field)
@staticmethod
def args_proc():
allowed_remote_address = ""
port = 22430
if len(sys.argv) > 1:
port = sys.argv[1]
if len(sys.argv) > 2:
allowed_remote_address = f";{sys.argv[2]};"
return allowed_remote_address, port
@classmethod
def main_run(cls):
app = Flask(__name__)
cls.logger = MyLogger.get("WebService", level=logging.INFO,
logger=app.logger)
allowed_remote_address, port = cls.args_proc()
@app.route('/', methods=['GET', 'POST'])
def main_page():
return cls.main_page()
@app.route('/screen_video_feed')
def screen_video_feed():
return cls.screen_video_feed()
@app.route('/new_capture_pic')
def new_capture_pic():
return cls.new_capture_pic()
@app.route('/his_capture_pic/<filename>/')
def his_capture_pic(filename=""):
return cls.his_capture_pic(filename)
@app.route('/camera_video_feed')
def camera_video_feed():
return cls.camera_video_feed()
@app.before_request
def limit_remote_addr():
if not allowed_remote_address:
return
if allowed_remote_address.find(request.remote_addr) < 0:
abort(403)
app.run(host="0.0.0.0", port=port)
class Capture:
sampling_times_minute = 1
save_pic_path = os.path.join(os.path.dirname(__file__),
"static", "capture_pic")
logger = MyLogger.get("Capture", level=logging.DEBUG)
@classmethod
def capture(cls):
while True:
secs = sorted(cls.random_nums())
cur_time = time.localtime()
cls.logger.info("采集日志的时刻(秒):%s", str(secs))
while secs[-1] < cur_time.tm_sec:
time.sleep(0.4)
cur_time = time.localtime()
for sec in secs:
cur_time = time.localtime()
if sec < cur_time.tm_sec:
continue
while sec > cur_time.tm_sec:
time.sleep(0.4)
cur_time = time.localtime()
filename = cls.get_filename()
im = ImageGrab.grab()
im.save(filename)
cls.logger.info("生成文件:%s", filename)
@classmethod
def get_filename(cls):
now_str = time.strftime("%Y%m%d_%H%M%S")
filename = os.path.join(cls.save_pic_path, f"capture_{now_str}.png")
return filename
@classmethod
def random_nums(cls):
return random.sample(range(0, 60), cls.sampling_times_minute)
@classmethod
def del_his_file(cls, del_secs=3 * 24 * 3600):
""""""
for filename in os.listdir(cls.save_pic_path):
filename = os.path.join(cls.save_pic_path, filename)
if not os.path.isfile(filename):
continue
file_stat = os.stat(filename)
if file_stat.st_ctime + del_secs < time.time():
cls.logger.info("删除文件:%s", filename)
class MainProcess:
logger = MyLogger.get("MainProcess", level=logging.INFO)
child_process = []
@classmethod
def break_signal_func(cls, signal_value, frame):
cls.logger.info("信号:%s %s", str(signal_value), str(frame))
for child in cls.child_process:
child.terminate()
cls.logger.info("终止:%s", str(child))
cls.logger.info("等待子进程退出")
time.sleep(1)
sys.exit(0)
@classmethod
def signal_proc(cls):
signal.signal(signal.SIGINT, cls.break_signal_func)
signal.signal(signal.SIGTERM, cls.break_signal_func)
signal.signal(signal.SIGILL, cls.break_signal_func)
@classmethod
def main_run(cls):
cls.logger.info("主进程启动,主进程pid:%d", os.getpid())
capture_proc = mp.Process(target=cls.capture, name="抓屏进程")
cls.child_process.append(capture_proc)
capture_proc.start()
cls.logger.info("已启动进程:%s", str(capture_proc))
web_proc = mp.Process(target=cls.web, name="Web服务进程")
cls.child_process.append(web_proc)
web_proc.start()
cls.logger.info("已启动进程:%s", str(web_proc))
cls.signal_proc()
while True:
cls.logger.info("主程序运行中")
time.sleep(30)
@classmethod
def capture(cls):
Capture.sampling_times_minute = 2
Capture.del_his_file()
Capture.capture()
@classmethod
def web(cls):
MonitorPageWebService.main_run()
if __name__ == '__main__':
mp.freeze_support()
MainProcess.main_run()
|