from __future__ import annotations

import argparse
import html
import json
import shutil
from pathlib import Path
from typing import Any

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from pyspark.sql.window import Window


def safe_div(numerator: F.Column, denominator: F.Column) -> F.Column:
    return F.when(denominator.isNull() | (denominator == 0), F.lit(None)).otherwise(numerator / denominator)


def add_financial_features(df: DataFrame) -> DataFrame:
    """Create financial ratios, temporal deterioration signals and explainable risk drivers."""
    w = Window.partitionBy("company_id").orderBy("year", "quarter")
    base = (
        df
        .withColumn("period", F.concat_ws("Q", F.col("year").cast("string"), F.col("quarter").cast("string")))
        .withColumn("net_debt_m", F.col("debt_m") - F.col("cash_m"))
        .withColumn("free_cash_flow_m", F.col("operating_cash_flow_m") - F.col("capex_m"))
        .withColumn("ebitda_margin_pct_raw", safe_div(F.col("ebitda_m"), F.col("revenue_m")) * 100)
        .withColumn("net_debt_to_ebitda_raw", safe_div(F.col("net_debt_m"), F.col("ebitda_m")))
        .withColumn("debt_to_ebitda_raw", safe_div(F.col("debt_m"), F.col("ebitda_m")))
        .withColumn("interest_coverage_raw", safe_div(F.col("ebitda_m"), F.col("interest_expense_m")))
        .withColumn("current_ratio_raw", safe_div(F.col("current_assets_m"), F.col("current_liabilities_m")))
        .withColumn("fcf_margin_pct_raw", safe_div(F.col("free_cash_flow_m"), F.col("revenue_m")) * 100)
        .withColumn("prev_revenue_m", F.lag("revenue_m").over(w))
        .withColumn("prev_ebitda_margin_pct", F.lag("ebitda_margin_pct_raw").over(w))
        .withColumn("prev_debt_to_ebitda", F.lag("debt_to_ebitda_raw").over(w))
        .withColumn("prev_interest_coverage", F.lag("interest_coverage_raw").over(w))
    )

    featured = (
        base
        .withColumn("revenue_growth_pct_raw", safe_div(F.col("revenue_m") - F.col("prev_revenue_m"), F.col("prev_revenue_m")) * 100)
        .withColumn("revenue_growth_pct_raw", F.coalesce(F.col("revenue_growth_pct_raw"), F.lit(0.0)))
        .withColumn("margin_delta_qoq", F.col("ebitda_margin_pct_raw") - F.col("prev_ebitda_margin_pct"))
        .withColumn("leverage_delta_qoq", F.col("debt_to_ebitda_raw") - F.col("prev_debt_to_ebitda"))
        .withColumn("coverage_delta_qoq", F.col("interest_coverage_raw") - F.col("prev_interest_coverage"))
        .withColumn("leverage_score", F.least(F.lit(35.0), F.greatest(F.lit(0.0), (F.col("net_debt_to_ebitda_raw") - 2.0) * 8.0)))
        .withColumn("liquidity_score", F.least(F.lit(25.0), F.greatest(F.lit(0.0), (F.lit(1.45) - F.col("current_ratio_raw")) * 28.0)))
        .withColumn("profitability_score", F.least(F.lit(20.0), F.greatest(F.lit(0.0), (F.lit(18.0) - F.col("ebitda_margin_pct_raw")) * 1.3)))
        .withColumn("coverage_score", F.least(F.lit(20.0), F.greatest(F.lit(0.0), (F.lit(3.2) - F.col("interest_coverage_raw")) * 6.0)))
        .withColumn("growth_score", F.least(F.lit(15.0), F.greatest(F.lit(0.0), -F.col("revenue_growth_pct_raw") * 1.4)))
        .withColumn(
            "financial_stress_score",
            F.round(F.least(F.lit(100.0), F.col("leverage_score") + F.col("liquidity_score") + F.col("profitability_score") + F.col("coverage_score") + F.col("growth_score")), 1),
        )
        .withColumn(
            "risk_tier",
            F.when(F.col("financial_stress_score") >= 50, "High")
            .when(F.col("financial_stress_score") >= 25, "Medium")
            .otherwise("Low"),
        )
        .withColumn(
            "deterioration_flag",
            (
                (F.coalesce(F.col("margin_delta_qoq"), F.lit(0.0)) < -2.0)
                | (F.coalesce(F.col("leverage_delta_qoq"), F.lit(0.0)) > 0.65)
                | (F.coalesce(F.col("coverage_delta_qoq"), F.lit(0.0)) < -0.8)
            ).cast("int"),
        )
        .withColumn("high_leverage_driver", F.when(F.col("net_debt_to_ebitda_raw") > 5.0, "High leverage"))
        .withColumn("weak_coverage_driver", F.when(F.col("interest_coverage_raw") < 2.0, "Weak interest coverage"))
        .withColumn("low_liquidity_driver", F.when(F.col("current_ratio_raw") < 1.0, "Low liquidity"))
        .withColumn("negative_growth_driver", F.when(F.col("revenue_growth_pct_raw") < -2.0, "Negative revenue growth"))
        .withColumn("margin_deterioration_driver", F.when(F.coalesce(F.col("margin_delta_qoq"), F.lit(0.0)) < -2.0, "Margin deterioration"))
        .withColumn(
            "risk_drivers",
            F.coalesce(
                F.concat_ws(", ", "high_leverage_driver", "weak_coverage_driver", "low_liquidity_driver", "negative_growth_driver", "margin_deterioration_driver"),
                F.lit("Stable financial profile"),
            ),
        )
        .withColumn("risk_drivers", F.when(F.col("risk_drivers") == "", "Stable financial profile").otherwise(F.col("risk_drivers")))
    )

    return (
        featured
        .withColumn("revenue_growth_pct", F.round("revenue_growth_pct_raw", 2))
        .withColumn("ebitda_margin_pct", F.round("ebitda_margin_pct_raw", 2))
        .withColumn("net_debt_to_ebitda", F.round("net_debt_to_ebitda_raw", 2))
        .withColumn("debt_to_ebitda", F.round("debt_to_ebitda_raw", 2))
        .withColumn("interest_coverage", F.round("interest_coverage_raw", 2))
        .withColumn("current_ratio", F.round("current_ratio_raw", 2))
        .withColumn("fcf_margin_pct", F.round("fcf_margin_pct_raw", 2))
        .withColumn("margin_delta_qoq", F.round("margin_delta_qoq", 2))
        .withColumn("leverage_delta_qoq", F.round("leverage_delta_qoq", 2))
        .withColumn("coverage_delta_qoq", F.round("coverage_delta_qoq", 2))
    )


