Add pd v3 post processing block
All checks were successful
Build and Push Docker Image / test (push) Successful in 17s
Build and Push Docker Image / build_and_push (push) Successful in 2m58s

This commit is contained in:
Ankur Malik 2025-12-04 10:59:02 -05:00
parent 70590db6b5
commit d167609fad
10 changed files with 186 additions and 23 deletions

View File

@ -1 +1,13 @@
**Hello world!!!** # PD V3 Post-Processing
- **Inputs:** Raw/isotonic PD outputs and model T weight from the processing block.
- **Outputs:** Final weighted PD and assigned grade.
- **Artifacts:** `grade_cutoffs.csv` generated from the Weighted Grades Cutoff workbook tab.
- **Tests:** `python -m unittest sequence-3.pd_v3_post_processing.test_block`.
- **Signature:** Sequence-3 convention: `__main__` must keep an explicit typed parameter list covering every input (int/float/str) and build the record from those args before weighting/grades; keep aligned with the block schemas.
- **UAT tolerance:** Downstream sequence-3 UAT scripts treat post-processing mismatches within `1e-4` as equivalent to expected values.
- **PD inputs:** `pd_a`, `pd_b`, and `pd_t` must be provided as non-null numbers (per schema); post-processing raises if any are missing/null.
## Schema notes
- The request and response schemas for post-processing are treated as immutable contracts. They already describe flat scalar fields (plus the array `pd_scores`), so keep them as-is and never refactor them into dict-of-dicts/object-of-dicts—arrays of dicts remain acceptable.

1
__init__.py Normal file
View File

@ -0,0 +1 @@
__all__ = ["__main__"]

133
block.py
View File

@ -1,21 +1,116 @@
@flowx_block from __future__ import annotations
def example_function(request: dict) -> dict:
# Processing logic here... import math
from pathlib import Path
from typing import Dict, List, Sequence, Tuple, TypedDict
return { import joblib
"meta_info": [
{
"name": "created_date", MODELS_DIR = Path(__file__).parent
"type": "string", GRADE_FILE = Path(__file__).parent / "grade_cutoffs.csv"
"value": "2024-11-05"
} _GRADE_TABLE: List[Tuple[str, float, float]] | None = None
], _ISOTONIC_MODELS: Dict[str, object] = {}
"fields": [ ISOTONIC_FILES = {
{ "a": MODELS_DIR / "isotonic_model_A.joblib",
"name": "", "b": MODELS_DIR / "isotonic_model_B.joblib",
"type": "", }
"value": ""
}
] class ScoreEntry(TypedDict):
} name: str
value: float
def _load_grade_table() -> None:
global _GRADE_TABLE
if _GRADE_TABLE is not None:
return
table: List[Tuple[str, float, float]] = []
with GRADE_FILE.open("r", encoding="utf-8") as handle:
next(handle) # skip header
for line in handle:
grade, min_pd, max_pd = line.strip().split(",")
table.append((grade, float(min_pd), float(max_pd)))
table.sort(key=lambda row: row[1])
_GRADE_TABLE = table
def _ensure_isotonic_models_loaded() -> None:
for key, path in ISOTONIC_FILES.items():
if key in _ISOTONIC_MODELS:
continue
_ISOTONIC_MODELS[key] = joblib.load(path)
def _clamp_probability(value: float) -> float:
return min(1.0, max(0.0, float(value)))
def _determine_grade(final_pd: float) -> str | None:
if final_pd is None or _GRADE_TABLE is None:
return None
for grade, min_pd, max_pd in _GRADE_TABLE:
if min_pd <= final_pd < max_pd:
return grade
# Allow equality with the top boundary to fall into the final grade.
last_grade, min_pd, max_pd = _GRADE_TABLE[-1]
if math.isclose(final_pd, max_pd):
return last_grade
return None
def _apply_isotonic(model_key: str, raw_pd: float) -> float:
calibrator = _ISOTONIC_MODELS.get(model_key)
if calibrator is None:
return _clamp_probability(raw_pd)
calibrated = calibrator.predict([raw_pd])[0]
return _clamp_probability(calibrated)
def __main__(
pd_a: float,
pd_b: float,
pd_t: float
) -> Dict[str, float | str]:
"""
Inputs (request schema):
- pd_scores: ordered list of {"name": "pd_a"|"pd_b"|"pd_t", "value": <float>} entries
- pd_scores_pd_a / pd_scores_pd_b / pd_scores_pd_t: explicit, non-null PD inputs; must match pd_scores when provided
Outputs (response schema):
- pd_a: raw PD A clamped to [0,1]
- pd_b: raw PD B clamped to [0,1]
- pd_t: model T probability clamped to [0,1]
- pd_iso_a: isotonic-calibrated PD A
- pd_iso_b: isotonic-calibrated PD B
- final_pd: weighted final PD using pd_t as weight
- grade: assigned grade from the cutoff table
"""
_load_grade_table()
_ensure_isotonic_models_loaded()
weight = _clamp_probability(pd_t)
pd_iso_a = _apply_isotonic("a", pd_a)
pd_iso_b = _apply_isotonic("b", pd_b)
final_pd = (pd_iso_a * weight) + (pd_iso_b * (1 - weight))
grade = _determine_grade(final_pd)
return {
"pd_a": _clamp_probability(pd_a),
"pd_b": _clamp_probability(pd_b),
"pd_t": weight,
"pd_iso_a": pd_iso_a,
"pd_iso_b": pd_iso_b,
"final_pd": final_pd,
"grade": grade if grade is not None else "",
}

