IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> Python后端框架(二):使用vue和tornado开发股票展示/分析系统 -> 正文阅读

[Python知识库]Python后端框架(二):使用vue和tornado开发股票展示/分析系统

目录

torndb适配python3

登录接口

数据查询接口

main入口


前端vue-element-admin已经上手,基于python的tornado框架开发股票分析后台拿到akshare的数据,定时任务写入mysql,前端表格展示,能结合vue,echarts,tornado,pandas,seaborn等前端:GitHub - yezonggang/vue-for-block: 股票分析系统股票分析系统. Contribute to yezonggang/vue-for-block development by creating an account on GitHub.https://github.com/yezonggang/vue-for-block后端:GitHub - yezonggang/stockleeks-new: 股票分析系统后台,基于python,tornado框架

效果图:

torndb适配python3

tornado的torndb模块没有适配python3,首先把它适配,修改torndb.py如下:

#!/usr/bin/env python
#
# Copyright 2009 Facebook
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""A lightweight wrapper around MySQLdb.

Originally part of the Tornado framework.  The tornado.database module
is slated for removal in Tornado 3.0, and it is now available separately
as torndb.
"""

from __future__ import absolute_import, division, with_statement

import copy
import itertools
import logging
import os
import time
import pymysql

try:
    import pymysql.connections
    import pymysql.converters
    import pymysql.cursors
except ImportError:
    # If MySQLdb isn't available this module won't actually be useable,
    # but we want it to at least be importable on readthedocs.org,
    # which has limitations on third-party modules.
    if 'READTHEDOCS' in os.environ:
        MySQLdb = None
    else:
        raise

version = "0.3"
version_info = (0, 3, 0, 0)

class Connection(object):
    """A lightweight wrapper around MySQLdb DB-API connections.

    The main value we provide is wrapping rows in a dict/object so that
    columns can be accessed by name. Typical usage::

        db = torndb.Connection("localhost", "mydatabase")
        for article in db.query("SELECT * FROM articles"):
            print article.title

    Cursors are hidden by the implementation, but other than that, the methods
    are very similar to the DB-API.

    We explicitly set the timezone to UTC and assume the character encoding to
    UTF-8 (can be changed) on all connections to avoid time zone and encoding errors.

    The sql_mode parameter is set by default to "traditional", which "gives an error instead of a warning"
    (http://dev.mysql.com/doc/refman/5.0/en/server-sql-mode.html). However, it can be set to
    any other mode including blank (None) thereby explicitly clearing the SQL mode.
    """
    def __init__(self, host, database, user=None, password=None,
                 max_idle_time=7 * 3600, connect_timeout=10, 
                 time_zone="+0:00", charset = "utf8", sql_mode="TRADITIONAL"):
        self.host = host
        self.database = database
        self.max_idle_time = float(max_idle_time)

        args = dict(conv=CONVERSIONS, use_unicode=True, charset=charset,
                    db=database, init_command=('SET time_zone = "%s"' % time_zone),
                    connect_timeout=connect_timeout, sql_mode=sql_mode)
        if user is not None:
            args["user"] = user
        if password is not None:
            args["passwd"] = password

        # We accept a path to a MySQL socket file or a host(:port) string
        if "/" in host:
            args["unix_socket"] = host
        else:
            self.socket = None
            pair = host.split(":")
            if len(pair) == 2:
                args["host"] = pair[0]
                args["port"] = int(pair[1])
            else:
                args["host"] = host
                args["port"] = 3306

        self._db = None
        self._db_args = args
        self._last_use_time = time.time()
        try:
            self.reconnect()
        except Exception:
            logging.error("Cannot connect to MySQL on %s", self.host,
                          exc_info=True)

    def __del__(self):
        self.close()

    def close(self):
        """Closes this database connection."""
        if getattr(self, "_db", None) is not None:
            self._db.close()
            self._db = None

    def reconnect(self):
        """Closes the existing database connection and re-opens it."""
        self.close()
        # self._db = MySQLdb.connect(**self._db_args)
        self._db = pymysql.connect(**self._db_args)
        self._db.autocommit(True)

    def iter(self, query, *parameters, **kwparameters):
        """Returns an iterator for the given query and parameters."""
        self._ensure_connected()
        cursor = MySQLdb.cursors.SSCursor(self._db)
        try:
            self._execute(cursor, query, parameters, kwparameters)
            column_names = [d[0] for d in cursor.description]
            for row in cursor:
                yield Row(zip(column_names, row))
        finally:
            cursor.close()

    def query(self, query, *parameters, **kwparameters):
        """Returns a row list for the given query and parameters."""
        cursor = self._cursor()
        try:
            self._execute(cursor, query, parameters, kwparameters)
            column_names = [d[0] for d in cursor.description]
            return [Row(itertools.zip_longest(column_names, row)) for row in cursor]
        finally:
            cursor.close()

    def get(self, query, *parameters, **kwparameters):
        """Returns the (singular) row returned by the given query.

        If the query has no results, returns None.  If it has
        more than one result, raises an exception.
        """
        rows = self.query(query, *parameters, **kwparameters)
        if not rows:
            return None
        elif len(rows) > 1:
            raise Exception("Multiple rows returned for Database.get() query")
        else:
            return rows[0]

    # rowcount is a more reasonable default return value than lastrowid,
    # but for historical compatibility execute() must return lastrowid.
    def execute(self, query, *parameters, **kwparameters):
        """Executes the given query, returning the lastrowid from the query."""
        return self.execute_lastrowid(query, *parameters, **kwparameters)

    def execute_lastrowid(self, query, *parameters, **kwparameters):
        """Executes the given query, returning the lastrowid from the query."""
        cursor = self._cursor()
        try:
            self._execute(cursor, query, parameters, kwparameters)
            return cursor.lastrowid
        finally:
            cursor.close()

    def execute_rowcount(self, query, *parameters, **kwparameters):
        """Executes the given query, returning the rowcount from the query."""
        cursor = self._cursor()
        try:
            self._execute(cursor, query, parameters, kwparameters)
            return cursor.rowcount
        finally:
            cursor.close()

    def executemany(self, query, parameters):
        """Executes the given query against all the given param sequences.

        We return the lastrowid from the query.
        """
        return self.executemany_lastrowid(query, parameters)

    def executemany_lastrowid(self, query, parameters):
        """Executes the given query against all the given param sequences.

        We return the lastrowid from the query.
        """
        cursor = self._cursor()
        try:
            cursor.executemany(query, parameters)
            return cursor.lastrowid
        finally:
            cursor.close()

    def executemany_rowcount(self, query, parameters):
        """Executes the given query against all the given param sequences.

        We return the rowcount from the query.
        """
        cursor = self._cursor()
        try:
            cursor.executemany(query, parameters)
            return cursor.rowcount
        finally:
            cursor.close()

    update = execute_rowcount
    updatemany = executemany_rowcount

    insert = execute_lastrowid
    insertmany = executemany_lastrowid

    def _ensure_connected(self):
        # Mysql by default closes client connections that are idle for
        # 8 hours, but the client library does not report this fact until
        # you try to perform a query and it fails.  Protect against this
        # case by preemptively closing and reopening the connection
        # if it has been idle for too long (7 hours by default).
        if (self._db is None or
            (time.time() - self._last_use_time > self.max_idle_time)):
            self.reconnect()
        self._last_use_time = time.time()

    def _cursor(self):
        self._ensure_connected()
        return self._db.cursor()

    def _execute(self, cursor, query, parameters, kwparameters):
        try:
            return cursor.execute(query, kwparameters or parameters)
        except OperationalError:
            logging.error("Error connecting to MySQL on %s", self.host)
            self.close()
            raise


class Row(dict):
    """A dict that allows for object-like property access syntax."""
    def __getattr__(self, name):
        try:
            return self[name]
        except KeyError:
            raise AttributeError(name)

if pymysql is not None:
    # Fix the access conversions to properly recognize unicode/binary
    # FIELD_TYPE = MySQLdb.constants.FIELD_TYPE
    # FLAG = MySQLdb.constants.FLAG
    # CONVERSIONS = copy.copy(MySQLdb.converters.conversions)
    FIELD_TYPE = pymysql.connections.FIELD_TYPE
    # FLAG = pymysql.connections.FLAG
    FLAG = True
    CONVERSIONS = copy.copy(pymysql.converters.conversions)

    field_types = [FIELD_TYPE.BLOB, FIELD_TYPE.STRING, FIELD_TYPE.VAR_STRING]
    if 'VARCHAR' in vars(FIELD_TYPE):
        field_types.append(FIELD_TYPE.VARCHAR)

    for field_type in field_types:
        # CONVERSIONS[field_type] = [(FLAG.BINARY, str)] + CONVERSIONS[field_type]
        CONVERSIONS[field_type] = [(FLAG, str)].append(CONVERSIONS[field_type])

    # Alias some common MySQL exceptions
    # IntegrityError = MySQLdb.IntegrityError
    # OperationalError = MySQLdb.OperationalError
    IntegrityError = pymysql.IntegrityError
    OperationalError = pymysql.OperationalError

登录接口

直接拷贝mockjs中定义的user和token数据;

#!/usr/local/bin/python3
# -*- coding: utf-8 -*-

import json
from lib2to3.pgen2 import token
from tornado import gen
import logging
import datetime
import sys
from os.path import dirname,abspath


project_path = dirname(dirname(abspath(__file__)))
#__file__用于获取文件的路径,abspath(__file__)获得绝对路径;
#dirname()用于获取上级目录,两个dirname()相当于获取了当前文件的上级的上级即示例中project2
sys.path.append(project_path)
import libs.common as common
import web.base as webBase

tokens = {"admin": {"token": 'admin-token'},"editor": {"token": 'editor-token'}}
users = {'admin-token': {"roles": ['admin'],  "introduction": 'I am a super administrator',  "avatar": 'https://wpimg.wallstcn.com/f778738c-e4f8-4870-b634-56703b4acafe.gif',  "name": 'Super Admin'},'editor-token': {  "roles": ['editor'],  "introduction": 'I am an editor',  "avatar": 'https://wpimg.wallstcn.com/f778738c-e4f8-4870-b634-56703b4acafe.gif',  "name": 'Normal Editor'}}

# 登录接口,需要form表单的body
class LoginHandler(webBase.BaseHandler):
    @gen.coroutine
    def post(self):
        data = json.loads(self.request.body)
        logging.info(data)
        obj={
            "code": 20000,
            "data": tokens[data["username"]]
        }
        logging.info(obj)
        self.write(json.dumps(obj))
        

# 查询用户权限接口,根据params里面的token看用户信息
class LoginInfoHandler(webBase.BaseHandler):
    @gen.coroutine
    def get(self):
        token = self.get_argument("token", default=0, strip=False)
        logging.info("get data####################")
        obj={
            "code": 20000,
            "data": users[token]
        }
        logging.info(obj)
        self.write(json.dumps(obj))
        
        
# 登出接口,虽然是post方法但是无参
class LogoutHandler(webBase.BaseHandler):
    @gen.coroutine
    def post(self):
        logging.info("get data####################")
        obj={
            "code": 20000,
            "data": "success"
        }
        logging.info(obj)
        self.write(json.dumps(obj))

数据查询接口

从akshare拿数据写库后,这个接口直接查库;

#!/usr/local/bin/python3
# -*- coding: utf-8 -*-

import json
from tkinter.messagebox import NO
from tornado import gen
import logging
import datetime
import sys
from os.path import dirname,abspath


project_path = dirname(dirname(abspath(__file__)))
#__file__用于获取文件的路径,abspath(__file__)获得绝对路径;
#dirname()用于获取上级目录,两个dirname()相当于获取了当前文件的上级的上级即示例中project2
sys.path.append(project_path)
import libs.common as common
import web.base as webBase

# 和在dic中的字符串一致。字符串前面都不特别声明是u""
eastmoney_name = "查看股票"


# 获得股票数据内容。
class GetStockDataHandler(webBase.BaseHandler):
    def get(self):
        print("get start")

        # 获得分页参数。
        page_param = self.get_argument("page", default=1, strip=False)
        limit_param = self.get_argument("limit", default=10, strip=False)
        page_param_int=int(page_param)
        limit_param_int=int(limit_param)
        limit_param_sql=' limit %s,%s '%((page_param_int-1)*limit_param_int,page_param_int*limit_param_int)
        print("page param:", page_param, limit_param)

        name_param = self.get_argument("name", default=None, strip=False)
        code_param = self.get_argument("code", default=None, strip=False)
        orderby_param = self.get_argument("orderby", default=None, strip=False)

        print("get stock data")
        print("name param:", name_param)

        # 查询数据库。
        order_by_sql = " order by "+orderby_param[1:]+(" desc " if orderby_param.startswith('+') else " asc ");
        where_sql=" "
        if (name_param!=None or code_param!=None):
            if(name_param!=None and code_param==None):
                where_sql="where name= %s" %(name_param)
            if(name_param==None and code_param!=None):
                where_sql="where code= %s" %(code_param)
            if (name_param!=None and code_param!=None):
                where_sql="where code= %s and name= %s" %(code_param,name_param)
        if((name_param==None or name_param=='') and (code_param==None or code_param=='')):
            where_sql=" "
        search_sql ="select date,code,name,latest_price,quote_change,ups_downs,volume,turnover,amplitude,high,low,open,closed,quantity_ratio,turnover_rate,pe_dynamic,pb from stock_data_dev.guess_indicators_daily "

        sql = search_sql+where_sql+order_by_sql+limit_param_sql;

        logging.info("select sql : " + sql)
        stock_web_list = self.db.query(sql)
        

        for tmp_obj in (stock_web_list):
            try:
                code_tmp = tmp_obj["code"]
                # 判断上海还是 深圳,东方财富 接口要求。
                if code_tmp.startswith("6"):
                    code_tmp = "SH" + code_tmp
                else:
                    code_tmp = "SZ" + code_tmp
                dongcai_URL='http://quote.eastmoney.com/%s.html'%(code_tmp)
                zhibiao_URL='/data/indicators?code='+code_tmp
                dongyan_URL='https://emweb.eastmoney.com/PC_HSF10/ShareholderResearch/Index?type=soft&code=%s'%(code_tmp)                
                tmp_obj["dongcai_URL"] = dongcai_URL
                tmp_obj["zhibiao_URL"] = zhibiao_URL
                tmp_obj["dongyan_URL"] = dongyan_URL
            except Exception as e:
                print("error :", e)
                        
        stock_web_list_all_sql="select count(*) from stock_data_dev.guess_indicators_daily;"
        stock_web_list_all= self.db.query(stock_web_list_all_sql)[0]['count(*)']
        logging.info("select stock_web_list_all : " + str(stock_web_list_all))
        obj = {
        "code":20000,
        "data": {"draw": 0,"items": stock_web_list,"recordsTotal":stock_web_list_all}
        }

        logging.info("get data####################")
        logging.info(obj)
        self.write(json.dumps(obj))

main入口

#!/usr/local/bin/python3
# -*- coding: utf-8 -*-
import logging
import os.path
from logging.handlers import TimedRotatingFileHandler
import torndb
import tornado.escape
from tornado import gen
import tornado.httpserver
import tornado.ioloop
import tornado.options
from tornado.log import access_log
import dataTableHandler as dataTableHandler
import loginHandler as loginHandler
import base as webBase
import pandas as pd
import numpy as np
import akshare as ak
import bokeh as bh
from tornado.options import define, options
import sys
from os.path import dirname,abspath

from web.loginHandler import LoginHandler
project_path = dirname(dirname(abspath(__file__)))
#__file__用于获取文件的路径,abspath(__file__)获得绝对路径;
#dirname()用于获取上级目录,两个dirname()相当于获取了当前文件的上级的上级即示例中project2
sys.path.append(project_path)
import libs.common as common



define('port', default=8888, help='run on the given port', type=int)
define('mode', default="info", help='run on info', type=str)


class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            # 设置路由
            # 使用datatable 展示报表数据模块。
            (r"/stock/api_data", dataTableHandler.GetStockDataHandler),
            (r"/stock/login", loginHandler.LoginHandler),
            (r"/stock/login_info", loginHandler.LoginInfoHandler),
            (r"/stock/logout", loginHandler.LogoutHandler),
            # 数据修改dataEditor。
            # 获得股票指标数据。
        ]

        # 配置
        settings = dict(
            template_path=os.path.join(os.path.dirname(__file__), "templates"),
            static_path=os.path.join(os.path.dirname(__file__), "static"),
            xsrf_cookies=False,  # True,
            # cookie加密
            cookie_secret="027bb1b670eddf0392cdda8709268a17b58b7",
            debug=True,
            compress_response=True,
        )
        settings["log_function"] = log_func
        super(Application, self).__init__(handlers, **settings)

        # Have one global connection to the blog DB across all handlers
        self.db = torndb.Connection(
            charset="utf8", max_idle_time=3600, connect_timeout=1000,
            host=common.MYSQL_HOST, database=common.MYSQL_DB,
            user=common.MYSQL_USER, password=common.MYSQL_PWD)


# 日志格式化
class LogFormatter(tornado.log.LogFormatter):

    def __init__(self):
        super(LogFormatter, self).__init__(
            fmt='%(color)s[%(asctime)s %(filename)s:%(funcName)s:%(lineno)d %(levelname)s]%(end_color)s %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )


def log_func(handler):
    if handler.get_status() < 400:
        log_method = access_log.info
    elif handler.get_status() < 500:
        log_method = access_log.warning
    else:
        log_method = access_log.error
    request_time = 1000.0 * handler.request.request_time()
    log_method("%d %s %s (%s) %s %s %.2fms",
               handler.get_status(), handler.request.method,
               handler.request.uri, handler.request.remote_ip,
               handler.request.headers["User-Agent"],
               handler.request.arguments,
               request_time)


def init_logging(log_file):
    # 使用TimedRotatingFileHandler处理器
    file_handler = TimedRotatingFileHandler(log_file, when="d", interval=1, backupCount=30)
    # 输出格式
    log_formatter = logging.Formatter(
        "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] [%(lineno)d]  %(message)s"
    )
    file_handler.setFormatter(log_formatter)
    # 将处理器附加到根logger
    root_logger = logging.getLogger()
    root_logger.addHandler(file_handler)


# 首页handler。
class HomeHandler(webBase.BaseHandler):
    @gen.coroutine
    def get(self):
        print("################## index.html ##################")
        pandasVersion = pd.__version__
        numpyVersion = np.__version__
        akshareVersion = ak.__version__
        bokehVersion = bh.__version__
        # talibVersion = talib.__version__
        # jupyterVersion = jupyter.__version__
        # stockstatsVersion = ss.__version__ # 没有这个函数,但是好久不更新了
        # https://github.com/jealous/stockstats
        self.render("index.html", pandasVersion=pandasVersion,
                    numpyVersion=numpyVersion,
                    akshareVersion=akshareVersion,
                    bokehVersion=bokehVersion,
                    stockstatsVersion="0.3.2",
                    pythonStockVersion=common.__version__,
                    leftMenu=webBase.GetLeftMenu(self.request.uri))


class TestHandler(webBase.BaseHandler):
    @gen.coroutine
    def get(self):
        self.render("test_akshare.html", entries="hello",
                    pythonStockVersion=common.__version__,
                    leftMenu=webBase.GetLeftMenu(self.request.uri))


class Test2Handler(webBase.BaseHandler):
    @gen.coroutine
    def get(self):
        name = self.get_argument("table_name", default=None, strip=False)
        print("table name:", name)
        stockWeb = stock_web_dic.STOCK_WEB_DATA_MAP[name]

        self.render("stock_livemore_guess.html", entries="hello",
                    stockWeb=stockWeb,
                    pythonStockVersion=common.__version__,
                    leftMenu=webBase.GetLeftMenu(self.request.uri))


def main():
    tornado.options.parse_command_line()
    # [i.setFormatter(LogFormatter()) for i in logging.getLogger().handlers]
    http_server = tornado.httpserver.HTTPServer(Application())
    port = 9904
    http_server.listen(port)
    # tornado.options.options.logging = "debug"
    log_path = '/data/logs'
    init_logging("%s/web.%s.%s.log" % (log_path, tornado.options.options.mode, tornado.options.options.port))
    # 日志保存到文件
    # tornado.options.define("log_file_prefix", default="/data/logs/web.log")
    # 日志文件按时间日期分割
    # tornado.options.define("log_rotate_mode", default='time')  # 轮询模式: time or size
    # tornado.options.define("log_rotate_when", default='S')  # 单位: S / M / H / D / W0 - W6
    # tornado.options.define("log_rotate_interval", default=60)  # 间隔: 60秒
    tornado.options.parse_command_line()

    tornado.ioloop.IOLoop.current().start()


if __name__ == "__main__":
    main()

  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2022-05-27 17:17:41  更:2022-05-27 17:17:43 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/15 13:42:30-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码