import json import logging from functools import lru_cache from pathlib import Path from typing import Any, Dict, List, Tuple import joblib import numpy as np import pandas as pd import xgboost as xgb from pandas.api.types import ( is_bool_dtype, is_categorical_dtype, is_float_dtype, is_integer_dtype, is_object_dtype, ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Paths & constants # --------------------------------------------------------------------------- try: BASE_DIR = Path(__file__).resolve().parent except NameError: # Fallback for environments where __file__ is not defined (e.g. some REPLs / notebooks) BASE_DIR = Path.cwd() A_MODEL_PATH = BASE_DIR / "xgboost_model_A.joblib" A_CATEGORY_ORDERS_PATH = BASE_DIR / "category_orders_train_A.json" B_MODEL_PATH = BASE_DIR / "xgboost_model_B.joblib" B_CATEGORY_ORDERS_PATH = BASE_DIR / "category_orders_train_B.json" T_MODEL_PATH = BASE_DIR / "xgboost_model_T.joblib" T_CATEGORY_ORDERS_PATH = BASE_DIR / "category_orders_train_T.json" # --------------------------------------------------------------------------- # Loaders # --------------------------------------------------------------------------- def _load_category_orders(path: Path) -> Dict[str, Any]: """Load category orders JSON from disk.""" with open(path, "r") as f: return json.load(f) @lru_cache(maxsize=1) def _load_a_model(): """Load and cache model A.""" logger.info("Loading model A from %s", A_MODEL_PATH) return joblib.load(A_MODEL_PATH) @lru_cache(maxsize=1) def _load_b_model(): """Load and cache model B.""" logger.info("Loading model B from %s", B_MODEL_PATH) return joblib.load(B_MODEL_PATH) @lru_cache(maxsize=1) def _load_t_model(): """Load and cache model T.""" logger.info("Loading model T from %s", T_MODEL_PATH) return joblib.load(T_MODEL_PATH) @lru_cache(maxsize=None) def _load_category_orders_cached(path: Path) -> Dict[str, Any]: """ Cache category orders per path to avoid disk I/O on each scoring. """ logger.info("Loading category orders from %s", path) return _load_category_orders(path) def _get_expected_features(model: Any, df: pd.DataFrame) -> List[str]: """ Get the expected feature names from the model. If the model has no 'feature_names' attribute, fall back to df columns. This is a defensive measure; ideally, feature names should always be stored with the model. """ feature_names = getattr(model, "feature_names", None) if feature_names is None: logger.warning( "Model has no attribute 'feature_names'; using DataFrame columns order." ) feature_names = list(df.columns) return list(feature_names) # --------------------------------------------------------------------------- # Preprocessing helpers # --------------------------------------------------------------------------- MISSING_SENTINELS = [None, "", "null", np.nan, "nan", " "] def _to_string_category(series: pd.Series) -> pd.Series: """ Force a categorical series whose categories are strings (not floats), backed by NumPy object dtype (not pandas StringDtype) for XGBoost compatibility. """ s = series.copy() s.replace(MISSING_SENTINELS, np.nan, inplace=True) # Use classic Python strings (object dtype), not pandas' StringDtype, # so that XGBoost's numpy-based dtype checks work correctly. s = s.astype(str) return s.astype("category") def _sanitize_expected_feature_dtypes( df: pd.DataFrame, expected_features: List[str], categorical_feature_names: List[str], ) -> pd.DataFrame: """ XGBoost DMatrix does NOT allow object dtype. For each expected feature column: - If dtype is numeric or bool, keep as-is. - If categorical with float categories, convert to string categories. - If object: * try numeric coercion * if still not usable, cast to string category. This mirrors the safety checks needed to satisfy the XGBoost 3.x pandas backend (`enable_categorical=True`). """ df = df.copy() categorical_set = set(categorical_feature_names) for col in expected_features: if col not in df.columns: # Ensure column exists so downstream checks don't fail here. df[col] = np.nan dtype = df[col].dtype # If this feature is known to be categorical from training-time # category_orders, assume _prepare_* already produced a proper # pandas Categorical with the training categories and leave it # untouched so that category codes match training. if col in categorical_set: continue # For non-categorical features, XGBoost expects numeric or bool. if is_bool_dtype(dtype) or is_integer_dtype(dtype) or is_float_dtype(dtype): continue # Anything else (object, string, unexpected categorical) -> numeric coercion. numeric = pd.to_numeric(df[col], errors="coerce") df[col] = numeric return df def _prepare_a(df: pd.DataFrame, category_orders: Dict[str, List[Any]]) -> pd.DataFrame: """ Prepare features for model A. For each column with category orders: - If all category labels are numeric-like, coerce both labels and data to floats and build a numeric categorical. - Otherwise, treat as string categories. """ df = df.copy() for col, raw_categories in category_orders.items(): if col not in df.columns: df[col] = np.nan df[col].replace(MISSING_SENTINELS, np.nan, inplace=True) # Detect whether category labels are numeric-like and, if so, # map numeric values onto the canonical string labels used # during training (e.g. -4 -> "-4.0"). numeric_like = True numeric_label_map: Dict[float, str] = {} for v in raw_categories: try: numeric_label_map[float(v)] = str(v) except (TypeError, ValueError): numeric_like = False break if numeric_like: def _map_value(val: Any) -> Any: if pd.isna(val): return np.nan try: key = float(val) except (TypeError, ValueError): return np.nan return numeric_label_map.get(key, np.nan) df[col] = df[col].map(_map_value) df[col] = pd.Categorical(df[col], categories=raw_categories, ordered=True) else: # Pure string categories: coerce to plain strings df[col] = df[col].astype(str) df[col] = pd.Categorical(df[col], categories=raw_categories, ordered=True) return df def _prepare_with_lower(df: pd.DataFrame, category_orders: Dict[str, List[Any]]) -> pd.DataFrame: """ Shared preparation logic for models B and T where categorical values are lowercased strings. """ df = df.copy() for col, raw_categories in category_orders.items(): if col not in df.columns: df[col] = np.nan # Normalize missing-like representations df[col].replace(MISSING_SENTINELS, np.nan, inplace=True) # Detect whether category labels are numeric-like and, if so, # map numeric values onto the canonical string labels used # during training (e.g. -4 -> "-4.0"). Otherwise, treat # them as lowercased string categories. numeric_like = True numeric_label_map: Dict[float, str] = {} for v in raw_categories: try: numeric_label_map[float(v)] = str(v) except (TypeError, ValueError): numeric_like = False break if numeric_like: def _map_value(val: Any) -> Any: if pd.isna(val): return np.nan try: key = float(val) except (TypeError, ValueError): return np.nan return numeric_label_map.get(key, np.nan) df[col] = df[col].map(_map_value) df[col] = pd.Categorical(df[col], categories=raw_categories, ordered=True) else: # String categories: lower-case string representation df[col] = df[col].astype(str).str.lower() df[col] = pd.Categorical(df[col], categories=raw_categories, ordered=True) return df def _prepare_b(df: pd.DataFrame, category_orders: Dict[str, List[Any]]) -> pd.DataFrame: """ Prepare features for model B (lowercased categorical values). """ return _prepare_with_lower(df, category_orders) def _prepare_t(df: pd.DataFrame, category_orders: Dict[str, List[Any]]) -> pd.DataFrame: """ Prepare features for model T (lowercased categorical values). """ return _prepare_with_lower(df, category_orders) # --------------------------------------------------------------------------- # Per-model processing functions # --------------------------------------------------------------------------- def processing_a(input_data: pd.DataFrame) -> float: """ Run model A on input_data and return the first prediction as float. """ df = pd.DataFrame(input_data) if df.empty: raise ValueError("Input DataFrame for model A is empty.") model = _load_a_model() category_orders = _load_category_orders_cached(A_CATEGORY_ORDERS_PATH) df = _prepare_a(df, category_orders) expected_features = _get_expected_features(model, df) df = _sanitize_expected_feature_dtypes(df, expected_features, list(category_orders.keys())) # Ensure all expected features exist in df missing_features = set(expected_features) - set(df.columns) if missing_features: raise KeyError( f"Missing expected features for model A: {sorted(missing_features)}" ) dmatrix = xgb.DMatrix(df[expected_features], enable_categorical=True, missing=np.nan) predictions = model.predict(dmatrix) if len(predictions) == 0: raise RuntimeError("Model A returned no predictions.") pd_a = float(predictions[0]) return pd_a def processing_b(input_data: pd.DataFrame) -> float: """ Run model B on input_data and return the first prediction as float. """ df = pd.DataFrame(input_data) if df.empty: raise ValueError("Input DataFrame for model B is empty.") model = _load_b_model() category_orders = _load_category_orders_cached(B_CATEGORY_ORDERS_PATH) df = _prepare_b(df, category_orders) expected_features = _get_expected_features(model, df) df = _sanitize_expected_feature_dtypes(df, expected_features, list(category_orders.keys())) missing_features = set(expected_features) - set(df.columns) if missing_features: raise KeyError( f"Missing expected features for model B: {sorted(missing_features)}" ) dmatrix = xgb.DMatrix(df[expected_features], enable_categorical=True, missing=np.nan) predictions = model.predict(dmatrix) if len(predictions) == 0: raise RuntimeError("Model B returned no predictions.") pd_b = float(predictions[0]) return pd_b def processing_t(input_data: pd.DataFrame) -> float: """ Run model T on input_data and return the first prediction as float. """ df = pd.DataFrame(input_data) if df.empty: raise ValueError("Input DataFrame for model T is empty.") model = _load_t_model() category_orders = _load_category_orders_cached(T_CATEGORY_ORDERS_PATH) df = _prepare_t(df, category_orders) expected_features = _get_expected_features(model, df) df = _sanitize_expected_feature_dtypes(df, expected_features, list(category_orders.keys())) missing_features = set(expected_features) - set(df.columns) if missing_features: raise KeyError( f"Missing expected features for model T: {sorted(missing_features)}" ) dmatrix = xgb.DMatrix(df[expected_features], enable_categorical=True, missing=np.nan) predictions = model.predict(dmatrix) if len(predictions) == 0: raise RuntimeError("Model T returned no predictions.") pd_t = float(predictions[0]) return pd_t def processing_all( df_a: pd.DataFrame, df_b: pd.DataFrame, df_t: pd.DataFrame, ) -> Tuple[float, float, float]: """ Convenience function to run all three models and return their predictions. """ return ( processing_a(df_a), processing_b(df_b), processing_t(df_t), ) # --------------------------------------------------------------------------- # Main entrypoint for batch-style input # --------------------------------------------------------------------------- def __main__(results: List[Dict[str, Any]]) -> Tuple[float, float, float]: """ Main entrypoint for processing a list of results dicts. Expected shape of each element in `results`: { "model_a_features": { ... feature_name: value ... }, "model_b_features": { ... feature_name: value ... }, "model_t_features": { ... feature_name: value ... }, } """ logger.info("Data received in processing block: %s", results) df = pd.DataFrame(results) if df.empty: raise ValueError("Input results list is empty.") if not {"model_a_features", "model_b_features", "model_t_features"}.issubset(df.columns): missing = { "model_a_features", "model_b_features", "model_t_features", } - set(df.columns) raise KeyError( f"Missing expected keys in results: {sorted(missing)}" ) # Each cell of these columns is expected to be a dict-like object df_a = pd.DataFrame(list(df["model_a_features"])) df_b = pd.DataFrame(list(df["model_b_features"])) df_t = pd.DataFrame(list(df["model_t_features"])) pd_a, pd_b, pd_t = processing_all(df_a, df_b, df_t) return { "pd_a": pd_a, "pd_b": pd_b, "pd_t": pd_t, }