def latest_period_features(featured: DataFrame) -> DataFrame:
    latest = featured.agg(F.max(F.struct("year", "quarter")).alias("latest")).first()["latest"]
    return featured.filter((F.col("year") == latest["year"]) & (F.col("quarter") == latest["quarter"]))


def compute_sector_summary(latest: DataFrame) -> list[dict[str, Any]]:
    """Use Spark SQL to aggregate sector-level exposure and risk insight."""
    latest.createOrReplaceTempView("latest_features")
    rows = latest.sparkSession.sql(
        """
        SELECT
          sector,
          COUNT(*) AS entities,
          ROUND(SUM(exposure_m), 1) AS total_exposure_m,
          ROUND(SUM(CASE WHEN risk_tier = 'High' THEN exposure_m ELSE 0 END), 1) AS high_risk_exposure_m,
          ROUND(AVG(financial_stress_score), 1) AS avg_stress_score,
          ROUND(SUM(exposure_m * financial_stress_score) / SUM(exposure_m), 1) AS exposure_weighted_score,
          ROUND(AVG(net_debt_to_ebitda), 2) AS avg_net_debt_to_ebitda,
          ROUND(AVG(interest_coverage), 2) AS avg_interest_coverage,
          ROUND(100 * AVG(distressed), 1) AS distressed_rate_pct
        FROM latest_features
        GROUP BY sector
        ORDER BY avg_stress_score DESC, exposure_weighted_score DESC
        """
    ).collect()
    return [row.asDict() for row in rows]


def compute_rating_summary(latest: DataFrame) -> list[dict[str, Any]]:
    latest.createOrReplaceTempView("latest_features")
    rows = latest.sparkSession.sql(
        """
        SELECT
          credit_rating,
          COUNT(*) AS entities,
          ROUND(AVG(financial_stress_score), 1) AS avg_stress_score,
          ROUND(AVG(ml_risk_probability), 3) AS avg_ml_probability,
          ROUND(SUM(exposure_m), 1) AS exposure_m
        FROM latest_features
        GROUP BY credit_rating
        ORDER BY avg_stress_score DESC
        """
    ).collect()
    return [row.asDict() for row in rows]


