目录
效果:
代码:
使用:
数据:
效果:
1. 点击左边的点,右边显示左边点对应时点该股票在行业中所处位置的散点图,并红星标出
2. 点击右边的点,左边显示右边点对应股票该指标的时间序列数据,并红星标出当前的时点
代码:
需要的包和字符串横坐标控件
import sys
import pandas as pd
from typing import Any,Dict
from PyQt5 import QtCore,QtWidgets,QtGui
from PyQt5.QtCore import Qt
import pyqtgraph as pg
import pyqtgraph.examples
pg.setConfigOption('background','w')
pg.setConfigOption('foreground','k')
class RotateAxisItem(pg.AxisItem):
def drawPicture(self, p, axisSpec, tickSpecs, textSpecs):
p.setRenderHint(p.Antialiasing,False)
p.setRenderHint(p.TextAntialiasing,True)
## draw long line along axis
pen,p1,p2 = axisSpec
p.setPen(pen)
p.drawLine(p1,p2)
p.translate(0.5,0) ## resolves some damn pixel ambiguity
## draw ticks
for pen,p1,p2 in tickSpecs:
p.setPen(pen)
p.drawLine(p1,p2)
## draw all text
# if self.tickFont is not None:
# p.setFont(self.tickFont)
p.setPen(self.pen())
for rect,flags,text in textSpecs:
# this is the important part
p.save()
p.translate(rect.x(),rect.y())
p.rotate(-90)
p.drawText(-rect.width(),rect.height()*2,rect.width(),rect.height(),flags,text)
# restoring the painter is *required*!!!
p.restore()
折线图控件和散点图控件
class PyQtGraphScatterWidget(QtWidgets.QWidget):
sinout_signal = QtCore.pyqtSignal(str)
def __init__(self):
super().__init__()
self.init_data()
self.init_ui()
def init_data(self):
# https://www.sioe.cn/yingyong/yanse-rgb-16/
self.current_point_color = (220,20,60) # 猩红
self.point_color = (0,0,255) # 纯蓝
self.color_star = (220, 20, 60)
pass
def init_ui(self):
self.title_label = QtWidgets.QLabel('散点图')
self.title_label.setAlignment(QtCore.Qt.AlignCenter)
xax = RotateAxisItem(orientation='bottom')
xax.setHeight(h=80)
self.pw = pg.PlotWidget(axisItems={'bottom': xax})
self.pw.setMouseEnabled(x=True, y=False)
self.pw.setAutoVisible(x=False, y=True)
layout = QtWidgets.QVBoxLayout()
layout.addWidget(self.title_label)
layout.addWidget(self.pw)
self.setLayout(layout)
pass
def set_data(self,data:Dict[str,Any]):
self.pw.clear()
if data is None:
return
title_str = data['title_str']
self.title_label.setText(title_str)
x = data['x']
self.y = data['y']
self.x_ticks = data['x_ticks']
self.cur_pos = data['cur_pos']
scatters = pg.ScatterPlotItem(
hoverable=True,
hoverPen=pg.mkPen('g'),
tip=None
)
spots3 = []
xax = self.pw.getAxis('bottom')
xax.setTicks([self.x_ticks])
for x0,y0 in zip(x,self.y):
spots3.append({
'pos':(x0,y0),
'size':5,
'pen':{'color':self.point_color,'width':2},
'brush':pg.mkBrush(color=self.point_color)
})
scatters.addPoints(spots3)
self.pw.addItem(scatters)
self.label = pg.TextItem()
self.pw.addItem(self.label,ignoreBounds=True)
self.cur_targetItem = pg.TargetItem(
pos=self.cur_pos,
movable=False,
size=12,
symbol='star',
pen=self.color_star,
brush=self.color_star
)
self.pw.addItem(self.cur_targetItem)
scatters.sigClicked.connect(self.scatter_clicked)
scatters.sigHovered.connect(self.scatter_hovered)
pass
def scatter_clicked(self,plot,points):
# 单击获取当前单击的点,发射事件
if len(points)<=0:
return
index_val = points[0].index()
if index_val == self.cur_pos[0]:
return
self.cur_targetItem.setPos(index_val,self.y[index_val])
res_str = self.x_ticks[index_val][1]
self.sinout_signal.emit(res_str)
pass
def scatter_hovered(self,plot,points):
if len(points)<=0:
return
cur_x = points[0].pos()[0]
cur_y = points[0].pos()[1]
index_val = points[0].index()
x_str = self.x_ticks[index_val][1]
y_val = self.y[index_val]
html_str = '<p style="color:black;font-size:12px;">'+x_str+' '+f"{y_val:,}"+'</p>'
self.label.setHtml(html_str)
self.label.setPos(cur_x,cur_y)
class PyQtGraphLineWidget(QtWidgets.QWidget):
sinout_signal = QtCore.pyqtSignal(str)
def __init__(self):
super().__init__()
self.init_data()
self.init_ui()
pass
def init_data(self):
self.color_line = (30, 144, 255)
self.color_scatter = (0, 0, 128)
self.color_star = (220, 20, 60)
self.color_hover = (0, 191, 255)
self.color_circle = (255, 0, 255)
pass
def init_ui(self):
self.title_label = QtWidgets.QLabel('指标时间序列')
self.title_label.setAlignment(Qt.AlignCenter)
xax = RotateAxisItem(orientation='bottom')
xax.setHeight(h=80)
self.pw = pg.PlotWidget(axisItems={'bottom': xax})
self.pw.setMouseEnabled(x=True, y=False)
self.pw.setAutoVisible(x=False, y=True)
layout = QtWidgets.QVBoxLayout()
layout.addWidget(self.title_label)
layout.addWidget(self.pw)
self.setLayout(layout)
pass
def set_data(self,data:Dict[str,Any]):
self.pw.clear()
if data is None:
return
title_str = data['title_str']
x = data['x']
y = data['y']
x_ticks = data['x_ticks']
cur_pos = data['cur_pos']
center_circle = data['center_circle']
cover_xy = data['cover_xy']
self.title_label.setText(title_str)
self.x_ticks = x_ticks
self.y = y
self.center_circle = center_circle
self.cover_xy = cover_xy
xax = self.pw.getAxis('bottom')
xax.setTicks([x_ticks])
self.label = pg.TextItem()
self.pw.addItem(self.label,ignoreBounds=True)
self.pw.plot(x,y,connect='finite',pen=pg.mkPen({'color':self.color_line,'width':2}),symbol='o',symbolSize=5)
self.cur_targetItem = pg.TargetItem(
pos=cur_pos,
movable=False,
size=12,
symbol='star',
pen=self.color_star,
brush=self.color_star
)
self.pw.addItem(self.cur_targetItem)
self.vb = self.pw.getViewBox()
self.proxy = pg.SignalProxy(self.pw.scene().sigMouseMoved,rateLimit=60, slot=self.mouseMoved)
self.proxy_clicked = pg.SignalProxy(self.pw.scene().sigMouseClicked, rateLimit=60, slot=self.mouseClicked)
self.pw.enableAutoRange()
pass
def mouseMoved(self,evt):
pos = evt[0]
if self.pw.sceneBoundingRect().contains(pos):
mousePoint = self.vb.mapSceneToView(pos)
cur_x = mousePoint.x()
cur_y = mousePoint.y()
for xy_i,item in enumerate(self.cover_xy):
cur_x_radius = item[0]
cur_y_radius = item[1]
if cur_x>=cur_x_radius[0] and cur_x<=cur_x_radius[1]:
if cur_y>=cur_y_radius[0] and cur_y<=cur_y_radius[1]:
cur_center_circle = self.center_circle[xy_i]
html_str = '<p style="color:black;font-size:12px;">' + str(self.x_ticks[xy_i][1]) + ' ' + f"{self.y[xy_i]:,}" + '</p>'
self.label.setHtml(html_str)
self.label.setPos(cur_center_circle[0],cur_center_circle[1])
pass
pass
pass
pass
def mouseClicked(self,evt):
pos = evt[0].pos()
cur_x = pos[0]
cur_y = pos[1]
for xy_i,item in enumerate(self.cover_xy):
cur_x_radius = item[0]
cur_y_radius = item[1]
if cur_x>=cur_x_radius[0] and cur_x<=cur_x_radius[1]:
if cur_y>=cur_y_radius[0] and cur_y<=cur_y_radius[1]:
cur_center_circle = self.center_circle[xy_i]
self.cur_targetItem.setPos(cur_center_circle[0],cur_center_circle[1])
res_str = self.x_ticks[xy_i][1]
self.sinout_signal.emit(res_str)
break
pass
pass
pass
?联动显示的控件
class ExampleWidget(QtWidgets.QWidget):
def __init__(self):
super().__init__()
self.init_data()
self.init_ui()
def init_data(self):
self.ori_df: pd.DataFrame = None
self.current_secID: str = ''
self.current_secShortName: str = ''
self.current_indicator_code: str = 'current_ratio'
self.current_endDate: str = None
pass
def init_ui(self):
self.title_label = QtWidgets.QLabel('流动比率_股票_计算机行业')
self.title_label.setAlignment(QtCore.Qt.AlignCenter)
self.title_label.setStyleSheet('QLabel{font-size:18px;font-weight:bold;}')
self.line_widget = PyQtGraphLineWidget()
self.line_widget.sinout_signal.connect(self.process_line_change)
self.scatter_widget = PyQtGraphScatterWidget()
self.scatter_widget.sinout_signal.connect(self.process_scatter_change)
layout_center = QtWidgets.QHBoxLayout()
layout_center.addWidget(self.line_widget)
layout_center.addWidget(self.scatter_widget)
layout = QtWidgets.QVBoxLayout()
layout.addWidget(self.title_label)
layout.addLayout(layout_center)
self.setLayout(layout)
pass
def init_start(self,df:pd.DataFrame):
self.ori_df = df
# 默认取第一条的股票
first_row = df.iloc[0]
self.current_secID = first_row['secID']
self.current_secShortName = first_row['secShortName']
line_map = self.caculate_line_data()
scatter_map = self.caculate_scatter_data()
self.line_widget.set_data(line_map)
self.scatter_widget.set_data(scatter_map)
self.fill_content()
pass
def fill_content(self):
self.title_label.setText(f"流动比率_{self.current_secShortName}_计算机行业")
def caculate_line_data(self):
df = self.ori_df.copy()
df_line = df[df['secID'] == self.current_secID].copy()
df_line['count'] = [i for i in range(len(df_line))]
if self.current_endDate is None:
self.current_endDate = df_line.iloc[-1]['endDate']
cur_x = df_line.iloc[-1]['count']
cur_y = df_line.iloc[-1][self.current_indicator_code]
endDate_list = df_line['endDate'].values.tolist()
x_ticks = [(i,item) for i,item in enumerate(endDate_list)]
# 计算圆半径
line_x_min = df_line['count'].min()
line_x_max = df_line['count'].max()
line_x_len = len(df_line)
line_x_radius = (line_x_max - line_x_min) / (line_x_len * 2)
line_y_min = df_line[self.current_indicator_code].min()
line_y_max = df_line[self.current_indicator_code].max()
line_y_radius = (line_y_max - line_y_min) / (line_x_len * 2)
line_center_circle = []
line_cover_xy = []
line_x = df_line['count'].values.tolist()
line_y = df_line[self.current_indicator_code].values.tolist()
for x, y in zip(line_x, line_y):
line_center_circle.append([x, y])
line_cover_xy.append([[x - line_x_radius, x + line_x_radius], [y - line_y_radius, y + line_y_radius]])
line_map = {
'title_str': self.current_secShortName,
'x':line_x,
'y':line_y,
'x_ticks':x_ticks,
'cur_pos':[cur_x,cur_y],
'center_circle':line_center_circle,
'cover_xy':line_cover_xy
}
return line_map
def caculate_scatter_data(self):
df = self.ori_df.copy()
df_scatter = df[df['endDate']==self.current_endDate].copy()
df_scatter['count'] = [i for i in range(len(df_scatter))]
cur_x = df_scatter[df_scatter['secID']==self.current_secID].iloc[0]['count']
cur_y = df_scatter[df_scatter['secID']==self.current_secID].iloc[0][self.current_indicator_code]
x_ticks = [(i,item) for i,item in enumerate(df_scatter['secShortName'].values.tolist())]
scatter_map = {
'title_str':self.current_endDate,
'x':df_scatter['count'].values.tolist(),
'y':df_scatter[self.current_indicator_code].values.tolist(),
'x_ticks':x_ticks,
'cur_pos':[cur_x,cur_y]
}
return scatter_map
def process_line_change(self,res_str:str):
# 返回 endDate
self.current_endDate = res_str
scatter_map = self.caculate_scatter_data()
self.scatter_widget.set_data(scatter_map)
pass
def process_scatter_change(self,res_str:str):
# 返回 secShortName
self.current_secShortName = res_str
self.current_secID = self.ori_df[self.ori_df['secShortName']==res_str].iloc[0]['secID']
line_map = self.caculate_line_data()
self.line_widget.set_data(line_map)
self.fill_content()
pass
def closeEvent(self, a0: QtGui.QCloseEvent) -> None:
self.close()
使用:
if __name__ == '__main__':
df = pd.read_csv('D:/temp003/example_df.csv',encoding='utf-8')
app = QtWidgets.QApplication(sys.argv)
t_win = ExampleWidget()
t_win.showMaximized()
t_win.init_start(df)
sys.exit(app.exec_())
?点击左侧的点,右侧散点图变动
?点击右侧的点,左侧折现图变动
数据:
链接:https://pan.baidu.com/s/1O6KkgP-v0S99Xu4U2PdJIQ? 提取码:6zm0
|