Files
crypto-atr-signal/scanner.py
T
2026-06-22 22:52:21 +02:00

980 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import argparse
import asyncio
import logging
import os
import sqlite3
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from decimal import Decimal, InvalidOperation
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo
import aiohttp
from dotenv import load_dotenv
load_dotenv()
BASE_URL = os.getenv("BINANCE_FAPI_BASE_URL", "https://fapi.binance.com")
DB_PATH = Path(os.getenv("DB_PATH", "data/app.db"))
INTERVAL = "4h"
INTERVAL_MS = 4 * 60 * 60 * 1000
ATR_LENGTH = int(os.getenv("ATR_LENGTH", "14"))
ATR_MULTIPLE = Decimal(os.getenv("ATR_MULTIPLE", "1.5"))
CRYPTO_ATR_LENGTH = int(os.getenv("CRYPTO_ATR_LENGTH", str(ATR_LENGTH)))
CRYPTO_ATR_MULTIPLE = Decimal(os.getenv("CRYPTO_ATR_MULTIPLE", str(ATR_MULTIPLE)))
TRADFI_ATR_LENGTH = int(os.getenv("TRADFI_ATR_LENGTH", str(ATR_LENGTH)))
TRADFI_ATR_MULTIPLE = Decimal(os.getenv("TRADFI_ATR_MULTIPLE", str(ATR_MULTIPLE)))
INIT_KLINES_LIMIT = int(os.getenv("INIT_KLINES_LIMIT", "40"))
CONCURRENCY = int(os.getenv("CONCURRENCY", "15"))
HTTP_TIMEOUT = int(os.getenv("HTTP_TIMEOUT", "20"))
MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3"))
RATE_LIMIT_BACKOFF = int(os.getenv("RATE_LIMIT_BACKOFF", "30"))
BODY_RATIO_FILTER_ENABLED = os.getenv("BODY_RATIO_FILTER_ENABLED", "false").lower() == "true"
MIN_BODY_RATIO = Decimal(os.getenv("MIN_BODY_RATIO", "0.5"))
CRYPTO_BODY_RATIO_FILTER_ENABLED = (
os.getenv("CRYPTO_BODY_RATIO_FILTER_ENABLED", str(BODY_RATIO_FILTER_ENABLED)).lower() == "true"
)
CRYPTO_MIN_BODY_RATIO = Decimal(os.getenv("CRYPTO_MIN_BODY_RATIO", str(MIN_BODY_RATIO)))
TRADFI_BODY_RATIO_FILTER_ENABLED = (
os.getenv("TRADFI_BODY_RATIO_FILTER_ENABLED", str(BODY_RATIO_FILTER_ENABLED)).lower() == "true"
)
TRADFI_MIN_BODY_RATIO = Decimal(os.getenv("TRADFI_MIN_BODY_RATIO", str(MIN_BODY_RATIO)))
KLINES_RETENTION_PER_SYMBOL = int(os.getenv("KLINES_RETENTION_PER_SYMBOL", "500"))
SIGNAL_RETENTION_DAYS = int(os.getenv("SIGNAL_RETENTION_DAYS", "90"))
DISCORD_ENABLED = os.getenv("DISCORD_ENABLED", "false").lower() == "true"
DISCORD_WEBHOOK_URL = os.getenv("DISCORD_WEBHOOK_URL", "").strip()
MARKET_CRYPTO = "CRYPTO"
MARKET_TRADFI = "TRADFI"
@dataclass(frozen=True)
class MarketConfig:
market_type: str
label: str
atr_length: int
atr_multiple: Decimal
body_ratio_filter_enabled: bool
min_body_ratio: Decimal
strict_time_grid: bool
@dataclass(frozen=True)
class ScanResult:
market_type: str
label: str
status: str
symbols_total: int
symbols_failed: int
updated_klines: int
signals_created: int
elapsed_seconds: float
error_summary: str | None = None
MARKET_CONFIGS = {
MARKET_CRYPTO: MarketConfig(
market_type=MARKET_CRYPTO,
label="Binance USDT 永续",
atr_length=CRYPTO_ATR_LENGTH,
atr_multiple=CRYPTO_ATR_MULTIPLE,
body_ratio_filter_enabled=CRYPTO_BODY_RATIO_FILTER_ENABLED,
min_body_ratio=CRYPTO_MIN_BODY_RATIO,
strict_time_grid=True,
),
MARKET_TRADFI: MarketConfig(
market_type=MARKET_TRADFI,
label="Binance TradFi 永续",
atr_length=TRADFI_ATR_LENGTH,
atr_multiple=TRADFI_ATR_MULTIPLE,
body_ratio_filter_enabled=TRADFI_BODY_RATIO_FILTER_ENABLED,
min_body_ratio=TRADFI_MIN_BODY_RATIO,
strict_time_grid=False,
),
}
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO"),
format="%(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger("crypto-atr-scanner")
def utc_now_ms() -> int:
return int(datetime.now(timezone.utc).timestamp() * 1000)
def latest_theoretical_closed_open_time(now_ms: int | None = None) -> int:
now_ms = utc_now_ms() if now_ms is None else now_ms
return (now_ms // INTERVAL_MS) * INTERVAL_MS - INTERVAL_MS
def ms_to_iso(ms: int) -> str:
return datetime.fromtimestamp(ms / 1000, timezone.utc).isoformat()
def decimal_from_api(value: Any) -> Decimal:
try:
return Decimal(str(value))
except (InvalidOperation, ValueError) as exc:
raise ValueError(f"Invalid decimal value from API: {value!r}") from exc
def db_connect(path: Path | str = DB_PATH) -> sqlite3.Connection:
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode = WAL;")
conn.execute("PRAGMA busy_timeout = 5000;")
return conn
def init_db(conn: sqlite3.Connection) -> None:
existing_symbols = conn.execute(
"SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'symbols'"
).fetchone()
if existing_symbols:
symbol_cols = [row["name"] for row in conn.execute("PRAGMA table_info(symbols)")]
if "market_type" not in symbol_cols:
migrate_v11_to_v12(conn)
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS symbols (
market_type TEXT NOT NULL DEFAULT 'CRYPTO',
symbol TEXT NOT NULL,
status TEXT NOT NULL,
underlying_type TEXT,
underlying_subtype TEXT,
updated_at TEXT NOT NULL,
PRIMARY KEY (market_type, symbol)
);
CREATE TABLE IF NOT EXISTS klines (
market_type TEXT NOT NULL DEFAULT 'CRYPTO',
symbol TEXT NOT NULL,
open_time INTEGER NOT NULL,
open TEXT NOT NULL,
high TEXT NOT NULL,
low TEXT NOT NULL,
close TEXT NOT NULL,
close_time INTEGER NOT NULL,
atr14 TEXT,
PRIMARY KEY (market_type, symbol, open_time)
);
CREATE TABLE IF NOT EXISTS signals (
market_type TEXT NOT NULL DEFAULT 'CRYPTO',
symbol TEXT NOT NULL,
direction TEXT NOT NULL,
multiple TEXT NOT NULL,
range TEXT NOT NULL,
atr14 TEXT NOT NULL,
open_time INTEGER NOT NULL,
created_at TEXT NOT NULL,
PRIMARY KEY (market_type, symbol, open_time)
);
CREATE INDEX IF NOT EXISTS idx_klines_symbol_time
ON klines (market_type, symbol, open_time DESC);
CREATE INDEX IF NOT EXISTS idx_signals_time
ON signals (market_type, open_time DESC);
CREATE TABLE IF NOT EXISTS scan_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
market_type TEXT NOT NULL,
started_at TEXT NOT NULL,
finished_at TEXT NOT NULL,
status TEXT NOT NULL,
symbols_total INTEGER NOT NULL DEFAULT 0,
symbols_success INTEGER NOT NULL DEFAULT 0,
symbols_failed INTEGER NOT NULL DEFAULT 0,
updated_klines INTEGER NOT NULL DEFAULT 0,
signals_created INTEGER NOT NULL DEFAULT 0,
elapsed_seconds TEXT NOT NULL,
latest_target_open_time INTEGER,
error_summary TEXT
);
CREATE INDEX IF NOT EXISTS idx_scan_runs_market_finished
ON scan_runs (market_type, finished_at DESC);
"""
)
conn.commit()
def migrate_v11_to_v12(conn: sqlite3.Connection) -> None:
logger.info("Migrating database schema to v1.2 market_type layout")
conn.executescript(
"""
ALTER TABLE symbols RENAME TO symbols_v11;
ALTER TABLE klines RENAME TO klines_v11;
ALTER TABLE signals RENAME TO signals_v11;
CREATE TABLE symbols (
market_type TEXT NOT NULL DEFAULT 'CRYPTO',
symbol TEXT NOT NULL,
status TEXT NOT NULL,
underlying_type TEXT,
underlying_subtype TEXT,
updated_at TEXT NOT NULL,
PRIMARY KEY (market_type, symbol)
);
CREATE TABLE klines (
market_type TEXT NOT NULL DEFAULT 'CRYPTO',
symbol TEXT NOT NULL,
open_time INTEGER NOT NULL,
open TEXT NOT NULL,
high TEXT NOT NULL,
low TEXT NOT NULL,
close TEXT NOT NULL,
close_time INTEGER NOT NULL,
atr14 TEXT,
PRIMARY KEY (market_type, symbol, open_time)
);
CREATE TABLE signals (
market_type TEXT NOT NULL DEFAULT 'CRYPTO',
symbol TEXT NOT NULL,
direction TEXT NOT NULL,
multiple TEXT NOT NULL,
range TEXT NOT NULL,
atr14 TEXT NOT NULL,
open_time INTEGER NOT NULL,
created_at TEXT NOT NULL,
PRIMARY KEY (market_type, symbol, open_time)
);
INSERT INTO symbols (market_type, symbol, status, updated_at)
SELECT 'CRYPTO', symbol, status, updated_at FROM symbols_v11;
INSERT INTO klines (market_type, symbol, open_time, open, high, low, close, close_time, atr14)
SELECT 'CRYPTO', symbol, open_time, open, high, low, close, close_time, atr14 FROM klines_v11;
INSERT INTO signals (market_type, symbol, direction, multiple, range, atr14, open_time, created_at)
SELECT 'CRYPTO', symbol, direction, multiple, range, atr14, open_time, created_at FROM signals_v11;
DROP TABLE symbols_v11;
DROP TABLE klines_v11;
DROP TABLE signals_v11;
"""
)
conn.commit()
async def fetch_json(
session: aiohttp.ClientSession,
path: str,
params: dict[str, Any] | None = None,
) -> Any:
url = f"{BASE_URL}{path}"
last_error: Exception | None = None
for attempt in range(1, MAX_RETRIES + 1):
try:
async with session.get(url, params=params) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientResponseError as exc:
last_error = exc
if exc.status in {418, 429}:
retry_after = exc.headers.get("Retry-After") if exc.headers else None
delay = int(retry_after) if retry_after and retry_after.isdigit() else RATE_LIMIT_BACKOFF
logger.warning(
"Rate limited attempt=%s status=%s path=%s sleep=%ss",
attempt,
exc.status,
path,
delay,
)
else:
delay = min(2**attempt, 10)
logger.warning(
"HTTP error attempt=%s status=%s path=%s error=%s",
attempt,
exc.status,
path,
exc,
)
await asyncio.sleep(delay)
except Exception as exc: # noqa: BLE001 - log and retry all transport/API errors.
last_error = exc
delay = min(2**attempt, 10)
logger.warning("Request failed attempt=%s path=%s error=%s", attempt, path, exc)
await asyncio.sleep(delay)
raise RuntimeError(f"Request failed after {MAX_RETRIES} attempts: {url}") from last_error
def normalize_subtype(value: Any) -> str | None:
if value is None:
return None
if isinstance(value, list):
return ",".join(str(item) for item in value)
return str(value)
async def fetch_symbols(session: aiohttp.ClientSession, market_type: str) -> list[dict[str, Any]]:
exchange_info = await fetch_json(session, "/fapi/v1/exchangeInfo")
symbols: list[dict[str, Any]] = []
for item in exchange_info.get("symbols", []):
if item.get("status") != "TRADING" or item.get("quoteAsset") != "USDT":
continue
subtype = item.get("underlyingSubType") or []
is_tradfi = item.get("contractType") == "TRADIFI_PERPETUAL" or "TradFi" in subtype
is_crypto = item.get("contractType") == "PERPETUAL" and not is_tradfi
if market_type == MARKET_CRYPTO and not is_crypto:
continue
if market_type == MARKET_TRADFI and not is_tradfi:
continue
symbols.append(
{
"symbol": item["symbol"],
"underlying_type": item.get("underlyingType"),
"underlying_subtype": normalize_subtype(subtype),
}
)
return sorted(symbols, key=lambda row: row["symbol"])
async def fetch_klines(
session: aiohttp.ClientSession,
symbol: str,
limit: int = 2,
start_time: int | None = None,
end_time: int | None = None,
) -> list[list[Any]]:
params: dict[str, Any] = {"symbol": symbol, "interval": INTERVAL, "limit": limit}
if start_time is not None:
params["startTime"] = start_time
if end_time is not None:
params["endTime"] = end_time
return await fetch_json(session, "/fapi/v1/klines", params)
def closed_klines(raw_klines: list[list[Any]], now_ms: int) -> list[dict[str, Any]]:
klines: list[dict[str, Any]] = []
for item in raw_klines:
close_time = int(item[6])
if close_time > now_ms:
continue
klines.append(
{
"open_time": int(item[0]),
"open": str(item[1]),
"high": str(item[2]),
"low": str(item[3]),
"close": str(item[4]),
"close_time": close_time,
}
)
return sorted(klines, key=lambda row: row["open_time"])
def upsert_symbols(conn: sqlite3.Connection, market_type: str, symbols: list[dict[str, Any]]) -> None:
now = datetime.now(timezone.utc).isoformat()
active = {row["symbol"] for row in symbols}
for row in symbols:
conn.execute(
"""
INSERT INTO symbols (
market_type, symbol, status, underlying_type, underlying_subtype, updated_at
)
VALUES (?, ?, 'TRADING', ?, ?, ?)
ON CONFLICT(market_type, symbol) DO UPDATE SET
status = excluded.status,
underlying_type = excluded.underlying_type,
underlying_subtype = excluded.underlying_subtype,
updated_at = excluded.updated_at
""",
(
market_type,
row["symbol"],
row.get("underlying_type"),
row.get("underlying_subtype"),
now,
),
)
existing = conn.execute(
"SELECT symbol FROM symbols WHERE market_type = ?",
(market_type,),
).fetchall()
for row in existing:
if row["symbol"] not in active:
conn.execute(
"""
UPDATE symbols
SET status = 'INACTIVE', updated_at = ?
WHERE market_type = ? AND symbol = ?
""",
(now, market_type, row["symbol"]),
)
conn.commit()
def latest_kline(conn: sqlite3.Connection, market_type: str, symbol: str) -> sqlite3.Row | None:
return conn.execute(
"""
SELECT * FROM klines
WHERE market_type = ? AND symbol = ?
ORDER BY open_time DESC
LIMIT 1
""",
(market_type, symbol),
).fetchone()
def active_symbols_from_db(conn: sqlite3.Connection, market_type: str) -> list[str]:
rows = conn.execute(
"""
SELECT symbol FROM symbols
WHERE market_type = ? AND status = 'TRADING'
ORDER BY symbol ASC
""",
(market_type,),
).fetchall()
return [row["symbol"] for row in rows]
def insert_klines(
conn: sqlite3.Connection,
market_type: str,
symbol: str,
klines: list[dict[str, Any]],
) -> None:
if not klines:
return
conn.executemany(
"""
INSERT INTO klines (
market_type, symbol, open_time, open, high, low, close, close_time, atr14
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, NULL)
ON CONFLICT(market_type, symbol, open_time) DO UPDATE SET
open = excluded.open,
high = excluded.high,
low = excluded.low,
close = excluded.close,
close_time = excluded.close_time
""",
[
(
market_type,
symbol,
row["open_time"],
row["open"],
row["high"],
row["low"],
row["close"],
row["close_time"],
)
for row in klines
],
)
conn.commit()
def recompute_atr_and_signals(
conn: sqlite3.Connection,
config: MarketConfig,
symbol: str,
candidate_open_times: set[int],
) -> int:
rows = conn.execute(
"""
SELECT * FROM klines
WHERE market_type = ? AND symbol = ?
ORDER BY open_time ASC
""",
(config.market_type, symbol),
).fetchall()
if not rows:
return 0
prev_close: Decimal | None = None
atr: Decimal | None = None
tr_values: list[Decimal] = []
signal_count = 0
for row in rows:
high = decimal_from_api(row["high"])
low = decimal_from_api(row["low"])
close = decimal_from_api(row["close"])
open_price = decimal_from_api(row["open"])
if prev_close is None:
tr = high - low
else:
tr = max(high - low, abs(high - prev_close), abs(low - prev_close))
tr_values.append(tr)
if len(tr_values) == config.atr_length:
atr = sum(tr_values) / Decimal(config.atr_length)
elif len(tr_values) > config.atr_length and atr is not None:
atr = ((atr * Decimal(config.atr_length - 1)) + tr) / Decimal(config.atr_length)
atr_text = str(atr) if atr is not None else None
conn.execute(
"""
UPDATE klines SET atr14 = ?
WHERE market_type = ? AND symbol = ? AND open_time = ?
""",
(atr_text, config.market_type, symbol, row["open_time"]),
)
if atr is not None and row["open_time"] in candidate_open_times:
candle_range = high - low
threshold = atr * config.atr_multiple
if candle_range >= threshold and close != open_price:
if config.body_ratio_filter_enabled:
body_ratio = abs(close - open_price) / candle_range if candle_range else Decimal("0")
if body_ratio < config.min_body_ratio:
prev_close = close
continue
direction = "Bullish" if close > open_price else "Bearish"
multiple = candle_range / atr
conn.execute(
"""
INSERT INTO signals (
market_type, symbol, direction, multiple, range, atr14, open_time, created_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(market_type, symbol, open_time) DO UPDATE SET
direction = excluded.direction,
multiple = excluded.multiple,
range = excluded.range,
atr14 = excluded.atr14,
created_at = excluded.created_at
""",
(
config.market_type,
symbol,
direction,
str(multiple),
str(candle_range),
str(atr),
row["open_time"],
datetime.now(timezone.utc).isoformat(),
),
)
signal_count += 1
prev_close = close
conn.commit()
return signal_count
def cleanup_old_data(conn: sqlite3.Connection) -> tuple[int, int]:
deleted_klines = 0
deleted_signals = 0
if KLINES_RETENTION_PER_SYMBOL > 0:
symbols = conn.execute("SELECT DISTINCT market_type, symbol FROM klines").fetchall()
for row in symbols:
cursor = conn.execute(
"""
DELETE FROM klines
WHERE market_type = ?
AND symbol = ?
AND open_time NOT IN (
SELECT open_time FROM klines
WHERE market_type = ?
AND symbol = ?
ORDER BY open_time DESC
LIMIT ?
)
""",
(
row["market_type"],
row["symbol"],
row["market_type"],
row["symbol"],
KLINES_RETENTION_PER_SYMBOL,
),
)
deleted_klines += cursor.rowcount
if SIGNAL_RETENTION_DAYS > 0:
cutoff = utc_now_ms() - (SIGNAL_RETENTION_DAYS * 24 * 60 * 60 * 1000)
cursor = conn.execute("DELETE FROM signals WHERE open_time < ?", (cutoff,))
deleted_signals = cursor.rowcount
conn.commit()
return deleted_klines, deleted_signals
def insert_scan_run(
conn: sqlite3.Connection,
market_type: str,
started_at: str,
status: str,
symbols_total: int,
symbols_success: int,
symbols_failed: int,
updated_klines: int,
signals_created: int,
elapsed_seconds: float,
latest_target_open_time: int | None,
error_summary: str | None = None,
) -> None:
conn.execute(
"""
INSERT INTO scan_runs (
market_type, started_at, finished_at, status, symbols_total, symbols_success,
symbols_failed, updated_klines, signals_created, elapsed_seconds,
latest_target_open_time, error_summary
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
market_type,
started_at,
datetime.now(timezone.utc).isoformat(),
status,
symbols_total,
symbols_success,
symbols_failed,
updated_klines,
signals_created,
f"{elapsed_seconds:.1f}",
latest_target_open_time,
error_summary,
),
)
conn.commit()
async def fetch_missing_closed_klines(
session: aiohttp.ClientSession,
symbol: str,
start_open_time: int,
expected_open_time: int,
now_ms: int,
strict_time_grid: bool,
) -> list[dict[str, Any]]:
if start_open_time > expected_open_time:
return []
needed = ((expected_open_time - start_open_time) // INTERVAL_MS) + 1
raw = await fetch_klines(
session,
symbol,
limit=max(2, min(1500, needed + 2)),
start_time=start_open_time,
end_time=expected_open_time + INTERVAL_MS - 1,
)
rows = closed_klines(raw, now_ms)
by_open = {row["open_time"]: row for row in rows}
missing: list[dict[str, Any]] = []
missed_slots = 0
cursor = start_open_time
while cursor <= expected_open_time:
row = by_open.get(cursor)
if row is not None:
missing.append(row)
else:
missed_slots += 1
cursor += INTERVAL_MS
if strict_time_grid and missed_slots:
logger.warning(
"%s missing %s expected 4H slots between %s and %s",
symbol,
missed_slots,
ms_to_iso(start_open_time),
ms_to_iso(expected_open_time),
)
return missing
async def process_symbol(
session: aiohttp.ClientSession,
conn: sqlite3.Connection,
config: MarketConfig,
symbol: str,
semaphore: asyncio.Semaphore,
now_ms: int,
expected_open_time: int,
) -> tuple[str, int, int]:
async with semaphore:
latest = latest_kline(conn, config.market_type, symbol)
new_rows: list[dict[str, Any]]
if latest is None:
raw = await fetch_klines(session, symbol, limit=INIT_KLINES_LIMIT + 1)
new_rows = closed_klines(raw, now_ms)[-INIT_KLINES_LIMIT:]
if config.strict_time_grid:
candidate_open_times = {expected_open_time}
else:
candidate_open_times = {new_rows[-1]["open_time"]} if new_rows else set()
else:
latest_open_time = int(latest["open_time"])
if latest_open_time >= expected_open_time:
raw = await fetch_klines(session, symbol, limit=2)
rows = closed_klines(raw, now_ms)
new_rows = [row for row in rows if row["open_time"] > latest_open_time]
candidate_open_times = {row["open_time"] for row in new_rows}
else:
next_expected = latest_open_time + INTERVAL_MS
new_rows = await fetch_missing_closed_klines(
session,
symbol,
next_expected,
expected_open_time,
now_ms,
config.strict_time_grid,
)
candidate_open_times = {row["open_time"] for row in new_rows}
insert_klines(conn, config.market_type, symbol, new_rows)
signal_count = recompute_atr_and_signals(
conn,
config,
symbol,
candidate_open_times,
)
return symbol, len(new_rows), signal_count
async def process_symbol_safe(
session: aiohttp.ClientSession,
conn: sqlite3.Connection,
config: MarketConfig,
symbol: str,
semaphore: asyncio.Semaphore,
now_ms: int,
expected_open_time: int,
) -> tuple[str, int, int, bool]:
try:
symbol, inserted, signal_count = await process_symbol(
session,
conn,
config,
symbol,
semaphore,
now_ms,
expected_open_time,
)
return symbol, inserted, signal_count, True
except Exception as exc: # noqa: BLE001 - one failed symbol must not block the scan.
logger.warning("%s skipped error=%s", symbol, exc)
return symbol, 0, 0, False
async def run_scan(market_type: str = MARKET_CRYPTO) -> ScanResult:
config = MARKET_CONFIGS[market_type]
started_perf = time.perf_counter()
started_at = datetime.now(timezone.utc).isoformat()
conn = db_connect()
init_db(conn)
symbols: list[str] = []
expected_open_time: int | None = None
timeout = aiohttp.ClientTimeout(total=HTTP_TIMEOUT)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
symbol_rows = await fetch_symbols(session, market_type)
upsert_symbols(conn, market_type, symbol_rows)
symbols = [row["symbol"] for row in symbol_rows]
except Exception as exc: # noqa: BLE001 - fallback keeps cron useful during API hiccups.
symbols = active_symbols_from_db(conn, market_type)
if not symbols:
elapsed = time.perf_counter() - started_perf
insert_scan_run(
conn,
market_type=market_type,
started_at=started_at,
status="failed",
symbols_total=0,
symbols_success=0,
symbols_failed=0,
updated_klines=0,
signals_created=0,
elapsed_seconds=elapsed,
latest_target_open_time=None,
error_summary=f"symbol refresh failed and no cached symbols: {exc}",
)
logger.error(
"Scan aborted: symbol refresh failed and database has no cached symbols elapsed=%.1fs error=%s",
elapsed,
exc,
)
raise
logger.warning(
"Symbol refresh failed, using %s cached symbols error=%s",
len(symbols),
exc,
)
now_ms = utc_now_ms()
expected_open_time = latest_theoretical_closed_open_time(now_ms)
semaphore = asyncio.Semaphore(CONCURRENCY)
logger.info(
"Scanning market=%s symbols=%s latest_closed_open=%s atr_length=%s atr_multiple=%s strict_time_grid=%s",
market_type,
len(symbols),
ms_to_iso(expected_open_time),
config.atr_length,
config.atr_multiple,
config.strict_time_grid,
)
tasks = [
process_symbol_safe(session, conn, config, symbol, semaphore, now_ms, expected_open_time)
for symbol in symbols
]
updated = 0
signals = 0
failures = 0
success = 0
for task in asyncio.as_completed(tasks):
symbol, inserted, signal_count, ok = await task
if not ok:
failures += 1
continue
success += 1
updated += inserted
signals += signal_count
if inserted:
logger.info("%s updated_klines=%s signals=%s", symbol, inserted, signal_count)
elapsed = time.perf_counter() - started_perf
status = "success" if failures == 0 else "partial"
insert_scan_run(
conn,
market_type=market_type,
started_at=started_at,
status=status,
symbols_total=len(symbols),
symbols_success=success,
symbols_failed=failures,
updated_klines=updated,
signals_created=signals,
elapsed_seconds=elapsed,
latest_target_open_time=expected_open_time,
)
conn.close()
speed = len(symbols) / elapsed if elapsed > 0 else 0
logger.info(
"Scan done market=%s symbols=%s updated_klines=%s signals=%s failures=%s elapsed=%.1fs speed=%.1f_symbols/s",
market_type,
len(symbols),
updated,
signals,
failures,
elapsed,
speed,
)
logger.info("%.1f seconds scan complete", elapsed)
cleanup_conn = db_connect()
deleted_klines, deleted_signals = cleanup_old_data(cleanup_conn)
cleanup_conn.close()
if deleted_klines or deleted_signals:
logger.info(
"Cleanup done deleted_klines=%s deleted_signals=%s",
deleted_klines,
deleted_signals,
)
return ScanResult(
market_type=market_type,
label=config.label,
status=status,
symbols_total=len(symbols),
symbols_failed=failures,
updated_klines=updated,
signals_created=signals,
elapsed_seconds=elapsed,
)
async def send_discord_summary(results: list[ScanResult], elapsed_seconds: float) -> None:
if not DISCORD_ENABLED:
return
if not DISCORD_WEBHOOK_URL:
logger.warning("Discord notification skipped: DISCORD_WEBHOOK_URL is empty")
return
madrid_time = datetime.now(timezone.utc).astimezone(ZoneInfo("Europe/Madrid"))
status_labels = {"success": "成功", "partial": "部分失败", "failed": "失败"}
lines = ["**Crypto ATR Signal · 4H 扫描完成**", ""]
for result in results:
market_name = "Crypto" if result.market_type == MARKET_CRYPTO else "TradFi"
status = status_labels.get(result.status, result.status)
lines.append(
f"{market_name}{result.signals_created} 个信号 · {status} · "
f"失败 {result.symbols_failed}"
)
lines.extend(
[
f"合计:{sum(result.signals_created for result in results)} 个信号",
f"耗时:{elapsed_seconds:.1f} 秒",
f"时间:{madrid_time.strftime('%Y-%m-%d %H:%M')} 马德里",
]
)
payload = {"content": "\n".join(lines), "allowed_mentions": {"parse": []}}
timeout = aiohttp.ClientTimeout(total=HTTP_TIMEOUT)
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(DISCORD_WEBHOOK_URL, json=payload) as response:
if response.status not in {200, 204}:
body = await response.text()
logger.warning(
"Discord notification failed status=%s body=%s",
response.status,
body[:300],
)
return
logger.info("Discord aggregate notification sent")
except Exception as exc: # noqa: BLE001 - notification must never fail the scan.
logger.warning("Discord notification skipped error=%s", exc)
async def run_all_markets() -> list[ScanResult]:
started = time.perf_counter()
results: list[ScanResult] = []
for market_type in (MARKET_CRYPTO, MARKET_TRADFI):
try:
results.append(await run_scan(market_type))
except Exception as exc: # noqa: BLE001 - one market must not block the other.
config = MARKET_CONFIGS[market_type]
logger.error("Market scan failed market=%s error=%s", market_type, exc)
results.append(
ScanResult(
market_type=market_type,
label=config.label,
status="failed",
symbols_total=0,
symbols_failed=0,
updated_klines=0,
signals_created=0,
elapsed_seconds=0,
error_summary=str(exc),
)
)
await send_discord_summary(results, time.perf_counter() - started)
return results
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Scan Binance ATR signals")
parser.add_argument(
"--market",
choices=["crypto", "tradfi", "all"],
default="crypto",
help="Market to scan: crypto, tradfi, or all",
)
args = parser.parse_args()
if args.market == "all":
asyncio.run(run_all_markets())
else:
selected_market = MARKET_TRADFI if args.market == "tradfi" else MARKET_CRYPTO
asyncio.run(run_scan(selected_market))