IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> Pythony应用(01)-学习监控(06) -> 正文阅读

[Python知识库]Pythony应用(01)-学习监控(06)

完整的代码


#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
文件名: capture_web.py
功 能:XXX类,该类主要涉及XXX功能
版权信息:版本所有(C) 2019-2022
修改记录:
    2022-04-30 11:37:15 创建
    2022-04-30 11:37:15 修改
环境:
    cd d:\temp
    python -m venv py3104venv
    Scripts\activate

    pip install flask==2.1.2
    pip install numpy==1.22.3
    pip install opencv-python==4.5.5.64
    pip install pillow==9.0.1
    pip install pyinstaller==5.0.1

    cd <path_to_capture_web.py>

打包:
    pyinstaller --noconsole --add-data logs;logs --add-data static;static  capture_web.py
功能:
    1. 能抓屏,并保存为图片
    2. 支持实时视频流
    3. 支持历史图片保存
    4. 支持当前最新图片查看
    5. 随机采样屏幕,并保存位图片(1分钟随机时间间隔采集N次)
    6. 支持web查看
    7. 支持迁移到其他电脑上:使用pyinstaller打包
    8. 支持多进程处理
    9. 支持日志记录
    10. 部署:支持自动启动: shell:startup
    11. pyinstaller打包的程序最小化,隐藏到托盘
    12. 做成window服务
    13. 安全:限制访问的客户端ip
    14. 使用说明:
        (1)自动启动 : 【Win】+【R】 输入:shell:startup ,
            然后在打开的目录中创建关于capture_web.exe的快捷方式
        (2)空间需要大约5GB(3天的图片)
        (3)第一次运行,网络应该许可
        (4)被监控端最好固定ip,采用ipconfig查看ip地址

    支持屏幕和摄像头查看

