diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..55944d7 --- /dev/null +++ b/.env.example @@ -0,0 +1,60 @@ +# Crypto ATR Signal v1.3.0 +# 复制为 .env 后按需修改。true=开启,false=关闭。 +# 扫描脚本每次启动都会读取最新配置;网页配置修改后需要重启 FastAPI。 + +# ==================== Crypto 策略 ==================== +# ATR 周期长度(TradingView 默认值为 14) +CRYPTO_ATR_LENGTH=14 +# 信号阈值:K线 High-Low 必须达到 ATR 的多少倍 +CRYPTO_ATR_MULTIPLE=1.5 +# 是否启用实体占比过滤;关闭时只判断波幅与涨跌方向 +CRYPTO_BODY_RATIO_FILTER_ENABLED=false +# 实体占整根 K 线波幅的最低比例,仅在上方开关为 true 时生效 +CRYPTO_MIN_BODY_RATIO=0.5 + +# ==================== TradFi 策略 ==================== +# TradFi 参数与 Crypto 完全独立 +TRADFI_ATR_LENGTH=14 +TRADFI_ATR_MULTIPLE=1.5 +TRADFI_BODY_RATIO_FILTER_ENABLED=false +TRADFI_MIN_BODY_RATIO=0.5 + +# ==================== Discord 聚合推送 ==================== +# 总开关。只在 scanner.py --market all 扫描完成后发送一条汇总消息 +DISCORD_ENABLED=false +# Discord 频道的 Webhook 完整地址;总开关关闭时可以留空 +DISCORD_WEBHOOK_URL= + +# ==================== 页面显示 ==================== +# 首页默认视图:all / crypto / tradfi +PAGE_DEFAULT_VIEW=all +# 是否显示“全部”入口 +PAGE_SHOW_ALL=true +# 是否允许网页显示 Crypto;只影响页面,不停止 Crypto 扫描 +PAGE_SHOW_CRYPTO=true +# 是否允许网页显示 TradFi;只影响页面,不停止 TradFi 扫描 +PAGE_SHOW_TRADFI=true +# 全部视图是否按 Crypto / TradFi 分成两张表;false 时合并为一张表排序 +PAGE_GROUP_BY_MARKET=true +# 是否在页面右上角显示版本号、发布日期和作者 +PAGE_SHOW_VERSION=true + +# ==================== 扫描与网络 ==================== +# 首次初始化每个品种下载的历史 K 线数量 +INIT_KLINES_LIMIT=40 +# Binance K 线请求并发数,2C2G 建议 10-20 +CONCURRENCY=10 +# 单次 HTTP 请求总超时秒数 +HTTP_TIMEOUT=20 +# 单个请求最多重试次数 +MAX_RETRIES=3 +# Binance 返回 429/418 时的基础等待秒数 +RATE_LIMIT_BACKOFF=30 + +# ==================== 数据存储 ==================== +# 每个市场、每个品种最多保留的 K 线数量 +KLINES_RETENTION_PER_SYMBOL=500 +# 信号历史保留天数 +SIGNAL_RETENTION_DAYS=90 +# SQLite 数据库路径 +DB_PATH=data/app.db diff --git a/.gitignore b/.gitignore index 36b13f1..51e319e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,176 +1,27 @@ -# ---> Python -# Byte-compiled / optimized / DLL files +# Secrets and local configuration +.env +.env.* +!.env.example + +# Python +.venv/ +venv/ __pycache__/ *.py[cod] -*$py.class -# C extensions -*.so - -# Distribution / packaging -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -share/python-wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py,cover -.hypothesis/ -.pytest_cache/ -cover/ - -# Translations -*.mo -*.pot - -# Django stuff: +# Runtime data and logs +data/*.db +data/*.db-* +data/*.pid *.log -local_settings.py -db.sqlite3 -db.sqlite3-journal -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -.pybuilder/ -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# IPython -profile_default/ -ipython_config.py - -# pyenv -# For a library or package, you might want to ignore these files since the code is -# intended to run in multiple environments; otherwise, check them in: -# .python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# UV -# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -#uv.lock - -# poetry -# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control -#poetry.lock - -# pdm -# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. -#pdm.lock -# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it -# in version control. -# https://pdm.fming.dev/latest/usage/project/#working-with-version-control -.pdm.toml -.pdm-python -.pdm-build/ - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm -__pypackages__/ - -# Celery stuff -celerybeat-schedule -celerybeat.pid - -# SageMath parsed files -*.sage.py - -# Environments -.env -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json - -# Pyre type checker -.pyre/ - -# pytype static type analyzer -.pytype/ - -# Cython debug symbols -cython_debug/ - -# PyCharm -# JetBrains specific template is maintained in a separate JetBrains.gitignore that can -# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore -# and can be added to the global gitignore or merged into this file. For a more nuclear -# option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ - -# Ruff stuff: -.ruff_cache/ - -# PyPI configuration file -.pypirc +# Local development artifacts +.agents/ +screenshots/ +*.7z +# Editors and operating systems +.idea/ +.vscode/ +.DS_Store +Thumbs.db diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..5f0a497 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,127 @@ +# Changelog + +## v1.3.0 - 2026-06-22 + +### Added + +- 新增 `scanner.py --market all`,依次扫描 Crypto 与 TradFi。 +- 新增 Discord Webhook 聚合推送;两个市场完成后只发送一条汇总消息。 +- 首页新增“全部”视图,可按 Crypto / TradFi 分组展示,也可合并排序。 +- 新增页面入口、默认视图、分组和版本信息的 `.env` 开关。 + +### Changed + +- Discord 推送使用一个总开关,默认关闭。 +- `.env.example` 按功能分区并补充中文注释。 +- 页面默认进入全部市场视图。 + +## v1.2.1 - 2026-06-22 + +### Added + +- 新增 `scan_runs` 表,记录每轮扫描的状态、耗时、失败数量、更新 K 线数量和信号数量。 +- 页面顶部显示最近一次扫描状态。 + +### Changed + +- TradFi 缺口逻辑改为休市友好:按 Binance 实际返回的已收盘 K 线处理,不把休市时间段强行当作漏档。 +- Crypto 保持严格 4H 时间轴检查,发现缺失时间段会写入 warning 日志。 + +### Fixed + +- 首次初始化 TradFi 时,如果当前处于休市阶段,候选信号改为 Binance 返回的最新真实已收盘 K 线。 + +## v1.2.0 - 2026-06-22 + +### Added + +- 增加 TradFi 市场模块。 +- 首页增加 Crypto / TradFi 市场切换。 +- 扫描脚本支持 `--market crypto` 和 `--market tradfi`。 +- Crypto 与 TradFi 使用独立参数: + - `CRYPTO_ATR_LENGTH` + - `CRYPTO_ATR_MULTIPLE` + - `CRYPTO_BODY_RATIO_FILTER_ENABLED` + - `CRYPTO_MIN_BODY_RATIO` + - `TRADFI_ATR_LENGTH` + - `TRADFI_ATR_MULTIPLE` + - `TRADFI_BODY_RATIO_FILTER_ENABLED` + - `TRADFI_MIN_BODY_RATIO` +- 数据库表增加 `market_type` 字段。 +- `symbols` 记录 Binance 返回的 `underlyingType` / `underlyingSubType`,方便区分 TradFi。 + +### Changed + +- `symbols`、`klines`、`signals` 改为按市场隔离存储。 +- 首页查询只展示当前市场的最新已收盘 K 线信号。 +- Cron 推荐拆成 Crypto 和 TradFi 两条任务,避免互相影响。 + +### Migration + +- 从 v1.1 升级时,旧数据会自动迁移为 `CRYPTO` 市场。 +- 迁移后无需删除旧数据库。 + +### Verified + +- Crypto 扫描:528 个品种,0 个失败。 +- TradFi 扫描:100 个品种,0 个失败。 +- Crypto 与 TradFi 页面均可独立展示。 + +## v1.1.0 Hotfix - 2026-06-22 + +### Fixed + +- 修复首页混入历史信号的问题。 +- 首页现在只展示数据库中最新已收盘 4H K 线对应的信号。 +- `ATR倍数` 正序 / 倒序排序保留,但排序范围限制在当前最新 K 线信号内。 + +### Verified + +- 数据库历史信号总数:`68` +- 最新 K 线信号数:`26` +- 首页当前信号显示:`26` + +## v1.1.0 - 2026-06-21 + +### Added + +- 支持 `.env` 配置。 +- 网页显示马德里时间。 +- 顶部显示交易对列表校对时间和最新已收盘 K 线时间。 +- 币种点击跳转 TradingView。 +- `ATR倍数` 表头支持点击切换正序 / 倒序。 +- 扫描日志增加耗时统计和处理速度。 +- 单个品种失败后跳过,不阻塞整轮扫描。 +- Binance 429 / 418 限流时等待后重试。 +- `exchangeInfo` 临时失败时,使用数据库缓存交易对继续扫描。 +- 自动清理历史数据: + - 每个品种保留最近 `KLINES_RETENTION_PER_SYMBOL` 根 K 线。 + - 信号保留最近 `SIGNAL_RETENTION_DAYS` 天。 +- 可选实体占比过滤: + - `BODY_RATIO_FILTER_ENABLED` + - `MIN_BODY_RATIO` + +### Changed + +- 页面改为更紧凑的交易信号看板。 +- 表格默认按 `ATR倍数` 倒序展示。 +- 表格时间列改为 `K线收盘时间`。 +- 顶部说明突出当前策略参数。 + +### Fixed + +- 已收盘 K 线判定不依赖接口数组最后一根,统一使用 `close_time <= 当前 UTC 时间`。 +- 首次初始化只用历史 K 线初始化 ATR,不把 40 根历史全部写成当前信号。 +- 修复排序按钮独立展示造成的页面冗余。 + +## v1.0.0 - 2026-06-21 + +### Added + +- 扫描 Binance USDT 永续合约交易对。 +- 首次初始化拉取最近 40 根 4H 已收盘 K 线。 +- 使用 TradingView 默认 ATR(14, RMA)。 +- 识别大阳 / 大阴信号。 +- SQLite 存储 symbols、klines、signals。 +- FastAPI 网页展示信号。 +- SQLite 开启 WAL。 diff --git a/README.md b/README.md index ff8d682..e441d6f 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,220 @@ -# crypto-atr-signal +# Crypto ATR Signal -Binance 合约 ATR 大阳大阴信号扫描系统 \ No newline at end of file +Version: `v1.3.0` + +扫描 Binance Futures 交易对,在 4 小时周期识别已收盘 K 线的大阳 / 大阴信号,并通过 FastAPI 网页展示。 + +## 核心逻辑 + +- 数据源:Binance Futures +- 市场: + - Crypto:USDT Perpetual crypto 合约 + - TradFi:Binance Futures 传统金融类合约 +- 周期:4H +- ATR:TradingView 默认 `ATR(14, RMA)` +- 默认过滤:`High - Low >= ATR * 1.5` +- 方向: + - `Close > Open`:大阳 + - `Close < Open`:大阴 +- 已收盘判定:只接受 `close_time <= 当前 UTC 时间` 的 K 线 +- 页面时间:按 `Europe/Madrid` 显示 + +## v1.2 变化 + +- 增加 TradFi 市场模块。 +- 首页增加 Crypto / TradFi 市场切换。 +- Crypto 和 TradFi 使用独立参数,不共用 ATR 长度、过滤倍数、实体过滤设置。 +- 数据库增加 `market_type`,旧 Crypto 数据会自动迁移为 `CRYPTO`。 +- 扫描命令增加 `--market crypto` / `--market tradfi`。 + +## v1.2.1 变化 + +- TradFi 使用休市友好逻辑:按 Binance 实际返回的已收盘 K 线处理,不把休市时间段强行当作漏档。 +- Crypto 保持严格 4H 时间轴检查。 +- 新增 `scan_runs` 扫描记录表。 + +## v1.3.0 变化 + +- 增加 `--market all`,一次完成 Crypto 与 TradFi 扫描。 +- 增加 Discord 聚合推送,一个总开关、每轮一条消息。 +- 页面增加“全部”视图,支持按市场分组或合并排序。 +- 页面默认视图、市场入口、分组和版本信息可通过 `.env` 控制。 + +## 项目结构 + +```text +crypto-atr-signal +├── scanner.py # 定时扫描:拉 K 线、补缺口、算 ATR、写 signals +├── webapp.py # FastAPI 网页展示 +├── templates/ +│ └── index.html +├── data/app.db # SQLite 数据库,运行后生成 +├── .env.example +├── requirements.txt +├── VERSION +├── CHANGELOG.md +└── README.md +``` + +## 安装 + +```bash +cd /www/wwwroot/crypto-atr-signal +python3 -m venv .venv +.venv/bin/pip install -r requirements.txt +cp .env.example .env +``` + +## 配置 + +```env +CRYPTO_ATR_LENGTH=14 +CRYPTO_ATR_MULTIPLE=1.5 +CRYPTO_BODY_RATIO_FILTER_ENABLED=false +CRYPTO_MIN_BODY_RATIO=0.5 + +TRADFI_ATR_LENGTH=14 +TRADFI_ATR_MULTIPLE=1.5 +TRADFI_BODY_RATIO_FILTER_ENABLED=false +TRADFI_MIN_BODY_RATIO=0.5 + +DISCORD_ENABLED=false +DISCORD_WEBHOOK_URL= + +PAGE_DEFAULT_VIEW=all +PAGE_SHOW_ALL=true +PAGE_SHOW_CRYPTO=true +PAGE_SHOW_TRADFI=true +PAGE_GROUP_BY_MARKET=true +PAGE_SHOW_VERSION=true + +INIT_KLINES_LIMIT=40 +CONCURRENCY=10 +MAX_RETRIES=3 +RATE_LIMIT_BACKOFF=30 +KLINES_RETENTION_PER_SYMBOL=500 +SIGNAL_RETENTION_DAYS=90 +DB_PATH=data/app.db +``` + +## 扫描 + +首次运行会为对应市场的每个品种下载最近 `INIT_KLINES_LIMIT` 根 4H 已收盘 K 线,用于初始化 ATR。 + +之后再次运行时: + +- 没有历史数据:初始化最近 40 根 K 线 +- 有历史数据且无缺口:只拉最新 2 根 +- 有缺口:只补中间缺失的 K 线 +- 单个品种失败:记录错误并跳过,不阻塞整轮扫描 + +手动扫描: + +```bash +.venv/bin/python scanner.py --market all +.venv/bin/python scanner.py --market crypto +.venv/bin/python scanner.py --market tradfi +``` + +Discord 聚合推送只由 `--market all` 触发。单独扫描某个市场不会发送汇总消息。 + +## Cron + +建议服务器使用 UTC 时区。Binance 4H K 线按 UTC 收盘: + +```text +00:00 / 04:00 / 08:00 / 12:00 / 16:00 / 20:00 UTC +``` + +推荐使用一条聚合 cron: + +```cron +# Binance 4H close + 1 min, UTC;扫描两个市场后发送一条 Discord 汇总 +1 0,4,8,12,16,20 * * * cd /www/wwwroot/crypto-atr-signal && .venv/bin/python scanner.py --market all >> scanner.log 2>&1 +``` + +对应马德里时间: + +- 夏令时:02:01 / 06:01 / 10:01 / 14:01 / 18:01 / 22:01 +- 冬令时:01:01 / 05:01 / 09:01 / 13:01 / 17:01 / 21:01 + +## 启动网页 + +```bash +.venv/bin/python -m uvicorn webapp:app --host 127.0.0.1 --port 8000 +``` + +访问: + +- 全部:`http://127.0.0.1:8000/?market=all&sort=desc` +- Crypto:`http://127.0.0.1:8000/?market=crypto&sort=desc` +- TradFi:`http://127.0.0.1:8000/?market=tradfi&sort=desc` + +## systemd 常驻网页 + +```ini +[Unit] +Description=Crypto ATR Signal Web +After=network.target + +[Service] +WorkingDirectory=/www/wwwroot/crypto-atr-signal +ExecStart=/www/wwwroot/crypto-atr-signal/.venv/bin/python -m uvicorn webapp:app --host 127.0.0.1 --port 8000 +Restart=always +RestartSec=3 +User=www +Group=www + +[Install] +WantedBy=multi-user.target +``` + +启用: + +```bash +sudo systemctl daemon-reload +sudo systemctl enable crypto-atr-signal +sudo systemctl start crypto-atr-signal +sudo systemctl status crypto-atr-signal +``` + +## 页面功能 + +- Crypto / TradFi 市场切换 +- 交易对列表校对时间 +- 最新已收盘 K 线时间 +- 当前信号统计 +- 大阳 / 大阴数量 +- 扫描品种数量 +- ATR 倍数点击切换正序 / 倒序 +- 点击品种跳转 TradingView + +## 数据库 + +使用 SQLite,并开启 WAL: + +```sql +PRAGMA journal_mode = WAL; +``` + +主要表: + +- `symbols`:交易对列表,按 `market_type + symbol` 区分市场 +- `klines`:4H K 线和 ATR,按 `market_type + symbol + open_time` 存储 +- `signals`:大阳 / 大阴信号,按 `market_type + symbol + open_time` 存储 + +## 资源建议 + +独立 `2C2G` VPS 足够运行: + +- FastAPI 常驻 +- SQLite +- cron 每 4 小时分别扫描 Crypto 和 TradFi +- Nginx 反向代理 + +建议: + +- `CONCURRENCY=8~10` +- 服务器时区用 UTC +- 页面显示马德里时间 +- 不需要 MySQL / PostgreSQL diff --git a/V1.1_FIX_REPORT.md b/V1.1_FIX_REPORT.md new file mode 100644 index 0000000..36df6ab --- /dev/null +++ b/V1.1_FIX_REPORT.md @@ -0,0 +1,118 @@ +# Crypto ATR Signal v1.1 修复报告 + +修复日期:`2026-06-22` + +当前页面版本:`v1.1.0` + +## 问题概述 + +首页信号列表原本会展示 `signals` 表中的历史信号,而不是只展示当前最新已收盘 4H K 线的信号。 + +这会导致一个问题: + +```text +历史 K 线中 ATR 倍数更高的信号,可能排在当前最新 K 线信号前面。 +``` + +因此用户在首页看到的并不完全是“当前最新一根 4H K 线”的信号。 + +## 问题原因 + +原首页查询逻辑直接读取 `signals` 表: + +```sql +SELECT symbol, direction, multiple, range, atr14, open_time, created_at +FROM signals +ORDER BY CAST(multiple AS REAL) DESC, open_time DESC, symbol ASC +LIMIT 300 +``` + +该查询没有限制: + +```sql +WHERE open_time = 最新已收盘K线 +``` + +所以历史信号会被一起展示。 + +## 修复方案 + +首页先从 `klines` 表中读取当前数据库里的最新 K 线时间: + +```sql +SELECT MAX(open_time) AS open_time FROM klines +``` + +然后只查询该 `open_time` 对应的信号: + +```sql +SELECT symbol, direction, multiple, range, atr14, open_time, created_at +FROM signals +WHERE open_time = ? +ORDER BY CAST(multiple AS REAL) DESC, open_time DESC, symbol ASC +LIMIT 300 +``` + +ATR 倍数正序 / 倒序排序仍然保留,但排序范围只限于当前最新已收盘 K 线的信号。 + +## 修复结果 + +修复后: + +```text +首页 = 当前最新已收盘 K 线信号 +历史信号 = 继续保留在数据库中,但不混入首页 +``` + +当前本地验证结果: + +```text +数据库历史信号总数:68 +最新 K 线信号数:26 +首页当前信号:26 +``` + +## 影响范围 + +影响文件: + +```text +webapp.py +``` + +不影响: + +```text +scanner.py 扫描逻辑 +ATR 计算逻辑 +缺口补抓逻辑 +SQLite 数据结构 +历史信号保存 +实体占比过滤开关 +数据清理逻辑 +``` + +## 当前状态 + +已完成: + +- 首页只展示最新已收盘 K 线信号。 +- 历史信号继续保留,默认保留 `SIGNAL_RETENTION_DAYS=90` 天。 +- ATR 倍数排序继续可用。 +- 本地服务已重启验证。 + +## 后续建议 + +后续如果需要查看历史信号,建议单独增加: + +```text +/history +``` + +或页面切换: + +```text +当前信号 / 历史信号 +``` + +这样首页保持清爽,历史查询也有独立入口。 diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..18fa8e7 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +v1.3.0 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..147a160 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,10 @@ +services: + web: + image: python:3.11-slim + working_dir: /app + command: sh -c "pip install -r requirements.txt && uvicorn webapp:app --host 0.0.0.0 --port 8000" + ports: + - "8000:8000" + volumes: + - .:/app + restart: unless-stopped diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..be810b1 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +aiohttp==3.9.5 +fastapi==0.111.0 +jinja2==3.1.4 +uvicorn[standard]==0.30.1 +tzdata==2025.2 +python-dotenv==1.2.2 diff --git a/scanner.py b/scanner.py new file mode 100644 index 0000000..6f95d80 --- /dev/null +++ b/scanner.py @@ -0,0 +1,979 @@ +import argparse +import asyncio +import logging +import os +import sqlite3 +import time +from dataclasses import dataclass +from datetime import datetime, timezone +from decimal import Decimal, InvalidOperation +from pathlib import Path +from typing import Any +from zoneinfo import ZoneInfo + +import aiohttp +from dotenv import load_dotenv + + +load_dotenv() + +BASE_URL = os.getenv("BINANCE_FAPI_BASE_URL", "https://fapi.binance.com") +DB_PATH = Path(os.getenv("DB_PATH", "data/app.db")) +INTERVAL = "4h" +INTERVAL_MS = 4 * 60 * 60 * 1000 +ATR_LENGTH = int(os.getenv("ATR_LENGTH", "14")) +ATR_MULTIPLE = Decimal(os.getenv("ATR_MULTIPLE", "1.5")) +CRYPTO_ATR_LENGTH = int(os.getenv("CRYPTO_ATR_LENGTH", str(ATR_LENGTH))) +CRYPTO_ATR_MULTIPLE = Decimal(os.getenv("CRYPTO_ATR_MULTIPLE", str(ATR_MULTIPLE))) +TRADFI_ATR_LENGTH = int(os.getenv("TRADFI_ATR_LENGTH", str(ATR_LENGTH))) +TRADFI_ATR_MULTIPLE = Decimal(os.getenv("TRADFI_ATR_MULTIPLE", str(ATR_MULTIPLE))) +INIT_KLINES_LIMIT = int(os.getenv("INIT_KLINES_LIMIT", "40")) +CONCURRENCY = int(os.getenv("CONCURRENCY", "15")) +HTTP_TIMEOUT = int(os.getenv("HTTP_TIMEOUT", "20")) +MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3")) +RATE_LIMIT_BACKOFF = int(os.getenv("RATE_LIMIT_BACKOFF", "30")) +BODY_RATIO_FILTER_ENABLED = os.getenv("BODY_RATIO_FILTER_ENABLED", "false").lower() == "true" +MIN_BODY_RATIO = Decimal(os.getenv("MIN_BODY_RATIO", "0.5")) +CRYPTO_BODY_RATIO_FILTER_ENABLED = ( + os.getenv("CRYPTO_BODY_RATIO_FILTER_ENABLED", str(BODY_RATIO_FILTER_ENABLED)).lower() == "true" +) +CRYPTO_MIN_BODY_RATIO = Decimal(os.getenv("CRYPTO_MIN_BODY_RATIO", str(MIN_BODY_RATIO))) +TRADFI_BODY_RATIO_FILTER_ENABLED = ( + os.getenv("TRADFI_BODY_RATIO_FILTER_ENABLED", str(BODY_RATIO_FILTER_ENABLED)).lower() == "true" +) +TRADFI_MIN_BODY_RATIO = Decimal(os.getenv("TRADFI_MIN_BODY_RATIO", str(MIN_BODY_RATIO))) +KLINES_RETENTION_PER_SYMBOL = int(os.getenv("KLINES_RETENTION_PER_SYMBOL", "500")) +SIGNAL_RETENTION_DAYS = int(os.getenv("SIGNAL_RETENTION_DAYS", "90")) +DISCORD_ENABLED = os.getenv("DISCORD_ENABLED", "false").lower() == "true" +DISCORD_WEBHOOK_URL = os.getenv("DISCORD_WEBHOOK_URL", "").strip() + +MARKET_CRYPTO = "CRYPTO" +MARKET_TRADFI = "TRADFI" + + +@dataclass(frozen=True) +class MarketConfig: + market_type: str + label: str + atr_length: int + atr_multiple: Decimal + body_ratio_filter_enabled: bool + min_body_ratio: Decimal + strict_time_grid: bool + + +@dataclass(frozen=True) +class ScanResult: + market_type: str + label: str + status: str + symbols_total: int + symbols_failed: int + updated_klines: int + signals_created: int + elapsed_seconds: float + error_summary: str | None = None + + +MARKET_CONFIGS = { + MARKET_CRYPTO: MarketConfig( + market_type=MARKET_CRYPTO, + label="Binance USDT 永续", + atr_length=CRYPTO_ATR_LENGTH, + atr_multiple=CRYPTO_ATR_MULTIPLE, + body_ratio_filter_enabled=CRYPTO_BODY_RATIO_FILTER_ENABLED, + min_body_ratio=CRYPTO_MIN_BODY_RATIO, + strict_time_grid=True, + ), + MARKET_TRADFI: MarketConfig( + market_type=MARKET_TRADFI, + label="Binance TradFi 永续", + atr_length=TRADFI_ATR_LENGTH, + atr_multiple=TRADFI_ATR_MULTIPLE, + body_ratio_filter_enabled=TRADFI_BODY_RATIO_FILTER_ENABLED, + min_body_ratio=TRADFI_MIN_BODY_RATIO, + strict_time_grid=False, + ), +} + + +logging.basicConfig( + level=os.getenv("LOG_LEVEL", "INFO"), + format="%(asctime)s %(levelname)s %(message)s", +) +logger = logging.getLogger("crypto-atr-scanner") + + +def utc_now_ms() -> int: + return int(datetime.now(timezone.utc).timestamp() * 1000) + + +def latest_theoretical_closed_open_time(now_ms: int | None = None) -> int: + now_ms = utc_now_ms() if now_ms is None else now_ms + return (now_ms // INTERVAL_MS) * INTERVAL_MS - INTERVAL_MS + + +def ms_to_iso(ms: int) -> str: + return datetime.fromtimestamp(ms / 1000, timezone.utc).isoformat() + + +def decimal_from_api(value: Any) -> Decimal: + try: + return Decimal(str(value)) + except (InvalidOperation, ValueError) as exc: + raise ValueError(f"Invalid decimal value from API: {value!r}") from exc + + +def db_connect(path: Path | str = DB_PATH) -> sqlite3.Connection: + path = Path(path) + path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(path) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode = WAL;") + conn.execute("PRAGMA busy_timeout = 5000;") + return conn + + +def init_db(conn: sqlite3.Connection) -> None: + existing_symbols = conn.execute( + "SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'symbols'" + ).fetchone() + if existing_symbols: + symbol_cols = [row["name"] for row in conn.execute("PRAGMA table_info(symbols)")] + if "market_type" not in symbol_cols: + migrate_v11_to_v12(conn) + + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS symbols ( + market_type TEXT NOT NULL DEFAULT 'CRYPTO', + symbol TEXT NOT NULL, + status TEXT NOT NULL, + underlying_type TEXT, + underlying_subtype TEXT, + updated_at TEXT NOT NULL, + PRIMARY KEY (market_type, symbol) + ); + + CREATE TABLE IF NOT EXISTS klines ( + market_type TEXT NOT NULL DEFAULT 'CRYPTO', + symbol TEXT NOT NULL, + open_time INTEGER NOT NULL, + open TEXT NOT NULL, + high TEXT NOT NULL, + low TEXT NOT NULL, + close TEXT NOT NULL, + close_time INTEGER NOT NULL, + atr14 TEXT, + PRIMARY KEY (market_type, symbol, open_time) + ); + + CREATE TABLE IF NOT EXISTS signals ( + market_type TEXT NOT NULL DEFAULT 'CRYPTO', + symbol TEXT NOT NULL, + direction TEXT NOT NULL, + multiple TEXT NOT NULL, + range TEXT NOT NULL, + atr14 TEXT NOT NULL, + open_time INTEGER NOT NULL, + created_at TEXT NOT NULL, + PRIMARY KEY (market_type, symbol, open_time) + ); + + CREATE INDEX IF NOT EXISTS idx_klines_symbol_time + ON klines (market_type, symbol, open_time DESC); + + CREATE INDEX IF NOT EXISTS idx_signals_time + ON signals (market_type, open_time DESC); + + CREATE TABLE IF NOT EXISTS scan_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + market_type TEXT NOT NULL, + started_at TEXT NOT NULL, + finished_at TEXT NOT NULL, + status TEXT NOT NULL, + symbols_total INTEGER NOT NULL DEFAULT 0, + symbols_success INTEGER NOT NULL DEFAULT 0, + symbols_failed INTEGER NOT NULL DEFAULT 0, + updated_klines INTEGER NOT NULL DEFAULT 0, + signals_created INTEGER NOT NULL DEFAULT 0, + elapsed_seconds TEXT NOT NULL, + latest_target_open_time INTEGER, + error_summary TEXT + ); + + CREATE INDEX IF NOT EXISTS idx_scan_runs_market_finished + ON scan_runs (market_type, finished_at DESC); + """ + ) + conn.commit() + + +def migrate_v11_to_v12(conn: sqlite3.Connection) -> None: + logger.info("Migrating database schema to v1.2 market_type layout") + conn.executescript( + """ + ALTER TABLE symbols RENAME TO symbols_v11; + ALTER TABLE klines RENAME TO klines_v11; + ALTER TABLE signals RENAME TO signals_v11; + + CREATE TABLE symbols ( + market_type TEXT NOT NULL DEFAULT 'CRYPTO', + symbol TEXT NOT NULL, + status TEXT NOT NULL, + underlying_type TEXT, + underlying_subtype TEXT, + updated_at TEXT NOT NULL, + PRIMARY KEY (market_type, symbol) + ); + + CREATE TABLE klines ( + market_type TEXT NOT NULL DEFAULT 'CRYPTO', + symbol TEXT NOT NULL, + open_time INTEGER NOT NULL, + open TEXT NOT NULL, + high TEXT NOT NULL, + low TEXT NOT NULL, + close TEXT NOT NULL, + close_time INTEGER NOT NULL, + atr14 TEXT, + PRIMARY KEY (market_type, symbol, open_time) + ); + + CREATE TABLE signals ( + market_type TEXT NOT NULL DEFAULT 'CRYPTO', + symbol TEXT NOT NULL, + direction TEXT NOT NULL, + multiple TEXT NOT NULL, + range TEXT NOT NULL, + atr14 TEXT NOT NULL, + open_time INTEGER NOT NULL, + created_at TEXT NOT NULL, + PRIMARY KEY (market_type, symbol, open_time) + ); + + INSERT INTO symbols (market_type, symbol, status, updated_at) + SELECT 'CRYPTO', symbol, status, updated_at FROM symbols_v11; + + INSERT INTO klines (market_type, symbol, open_time, open, high, low, close, close_time, atr14) + SELECT 'CRYPTO', symbol, open_time, open, high, low, close, close_time, atr14 FROM klines_v11; + + INSERT INTO signals (market_type, symbol, direction, multiple, range, atr14, open_time, created_at) + SELECT 'CRYPTO', symbol, direction, multiple, range, atr14, open_time, created_at FROM signals_v11; + + DROP TABLE symbols_v11; + DROP TABLE klines_v11; + DROP TABLE signals_v11; + """ + ) + conn.commit() + + +async def fetch_json( + session: aiohttp.ClientSession, + path: str, + params: dict[str, Any] | None = None, +) -> Any: + url = f"{BASE_URL}{path}" + last_error: Exception | None = None + for attempt in range(1, MAX_RETRIES + 1): + try: + async with session.get(url, params=params) as response: + response.raise_for_status() + return await response.json() + except aiohttp.ClientResponseError as exc: + last_error = exc + if exc.status in {418, 429}: + retry_after = exc.headers.get("Retry-After") if exc.headers else None + delay = int(retry_after) if retry_after and retry_after.isdigit() else RATE_LIMIT_BACKOFF + logger.warning( + "Rate limited attempt=%s status=%s path=%s sleep=%ss", + attempt, + exc.status, + path, + delay, + ) + else: + delay = min(2**attempt, 10) + logger.warning( + "HTTP error attempt=%s status=%s path=%s error=%s", + attempt, + exc.status, + path, + exc, + ) + await asyncio.sleep(delay) + except Exception as exc: # noqa: BLE001 - log and retry all transport/API errors. + last_error = exc + delay = min(2**attempt, 10) + logger.warning("Request failed attempt=%s path=%s error=%s", attempt, path, exc) + await asyncio.sleep(delay) + raise RuntimeError(f"Request failed after {MAX_RETRIES} attempts: {url}") from last_error + + +def normalize_subtype(value: Any) -> str | None: + if value is None: + return None + if isinstance(value, list): + return ",".join(str(item) for item in value) + return str(value) + + +async def fetch_symbols(session: aiohttp.ClientSession, market_type: str) -> list[dict[str, Any]]: + exchange_info = await fetch_json(session, "/fapi/v1/exchangeInfo") + symbols: list[dict[str, Any]] = [] + for item in exchange_info.get("symbols", []): + if item.get("status") != "TRADING" or item.get("quoteAsset") != "USDT": + continue + subtype = item.get("underlyingSubType") or [] + is_tradfi = item.get("contractType") == "TRADIFI_PERPETUAL" or "TradFi" in subtype + is_crypto = item.get("contractType") == "PERPETUAL" and not is_tradfi + if market_type == MARKET_CRYPTO and not is_crypto: + continue + if market_type == MARKET_TRADFI and not is_tradfi: + continue + symbols.append( + { + "symbol": item["symbol"], + "underlying_type": item.get("underlyingType"), + "underlying_subtype": normalize_subtype(subtype), + } + ) + return sorted(symbols, key=lambda row: row["symbol"]) + + +async def fetch_klines( + session: aiohttp.ClientSession, + symbol: str, + limit: int = 2, + start_time: int | None = None, + end_time: int | None = None, +) -> list[list[Any]]: + params: dict[str, Any] = {"symbol": symbol, "interval": INTERVAL, "limit": limit} + if start_time is not None: + params["startTime"] = start_time + if end_time is not None: + params["endTime"] = end_time + return await fetch_json(session, "/fapi/v1/klines", params) + + +def closed_klines(raw_klines: list[list[Any]], now_ms: int) -> list[dict[str, Any]]: + klines: list[dict[str, Any]] = [] + for item in raw_klines: + close_time = int(item[6]) + if close_time > now_ms: + continue + klines.append( + { + "open_time": int(item[0]), + "open": str(item[1]), + "high": str(item[2]), + "low": str(item[3]), + "close": str(item[4]), + "close_time": close_time, + } + ) + return sorted(klines, key=lambda row: row["open_time"]) + + +def upsert_symbols(conn: sqlite3.Connection, market_type: str, symbols: list[dict[str, Any]]) -> None: + now = datetime.now(timezone.utc).isoformat() + active = {row["symbol"] for row in symbols} + for row in symbols: + conn.execute( + """ + INSERT INTO symbols ( + market_type, symbol, status, underlying_type, underlying_subtype, updated_at + ) + VALUES (?, ?, 'TRADING', ?, ?, ?) + ON CONFLICT(market_type, symbol) DO UPDATE SET + status = excluded.status, + underlying_type = excluded.underlying_type, + underlying_subtype = excluded.underlying_subtype, + updated_at = excluded.updated_at + """, + ( + market_type, + row["symbol"], + row.get("underlying_type"), + row.get("underlying_subtype"), + now, + ), + ) + + existing = conn.execute( + "SELECT symbol FROM symbols WHERE market_type = ?", + (market_type,), + ).fetchall() + for row in existing: + if row["symbol"] not in active: + conn.execute( + """ + UPDATE symbols + SET status = 'INACTIVE', updated_at = ? + WHERE market_type = ? AND symbol = ? + """, + (now, market_type, row["symbol"]), + ) + conn.commit() + + +def latest_kline(conn: sqlite3.Connection, market_type: str, symbol: str) -> sqlite3.Row | None: + return conn.execute( + """ + SELECT * FROM klines + WHERE market_type = ? AND symbol = ? + ORDER BY open_time DESC + LIMIT 1 + """, + (market_type, symbol), + ).fetchone() + + +def active_symbols_from_db(conn: sqlite3.Connection, market_type: str) -> list[str]: + rows = conn.execute( + """ + SELECT symbol FROM symbols + WHERE market_type = ? AND status = 'TRADING' + ORDER BY symbol ASC + """, + (market_type,), + ).fetchall() + return [row["symbol"] for row in rows] + + +def insert_klines( + conn: sqlite3.Connection, + market_type: str, + symbol: str, + klines: list[dict[str, Any]], +) -> None: + if not klines: + return + conn.executemany( + """ + INSERT INTO klines ( + market_type, symbol, open_time, open, high, low, close, close_time, atr14 + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, NULL) + ON CONFLICT(market_type, symbol, open_time) DO UPDATE SET + open = excluded.open, + high = excluded.high, + low = excluded.low, + close = excluded.close, + close_time = excluded.close_time + """, + [ + ( + market_type, + symbol, + row["open_time"], + row["open"], + row["high"], + row["low"], + row["close"], + row["close_time"], + ) + for row in klines + ], + ) + conn.commit() + + +def recompute_atr_and_signals( + conn: sqlite3.Connection, + config: MarketConfig, + symbol: str, + candidate_open_times: set[int], +) -> int: + rows = conn.execute( + """ + SELECT * FROM klines + WHERE market_type = ? AND symbol = ? + ORDER BY open_time ASC + """, + (config.market_type, symbol), + ).fetchall() + if not rows: + return 0 + + prev_close: Decimal | None = None + atr: Decimal | None = None + tr_values: list[Decimal] = [] + signal_count = 0 + + for row in rows: + high = decimal_from_api(row["high"]) + low = decimal_from_api(row["low"]) + close = decimal_from_api(row["close"]) + open_price = decimal_from_api(row["open"]) + + if prev_close is None: + tr = high - low + else: + tr = max(high - low, abs(high - prev_close), abs(low - prev_close)) + + tr_values.append(tr) + if len(tr_values) == config.atr_length: + atr = sum(tr_values) / Decimal(config.atr_length) + elif len(tr_values) > config.atr_length and atr is not None: + atr = ((atr * Decimal(config.atr_length - 1)) + tr) / Decimal(config.atr_length) + + atr_text = str(atr) if atr is not None else None + conn.execute( + """ + UPDATE klines SET atr14 = ? + WHERE market_type = ? AND symbol = ? AND open_time = ? + """, + (atr_text, config.market_type, symbol, row["open_time"]), + ) + + if atr is not None and row["open_time"] in candidate_open_times: + candle_range = high - low + threshold = atr * config.atr_multiple + if candle_range >= threshold and close != open_price: + if config.body_ratio_filter_enabled: + body_ratio = abs(close - open_price) / candle_range if candle_range else Decimal("0") + if body_ratio < config.min_body_ratio: + prev_close = close + continue + + direction = "Bullish" if close > open_price else "Bearish" + multiple = candle_range / atr + conn.execute( + """ + INSERT INTO signals ( + market_type, symbol, direction, multiple, range, atr14, open_time, created_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(market_type, symbol, open_time) DO UPDATE SET + direction = excluded.direction, + multiple = excluded.multiple, + range = excluded.range, + atr14 = excluded.atr14, + created_at = excluded.created_at + """, + ( + config.market_type, + symbol, + direction, + str(multiple), + str(candle_range), + str(atr), + row["open_time"], + datetime.now(timezone.utc).isoformat(), + ), + ) + signal_count += 1 + + prev_close = close + + conn.commit() + return signal_count + + +def cleanup_old_data(conn: sqlite3.Connection) -> tuple[int, int]: + deleted_klines = 0 + deleted_signals = 0 + + if KLINES_RETENTION_PER_SYMBOL > 0: + symbols = conn.execute("SELECT DISTINCT market_type, symbol FROM klines").fetchall() + for row in symbols: + cursor = conn.execute( + """ + DELETE FROM klines + WHERE market_type = ? + AND symbol = ? + AND open_time NOT IN ( + SELECT open_time FROM klines + WHERE market_type = ? + AND symbol = ? + ORDER BY open_time DESC + LIMIT ? + ) + """, + ( + row["market_type"], + row["symbol"], + row["market_type"], + row["symbol"], + KLINES_RETENTION_PER_SYMBOL, + ), + ) + deleted_klines += cursor.rowcount + + if SIGNAL_RETENTION_DAYS > 0: + cutoff = utc_now_ms() - (SIGNAL_RETENTION_DAYS * 24 * 60 * 60 * 1000) + cursor = conn.execute("DELETE FROM signals WHERE open_time < ?", (cutoff,)) + deleted_signals = cursor.rowcount + + conn.commit() + return deleted_klines, deleted_signals + + +def insert_scan_run( + conn: sqlite3.Connection, + market_type: str, + started_at: str, + status: str, + symbols_total: int, + symbols_success: int, + symbols_failed: int, + updated_klines: int, + signals_created: int, + elapsed_seconds: float, + latest_target_open_time: int | None, + error_summary: str | None = None, +) -> None: + conn.execute( + """ + INSERT INTO scan_runs ( + market_type, started_at, finished_at, status, symbols_total, symbols_success, + symbols_failed, updated_klines, signals_created, elapsed_seconds, + latest_target_open_time, error_summary + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + market_type, + started_at, + datetime.now(timezone.utc).isoformat(), + status, + symbols_total, + symbols_success, + symbols_failed, + updated_klines, + signals_created, + f"{elapsed_seconds:.1f}", + latest_target_open_time, + error_summary, + ), + ) + conn.commit() + + +async def fetch_missing_closed_klines( + session: aiohttp.ClientSession, + symbol: str, + start_open_time: int, + expected_open_time: int, + now_ms: int, + strict_time_grid: bool, +) -> list[dict[str, Any]]: + if start_open_time > expected_open_time: + return [] + + needed = ((expected_open_time - start_open_time) // INTERVAL_MS) + 1 + raw = await fetch_klines( + session, + symbol, + limit=max(2, min(1500, needed + 2)), + start_time=start_open_time, + end_time=expected_open_time + INTERVAL_MS - 1, + ) + rows = closed_klines(raw, now_ms) + by_open = {row["open_time"]: row for row in rows} + missing: list[dict[str, Any]] = [] + missed_slots = 0 + cursor = start_open_time + while cursor <= expected_open_time: + row = by_open.get(cursor) + if row is not None: + missing.append(row) + else: + missed_slots += 1 + cursor += INTERVAL_MS + if strict_time_grid and missed_slots: + logger.warning( + "%s missing %s expected 4H slots between %s and %s", + symbol, + missed_slots, + ms_to_iso(start_open_time), + ms_to_iso(expected_open_time), + ) + return missing + + +async def process_symbol( + session: aiohttp.ClientSession, + conn: sqlite3.Connection, + config: MarketConfig, + symbol: str, + semaphore: asyncio.Semaphore, + now_ms: int, + expected_open_time: int, +) -> tuple[str, int, int]: + async with semaphore: + latest = latest_kline(conn, config.market_type, symbol) + new_rows: list[dict[str, Any]] + + if latest is None: + raw = await fetch_klines(session, symbol, limit=INIT_KLINES_LIMIT + 1) + new_rows = closed_klines(raw, now_ms)[-INIT_KLINES_LIMIT:] + if config.strict_time_grid: + candidate_open_times = {expected_open_time} + else: + candidate_open_times = {new_rows[-1]["open_time"]} if new_rows else set() + else: + latest_open_time = int(latest["open_time"]) + if latest_open_time >= expected_open_time: + raw = await fetch_klines(session, symbol, limit=2) + rows = closed_klines(raw, now_ms) + new_rows = [row for row in rows if row["open_time"] > latest_open_time] + candidate_open_times = {row["open_time"] for row in new_rows} + else: + next_expected = latest_open_time + INTERVAL_MS + new_rows = await fetch_missing_closed_klines( + session, + symbol, + next_expected, + expected_open_time, + now_ms, + config.strict_time_grid, + ) + candidate_open_times = {row["open_time"] for row in new_rows} + + insert_klines(conn, config.market_type, symbol, new_rows) + signal_count = recompute_atr_and_signals( + conn, + config, + symbol, + candidate_open_times, + ) + return symbol, len(new_rows), signal_count + + +async def process_symbol_safe( + session: aiohttp.ClientSession, + conn: sqlite3.Connection, + config: MarketConfig, + symbol: str, + semaphore: asyncio.Semaphore, + now_ms: int, + expected_open_time: int, +) -> tuple[str, int, int, bool]: + try: + symbol, inserted, signal_count = await process_symbol( + session, + conn, + config, + symbol, + semaphore, + now_ms, + expected_open_time, + ) + return symbol, inserted, signal_count, True + except Exception as exc: # noqa: BLE001 - one failed symbol must not block the scan. + logger.warning("%s skipped error=%s", symbol, exc) + return symbol, 0, 0, False + + +async def run_scan(market_type: str = MARKET_CRYPTO) -> ScanResult: + config = MARKET_CONFIGS[market_type] + started_perf = time.perf_counter() + started_at = datetime.now(timezone.utc).isoformat() + conn = db_connect() + init_db(conn) + symbols: list[str] = [] + expected_open_time: int | None = None + + timeout = aiohttp.ClientTimeout(total=HTTP_TIMEOUT) + async with aiohttp.ClientSession(timeout=timeout) as session: + try: + symbol_rows = await fetch_symbols(session, market_type) + upsert_symbols(conn, market_type, symbol_rows) + symbols = [row["symbol"] for row in symbol_rows] + except Exception as exc: # noqa: BLE001 - fallback keeps cron useful during API hiccups. + symbols = active_symbols_from_db(conn, market_type) + if not symbols: + elapsed = time.perf_counter() - started_perf + insert_scan_run( + conn, + market_type=market_type, + started_at=started_at, + status="failed", + symbols_total=0, + symbols_success=0, + symbols_failed=0, + updated_klines=0, + signals_created=0, + elapsed_seconds=elapsed, + latest_target_open_time=None, + error_summary=f"symbol refresh failed and no cached symbols: {exc}", + ) + logger.error( + "Scan aborted: symbol refresh failed and database has no cached symbols elapsed=%.1fs error=%s", + elapsed, + exc, + ) + raise + logger.warning( + "Symbol refresh failed, using %s cached symbols error=%s", + len(symbols), + exc, + ) + + now_ms = utc_now_ms() + expected_open_time = latest_theoretical_closed_open_time(now_ms) + semaphore = asyncio.Semaphore(CONCURRENCY) + logger.info( + "Scanning market=%s symbols=%s latest_closed_open=%s atr_length=%s atr_multiple=%s strict_time_grid=%s", + market_type, + len(symbols), + ms_to_iso(expected_open_time), + config.atr_length, + config.atr_multiple, + config.strict_time_grid, + ) + + tasks = [ + process_symbol_safe(session, conn, config, symbol, semaphore, now_ms, expected_open_time) + for symbol in symbols + ] + updated = 0 + signals = 0 + failures = 0 + success = 0 + for task in asyncio.as_completed(tasks): + symbol, inserted, signal_count, ok = await task + if not ok: + failures += 1 + continue + success += 1 + updated += inserted + signals += signal_count + if inserted: + logger.info("%s updated_klines=%s signals=%s", symbol, inserted, signal_count) + + elapsed = time.perf_counter() - started_perf + status = "success" if failures == 0 else "partial" + insert_scan_run( + conn, + market_type=market_type, + started_at=started_at, + status=status, + symbols_total=len(symbols), + symbols_success=success, + symbols_failed=failures, + updated_klines=updated, + signals_created=signals, + elapsed_seconds=elapsed, + latest_target_open_time=expected_open_time, + ) + conn.close() + speed = len(symbols) / elapsed if elapsed > 0 else 0 + logger.info( + "Scan done market=%s symbols=%s updated_klines=%s signals=%s failures=%s elapsed=%.1fs speed=%.1f_symbols/s", + market_type, + len(symbols), + updated, + signals, + failures, + elapsed, + speed, + ) + logger.info("%.1f seconds scan complete", elapsed) + + cleanup_conn = db_connect() + deleted_klines, deleted_signals = cleanup_old_data(cleanup_conn) + cleanup_conn.close() + if deleted_klines or deleted_signals: + logger.info( + "Cleanup done deleted_klines=%s deleted_signals=%s", + deleted_klines, + deleted_signals, + ) + return ScanResult( + market_type=market_type, + label=config.label, + status=status, + symbols_total=len(symbols), + symbols_failed=failures, + updated_klines=updated, + signals_created=signals, + elapsed_seconds=elapsed, + ) + + +async def send_discord_summary(results: list[ScanResult], elapsed_seconds: float) -> None: + if not DISCORD_ENABLED: + return + if not DISCORD_WEBHOOK_URL: + logger.warning("Discord notification skipped: DISCORD_WEBHOOK_URL is empty") + return + + madrid_time = datetime.now(timezone.utc).astimezone(ZoneInfo("Europe/Madrid")) + status_labels = {"success": "成功", "partial": "部分失败", "failed": "失败"} + lines = ["**Crypto ATR Signal · 4H 扫描完成**", ""] + for result in results: + market_name = "Crypto" if result.market_type == MARKET_CRYPTO else "TradFi" + status = status_labels.get(result.status, result.status) + lines.append( + f"{market_name}:{result.signals_created} 个信号 · {status} · " + f"失败 {result.symbols_failed}" + ) + lines.extend( + [ + f"合计:{sum(result.signals_created for result in results)} 个信号", + f"耗时:{elapsed_seconds:.1f} 秒", + f"时间:{madrid_time.strftime('%Y-%m-%d %H:%M')} 马德里", + ] + ) + payload = {"content": "\n".join(lines), "allowed_mentions": {"parse": []}} + timeout = aiohttp.ClientTimeout(total=HTTP_TIMEOUT) + try: + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.post(DISCORD_WEBHOOK_URL, json=payload) as response: + if response.status not in {200, 204}: + body = await response.text() + logger.warning( + "Discord notification failed status=%s body=%s", + response.status, + body[:300], + ) + return + logger.info("Discord aggregate notification sent") + except Exception as exc: # noqa: BLE001 - notification must never fail the scan. + logger.warning("Discord notification skipped error=%s", exc) + + +async def run_all_markets() -> list[ScanResult]: + started = time.perf_counter() + results: list[ScanResult] = [] + for market_type in (MARKET_CRYPTO, MARKET_TRADFI): + try: + results.append(await run_scan(market_type)) + except Exception as exc: # noqa: BLE001 - one market must not block the other. + config = MARKET_CONFIGS[market_type] + logger.error("Market scan failed market=%s error=%s", market_type, exc) + results.append( + ScanResult( + market_type=market_type, + label=config.label, + status="failed", + symbols_total=0, + symbols_failed=0, + updated_klines=0, + signals_created=0, + elapsed_seconds=0, + error_summary=str(exc), + ) + ) + await send_discord_summary(results, time.perf_counter() - started) + return results + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Scan Binance ATR signals") + parser.add_argument( + "--market", + choices=["crypto", "tradfi", "all"], + default="crypto", + help="Market to scan: crypto, tradfi, or all", + ) + args = parser.parse_args() + if args.market == "all": + asyncio.run(run_all_markets()) + else: + selected_market = MARKET_TRADFI if args.market == "tradfi" else MARKET_CRYPTO + asyncio.run(run_scan(selected_market)) diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..521dc19 --- /dev/null +++ b/templates/index.html @@ -0,0 +1,384 @@ + + + + + + Crypto ATR Signal + + + +
+
+
+

