
stock_A_simple_monitor.md
这是结合 akshare 制作的一个股市盯盘轻量化小工具 话不多说,源代码如下
这是结合 akshare 制作的一个股市盯盘轻量化小工具
话不多说,源代码如下
import os
import sys
import time
import signal
import threading
import requests
import pandas as pd
import akshare as ak
from datetime import datetime, timedelta
from rich.live import Live
from rich.table import Table
from rich.layout import Layout
from rich.panel import Panel
from rich.console import Console
from rich.align import Align
from rich import box
console = Console()
# 全局状态控制(用于键盘输入和动态切换)
input_buffer = ""
target_symbol = ""
app_running = True
monitor_running = False
# 0. 跨平台后台按键监听器
def _unix_key_listener():
global input_buffer, target_symbol, monitor_running, app_running
import tty
import termios
import select
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
# 开启 cbreak 模式,使得不用按回车也能读取按键
tty.setcbreak(fd)
while monitor_running and app_running:
dr, dw, de = select.select([sys.stdin], [], [], 0.1)
if dr:
char = sys.stdin.read(1)
if char in ('\x03', '\x04'): # Ctrl+C 或 Ctrl+D
monitor_running = False
app_running = False
os.kill(os.getpid(), signal.SIGINT) # 抛给主线程去捕获
break
elif char.lower() == 'q': # 按 q 退出当前盯盘
monitor_running = False
break
elif char.isdigit():
input_buffer += char
if len(input_buffer) == 6:
target_symbol = input_buffer
input_buffer = ""
elif char in ('\x7f', '\b'): # 退格键
input_buffer = input_buffer[:-1]
elif char in ('\r', '\n', '\x1b'): # 回车键 或 Esc 清空重输
input_buffer = ""
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
def _win_key_listener():
global input_buffer, target_symbol, monitor_running, app_running
import msvcrt
while monitor_running and app_running:
if msvcrt.kbhit():
char_bytes = msvcrt.getch()
if char_bytes == b'\x03': # Ctrl+C
monitor_running = False
app_running = False
os.kill(os.getpid(), signal.SIGINT)
break
try:
char = char_bytes.decode('utf-8')
if char.lower() == 'q': # 按 q 退出当前盯盘
monitor_running = False
break
elif char.isdigit():
input_buffer += char
if len(input_buffer) == 6:
target_symbol = input_buffer
input_buffer = ""
elif char in ('\x08', '\x7f', '\b'):
input_buffer = input_buffer[:-1]
elif char in ('\r', '\n', '\x1b'):
input_buffer = ""
except:
pass
time.sleep(0.05)
def start_key_listener():
if sys.platform == 'win32':
t = threading.Thread(target=_win_key_listener, daemon=True)
else:
t = threading.Thread(target=_unix_key_listener, daemon=True)
t.start()
return t
# 工具函数
def safe_float(val):
try:
v = str(val).strip()
return float(v) if v not in ("", "-") else 0.0
except:
return 0.0
def get_prefix(symbol: str) -> str:
if symbol.startswith("6"): return "sh"
if symbol.startswith(("0", "3")): return "sz"
if symbol.startswith(("4", "8")): return "bj"
return "sh"
def calc_indicators(series: pd.Series) -> dict:
"""给任意周期的收盘价序列计算 MA5/10/20 + MACD"""
s = series.dropna().reset_index(drop=True)
n = len(s)
if n < 2:
return {"MA5": 0, "MA10": 0, "MA20": 0, "DIF": 0, "DEA": 0, "MACD": 0}
ma5 = s.rolling(5, min_periods=1).mean().iloc[-1]
ma10 = s.rolling(10, min_periods=1).mean().iloc[-1]
ma20 = s.rolling(20, min_periods=1).mean().iloc[-1]
exp1 = s.ewm(span=12, adjust=False).mean()
exp2 = s.ewm(span=26, adjust=False).mean()
dif = exp1 - exp2
dea = dif.ewm(span=9, adjust=False).mean()
macd = (dif - dea) * 2
return {
"MA5": ma5, "MA10": ma10, "MA20": ma20,
"DIF": dif.iloc[-1], "DEA": dea.iloc[-1], "MACD": macd.iloc[-1],
}
# ──────────────────────────────────────────
# 1. 多周期历史数据加载
# ──────────────────────────────────────────
def _load_min(symbol, period_str, n_days, label):
end = datetime.now()
start = end - timedelta(days=n_days)
try:
df = ak.stock_zh_a_hist_min_em(
symbol=symbol,
period=period_str,
start_date=start.strftime("%Y-%m-%d %H:%M:%S"),
end_date=end.strftime("%Y-%m-%d %H:%M:%S"),
adjust="qfq",
)
col_close = "收盘" if "收盘" in df.columns else df.columns[4]
df = df[[col_close]].rename(columns={col_close: "close"})
df["close"] = pd.to_numeric(df["close"], errors="coerce")
df = df.dropna().reset_index(drop=True)
console.print(f" [green]✓ {label}({period_str}min) 载入成功[/green]")
return df
except Exception as e:
console.print(f" [red]✗ {label} 失败: {e}[/red]")
return pd.DataFrame(columns=["close"])
def _load_kline(symbol, period, n_bars, label):
try:
df = ak.stock_zh_a_hist(symbol=symbol, period=period, adjust="qfq").tail(n_bars)
col_close = "收盘" if "收盘" in df.columns else df.columns[4]
df = df[[col_close]].rename(columns={col_close: "close"})
df["close"] = pd.to_numeric(df["close"], errors="coerce")
df = df.dropna().reset_index(drop=True)
console.print(f" [green]✓ {label} 载入成功[/green]")
return df
except Exception as e:
console.print(f" [red]✗ {label} 失败: {e}[/red]")
return pd.DataFrame(columns=["close"])
def init_history_data(symbol) -> dict:
console.print(f"\n[bold cyan]正在加载 [{symbol}] 历史复权数据...[/bold cyan]")
hist = {
"分时": _load_min(symbol, "1", 10, "分时"),
"五日": _load_min(symbol, "5", 30, "五日"),
"日线": _load_kline(symbol, "daily", 250, "日线"),
"周线": _load_kline(symbol, "weekly", 120, "周线"),
"月线": _load_kline(symbol, "monthly", 60, "月线"),
}
console.print("[bold green] 数据装载完毕,启动监控 [/bold green]\n")
return hist
# 2. 实时计算所有周期指标
def calc_all_indicators(hist: dict, price: float) -> dict:
result = {}
for key, df in hist.items():
if df.empty or price == 0:
result[key] = {"MA5": 0, "MA10": 0, "MA20": 0, "DIF": 0, "DEA": 0, "MACD": 0}
continue
series = df["close"].copy()
series.iloc[-1] = price
result[key] = calc_indicators(series)
return result
# 3. 行情抓取
def _parse_tencent(symbol: str):
prefix = get_prefix(symbol)
url = f"http://qt.gtimg.cn/q={prefix}{symbol}"
headers = {"User-Agent": "Mozilla/5.0", "Referer": "https://gu.qq.com/"}
r = requests.get(url, headers=headers, timeout=3)
r.encoding = "gbk"
raw = r.text.strip()
if not raw or "~" not in raw: return "解析失败"
data = (raw.split('"')[1] if '"' in raw else raw).split("~")
def get(i): return data[i] if i < len(data) else ""
price = safe_float(get(3))
pb = safe_float(get(46))
net_asset = price / pb if pb > 0 else 0.0
circ_mv = safe_float(get(44))
total_mv = safe_float(get(45))
circ_shares = circ_mv / price if price > 0 else 0.0
total_shares = total_mv / price if price > 0 else 0.0
buy_vols = sum(safe_float(get(i)) for i in [10, 12, 14, 16, 18])
sell_vols = sum(safe_float(get(i)) for i in [20, 22, 24, 26, 28])
weibi = ((buy_vols - sell_vols) / (buy_vols + sell_vols) * 100) if (buy_vols + sell_vols) > 0 else 0.0
return {
"name": get(1),
"price": price,
"prev_close": safe_float(get(4)),
"open": safe_float(get(5)),
"volume": safe_float(get(6)),
"turnover_amt": safe_float(get(37)),
"outer": safe_float(get(7)),
"inner": safe_float(get(8)),
"turnover_rate": safe_float(get(38)),
"pe_dynamic": safe_float(get(52)),
"pe_static": safe_float(get(53)),
"pb_ratio": pb,
"net_asset": net_asset,
"amplitude": safe_float(get(43)),
"limit_up": safe_float(get(47)),
"limit_down": safe_float(get(48)),
"vol_ratio": safe_float(get(49)),
"weibi": weibi,
"circ_mv": circ_mv,
"total_mv": total_mv,
"circ_shares": circ_shares,
"total_shares": total_shares,
"sell_price_1": get(19), "sell_vol_1": get(20),
"sell_price_2": get(21), "sell_vol_2": get(22),
"sell_price_3": get(23), "sell_vol_3": get(24),
"sell_price_4": get(25), "sell_vol_4": get(26),
"sell_price_5": get(27), "sell_vol_5": get(28),
"buy_price_1": get(9), "buy_vol_1": get(10),
"buy_price_2": get(11), "buy_vol_2": get(12),
"buy_price_3": get(13), "buy_vol_3": get(14),
"buy_price_4": get(15), "buy_vol_4": get(16),
"buy_price_5": get(17), "buy_vol_5": get(18),
}
def fetch_realtime_tick(symbol):
try:
res = _parse_tencent(symbol)
if isinstance(res, dict): return res
return "数据不足或股票代码无效"
except Exception as e:
return f"接口异常: {e}"
# 4. UI 渲染
PERIODS = ["分时", "五日", "日线", "周线", "月线"]
def _ind_table(ind: dict, price: float) -> Table:
t = Table(show_header=False, box=None, padding=(0, 2))
t.add_column("指标", style="dim", no_wrap=True)
t.add_column("数值", justify="right", no_wrap=True)
def mc(val):
if val == 0: return "white"
return "red" if price > val else "green"
t.add_row("[white]MA5[/]", f"[{mc(ind['MA5'])}]{ind['MA5']:.3f}[/]")
t.add_row("[yellow]MA10[/]", f"[{mc(ind['MA10'])}]{ind['MA10']:.3f}[/]")
t.add_row("[magenta]MA20[/]", f"[{mc(ind['MA20'])}]{ind['MA20']:.3f}[/]")
t.add_row("", "")
mc2 = "red" if ind["MACD"] > 0 else "green"
t.add_row("MACD", f"[bold {mc2}]{ind['MACD']:.4f}[/]")
t.add_row("[white]DIF[/]", f"{ind['DIF']:.4f}")
t.add_row("[yellow]DEA[/]", f"{ind['DEA']:.4f}")
return t
def generate_dashboard(tick: dict, all_ind: dict, in_buffer: str) -> Layout:
layout = Layout()
layout.split_column(
Layout(name="header", size=3),
Layout(name="main")
)
layout["main"].split_row(
Layout(name="left_pane", ratio=32),
Layout(name="right_pane", ratio=68)
)
layout["left_pane"].split_column(
Layout(name="order_book", ratio=35),
Layout(name="detailed_stats", ratio=45),
Layout(name="fund_flow", ratio=20)
)
layout["right_pane"].split_column(
Layout(name="intraday_row", ratio=1),
Layout(name="kline_row", ratio=1)
)
layout["intraday_row"].split_row(Layout(name="p_分时"), Layout(name="p_五日"))
layout["kline_row"].split_row(Layout(name="p_日线"), Layout(name="p_周线"), Layout(name="p_月线"))
# ── 顶栏(新增键盘输入状态显示) ──
prev = tick["prev_close"] if tick["prev_close"] > 0 else tick["price"]
chg = tick["price"] - prev
pct = chg / prev * 100 if prev > 0 else 0
color = "red" if chg > 0 else ("green" if chg < 0 else "white")
vol_str = f"{tick['volume'] / 10000:.2f}万手" if tick['volume'] >= 10000 else f"{tick['volume']:.0f}手"
amt_str = f"{tick['turnover_amt'] / 10000:.2f}亿" if tick[
'turnover_amt'] >= 10000 else f"{tick['turnover_amt']:.1f}万"
input_display = f"[bold yellow]键盘输入中: {in_buffer}_[/bold yellow]" if in_buffer else "[dim]键入6位代码切换 │ 按 'q' 返回搜索[/dim]"
header_text = (
f"[bold {color}]【{tick['name']}】 "
f"现价: {tick['price']:.2f} {chg:+.2f} ({pct:+.2f}%) "
f"[dim white]今开: {tick['open']:.2f} │ 昨收: {prev:.2f} │ "
f"成交量: {vol_str} │ 成交额: [bold yellow]{amt_str}[/bold yellow][/dim white] "
f"[dim]刷新: {time.strftime('%H:%M:%S')}[/dim] │ {input_display}"
)
layout["header"].update(Panel(header_text, style="white on dark_blue", box=box.ROUNDED))
# ── 左侧:五档盘口 ──
ob = Table(show_header=False, box=None, expand=True)
ob.add_column("档", justify="left")
ob.add_column("价", justify="right")
ob.add_column("量", justify="right", style="cyan")
for i in range(5, 0, -1):
ob.add_row(f"卖{i}", f"[green]{tick[f'sell_price_{i}']}[/]", tick[f"sell_vol_{i}"])
ob.add_row("[dim]──[/]", "[dim]───────[/]", "[dim]─────[/]")
for i in range(1, 6):
ob.add_row(f"买{i}", f"[red]{tick[f'buy_price_{i}']}[/]", tick[f"buy_vol_{i}"])
layout["order_book"].update(Panel(ob, title="[bold]五档盘口[/]", border_style="cyan", box=box.ROUNDED))
# ── 左侧:详尽数据面板 ──
ds = Table(show_header=False, box=None, padding=(0, 1), expand=True)
ds.add_column("K1", style="dim")
ds.add_column("V1", justify="right", style="bold")
ds.add_column("K2", style="dim")
ds.add_column("V2", justify="right", style="bold")
weibi_color = "red" if tick['weibi'] > 0 else "green" if tick['weibi'] < 0 else "white"
ds.add_row("换手率", f"{tick['turnover_rate']:.2f}%", "量比", f"{tick['vol_ratio']:.2f}")
ds.add_row("委比", f"[{weibi_color}]{tick['weibi']:+.2f}%[/]", "振幅", f"{tick['amplitude']:.2f}%")
ds.add_row("市盈(动)", f"{tick['pe_dynamic']:.1f}", "市盈(静)", f"{tick['pe_static']:.1f}")
ds.add_row("市净率", f"{tick['pb_ratio']:.2f}", "每股净资", f"{tick['net_asset']:.2f}")
ds.add_row("流通市值", f"{tick['circ_mv']:.1f}亿", "总市值", f"{tick['total_mv']:.1f}亿")
ds.add_row("流通股本", f"{tick['circ_shares']:.2f}亿", "总股本", f"{tick['total_shares']:.2f}亿")
ds.add_row("涨停", f"[red]{tick['limit_up']:.2f}[/]", "跌停", f"[green]{tick['limit_down']:.2f}[/]")
layout["detailed_stats"].update(Panel(ds, title="[bold]基本面数据[/]", border_style="magenta", box=box.ROUNDED))
# ── 左侧:量能博弈 ──
ff = Table(show_header=False, box=None, expand=True)
ff.add_column("项")
ff.add_column("值", justify="right")
outer, inner = tick["outer"], tick["inner"]
total = outer + inner
if total > 0:
r_out = outer / total * 100
bar_len = 16
red_len = int(bar_len * (r_out / 100))
bar = "[red]" + "█" * red_len + "[/][green]" + "█" * (bar_len - red_len) + "[/]"
ff.add_row("外盘(主动买)", f"[red]{outer:.0f}[/]")
ff.add_row("内盘(主动卖)", f"[green]{inner:.0f}[/]")
ff.add_row("力量对比", bar)
ff.add_row("多方占比", f"[red]{r_out:.1f}%[/]")
else:
ff.add_row("状态", "[yellow]竞价中/未开盘[/]")
layout["fund_flow"].update(Panel(ff, title="[bold]盘口博弈[/]", border_style="yellow", box=box.ROUNDED))
# ── 右侧:指标矩阵 ──
price = tick["price"]
border_map = {"分时": "light_sea_green", "五日": "deep_sky_blue1", "日线": "gold3", "周线": "dark_orange",
"月线": "indian_red"}
for period in PERIODS:
ind = all_ind.get(period, {"MA5": 0, "MA10": 0, "MA20": 0, "DIF": 0, "DEA": 0, "MACD": 0})
layout[f"p_{period}"].update(Panel(_ind_table(ind, price), title=f"[bold {border_map[period]}] ◉ {period} [/]",
border_style=border_map[period], box=box.ROUNDED))
return layout
# 5. 主事件循环
def main():
global target_symbol, app_running, monitor_running, input_buffer
while app_running:
# 清除终端屏幕并打印搜索界面
console.clear()
welcome_text = (
"[bold cyan]欢迎使用终端实时盯盘系统[/bold cyan]\n\n"
"1. 请输入 [bold yellow]6位股票代码[/bold yellow] 开始盯盘(例如:600339)\n"
"2. 盯盘过程中,按 [bold red]'q'[/bold red] 随时退出画布并返回本界面\n\n"
)
console.print(Panel(welcome_text, title="🔍 搜索主界面", border_style="cyan", box=box.ROUNDED))
try:
symbol = console.input("👉 [bold green]请输入股票代码: [/bold green]").strip()
except (KeyboardInterrupt, EOFError):
break
# 在主界面输入 q 直接退出系统
if symbol.lower() in ('q', 'exit', 'quit'):
break
# 简单的拦截检查
if not symbol.isdigit() or len(symbol) != 6:
console.print("\n[red]⚠️ 股票代码格式不正确,请输入 6 位数字代码![/red]")
time.sleep(1.5)
continue
target_symbol = symbol
monitor_running = True
input_buffer = ""
# 启动后台键盘监听
listener_thread = start_key_listener()
try:
while monitor_running and app_running:
current_symbol = target_symbol
# 在 Live 渲染外获取长周期数据,避免进度信息打乱全屏 UI
hist = init_history_data(current_symbol)
if not monitor_running: break
# 启用 10 FPS 超高刷新率来立即反馈用户敲击的键盘字符
with Live(refresh_per_second=10, screen=True) as live:
last_fetch_time = 0
tick = None
all_ind = None
while monitor_running and current_symbol == target_symbol:
now = time.time()
# 行情每 2 秒抓取一次即可,防止被封 IP
if now - last_fetch_time >= 2:
tick = fetch_realtime_tick(current_symbol)
if isinstance(tick, dict):
all_ind = calc_all_indicators(hist, tick["price"])
last_fetch_time = now
# UI 高频更新以响应 input_buffer 的变化
if isinstance(tick, dict) and all_ind:
live.update(generate_dashboard(tick, all_ind, input_buffer))
elif isinstance(tick, str):
# 如果股票代码输错
error_panel = Panel(
f"[bold red]获取 [{current_symbol}] 行情失败[/bold red]\n\n"
f"原因: {tick}\n\n"
f"输入状态: [bold yellow]{input_buffer}_[/bold yellow]\n\n"
f"[dim]请直接键入正确的6位代码切换,或按 'q' 返回搜索界面...[/dim]",
title="[bold red]数据无效[/bold red]",
border_style="red",
box=box.ROUNDED,
padding=(2, 4)
)
live.update(Align.center(error_panel, vertical="middle"))
time.sleep(0.1)
except KeyboardInterrupt:
app_running = False
monitor_running = False
# 盯盘被打断(如按了q),等待监听线程安全退出,然后外层 while 重新回到主界面
monitor_running = False
if listener_thread.is_alive():
listener_thread.join(timeout=0.5)
console.print("\n[bold yellow]监控已安全退出,感谢使用。[/bold yellow]")
if __name__ == "__main__":
main()