def train_risk_model(featured: DataFrame) -> tuple[DataFrame, dict[str, float]]:
    """Spark ML example: train/test split, metrics and risk probabilities."""
    feature_cols = [
        "revenue_growth_pct",
        "ebitda_margin_pct",
        "net_debt_to_ebitda",
        "interest_coverage",
        "current_ratio",
        "fcf_margin_pct",
        "margin_delta_qoq",
        "leverage_delta_qoq",
    ]
    model_df = featured.select(
        "company_id", "year", "quarter", *(F.coalesce(F.col(c), F.lit(0.0)).alias(c) for c in feature_cols), F.col("distressed").cast("double").alias("label")
    )

    assembler = VectorAssembler(inputCols=feature_cols, outputCol="raw_features")
    scaler = StandardScaler(inputCol="raw_features", outputCol="features", withMean=True, withStd=True)
    lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=40, regParam=0.03)
    pipeline = Pipeline(stages=[assembler, scaler, lr])

    train, test = model_df.randomSplit([0.8, 0.2], seed=42)
    fitted = pipeline.fit(train)
    predictions = fitted.transform(test)
    auc = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC").evaluate(predictions)
    accuracy = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy").evaluate(predictions)
    precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision").evaluate(predictions)
    recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall").evaluate(predictions)

    probability_to_float = F.udf(lambda vector: float(vector[1]), DoubleType())
    all_predictions = fitted.transform(model_df).select(
        "company_id", "year", "quarter", probability_to_float("probability").alias("ml_risk_probability")
    )
    scored = featured.join(all_predictions, on=["company_id", "year", "quarter"], how="left")
    metrics = {"auc": round(float(auc), 3), "accuracy": round(float(accuracy), 3), "precision": round(float(precision), 3), "recall": round(float(recall), 3)}
    return scored, metrics


def build_payload(sector_summary: list[dict[str, Any]], rating_summary: list[dict[str, Any]], watchlist_rows: list[dict[str, Any]], model_metrics: dict[str, float]) -> dict[str, Any]:
    entities = len(watchlist_rows)
    total_exposure = round(sum(float(row["exposure_m"]) for row in watchlist_rows), 1) if entities else 0
    exposure_at_risk = round(sum(float(row["exposure_m"]) for row in watchlist_rows if row.get("risk_tier") == "High"), 1) if entities else 0
    avg_score = round(sum(float(row["financial_stress_score"]) for row in watchlist_rows) / entities, 1) if entities else 0
    high_risk = sum(1 for row in watchlist_rows if row.get("risk_tier") == "High")
    deteriorating = sum(1 for row in watchlist_rows if row.get("deterioration_flag") == 1)
    top_sector = sector_summary[0]["sector"] if sector_summary else "n/a"
    top_exposure_sector = sector_summary[0]["high_risk_exposure_m"] if sector_summary else 0
    executive_summary = [
        f"{high_risk} entities are flagged as high risk in the latest quarter, representing €{exposure_at_risk:,.1f}m of exposure.",
        f"{top_sector} is the main concentration to review, with €{float(top_exposure_sector):,.1f}m of high-risk exposure.",
        f"{deteriorating} entities show recent deterioration in margin, leverage or debt-service capacity.",
    ]
    return {
        "headline_metrics": {
            "entities_analyzed": entities,
            "sectors": len({row["sector"] for row in watchlist_rows}),
            "total_exposure_m": total_exposure,
            "exposure_at_risk_m": exposure_at_risk,
            "high_risk_entities": high_risk,
            "avg_stress_score": avg_score,
            "model_auc": model_metrics.get("auc", 0),
        },
        "executive_summary": executive_summary,
        "sector_summary": sector_summary,
        "rating_summary": rating_summary,
        "watchlist": watchlist_rows,
        "model_metrics": model_metrics,
    }


def _fmt(value: Any) -> str:
    if value is None:
        return "n/a"
    if isinstance(value, float):
        return f"{value:,.1f}" if abs(value) >= 10 else f"{value:,.2f}"
    return str(value)


