|
import pandas as pd
|
|
import numpy as np
|
|
import os
|
|
import joblib
|
|
from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold
|
|
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
|
|
from sklearn.impute import SimpleImputer
|
|
from sklearn.metrics import classification_report, accuracy_score, roc_curve, auc
|
|
from sklearn.feature_selection import SelectFromModel
|
|
import matplotlib.pyplot as plt
|
|
import seaborn as sns
|
|
from scipy.stats import zscore
|
|
from imblearn.over_sampling import SMOTE
|
|
import os
|
|
os.environ["LOKY_MAX_CPU_COUNT"] = "4"
|
|
|
|
|
|
|
|
def load_data(file_path):
|
|
df = pd.read_excel(file_path, header=1)
|
|
return df
|
|
|
|
|
|
|
|
def preprocess_data_with_categoricals(df):
|
|
|
|
df.replace(-9, np.nan, inplace=True)
|
|
|
|
|
|
missing_percentage = df.isnull().sum() / len(df) * 100
|
|
df = df.drop(columns=missing_percentage[missing_percentage > 50].index)
|
|
|
|
|
|
imputer = SimpleImputer(strategy='median')
|
|
numeric_cols = df.select_dtypes(include=['number']).columns
|
|
df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
|
|
|
|
|
|
if 'Binary diagnosis' in df.columns:
|
|
df['Binary diagnosis'] = df['Binary diagnosis'].apply(
|
|
lambda x: 1 if str(x).strip().lower() == "ipf" else 0
|
|
)
|
|
|
|
if 'Death' in df.columns:
|
|
df['Death'] = df['Death'].apply(
|
|
lambda x: 1 if str(x).strip().lower() == "yes" else 0
|
|
)
|
|
|
|
|
|
df = apply_one_hot_encoding(df)
|
|
|
|
|
|
categorical_cols = df.select_dtypes(include=['object']).columns
|
|
numeric_cols = df.select_dtypes(include=['number']).columns
|
|
print("Categorical Variables:", categorical_cols.tolist())
|
|
print("Numerical Variables:", numeric_cols.tolist())
|
|
return df, numeric_cols, categorical_cols
|
|
|
|
|
|
def apply_one_hot_encoding(df):
|
|
categorical_cols = df.select_dtypes(include=['object']).columns
|
|
df = pd.get_dummies(df, columns=categorical_cols, drop_first=True)
|
|
return df
|
|
|
|
def remove_outliers(df, numeric_cols, z_threshold=4):
|
|
for col in numeric_cols:
|
|
z_scores = zscore(df[col])
|
|
df = df[(np.abs(z_scores) < z_threshold) | (pd.isnull(z_scores))]
|
|
return df
|
|
|
|
|
|
def select_important_features(X, y, threshold=0.03):
|
|
model = RandomForestClassifier(random_state=42)
|
|
model.fit(X, y)
|
|
selector = SelectFromModel(model, threshold=threshold, prefit=True)
|
|
selected_mask = selector.get_support()
|
|
selected_features = X.columns[selected_mask]
|
|
X_reduced = X.loc[:, selected_features]
|
|
return X_reduced, selected_features
|
|
|
|
|
|
def plot_feature_importance(model, features, target):
|
|
importance = model.feature_importances_
|
|
sorted_idx = np.argsort(importance)[::-1]
|
|
plt.figure(figsize=(10, 6))
|
|
sns.barplot(x=importance[sorted_idx], y=np.array(features)[sorted_idx])
|
|
plt.title(f'Feature Importance for {target}')
|
|
plt.xlabel('Importance')
|
|
plt.ylabel('Feature')
|
|
plt.tight_layout()
|
|
plt.show()
|
|
|
|
|
|
def plot_model_performance(cv_scores, train_scores, test_scores, target ,metric_name="Accuracy"):
|
|
plt.figure(figsize=(12, 6))
|
|
|
|
|
|
plt.subplot(1, 2, 1)
|
|
plt.plot(cv_scores, label='Cross-validation scores', marker='o')
|
|
plt.title(f'Cross-validation {metric_name} for {target}')
|
|
plt.xlabel('Fold')
|
|
plt.ylabel(metric_name)
|
|
plt.grid(True)
|
|
plt.legend()
|
|
|
|
|
|
plt.subplot(1, 2, 2)
|
|
plt.bar(['Train', 'Test'], [train_scores.mean(), test_scores], color=['blue', 'orange'])
|
|
plt.title(f'{metric_name}: Train vs Test')
|
|
plt.ylabel(metric_name)
|
|
plt.grid(True)
|
|
|
|
plt.tight_layout()
|
|
plt.show()
|
|
|
|
|
|
def plot_roc_auc(model, X_test, y_test, target):
|
|
y_prob = model.predict_proba(X_test)[:, 1]
|
|
fpr, tpr, thresholds = roc_curve(y_test, y_prob)
|
|
roc_auc = auc(fpr, tpr)
|
|
|
|
plt.figure(figsize=(8, 6))
|
|
plt.plot(fpr, tpr, color='blue', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
|
|
plt.plot([0, 1], [0, 1], color='gray', linestyle='--')
|
|
plt.xlabel('False Positive Rate')
|
|
plt.ylabel('True Positive Rate')
|
|
plt.title(f'ROC-AUC Curve for {target}')
|
|
plt.legend(loc="lower right")
|
|
plt.grid(True)
|
|
plt.show()
|
|
|
|
|
|
def save_model(model, target, selected_features):
|
|
if not os.path.exists("models"):
|
|
os.makedirs("models")
|
|
file_name = f"models/{target}_random_forest_model.pkl"
|
|
joblib.dump({'model': model, 'features': selected_features}, file_name)
|
|
print(f"Model and features saved to {file_name}")
|
|
|
|
|
|
def main():
|
|
file_path = 'FibroPredCODIFICADA.xlsx'
|
|
df = load_data(file_path)
|
|
|
|
|
|
target_columns = ['Death', 'Binary diagnosis', 'Necessity of transplantation', 'Progressive disease']
|
|
|
|
|
|
predictors_to_remove_dict = {
|
|
'Death': ['Final diagnosis', 'Transplantation date', 'Cause of death', 'Date of death', 'COD NUMBER','FVC (L) 1 year after diagnosis',
|
|
'FVC (%) 1 year after diagnosis','DLCO (%) 1 year after diagnosis'],
|
|
'Binary diagnosis': ['ProgressiveDisease', 'Final diagnosis', 'Transplantation date', 'Cause of death', 'Date of death', 'COD NUMBER','Pirfenidone','Nintedanib',
|
|
'Antifibrotic Drug','Prednisone','Mycophenolate','FVC (L) 1 year after diagnosis','FVC (%) 1 year after diagnosis',
|
|
'DLCO (%) 1 year after diagnosis','RadioWorsening2y'],
|
|
'Necessity of transplantation': ['ProgressiveDisease', 'Final diagnosis', 'Transplantation date', 'Cause of death', 'Date of death', 'COD NUMBER','Age at diagnosis'],
|
|
'Progressive disease': ['ProgressiveDisease', 'Final diagnosis', 'Transplantation date', 'Cause of death', 'Date of death', 'COD NUMBER', 'FVC (L) 1 year after diagnosis',
|
|
'FVC (%) 1 year after diagnosis','DLCO (%) 1 year after diagnosis','RadioWorsening2y']
|
|
}
|
|
|
|
|
|
df, numeric_cols, categorical_cols = preprocess_data_with_categoricals(df)
|
|
|
|
for target in target_columns:
|
|
print(f"Processing target: {target}")
|
|
|
|
if target in ['Necessity of transplantation', 'Progressive disease']:
|
|
print(f"Removing outliers for target: {target}")
|
|
df = remove_outliers(df, numeric_cols)
|
|
|
|
|
|
predictors_to_remove = predictors_to_remove_dict.get(target, [])
|
|
|
|
X = df[numeric_cols].drop(columns=target_columns + predictors_to_remove, errors='ignore')
|
|
y = df[target]
|
|
|
|
|
|
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
|
|
|
|
|
|
if target in ['Binary diagnosis', 'Necessity of transplantation']:
|
|
print(f"Applying SMOTE to balance the training set for target: {target}")
|
|
smote = SMOTE(random_state=42)
|
|
X_train, y_train = smote.fit_resample(X_train, y_train)
|
|
|
|
X_train_selected, selected_features = select_important_features(X_train, y_train)
|
|
X_test_selected = X_test[selected_features]
|
|
|
|
print(f"Selected predictors for training {target} ({len(selected_features)} predictors): {selected_features.tolist()}")
|
|
|
|
|
|
model = RandomForestClassifier(n_estimators=300,
|
|
max_depth=4,
|
|
min_samples_split=10,
|
|
min_samples_leaf=10,
|
|
class_weight='balanced',
|
|
max_features='sqrt',
|
|
random_state=42)
|
|
model.fit(X_train_selected, y_train)
|
|
|
|
|
|
cv = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)
|
|
cv_scores = cross_val_score(model, X_train_selected, y_train, cv=cv, scoring='accuracy')
|
|
train_scores = cross_val_score(model, X_train_selected, y_train, cv=10, scoring='accuracy')
|
|
y_pred_test = model.predict(X_test_selected)
|
|
test_score = accuracy_score(y_test, y_pred_test)
|
|
|
|
print(f"Cross-validation accuracy for {target}: {cv_scores.mean():.4f} (+/- {cv_scores.std():.4f})")
|
|
print(f"Test accuracy for {target}: {test_score:.4f}")
|
|
print(classification_report(y_test, y_pred_test))
|
|
|
|
|
|
plot_model_performance(cv_scores, train_scores, test_score, target, metric_name="Accuracy")
|
|
|
|
|
|
print(f"Feature importance for {target}:")
|
|
plot_feature_importance(model, selected_features, target)
|
|
|
|
|
|
plot_roc_auc(model, X_test_selected, y_test, target)
|
|
|
|
|
|
save_model(model, target, selected_features.tolist())
|
|
|
|
print("Pipeline completed.")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|