7
grade_cutoffs.csv Normal file
View File

@ -0,0 +1,7 @@
grade,min_pd,max_pd
A1,0.0000000000,0.05
A2,0.0500000001,0.1
B1,0.1000000001,0.15
B2,0.1500000001,0.3
C1,0.3000000001,0.35
C2,0.3500000001,1
1 grade min_pd max_pd
2 A1 0.0000000000 0.05
3 A2 0.0500000001 0.1
4 B1 0.1000000001 0.15
5 B2 0.1500000001 0.3
6 C1 0.3000000001 0.35
7 C2 0.3500000001 1

BIN
isotonic_model_A.joblib Normal file

Binary file not shown.

BIN
isotonic_model_B.joblib Normal file

Binary file not shown.

View File

@ -1 +1,11 @@
{} {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"pd_a": { "type": "number" },
"pd_b": { "type": "number" },
"pd_t": { "type": "number" }
},
"required": ["pd_a", "pd_b", "pd_t"],
"additionalProperties": false
}

View File

@ -1 +1,2 @@
{} joblib==1.5.2
scikit-learn==1.7.2

View File

@ -1 +1,16 @@
{} {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"pd_a": { "type": "number" },
"pd_b": { "type": "number" },
"pd_t": { "type": "number" },
"pd_iso_a": {"type": "number" },
"pd_iso_b": {"type": "number" },
"final_pd": {"type": "number" },
"grade": {"type": "string"}
},
"required": ["final_pd", "grade", "pd_a", "pd_b", "pd_t", "pd_iso_a", "pd_iso_b"],
"additionalProperties": false
}

22
test_block.py Normal file
View File

@ -0,0 +1,22 @@
import unittest
from block import __main__
data = {'pd_a': 0.030282551422715187, 'pd_b': 0.07098247110843658, 'pd_t': 0.6349245309829712}
class TestBlock(unittest.TestCase):
def test_main_returns_scores(self):
block_result = __main__(**data)
print(block_result)
self.assertIsInstance(block_result, dict)
self.assertIn("pd_a", block_result)
self.assertIn("pd_b", block_result)
self.assertIn("pd_t", block_result)
self.assertIn("pd_iso_a", block_result)
self.assertIn("pd_iso_b", block_result)
self.assertIn("final_pd", block_result)
self.assertIn("grade", block_result)
if __name__ == "__main__": # pragma: no cover
unittest.main()