def render_site(payload: dict[str, Any], output_dir: str | Path) -> None:
    output = Path(output_dir)
    output.mkdir(parents=True, exist_ok=True)
    source_dir = output / "source"
    source_dir.mkdir(exist_ok=True)
    (output / "insights-data.json").write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
    source_path = Path(__file__).resolve()
    (source_dir / "pyspark_pipeline.py").write_text(source_path.read_text(encoding="utf-8"), encoding="utf-8")
    generator_path = source_path.with_name("generate_synthetic_data.py")
    if generator_path.exists():
        shutil.copyfile(generator_path, source_dir / "generate_synthetic_data.py")

    metrics = payload["headline_metrics"]
    summary_items = "".join(f"<li>{html.escape(item)}</li>" for item in payload["executive_summary"])
    sector_cards = "\n".join(
        f"""
        <article class="sector-card">
          <div class="sector-top"><span>{html.escape(row['sector'])}</span><strong>Stress {_fmt(row['avg_stress_score'])}/100</strong></div>
          <div class="bar" aria-label="Stress score {_fmt(row['avg_stress_score'])} out of 100"><i style="width:{float(row['avg_stress_score']):.0f}%"></i></div>
          <dl>
            <div><dt>Entities</dt><dd>{row['entities']}</dd></div>
            <div><dt>Total exposure</dt><dd>€{_fmt(row['total_exposure_m'])}m</dd></div>
            <div><dt>High-risk exposure</dt><dd>€{_fmt(row['high_risk_exposure_m'])}m</dd></div>
            <div><dt>Coverage</dt><dd>{_fmt(row['avg_interest_coverage'])}x</dd></div>
          </dl>
        </article>
        """ for row in payload["sector_summary"]
    )

    sorted_watchlist = sorted(payload["watchlist"], key=lambda r: (-float(r["financial_stress_score"]), -float(r["exposure_m"])))[:25]
    watchlist_rows = "\n".join(
        f"""
        <tr>
          <td><strong>{html.escape(row['company'])}</strong><span>{html.escape(row['sector'])} · {row['period']} · rating {row['credit_rating']}</span></td>
          <td>€{_fmt(row['exposure_m'])}m</td>
          <td>{_fmt(row['financial_stress_score'])}/100</td>
          <td>{_fmt(float(row.get('ml_risk_probability') or 0) * 100)}%</td>
          <td><em class="tier {row['risk_tier'].lower()}">{row['risk_tier']}</em></td>
          <td>{html.escape(row['risk_drivers'])}</td>
        </tr>
        """ for row in sorted_watchlist
    )

    rating_rows = "\n".join(
        f"<tr><td>{row['credit_rating']}</td><td>{row['entities']}</td><td>€{_fmt(row['exposure_m'])}m</td><td>{_fmt(row['avg_stress_score'])}/100</td><td>{_fmt(float(row.get('avg_ml_probability') or 0) * 100)}%</td></tr>"
        for row in payload["rating_summary"]
    )

    why_steps = [
        ("Theme", "Finance was chosen because the target role asks for data science in a financial environment, not only generic modelling."),
        ("Historical data", "Real teams monitor evolution over time: margin compression, rising leverage and worsening coverage matter more than one isolated number."),
        ("Exposure", "Risk only becomes business-relevant when combined with financial impact: where is the money at risk?"),
        ("Explainable score", "The score translates several financial signals into a simple 0-100 view that a non-technical stakeholder can prioritise."),
        ("PySpark + SQL", "The workflow mirrors production analytics: scalable transformations first, business cuts and summaries with SQL afterwards."),
        ("ML baseline", "The predictive layer adds prioritisation while the rule-based drivers keep the analysis understandable and auditable."),
    ]
    why_cards = "".join(f"<article><h3>{title}</h3><p>{body}</p></article>" for title, body in why_steps)

    feature_code = "\n".join([
        "w = Window.partitionBy('company_id').orderBy('year', 'quarter')",
        "df = (df",
        "  .withColumn('net_debt_to_ebitda', net_debt / ebitda)",
        "  .withColumn('current_ratio', current_assets / current_liabilities)",
        "  .withColumn('prev_revenue_m', lag('revenue_m').over(w))",
        "  .withColumn('revenue_growth_pct', (revenue - prev_revenue) / prev_revenue * 100)",
        "  .withColumn('financial_stress_score', leverage + liquidity + coverage + growth)",
        ")",
    ])
    sql_code = "\n".join([
        "SELECT sector,",
        "       SUM(exposure_m) AS total_exposure_m,",
        "       SUM(CASE WHEN risk_tier = 'High' THEN exposure_m ELSE 0 END) AS high_risk_exposure_m,",
        "       AVG(financial_stress_score) AS avg_stress_score",
        "FROM latest_features",
        "GROUP BY sector",
        "ORDER BY high_risk_exposure_m DESC",
    ])
    ml_code = "\n".join([
        "train, test = model_df.randomSplit([0.8, 0.2], seed=42)",
        "pipeline = Pipeline(stages=[VectorAssembler(...), StandardScaler(...), LogisticRegression(...)])",
        "model = pipeline.fit(train)",
        "predictions = model.transform(test)",
        "auc = BinaryClassificationEvaluator(metricName='areaUnderROC').evaluate(predictions)",
    ])

    css = """
    :root {--purple:#533afd;--navy:#061b31;--body:#64748d;--border:#e5edf5;--dark:#151943;--ruby:#ea2261;--green:#15be53;--bg:#f7faff;}
    * { box-sizing:border-box; } body { margin:0; font-family:'Source Sans 3',system-ui,sans-serif; color:var(--navy); background:#fff; } a { color:inherit; }
    header { position:sticky; top:0; z-index:5; backdrop-filter:blur(14px); background:rgba(255,255,255,.9); border-bottom:1px solid var(--border); }
    nav,.wrap { max-width:1160px; margin:0 auto; padding:0 24px; } nav { height:68px; display:flex; align-items:center; justify-content:space-between; } .brand { font-weight:700; text-decoration:none; }
    .navlinks { display:flex; gap:20px; align-items:center; } .navlinks a { text-decoration:none; font-size:14px; } .btn,.ghost { display:inline-block; padding:10px 16px; border-radius:6px; text-decoration:none; } .btn { background:var(--purple); color:white!important; box-shadow:rgba(50,50,93,.25) 0 20px 30px -20px; } .ghost { border:1px solid #b9b9f9; color:var(--purple); }
    .hero { padding:78px 0 48px; background:radial-gradient(circle at 80% 10%,#f3e8ff 0,transparent 32%),linear-gradient(180deg,#fff 0,#f8fbff 100%); } .hero-grid { display:grid; grid-template-columns:1fr 1fr; gap:42px; align-items:center; }
    .eyebrow { color:var(--purple); font-weight:700; font-size:14px; letter-spacing:.02em; } h1 { font-size:54px; line-height:1.03; letter-spacing:-1.4px; font-weight:300; margin:16px 0; } .lead { color:var(--body); font-size:20px; line-height:1.45; max-width:720px; } .actions { display:flex; gap:14px; margin-top:26px; flex-wrap:wrap; }
    .dashboard,.note,.sector-card,.step,.why-card article { background:#fff; border:1px solid var(--border); border-radius:10px; padding:20px; box-shadow:rgba(50,50,93,.13) 0 20px 36px -28px; } .mini-title { display:flex; justify-content:space-between; color:#273951; font-size:13px; margin-bottom:14px; }
    .metrics { display:grid; grid-template-columns:repeat(2,1fr); gap:12px; } .metric { border:1px solid var(--border); padding:16px; border-radius:8px; background:#fdfefe; } .metric span { color:var(--body); font-size:13px; } .metric strong { display:block; font-size:28px; font-weight:300; letter-spacing:-.4px; }
    section { padding:62px 0; } h2 { font-size:36px; line-height:1.1; letter-spacing:-.64px; font-weight:300; margin:0 0 14px; } .section-lead { color:var(--body); font-size:18px; max-width:820px; }
    .split { display:grid; grid-template-columns:.9fr 1.1fr; gap:28px; align-items:start; } .summary-list { margin:12px 0 0; padding-left:20px; color:var(--body); font-size:18px; line-height:1.55; } .note h3 { margin:0 0 8px; font-weight:500; } .note p { color:var(--body); margin:0 0 12px; }
    .why-card { display:grid; grid-template-columns:repeat(3,1fr); gap:16px; margin-top:28px; } .why-card h3 { margin:0 0 8px; font-size:18px; } .why-card p { margin:0; color:var(--body); line-height:1.45; }
    .cards { display:grid; grid-template-columns:repeat(3,1fr); gap:16px; margin-top:28px; } .sector-top { display:flex; justify-content:space-between; gap:10px; font-size:14px; } .sector-top strong { color:var(--ruby); font-weight:600; } .bar { height:7px; background:#f1f5fb; border-radius:4px; margin:16px 0; overflow:hidden; } .bar i { display:block; height:100%; background:linear-gradient(90deg,var(--purple),#f96bee); }
    dl { display:grid; grid-template-columns:repeat(2,1fr); gap:10px; margin:0; } dt { color:var(--body); font-size:12px; } dd { margin:0; font-size:17px; font-weight:500; }
    .dark { background:var(--dark); color:white; } .dark .section-lead,.dark .concepts p { color:rgba(255,255,255,.72); } .concepts,.pipeline { display:grid; grid-template-columns:repeat(5,1fr); gap:14px; margin-top:24px; } .concepts article,.step { border:1px solid rgba(255,255,255,.14); border-radius:8px; padding:16px; background:rgba(255,255,255,.06); } .concepts h3,.step b { margin:0 0 6px; font-weight:500; color:inherit; } .step { background:#fff; } .step b { color:var(--purple); display:block; } .step p { color:var(--body); margin:0; }
    .table-wrap { overflow:auto; border:1px solid var(--border); border-radius:10px; margin-top:28px; box-shadow:rgba(50,50,93,.18) 0 24px 38px -28px; } table { width:100%; border-collapse:collapse; min-width:900px; } th,td { text-align:left; padding:14px 16px; border-bottom:1px solid var(--border); vertical-align:top; } th { font-size:12px; color:#273951; background:#f8fbff; } td span { display:block; color:var(--body); font-size:12px; } .tier { font-style:normal; border-radius:999px; padding:3px 9px; font-size:12px; font-weight:600; } .tier.high { background:#fee2e2; color:#991b1b; } .tier.medium { background:#fef3c7; color:#92400e; } .tier.low { background:rgba(21,190,83,.18); color:#108c3d; }
    .code-grid { display:grid; grid-template-columns:1fr; gap:18px; margin-top:28px; } .code-card { background:#0d253d; border:1px solid rgba(255,255,255,.13); border-radius:8px; overflow:hidden; } .code-card h3 { margin:0; padding:14px 18px; font-weight:500; font-size:16px; border-bottom:1px solid rgba(255,255,255,.12); } pre { margin:0; overflow:auto; padding:18px; color:#dbeafe; font-family:'Source Code Pro',monospace; font-size:12px; line-height:1.7; }
    footer { padding:34px 0; color:var(--body); border-top:1px solid var(--border); } footer a { color:var(--purple); }
    @media (max-width:980px) { .hero-grid,.split { grid-template-columns:1fr; } .cards,.why-card { grid-template-columns:repeat(2,1fr); } .concepts,.pipeline { grid-template-columns:repeat(2,1fr); } h1 { font-size:40px; } }
    @media (max-width:640px) { .cards,.metrics,.why-card,.concepts,.pipeline { grid-template-columns:1fr; } .navlinks a:not(.btn) { display:none; } section { padding:44px 0; } }
    """

    index_html = f"""<!doctype html>
<html lang="en">
<head>
  <meta charset="utf-8"><meta name="viewport" content="width=device-width, initial-scale=1">
  <title>Financial Risk Intelligence</title>
  <meta name="description" content="Applied PySpark portfolio demo for financial risk analytics.">
  <link href="https://fonts.googleapis.com/css2?family=Source+Sans+3:wght@300;400;500;600;700&family=Source+Code+Pro:wght@400;500;700&display=swap" rel="stylesheet">
  <style>{css}</style>
</head>
<body>
<header><nav><a class="brand" href="/">Financial Risk Intelligence</a><div class="navlinks"><a href="#why">Why this case</a><a href="#watchlist">Watchlist</a><a href="/code.html">Method</a><a class="btn" href="/insights-data.json">Data</a></div></nav></header>
<main>
  <section class="hero"><div class="wrap hero-grid"><div><div class="eyebrow">PySpark · Spark SQL · Financial analytics</div><h1>Financial risk signals from company statement data.</h1><p class="lead">A compact portfolio project that simulates how a data scientist can turn financial data into risk prioritisation, business insights and a simple predictive model.</p><div class="actions"><a class="btn" href="#watchlist">Explore the analysis</a><a class="ghost" href="/code.html">See the method</a></div></div><aside class="dashboard"><div class="mini-title"><span>Latest quarter</span><span>synthetic financial-services sample</span></div><div class="metrics"><div class="metric"><span>Entities</span><strong>{metrics['entities_analyzed']}</strong></div><div class="metric"><span>Total exposure</span><strong>€{_fmt(metrics['total_exposure_m'])}m</strong></div><div class="metric"><span>Exposure at risk</span><strong>€{_fmt(metrics['exposure_at_risk_m'])}m</strong></div><div class="metric"><span>Avg. stress score</span><strong>{_fmt(metrics['avg_stress_score'])}</strong></div><div class="metric"><span>High-risk entities</span><strong>{metrics['high_risk_entities']}</strong></div><div class="metric"><span>Model AUC</span><strong>{metrics['model_auc']}</strong></div></div></aside></div></section>
  <section><div class="wrap split"><div><h2>Executive summary</h2><p class="section-lead">This is the type of output a business user would need first: what requires attention, where the exposure is concentrated and why the alert exists.</p></div><div class="note"><h3>Current findings</h3><ul class="summary-list">{summary_items}</ul></div></div></section>
  <section id="why"><div class="wrap"><h2>Why this portfolio case</h2><p class="section-lead">The topic was selected because it connects technical PySpark work with a realistic financial-services decision: prioritising entities for review based on risk and exposure. Each step mirrors a common workflow in analytics teams.</p><div class="why-card">{why_cards}</div></div></section>
  <section><div class="wrap"><h2>Sector concentration</h2><p class="section-lead">Spark SQL aggregates the latest quarter by sector. The coloured bar is the average stress score on a 0-100 scale, so the riskiest sectors are visible at a glance.</p><div class="cards">{sector_cards}</div></div></section>
  <section id="watchlist"><div class="wrap"><h2>Entity watchlist</h2><p class="section-lead">The watchlist combines exposure, an explainable financial stress score, model probability and plain-language risk drivers.</p><div class="table-wrap"><table><thead><tr><th>Entity</th><th>Exposure</th><th>Stress score</th><th>ML probability</th><th>Tier</th><th>Main drivers</th></tr></thead><tbody>{watchlist_rows}</tbody></table></div></div></section>
  <section><div class="wrap"><h2>Risk by credit rating</h2><p class="section-lead">This adds another business lens: whether lower ratings also concentrate higher exposure and model-estimated distress probability.</p><div class="table-wrap"><table><thead><tr><th>Rating</th><th>Entities</th><th>Exposure</th><th>Avg. stress score</th><th>Avg. ML probability</th></tr></thead><tbody>{rating_rows}</tbody></table></div></div></section>
  <section class="dark"><div class="wrap"><h2>How it resembles real work</h2><p class="section-lead">The project keeps the stack simple, but the flow is realistic: clean data, create financial features, detect trend deterioration, aggregate impact, explain alerts and validate a predictive baseline.</p><div class="concepts"><article><h3>Data engineering</h3><p>Columnar PySpark transformations prepare scalable features.</p></article><article><h3>Financial context</h3><p>Ratios link the data to leverage, liquidity and debt-service capacity.</p></article><article><h3>Business impact</h3><p>Exposure at risk helps prioritise what matters economically.</p></article><article><h3>Explainability</h3><p>Drivers explain why an entity appears in the watchlist.</p></article><article><h3>Validation</h3><p>Train/test metrics avoid presenting the model as a black box.</p></article></div><div class="actions"><a class="btn" href="/code.html">Open implementation notes</a><a class="ghost" href="/source/pyspark_pipeline.py">View source</a></div></div></section>
</main><footer><div class="wrap">Generated by a PySpark analytical pipeline · synthetic data for portfolio demonstration · <a href="/code.html">implementation notes</a></div></footer>
</body></html>"""

    code_html = f"""<!doctype html>
<html lang="en">
<head>
  <meta charset="utf-8"><meta name="viewport" content="width=device-width, initial-scale=1">
  <title>Implementation | Financial Risk Intelligence</title>
  <meta name="description" content="PySpark implementation notes for the financial risk intelligence demo.">
  <link href="https://fonts.googleapis.com/css2?family=Source+Sans+3:wght@300;400;500;600;700&family=Source+Code+Pro:wght@400;500;700&display=swap" rel="stylesheet">
  <style>{css}</style>
</head>
<body>
<header><nav><a class="brand" href="/">Financial Risk Intelligence</a><div class="navlinks"><a href="/">Analysis</a><a href="#steps">Steps</a><a class="btn" href="/source/pyspark_pipeline.py">Source</a></div></nav></header>
<main>
  <section class="hero"><div class="wrap"><div class="eyebrow">Implementation notes</div><h1>The PySpark workflow behind the page.</h1><p class="lead">The goal is not to build a heavy dashboard, but to show an end-to-end analytical pipeline: synthetic data, transformations, SQL summaries, ML metrics and a business-readable output.</p><div class="pipeline" id="steps"><div class="step"><b>1 · Data</b><p>Historical quarterly records simulate a financial-services portfolio.</p></div><div class="step"><b>2 · Features</b><p>Ratios convert statements into risk signals.</p></div><div class="step"><b>3 · Trends</b><p>Window functions detect quarter-on-quarter deterioration.</p></div><div class="step"><b>4 · SQL</b><p>Sector and rating summaries translate rows into business cuts.</p></div><div class="step"><b>5 · ML</b><p>A train/test Spark ML baseline estimates distress probability.</p></div></div></div></section>
  <section><div class="wrap split"><div><h2>Why each step matters</h2><p class="section-lead">These choices were made to resemble day-to-day analytics work: start with data quality and context, engineer features, summarise impact, then add modelling only where it helps prioritisation.</p></div><div class="note"><h3>For non-technical readers</h3><p>The page answers three practical questions: who needs attention, why, and how much exposure is involved.</p></div></div></section>
  <section><div class="wrap"><h2>PySpark feature engineering</h2><p class="section-lead">Window functions compare each entity with its own previous quarter, which is closer to monitoring work than a one-off spreadsheet snapshot.</p><div class="code-grid"><article class="code-card"><h3>DataFrame + Window functions</h3><pre>{html.escape(feature_code)}</pre></article></div></div></section>
  <section class="dark"><div class="wrap"><h2>Spark SQL business summaries</h2><p class="section-lead">SQL is used after feature engineering because it is readable for analysts and practical for stakeholder cuts such as sector concentration.</p><div class="code-grid"><article class="code-card"><h3>Exposure at risk by sector</h3><pre>{html.escape(sql_code)}</pre></article></div></div></section>
  <section><div class="wrap"><h2>Spark ML validation</h2><p class="section-lead">The model is deliberately simple and interpretable. The important point is the workflow: split data, train, evaluate, then publish metrics alongside the business rules.</p><div class="code-grid"><article class="code-card"><h3>Train/test ML baseline</h3><pre>{html.escape(ml_code)}</pre></article></div><div class="actions"><a class="btn" href="/source/pyspark_pipeline.py">Open full source</a><a class="ghost" href="/insights-data.json">Open generated JSON</a></div></div></section>
</main><footer><div class="wrap"><a href="/">Back to analysis</a> · Source available under <a href="/source/pyspark_pipeline.py">/source/pyspark_pipeline.py</a></div></footer>
</body></html>"""

    output.joinpath("index.html").write_text(index_html, encoding="utf-8")
    output.joinpath("code.html").write_text(code_html, encoding="utf-8")