"""
import logging
from logging.handlers import RotatingFileHandler
import multiprocessing as mp
import os
import random
import signal
import sys
import time

import cv2
from flask import abort, Flask, jsonify, request, Response
from flask import render_template_string, send_file, url_for
import numpy as np
from PIL import ImageGrab


__all__ = ("MyLogger", "VideoCamera", "MonitorPageWebService",
           "Capture", "MainProcess")


def __dir__():
    return __all__


class MyLogger:
    log_path = os.path.join(os.path.dirname(__file__), "logs", "my.log")

    @classmethod
    def get(cls, log_name=__name__, level=logging.INFO,
            fmt_simple=True, logger=None):
        if not logger:
            logger = logging.getLogger(log_name)
            logger.setLevel(level=level)

        fmt_str = ("%(asctime)s.%(msecs)03d {}|%(name)s|"
                   "%(threadName)s:%(process)d-%(thread)d|"
                   "%(filename)s:%(lineno)s|"
                   "%(levelname)s|%(message)s").format(time.strftime("%z"))
        if fmt_simple:
            fmt_str = ("%(asctime)s.%(msecs)03d::%(name)s::"
                       "%(levelname)5s:: %(message)s")

        date_fmt = "%Y-%m-%d %H:%M:%S"
        log_format = logging.Formatter(fmt_str, datefmt=date_fmt)

        # 循环日志
        log_file_name = os.path.join(os.path.dirname(cls.log_path),
                                     f"{log_name}.log")
        handle = RotatingFileHandler(log_file_name, maxBytes=5 * 1024 * 1024,
                                     backupCount=3, encoding="utf-8")
        handle.setLevel(level)
        handle.setFormatter(log_format)
        # 设置日志名称
        handle.namer = cls.log_file_name

        # 给logger添加handler
        logger.addHandler(handle)

        # 控制台日志
        console = logging.StreamHandler()
        console.setLevel(level)
        console.setFormatter(log_format)
        logger.addHandler(console)

        return logger

    @staticmethod
    def log_file_name(default_filename):
        dir_name = os.path.dirname(default_filename)
        file_name = os.path.basename(default_filename)

        fields = file_name.split(".")
        if len(fields) < 3:
            return default_filename
        fields[-1], fields[-2] = fields[-2], fields[-1]
        file_name = ".".join(fields)
        file_name = os.path.join(dir_name, file_name)
        return file_name


class VideoCamera(object):
    def __init__(self):
        self.video = cv2.VideoCapture(0)

    def __del__(self):
        self.video.release()

    def get_frame(self):
        success, image = self.video.read()
        ret, jpeg = cv2.imencode('.jpg', image)
        return jpeg.tobytes()


class MonitorPageWebService:
    logger = None  # 在main_run中初始化
    save_capture_pic_path = os.path.join(
        os.path.dirname(__file__), "static", "capture_pic")

    def __init__(self):
        """"""

    @classmethod
    def get_capture_his_pic(cls):
        his_filename_list = os.listdir(cls.save_capture_pic_path)
        his_filename_list.sort(reverse=True)
        return his_filename_list

    @classmethod
    def get_html(cls, form_field):
        html = """
            <!DOCTYPE html>
            <html lang="en">
            <head>
                <meta charset="UTF-8">
                <title>学习屏幕监控器</title>
                <link rel="icon" href="{{ url_for('static', filename='favicon.ico') }}">
            </head>
            <body>
            <form id=main_from action="." method="POST">
                <table border = 0 cellpadding="20">
                    <tr>
                        <td></td>
                    {% for handle in handlers.values() %}
                        <td>
                            <input type="radio" name="operator" 
                            value="{{handle['value']}}" 
                            οnclick="main_from.submit();" 
                            {{handle['checked']}}>{{handle['text']}}
                        </td>
                    {% endfor %}
                    </tr>
                </table>
                <table border = 0>
                <tr>
                <td colspan="2">
                <div id="displayFileName" style="text-align:center">
                
                </div>
                </td>
                </tr>
                <tr>
                    <td>
                    <div style="height:600px; width:100%; overflow:auto">
                    <div>
                    {% for filename in his_filename_list %}
                        <a href="javascript:void(0);" οnclick="fa_click(this)"> 
                        {{ filename }} </a> <BR>
                    {% endfor %}
                    </div>
                    </div>
                    </td>
                    
                    <td>
                        <img id="displayPic" src="{{ cur_handler.imgSrc }}" 
                        width="100%" height="100%" alt="monitor capture" 
                        οnclick="img_display_pic_onclick(this)">
                    </td>
                    
                </tr>
                
                </table>
            </form>
                <script type="text/javascript">
                    function fa_click(elem){ //file_a_link_on_click
                        var fileName = elem.innerText;
                        var img = document.getElementById('displayPic');
                        img.src = './his_capture_pic/' + fileName + '/';
                        var div = document.getElementById('displayFileName');
                        div.innerHTML = '<H2>' + fileName + '</H2>';
                    }
                    function img_display_pic_onclick(elem){
                        var src = elem.src.split('?')[0];
                        src = src + '?' + (new Date().getTime());
                        elem.src = src;
                    }
                </script>            
            </body>
            </html>
        """

        handlers = {
            "realTimeScreen": {
                "value": "realTimeScreen", "text": "实时屏幕画面",
                "checked": "checked", "imgSrc": url_for('screen_video_feed')
            },
            "curPic": {"value": "curPic", "text": "当前屏幕图片",
                       "checked": "", "imgSrc": url_for('new_capture_pic')},
            "hisPic": {"value": "hisPic", "text": "历史屏幕图片",
                       "checked": "", "imgSrc": url_for('new_capture_pic')},
            "realTimeCamera": {
                "value": "realTimeCamera", "text": "实时摄像头画面",
                "checked": "", "imgSrc": url_for('camera_video_feed')},
        }

        cur_operator = ""
        if form_field.get("operator", None):
            for item in handlers.values():
                if item["value"] == form_field["operator"]:
                    cur_operator = form_field["operator"]
                    item["checked"] = "checked"
                else:
                    item["checked"] = ""

        his_filename_list = []
        if form_field["operator"] == "hisPic":
            cur_operator = "hisPic"
            his_filename_list = cls.get_capture_his_pic()
            handlers["hisPic"]["imgSrc"] = url_for('new_capture_pic')

        return render_template_string(html, form_field=form_field,
                                      handlers=handlers,
                                      cur_handler=handlers[cur_operator],
                                      his_filename_list=his_filename_list)

    @staticmethod
    def camera_gen(camera):
        while True:
            frame = camera.get_frame()
            yield (b'--frame\r\n'
                   b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')

    @classmethod
    def camera_video_feed(cls):
        """视频流的路线。将其放在img标记的src属性中。"""
        return Response(cls.camera_gen(VideoCamera()),
                        mimetype='multipart/x-mixed-replace; boundary=frame')

    @staticmethod
    def screen_gen():
        while True:
            pil_image = ImageGrab.grab()
            image = cv2.cvtColor(np.asarray(pil_image), cv2.COLOR_RGB2BGR)
            ret, jpeg = cv2.imencode('.jpg', image)
            frame = jpeg.tobytes()
            yield (b'--frame\r\n'
                   b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')

    @classmethod
    def screen_video_feed(cls):
        """视频流的路线。将其放在img标记的src属性中。"""
        return Response(cls.screen_gen(),
                        mimetype='multipart/x-mixed-replace; boundary=frame')

    @classmethod
    def new_capture_pic(cls):
        filenames = os.listdir(cls.save_capture_pic_path)
        filenames.sort(reverse=True)
        new_filename = os.path.join(cls.save_capture_pic_path, filenames[0])
        return send_file(new_filename, mimetype='image/png')

    @classmethod
    def his_capture_pic(cls, filename):
        if not filename:
            filename_list = os.listdir(cls.save_capture_pic_path)
            if filename_list:
                filename = filename_list[0]
            else:
                filename = os.path.join(cls.save_capture_pic_path,
                                        "..", "favicon.ico")
        new_filename = os.path.join(cls.save_capture_pic_path, filename)
        return send_file(new_filename, mimetype='image/png')

    @classmethod
    def main_page(cls):
        """主页面"""
        if request.method not in ('GET', 'POST'):
            return jsonify({'code': 500, 'msg': '不支持该请求'})

        form_field = {"operator": "realTimeScreen"}
        if request.method == 'POST':
            form_field = request.form
        return cls.get_html(form_field)

    @staticmethod
    def args_proc():
        allowed_remote_address = ""  # "ip1;ip2;ip3;"
        port = 22430

        if len(sys.argv) > 1:
            port = sys.argv[1]

        if len(sys.argv) > 2:
            allowed_remote_address = f";{sys.argv[2]};"

        return allowed_remote_address, port

    @classmethod
    def main_run(cls):
        app = Flask(__name__)
        cls.logger = MyLogger.get("WebService", level=logging.INFO,
                                  logger=app.logger)
        allowed_remote_address, port = cls.args_proc()

        # 主页面
        @app.route('/', methods=['GET', 'POST'])
        def main_page():
            return cls.main_page()

        @app.route('/screen_video_feed')
        def screen_video_feed():
            return cls.screen_video_feed()

        @app.route('/new_capture_pic')
        def new_capture_pic():
            return cls.new_capture_pic()

        @app.route('/his_capture_pic/<filename>/')
        def his_capture_pic(filename=""):
            return cls.his_capture_pic(filename)

        @app.route('/camera_video_feed')
        def camera_video_feed():
            return cls.camera_video_feed()

        @app.before_request
        def limit_remote_addr():
            if not allowed_remote_address:
                return

            if allowed_remote_address.find(request.remote_addr) < 0:
                abort(403)  # Forbidden

        app.run(host="0.0.0.0", port=port)


class Capture:
    sampling_times_minute = 1  # 每分钟采样次数
    save_pic_path = os.path.join(os.path.dirname(__file__),
                                 "static", "capture_pic")

    logger = MyLogger.get("Capture", level=logging.DEBUG)

    @classmethod
    def capture(cls):
        while True:
            secs = sorted(cls.random_nums())
            cur_time = time.localtime()
            cls.logger.info("采集日志的时刻(秒):%s", str(secs))
            while secs[-1] < cur_time.tm_sec:
                time.sleep(0.4)
                cur_time = time.localtime()

            for sec in secs:
                cur_time = time.localtime()
                if sec < cur_time.tm_sec:
                    continue
                while sec > cur_time.tm_sec:
                    time.sleep(0.4)
                    cur_time = time.localtime()

                filename = cls.get_filename()
                im = ImageGrab.grab()
                im.save(filename)
                cls.logger.info("生成文件:%s", filename)

    @classmethod
    def get_filename(cls):
        now_str = time.strftime("%Y%m%d_%H%M%S")
        filename = os.path.join(cls.save_pic_path, f"capture_{now_str}.png")
        return filename

    @classmethod
    def random_nums(cls):
        return random.sample(range(0, 60), cls.sampling_times_minute)

    @classmethod
    def del_his_file(cls, del_secs=3 * 24 * 3600):
        """"""
        for filename in os.listdir(cls.save_pic_path):
            filename = os.path.join(cls.save_pic_path, filename)
            if not os.path.isfile(filename):
                continue
            file_stat = os.stat(filename)
            if file_stat.st_ctime + del_secs < time.time():
                # os.remove(filename)
                cls.logger.info("删除文件:%s", filename)


class MainProcess:
    logger = MyLogger.get("MainProcess", level=logging.INFO)
    child_process = []

    @classmethod
    def break_signal_func(cls, signal_value, frame):
        cls.logger.info("信号:%s %s", str(signal_value), str(frame))
        for child in cls.child_process:
            child.terminate()
            # os.kill(child.pid, signal.SIGTERM)
            cls.logger.info("终止:%s", str(child))

        cls.logger.info("等待子进程退出")
        time.sleep(1)
        sys.exit(0)

    @classmethod
    def signal_proc(cls):
        signal.signal(signal.SIGINT, cls.break_signal_func)
        signal.signal(signal.SIGTERM, cls.break_signal_func)
        signal.signal(signal.SIGILL, cls.break_signal_func)

    @classmethod
    def main_run(cls):
        cls.logger.info("主进程启动,主进程pid:%d", os.getpid())

        # 启动抓屏进程
        capture_proc = mp.Process(target=cls.capture, name="抓屏进程")
        cls.child_process.append(capture_proc)
        capture_proc.start()
        cls.logger.info("已启动进程:%s", str(capture_proc))

        # 启动web service进程
        web_proc = mp.Process(target=cls.web, name="Web服务进程")
        cls.child_process.append(web_proc)
        web_proc.start()
        cls.logger.info("已启动进程:%s", str(web_proc))

        # 主进程
        cls.signal_proc()
        while True:
            cls.logger.info("主程序运行中")
            time.sleep(30)

    @classmethod
    def capture(cls):
        Capture.sampling_times_minute = 2
        Capture.del_his_file()
        Capture.capture()

    @classmethod
    def web(cls):
        MonitorPageWebService.main_run()


if __name__ == '__main__':
    mp.freeze_support()  # 如果不添加该语句,pyinstaller打包后会启动很多进程,造成报错
    MainProcess.main_run()

  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2022-05-05 11:14:27  更:2022-05-05 11:16:30 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/28 9:12:48-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计