#!/usr/bin/env python3
"""
统一抓取 A股 + 港股日线数据：
- A股：AKShare (stock_zh_a_hist)
- 港股：yfinance (.HK)

输出：data/market/daily_quotes.csv
字段：date, market, symbol, open, high, low, close, volume, amount, source
"""

from __future__ import annotations

from pathlib import Path
from typing import List, Dict

import pandas as pd
import yaml

import akshare as ak
import yfinance as yf

ROOT = Path(__file__).resolve().parents[2]
CONFIG_PATH = ROOT / "config" / "market_symbols.yaml"
OUT_PATH = ROOT / "data" / "market" / "daily_quotes.csv"


def load_config() -> Dict[str, List[str]]:
    with CONFIG_PATH.open("r", encoding="utf-8") as f:
        cfg = yaml.safe_load(f) or {}
    return {
        "a_share": [str(x).zfill(6) for x in cfg.get("a_share", [])],
        "hk": [str(x).zfill(4) for x in cfg.get("hk", [])],
    }


def _finalize_a(df: pd.DataFrame, symbol: str, source: str) -> pd.DataFrame:
    if df is None or df.empty:
        return pd.DataFrame()
    df = df.copy()
    df["market"] = "A"
    df["symbol"] = symbol
    df["source"] = source
    return df[["date", "market", "symbol", "open", "high", "low", "close", "volume", "amount", "source"]]


def _a_symbol_to_sina(symbol: str) -> str:
    return f"sh{symbol}" if symbol.startswith("6") else f"sz{symbol}"


def _a_symbol_to_yf(symbol: str) -> str:
    return f"{symbol}.SS" if symbol.startswith("6") else f"{symbol}.SZ"


def fetch_a_share(symbol: str) -> pd.DataFrame:
    errors = []

    # Source 1: AKShare Eastmoney hist
    try:
        df = ak.stock_zh_a_hist(symbol=symbol, period="daily", adjust="")
        if df is not None and not df.empty:
            rename_map = {
                "日期": "date",
                "开盘": "open",
                "最高": "high",
                "最低": "low",
                "收盘": "close",
                "成交量": "volume",
                "成交额": "amount",
            }
            df = df.rename(columns=rename_map)
            keep_cols = ["date", "open", "high", "low", "close", "volume", "amount"]
            df = df[[c for c in keep_cols if c in df.columns]].copy()
            return _finalize_a(df, symbol, "akshare:hist")
    except Exception as e:
        errors.append(f"hist={e}")

    # Source 2: AKShare Sina daily
    try:
        sina_symbol = _a_symbol_to_sina(symbol)
        df = ak.stock_zh_a_daily(symbol=sina_symbol, adjust="")
        if df is not None and not df.empty:
            if isinstance(df.index, pd.DatetimeIndex):
                df = df.reset_index().rename(columns={"index": "date"})
            rename_map = {
                "date": "date",
                "open": "open",
                "high": "high",
                "low": "low",
                "close": "close",
                "volume": "volume",
                "amount": "amount",
            }
            df = df.rename(columns=rename_map)
            keep_cols = ["date", "open", "high", "low", "close", "volume", "amount"]
            df = df[[c for c in keep_cols if c in df.columns]].copy()
            return _finalize_a(df, symbol, "akshare:sina")
    except Exception as e:
        errors.append(f"sina={e}")

    # Source 3: yfinance fallback (.SS/.SZ)
    try:
        ticker = _a_symbol_to_yf(symbol)
        df = yf.download(ticker, period="max", interval="1d", auto_adjust=False, progress=False)
        if df is not None and not df.empty:
            if isinstance(df.columns, pd.MultiIndex):
                df.columns = [c[0] for c in df.columns]
            df = df.reset_index().rename(
                columns={
                    "Date": "date",
                    "Open": "open",
                    "High": "high",
                    "Low": "low",
                    "Close": "close",
                    "Volume": "volume",
                }
            )
            df["amount"] = pd.NA
            return _finalize_a(df, symbol, "yfinance")
    except Exception as e:
        errors.append(f"yf={e}")

    raise RuntimeError("; ".join(errors) if errors else "all a-share sources empty")


def fetch_hk(symbol: str) -> pd.DataFrame:
    ticker = f"{symbol}.HK"
    df = yf.download(ticker, period="max", interval="1d", auto_adjust=False, progress=False)
    if df is None or df.empty:
        return pd.DataFrame()

    # yfinance 可能返回 MultiIndex 列
    if isinstance(df.columns, pd.MultiIndex):
        df.columns = [c[0] for c in df.columns]

    df = df.reset_index().rename(
        columns={
            "Date": "date",
            "Open": "open",
            "High": "high",
            "Low": "low",
            "Close": "close",
            "Volume": "volume",
        }
    )
    df["amount"] = pd.NA
    df["market"] = "HK"
    df["symbol"] = symbol
    df["source"] = "yfinance"
    return df[["date", "market", "symbol", "open", "high", "low", "close", "volume", "amount", "source"]]


def normalize(df: pd.DataFrame) -> pd.DataFrame:
    if df.empty:
        return df
    df = df.copy()
    df["date"] = pd.to_datetime(df["date"]).dt.date.astype(str)
    for c in ["open", "high", "low", "close", "volume", "amount"]:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")
    return df


def main() -> None:
    cfg = load_config()
    all_frames: List[pd.DataFrame] = []

    for s in cfg["a_share"]:
        try:
            df = fetch_a_share(s)
            if not df.empty:
                all_frames.append(normalize(df))
                print(f"[OK] A-share {s}: {len(df)} rows")
            else:
                print(f"[WARN] A-share {s}: empty")
        except Exception as e:
            print(f"[ERR] A-share {s}: {e}")

    for s in cfg["hk"]:
        try:
            df = fetch_hk(s)
            if not df.empty:
                all_frames.append(normalize(df))
                print(f"[OK] HK {s}: {len(df)} rows")
            else:
                print(f"[WARN] HK {s}: empty")
        except Exception as e:
            print(f"[ERR] HK {s}: {e}")

    if not all_frames:
        raise RuntimeError("No data fetched.")

    merged = pd.concat(all_frames, ignore_index=True)
    merged = merged[["date", "market", "symbol", "open", "high", "low", "close", "volume", "amount", "source"]]
    merged = merged.drop_duplicates(subset=["date", "market", "symbol"], keep="last")
    merged = merged.sort_values(["market", "symbol", "date"]).reset_index(drop=True)

    OUT_PATH.parent.mkdir(parents=True, exist_ok=True)
    merged.to_csv(OUT_PATH, index=False, encoding="utf-8")
    print(f"\nSaved: {OUT_PATH} ({len(merged)} rows)")


if __name__ == "__main__":
    main()
