Files
crypto-atr-signal/webapp.py
T

348 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
from datetime import datetime, timezone
from urllib.parse import quote
from zoneinfo import ZoneInfo
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from scanner import (
DB_PATH,
INTERVAL_MS,
MARKET_CONFIGS,
MARKET_CRYPTO,
MARKET_TRADFI,
count_current_signals,
db_connect,
init_db,
)
app = FastAPI(title="Crypto ATR Signal")
templates = Jinja2Templates(directory="templates")
MADRID_TZ = ZoneInfo("Europe/Madrid")
APP_VERSION = "v1.3.1"
APP_RELEASE_DATE = "2026-06-23"
APP_AUTHOR = "Z"
def env_bool(name: str, default: bool) -> bool:
return os.getenv(name, str(default).lower()).lower() == "true"
PAGE_DEFAULT_VIEW = os.getenv("PAGE_DEFAULT_VIEW", "all").lower()
PAGE_SHOW_ALL = env_bool("PAGE_SHOW_ALL", True)
PAGE_SHOW_CRYPTO = env_bool("PAGE_SHOW_CRYPTO", True)
PAGE_SHOW_TRADFI = env_bool("PAGE_SHOW_TRADFI", True)
PAGE_GROUP_BY_MARKET = env_bool("PAGE_GROUP_BY_MARKET", True)
PAGE_SHOW_VERSION = env_bool("PAGE_SHOW_VERSION", True)
HEALTH_MAX_SCAN_AGE_HOURS = float(os.getenv("HEALTH_MAX_SCAN_AGE_HOURS", "8"))
def fmt_decimal(value: str | None, places: int = 6) -> str:
if value is None:
return "-"
number = float(value)
if abs(number) >= 100:
return f"{number:,.2f}"
if abs(number) >= 1:
return f"{number:,.4f}"
return f"{number:,.{places}f}".rstrip("0").rstrip(".")
def fmt_madrid_time(ms: int) -> str:
return datetime.fromtimestamp(ms / 1000, timezone.utc).astimezone(MADRID_TZ).strftime(
"%Y-%m-%d %H:%M"
)
def fmt_utc_time(ms: int) -> str:
return datetime.fromtimestamp(ms / 1000, timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
def fmt_scan_time(value: str | None) -> str:
if not value:
return "-"
dt = datetime.fromisoformat(value)
return dt.astimezone(MADRID_TZ).strftime("%Y-%m-%d %H:%M 马德里")
def fmt_candle_interval_title(open_time: int) -> str:
close_time = open_time + INTERVAL_MS
return (
f"K线区间:{fmt_madrid_time(open_time)} - {fmt_madrid_time(close_time)} 马德里\n"
f"UTC{fmt_utc_time(open_time)} - {fmt_utc_time(close_time)}"
)
def tradingview_url(symbol: str) -> str:
tv_symbol = quote(f"BINANCE:{symbol}.P", safe="")
return f"https://www.tradingview.com/chart/?symbol={tv_symbol}"
def scan_status_label(status: str | None) -> str:
labels = {
"success": "成功",
"partial": "部分失败",
"failed": "失败",
}
return labels.get(status or "", "-")
def multiple_level(value: str) -> str:
number = float(value)
if number >= 3:
return "extreme"
if number >= 2:
return "strong"
return "normal"
@app.on_event("startup")
def startup() -> None:
conn = db_connect(DB_PATH)
init_db(conn)
conn.close()
@app.get("/health", response_class=JSONResponse)
def health() -> JSONResponse:
checked_at = datetime.now(timezone.utc)
payload = {
"status": "ok",
"version": APP_VERSION,
"checked_at": checked_at.isoformat(),
"database": "ok",
"markets": {},
}
healthy = True
conn = None
try:
conn = db_connect(DB_PATH)
integrity = conn.execute("PRAGMA quick_check;").fetchone()[0]
if integrity != "ok":
payload["database"] = integrity
healthy = False
for market_type, slug in ((MARKET_CRYPTO, "crypto"), (MARKET_TRADFI, "tradfi")):
latest_run = conn.execute(
"""
SELECT status, finished_at, symbols_failed, error_summary
FROM scan_runs
WHERE market_type = ?
ORDER BY finished_at DESC
LIMIT 1
""",
(market_type,),
).fetchone()
if latest_run:
finished_at = datetime.fromisoformat(latest_run["finished_at"])
age_hours = max(0.0, (checked_at - finished_at).total_seconds() / 3600)
market_healthy = (
latest_run["status"] == "success"
and age_hours <= HEALTH_MAX_SCAN_AGE_HOURS
)
payload["markets"][slug] = {
"status": latest_run["status"],
"last_scan_at": finished_at.isoformat(),
"age_hours": round(age_hours, 2),
"failed_symbols": latest_run["symbols_failed"],
"current_signals": count_current_signals(conn, market_type),
"error": latest_run["error_summary"],
}
else:
market_healthy = False
payload["markets"][slug] = {
"status": "unavailable",
"last_scan_at": None,
"age_hours": None,
"failed_symbols": None,
"current_signals": count_current_signals(conn, market_type),
"error": "no scan history",
}
healthy = healthy and market_healthy
except Exception as exc: # noqa: BLE001 - health endpoint must report failures as JSON.
healthy = False
payload["database"] = "error"
payload["error"] = str(exc)
finally:
if conn is not None:
conn.close()
if not healthy:
payload["status"] = "degraded"
return JSONResponse(payload, status_code=200 if healthy else 503)
def load_market_view(
conn, market_type: str, sort_sql: str
) -> dict:
config = MARKET_CONFIGS[market_type]
latest_kline_time = conn.execute(
"SELECT MAX(open_time) AS open_time FROM klines WHERE market_type = ?",
(market_type,),
).fetchone()
latest_open_time = latest_kline_time["open_time"] if latest_kline_time else None
rows = conn.execute(
f"""
SELECT symbol, direction, multiple, range, atr14, open_time, created_at
FROM signals
WHERE market_type = ? AND open_time = ?
ORDER BY CAST(multiple AS REAL) {sort_sql}, open_time DESC, symbol ASC
LIMIT 300
""",
(market_type, latest_open_time),
).fetchall()
latest_scan = conn.execute(
"SELECT MAX(updated_at) AS updated_at FROM symbols WHERE market_type = ?",
(market_type,),
).fetchone()
active_symbols = conn.execute(
"""
SELECT COUNT(*) AS count FROM symbols
WHERE market_type = ? AND status = 'TRADING'
""",
(market_type,),
).fetchone()
signals = [
{
"symbol": row["symbol"],
"tradingview_url": tradingview_url(row["symbol"]),
"direction": "大阳" if row["direction"] == "Bullish" else "大阴",
"direction_class": "bullish" if row["direction"] == "Bullish" else "bearish",
"multiple": f"{float(row['multiple']):.2f}x",
"multiple_level": multiple_level(row["multiple"]),
"range": fmt_decimal(row["range"]),
"atr14": fmt_decimal(row["atr14"]),
"close_time_madrid": f"{fmt_madrid_time(row['open_time'] + INTERVAL_MS)} 马德里",
"time_title": fmt_candle_interval_title(row["open_time"]),
"created_at": row["created_at"],
}
for row in rows
]
return {
"market_type": market_type,
"market": "crypto" if market_type == MARKET_CRYPTO else "tradfi",
"label": "Crypto" if market_type == MARKET_CRYPTO else "TradFi",
"signals": signals,
"latest_open_time": latest_open_time,
"symbol_checked_at_raw": latest_scan["updated_at"] if latest_scan else None,
"symbol_checked_at": fmt_scan_time(latest_scan["updated_at"] if latest_scan else None),
"latest_closed_kline": (
f"{fmt_madrid_time(latest_open_time + INTERVAL_MS)} 马德里"
if latest_open_time is not None
else "-"
),
"active_symbols": active_symbols["count"] if active_symbols else 0,
"config": config,
}
@app.get("/", response_class=HTMLResponse)
def index(request: Request, market: str | None = None, sort: str = "desc") -> HTMLResponse:
visible_markets = []
if PAGE_SHOW_CRYPTO:
visible_markets.append("crypto")
if PAGE_SHOW_TRADFI:
visible_markets.append("tradfi")
if not visible_markets:
visible_markets = ["crypto"]
allowed_views = (["all"] if PAGE_SHOW_ALL else []) + visible_markets
default_view = PAGE_DEFAULT_VIEW if PAGE_DEFAULT_VIEW in allowed_views else allowed_views[0]
selected_view = market.lower() if market else default_view
if selected_view not in allowed_views:
selected_view = default_view
sort_order = "asc" if sort == "asc" else "desc"
sort_sql = "ASC" if sort_order == "asc" else "DESC"
selected_market_types = (
[MARKET_CRYPTO if slug == "crypto" else MARKET_TRADFI for slug in visible_markets]
if selected_view == "all"
else [MARKET_TRADFI if selected_view == "tradfi" else MARKET_CRYPTO]
)
conn = db_connect(DB_PATH)
market_views = [load_market_view(conn, market_type, sort_sql) for market_type in selected_market_types]
conn.close()
if selected_view == "all" and not PAGE_GROUP_BY_MARKET:
combined_signals = [signal for view in market_views for signal in view["signals"]]
combined_signals.sort(
key=lambda item: (float(item["multiple"].rstrip("x")), item["symbol"]),
reverse=sort_order == "desc",
)
groups = [{"label": "全部市场", "market": "all", "signals": combined_signals}]
else:
groups = [
{"label": view["label"], "market": view["market"], "signals": view["signals"]}
for view in market_views
]
signals = [signal for group in groups for signal in group["signals"]]
if selected_view == "all":
symbol_checked_at = " · ".join(
f"{view['label']} {view['symbol_checked_at']}" for view in market_views
)
latest_closed_kline = " · ".join(
f"{view['label']} {view['latest_closed_kline']}" for view in market_views
)
strategy = {
"market": "Crypto + TradFi",
"timeframe": "4H",
"atr": "独立 ATR 参数",
"threshold": "独立倍数过滤",
"body_filter": "",
}
else:
view = market_views[0]
config = view["config"]
symbol_checked_at = view["symbol_checked_at"]
latest_closed_kline = view["latest_closed_kline"]
strategy = {
"market": config.label,
"timeframe": "4H",
"atr": f"ATR {config.atr_length} · RMA",
"threshold": f"{config.atr_multiple}倍过滤",
"body_filter": (
f"实体占比≥{config.min_body_ratio}"
if config.body_ratio_filter_enabled
else ""
),
}
return templates.TemplateResponse(
"index.html",
{
"request": request,
"groups": groups,
"signals": signals,
"signal_count": len(signals),
"bullish_count": sum(1 for item in signals if item["direction_class"] == "bullish"),
"bearish_count": sum(1 for item in signals if item["direction_class"] == "bearish"),
"active_symbols": sum(view["active_symbols"] for view in market_views),
"market": selected_view,
"sort_order": sort_order,
"next_sort_order": "asc" if sort_order == "desc" else "desc",
"sort_indicator": "↓" if sort_order == "desc" else "↑",
"symbol_checked_at": symbol_checked_at,
"latest_closed_kline": latest_closed_kline,
"strategy": strategy,
"page_config": {
"show_all": PAGE_SHOW_ALL,
"show_crypto": PAGE_SHOW_CRYPTO,
"show_tradfi": PAGE_SHOW_TRADFI,
"show_version": PAGE_SHOW_VERSION,
"group_by_market": PAGE_GROUP_BY_MARKET,
},
"app_meta": {
"version": APP_VERSION,
"release_date": APP_RELEASE_DATE,
"author": APP_AUTHOR,
},
},
)