def run_pipeline(data_path: str | Path, output_dir: str | Path) -> dict[str, Any]:
    spark = (
        SparkSession.builder
        .master("local[*]")
        .appName("financial-risk-intelligence")
        .config("spark.ui.enabled", "false")
        .getOrCreate()
    )
    try:
        raw = spark.read.csv(str(data_path), header=True, inferSchema=True)
        featured = add_financial_features(raw).cache()
        scored, model_metrics = train_risk_model(featured)
        latest = latest_period_features(scored).cache()
        sector_summary = compute_sector_summary(latest)
        rating_summary = compute_rating_summary(latest)
        watchlist_rows = [row.asDict() for row in latest.orderBy(F.desc("financial_stress_score"), F.desc("exposure_m")).collect()]
        payload = build_payload(sector_summary, rating_summary, watchlist_rows, model_metrics)
        render_site(payload, output_dir)
        return payload
    finally:
        spark.stop()


def main() -> None:
    parser = argparse.ArgumentParser(description="Build PySpark financial risk intelligence demo")
    parser.add_argument("--data", default="data/raw/financial_timeseries.csv")
    parser.add_argument("--out", default="dist")
    args = parser.parse_args()
    payload = run_pipeline(args.data, args.out)
    print(json.dumps(payload["headline_metrics"], indent=2))


if __name__ == "__main__":
    main()