Crypto ATR Signal

+
+ {{ strategy.market }} · {{ strategy.timeframe }} / 大阳 / 大阴 / {{ strategy.atr }} / {{ strategy.threshold }}{% if strategy.body_filter %} / {{ strategy.body_filter }}{% endif %} +
+ +
+
+
交易对列表校对:
+
{{ symbol_checked_at }}
+
最新已收盘K线:
+
{{ latest_closed_kline }}
+ {% if page_config.show_version %} +
Version:
+
{{ app_meta.version }} · {{ app_meta.release_date }} · by {{ app_meta.author }}
+ {% endif %} +
+
+ +
+
+
当前信号
+
{{ signal_count }}
+
+
+
大阳
+
{{ bullish_count }}
+
+
+
大阴
+
{{ bearish_count }}
+
+
+
扫描品种
+
{{ active_symbols }}
+
+
+ + {% for group in groups %} +
+ {% if market == 'all' and page_config.group_by_market %} +

{{ group.label }} {{ group.signals|length }} 个信号

+ {% endif %} + {% if group.signals %} +
+ + + + + + + + + + + + + {% for signal in group.signals %} + + + + + + + + + {% endfor %} + +
币种方向ATR波幅 + + ATR倍数 {{ sort_indicator }} + + K线收盘时间
+ + {{ signal.symbol }} + + {{ signal.direction }}{{ signal.atr14 }}{{ signal.range }}{{ signal.multiple }}{{ signal.close_time_madrid }}
+
+ {% else %} +
暂无信号
+ {% endif %} +
+ {% endfor %} + +
+ + diff --git a/webapp.py b/webapp.py new file mode 100644 index 0000000..f703c2f --- /dev/null +++ b/webapp.py @@ -0,0 +1,276 @@ +import os +from datetime import datetime, timezone +from urllib.parse import quote +from zoneinfo import ZoneInfo + +from fastapi import FastAPI, Request +from fastapi.responses import HTMLResponse +from fastapi.templating import Jinja2Templates + +from scanner import ( + DB_PATH, + INTERVAL_MS, + MARKET_CONFIGS, + MARKET_CRYPTO, + MARKET_TRADFI, + db_connect, + init_db, +) + + +app = FastAPI(title="Crypto ATR Signal") +templates = Jinja2Templates(directory="templates") +MADRID_TZ = ZoneInfo("Europe/Madrid") +APP_VERSION = "v1.3.0" +APP_RELEASE_DATE = "2026-06-22" +APP_AUTHOR = "Z" + + +def env_bool(name: str, default: bool) -> bool: + return os.getenv(name, str(default).lower()).lower() == "true" + + +PAGE_DEFAULT_VIEW = os.getenv("PAGE_DEFAULT_VIEW", "all").lower() +PAGE_SHOW_ALL = env_bool("PAGE_SHOW_ALL", True) +PAGE_SHOW_CRYPTO = env_bool("PAGE_SHOW_CRYPTO", True) +PAGE_SHOW_TRADFI = env_bool("PAGE_SHOW_TRADFI", True) +PAGE_GROUP_BY_MARKET = env_bool("PAGE_GROUP_BY_MARKET", True) +PAGE_SHOW_VERSION = env_bool("PAGE_SHOW_VERSION", True) + + +def fmt_decimal(value: str | None, places: int = 6) -> str: + if value is None: + return "-" + number = float(value) + if abs(number) >= 100: + return f"{number:,.2f}" + if abs(number) >= 1: + return f"{number:,.4f}" + return f"{number:,.{places}f}".rstrip("0").rstrip(".") + + +def fmt_madrid_time(ms: int) -> str: + return datetime.fromtimestamp(ms / 1000, timezone.utc).astimezone(MADRID_TZ).strftime( + "%Y-%m-%d %H:%M" + ) + + +def fmt_utc_time(ms: int) -> str: + return datetime.fromtimestamp(ms / 1000, timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + + +def fmt_scan_time(value: str | None) -> str: + if not value: + return "-" + dt = datetime.fromisoformat(value) + return dt.astimezone(MADRID_TZ).strftime("%Y-%m-%d %H:%M 马德里") + + +def fmt_candle_interval_title(open_time: int) -> str: + close_time = open_time + INTERVAL_MS + return ( + f"K线区间:{fmt_madrid_time(open_time)} - {fmt_madrid_time(close_time)} 马德里\n" + f"UTC:{fmt_utc_time(open_time)} - {fmt_utc_time(close_time)}" + ) + + +def tradingview_url(symbol: str) -> str: + tv_symbol = quote(f"BINANCE:{symbol}.P", safe="") + return f"https://www.tradingview.com/chart/?symbol={tv_symbol}" + + +def scan_status_label(status: str | None) -> str: + labels = { + "success": "成功", + "partial": "部分失败", + "failed": "失败", + } + return labels.get(status or "", "-") + + +def multiple_level(value: str) -> str: + number = float(value) + if number >= 3: + return "extreme" + if number >= 2: + return "strong" + return "normal" + + +@app.on_event("startup") +def startup() -> None: + conn = db_connect(DB_PATH) + init_db(conn) + conn.close() + + +def load_market_view( + conn, market_type: str, sort_sql: str +) -> dict: + config = MARKET_CONFIGS[market_type] + latest_kline_time = conn.execute( + "SELECT MAX(open_time) AS open_time FROM klines WHERE market_type = ?", + (market_type,), + ).fetchone() + latest_open_time = latest_kline_time["open_time"] if latest_kline_time else None + rows = conn.execute( + f""" + SELECT symbol, direction, multiple, range, atr14, open_time, created_at + FROM signals + WHERE market_type = ? AND open_time = ? + ORDER BY CAST(multiple AS REAL) {sort_sql}, open_time DESC, symbol ASC + LIMIT 300 + """, + (market_type, latest_open_time), + ).fetchall() + latest_scan = conn.execute( + "SELECT MAX(updated_at) AS updated_at FROM symbols WHERE market_type = ?", + (market_type,), + ).fetchone() + active_symbols = conn.execute( + """ + SELECT COUNT(*) AS count FROM symbols + WHERE market_type = ? AND status = 'TRADING' + """, + (market_type,), + ).fetchone() + + signals = [ + { + "symbol": row["symbol"], + "tradingview_url": tradingview_url(row["symbol"]), + "direction": "大阳" if row["direction"] == "Bullish" else "大阴", + "direction_class": "bullish" if row["direction"] == "Bullish" else "bearish", + "multiple": f"{float(row['multiple']):.2f}x", + "multiple_level": multiple_level(row["multiple"]), + "range": fmt_decimal(row["range"]), + "atr14": fmt_decimal(row["atr14"]), + "close_time_madrid": f"{fmt_madrid_time(row['open_time'] + INTERVAL_MS)} 马德里", + "time_title": fmt_candle_interval_title(row["open_time"]), + "created_at": row["created_at"], + } + for row in rows + ] + + return { + "market_type": market_type, + "market": "crypto" if market_type == MARKET_CRYPTO else "tradfi", + "label": "Crypto" if market_type == MARKET_CRYPTO else "TradFi", + "signals": signals, + "latest_open_time": latest_open_time, + "symbol_checked_at_raw": latest_scan["updated_at"] if latest_scan else None, + "symbol_checked_at": fmt_scan_time(latest_scan["updated_at"] if latest_scan else None), + "latest_closed_kline": ( + f"{fmt_madrid_time(latest_open_time + INTERVAL_MS)} 马德里" + if latest_open_time is not None + else "-" + ), + "active_symbols": active_symbols["count"] if active_symbols else 0, + "config": config, + } + + +@app.get("/", response_class=HTMLResponse) +def index(request: Request, market: str | None = None, sort: str = "desc") -> HTMLResponse: + visible_markets = [] + if PAGE_SHOW_CRYPTO: + visible_markets.append("crypto") + if PAGE_SHOW_TRADFI: + visible_markets.append("tradfi") + if not visible_markets: + visible_markets = ["crypto"] + + allowed_views = (["all"] if PAGE_SHOW_ALL else []) + visible_markets + default_view = PAGE_DEFAULT_VIEW if PAGE_DEFAULT_VIEW in allowed_views else allowed_views[0] + selected_view = market.lower() if market else default_view + if selected_view not in allowed_views: + selected_view = default_view + + sort_order = "asc" if sort == "asc" else "desc" + sort_sql = "ASC" if sort_order == "asc" else "DESC" + selected_market_types = ( + [MARKET_CRYPTO if slug == "crypto" else MARKET_TRADFI for slug in visible_markets] + if selected_view == "all" + else [MARKET_TRADFI if selected_view == "tradfi" else MARKET_CRYPTO] + ) + + conn = db_connect(DB_PATH) + market_views = [load_market_view(conn, market_type, sort_sql) for market_type in selected_market_types] + conn.close() + + if selected_view == "all" and not PAGE_GROUP_BY_MARKET: + combined_signals = [signal for view in market_views for signal in view["signals"]] + combined_signals.sort( + key=lambda item: (float(item["multiple"].rstrip("x")), item["symbol"]), + reverse=sort_order == "desc", + ) + groups = [{"label": "全部市场", "market": "all", "signals": combined_signals}] + else: + groups = [ + {"label": view["label"], "market": view["market"], "signals": view["signals"]} + for view in market_views + ] + + signals = [signal for group in groups for signal in group["signals"]] + if selected_view == "all": + symbol_checked_at = " · ".join( + f"{view['label']} {view['symbol_checked_at']}" for view in market_views + ) + latest_closed_kline = " · ".join( + f"{view['label']} {view['latest_closed_kline']}" for view in market_views + ) + strategy = { + "market": "Crypto + TradFi", + "timeframe": "4H", + "atr": "独立 ATR 参数", + "threshold": "独立倍数过滤", + "body_filter": "", + } + else: + view = market_views[0] + config = view["config"] + symbol_checked_at = view["symbol_checked_at"] + latest_closed_kline = view["latest_closed_kline"] + strategy = { + "market": config.label, + "timeframe": "4H", + "atr": f"ATR {config.atr_length} · RMA", + "threshold": f"{config.atr_multiple}倍过滤", + "body_filter": ( + f"实体占比≥{config.min_body_ratio}" + if config.body_ratio_filter_enabled + else "" + ), + } + + return templates.TemplateResponse( + "index.html", + { + "request": request, + "groups": groups, + "signals": signals, + "signal_count": len(signals), + "bullish_count": sum(1 for item in signals if item["direction_class"] == "bullish"), + "bearish_count": sum(1 for item in signals if item["direction_class"] == "bearish"), + "active_symbols": sum(view["active_symbols"] for view in market_views), + "market": selected_view, + "sort_order": sort_order, + "next_sort_order": "asc" if sort_order == "desc" else "desc", + "sort_indicator": "↓" if sort_order == "desc" else "↑", + "symbol_checked_at": symbol_checked_at, + "latest_closed_kline": latest_closed_kline, + "strategy": strategy, + "page_config": { + "show_all": PAGE_SHOW_ALL, + "show_crypto": PAGE_SHOW_CRYPTO, + "show_tradfi": PAGE_SHOW_TRADFI, + "show_version": PAGE_SHOW_VERSION, + "group_by_market": PAGE_GROUP_BY_MARKET, + }, + "app_meta": { + "version": APP_VERSION, + "release_date": APP_RELEASE_DATE, + "author": APP_AUTHOR, + }, + }, + )