Spaces:
Sleeping
Sleeping
# -*- coding: utf-8 -*- | |
import os | |
script_dir = os.path.dirname(os.path.abspath(__file__)) | |
import pandas as pd | |
import numpy as np | |
from datetime import datetime | |
import multiprocessing | |
from tqdm import tqdm | |
from sklearn.svm import SVC | |
from sklearn.linear_model import SGDClassifier | |
from sklearn.model_selection import cross_val_predict, KFold | |
from skmultilearn.problem_transform import BinaryRelevance | |
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, hamming_loss | |
aspect_type = "" | |
dataset_type = "" | |
representation_dataframe = "" | |
representation_name = "" | |
detailed_output = False | |
def warn(*args, **kwargs): | |
pass | |
import warnings | |
warnings.warn = warn | |
def check_for_at_least_two_class_sample_exits(y): | |
for column in y: | |
column_sum = np.sum(y[column].array) | |
if column_sum < 2: | |
print('At least 2 positive samples are required for each class {0} class has {1} positive samples'.format(column, column_sum)) | |
return False | |
return True | |
def create_valid_kfold_object_for_multilabel_splits(X, y, kf): | |
if not check_for_at_least_two_class_sample_exits(y): | |
return None | |
sample_class_occurance = dict(zip(y.columns, np.zeros(len(y.columns)))) | |
for column in y: | |
for fold_train_index, fold_test_index in kf.split(X, y): | |
fold_col_sum = np.sum(y.iloc[fold_test_index, :][column].array) | |
if fold_col_sum > 0: | |
sample_class_occurance[column] += 1 | |
for key, value in sample_class_occurance.items(): | |
if value < 2: | |
random_state = np.random.randint(1000) | |
print(f"Random state is changed since at least two positive samples are required in different train/test folds. " | |
f"However, only one fold exists with positive samples for class {key}") | |
print(f"Selected random state is {random_state}") | |
kf = KFold(n_splits=5, shuffle=True, random_state=random_state) | |
return create_valid_kfold_object_for_multilabel_splits(X, y, kf) | |
return kf | |
def MultiLabelSVC_cross_val_predict(representation_name, dataset, X, y, classifier): | |
clf = classifier | |
Xn = np.array(X.tolist(), dtype=float) | |
kf_init = KFold(n_splits=5, shuffle=True, random_state=42) | |
kf = create_valid_kfold_object_for_multilabel_splits(X, y, kf_init) | |
if kf is None: | |
return None | |
y_pred = cross_val_predict(clf, Xn, y, cv=kf) | |
acc_cv, f1_mi_cv, f1_ma_cv, f1_we_cv = [], [], [], [] | |
pr_mi_cv, pr_ma_cv, pr_we_cv = [], [], [] | |
rc_mi_cv, rc_ma_cv, rc_we_cv = [], [], [] | |
hamm_cv = [] | |
for fold_train_index, fold_test_index in kf.split(X, y): | |
acc = accuracy_score(y.iloc[fold_test_index, :], y_pred[fold_test_index]) | |
acc_cv.append(np.round(acc, decimals=5)) | |
f1_mi_cv.append(np.round(f1_score(y.iloc[fold_test_index, :], y_pred[fold_test_index], average="micro"), decimals=5)) | |
f1_ma_cv.append(np.round(f1_score(y.iloc[fold_test_index, :], y_pred[fold_test_index], average="macro"), decimals=5)) | |
f1_we_cv.append(np.round(f1_score(y.iloc[fold_test_index, :], y_pred[fold_test_index], average="weighted"), decimals=5)) | |
pr_mi_cv.append(np.round(precision_score(y.iloc[fold_test_index, :], y_pred[fold_test_index], average="micro"), decimals=5)) | |
pr_ma_cv.append(np.round(precision_score(y.iloc[fold_test_index, :], y_pred[fold_test_index], average="macro"), decimals=5)) | |
pr_we_cv.append(np.round(precision_score(y.iloc[fold_test_index, :], y_pred[fold_test_index], average="weighted"), decimals=5)) | |
rc_mi_cv.append(np.round(recall_score(y.iloc[fold_test_index, :], y_pred[fold_test_index], average="micro"), decimals=5)) | |
rc_ma_cv.append(np.round(recall_score(y.iloc[fold_test_index, :], y_pred[fold_test_index], average="macro"), decimals=5)) | |
rc_we_cv.append(np.round(recall_score(y.iloc[fold_test_index, :], y_pred[fold_test_index], average="weighted"), decimals=5)) | |
hamm_cv.append(np.round(hamming_loss(y.iloc[fold_test_index, :], y_pred[fold_test_index]), decimals=5)) | |
means = list(np.mean([acc_cv, f1_mi_cv, f1_ma_cv, f1_we_cv, pr_mi_cv, pr_ma_cv, pr_we_cv, rc_mi_cv, rc_ma_cv, rc_we_cv, hamm_cv], axis=1)) | |
means = [np.round(i, decimals=5) for i in means] | |
stds = list(np.std([acc_cv, f1_mi_cv, f1_ma_cv, f1_we_cv, pr_mi_cv, pr_ma_cv, pr_we_cv, rc_mi_cv, rc_ma_cv, rc_we_cv, hamm_cv], axis=1)) | |
stds = [np.round(i, decimals=5) for i in stds] | |
return { | |
"cv_results": [representation_name + "_" + dataset, acc_cv, f1_mi_cv, f1_ma_cv, f1_we_cv, pr_mi_cv, pr_ma_cv, pr_we_cv, rc_mi_cv, rc_ma_cv, rc_we_cv, hamm_cv], | |
"means": [representation_name + "_" + dataset] + means, | |
"stds": [representation_name + "_" + dataset] + stds, | |
"predictions": y_pred | |
} | |
def ProtDescModel(): | |
datasets = os.listdir(os.path.join(script_dir, r"../data/auxilary_input/GO_datasets")) | |
if dataset_type == "All_Data_Sets" and aspect_type == "All_Aspects": | |
filtered_datasets = datasets | |
elif dataset_type == "All_Data_Sets": | |
filtered_datasets = [dataset for dataset in datasets if aspect_type in dataset] | |
elif aspect_type == "All_Aspects": | |
filtered_datasets = [dataset for dataset in datasets if dataset_type in dataset] | |
else: | |
filtered_datasets = [dataset for dataset in datasets if aspect_type in dataset and dataset_type in dataset] | |
cv_results = [] | |
cv_mean_results = [] | |
cv_std_results = [] | |
for dt in tqdm(filtered_datasets, total=len(filtered_datasets)): | |
print(f"Protein function prediction is started for the dataset: {dt.split('.')[0]}") | |
dt_file = pd.read_csv(os.path.join(script_dir, f"../data/auxilary_input/GO_datasets/{dt}"), sep="\t") | |
dt_merge = dt_file.merge(representation_dataframe, left_on="Protein_Id", right_on="Entry") | |
dt_X = dt_merge['Vector'] | |
dt_y = dt_merge.iloc[:, 1:-2] | |
if not check_for_at_least_two_class_sample_exits(dt_y): | |
print(f"No function will be predicted for the dataset: {dt.split('.')[0]}") | |
continue | |
cpu_number = multiprocessing.cpu_count() | |
model = MultiLabelSVC_cross_val_predict(representation_name, dt.split(".")[0], dt_X, dt_y, | |
classifier=BinaryRelevance(SGDClassifier(n_jobs=cpu_number, random_state=42))) | |
if model is not None: | |
cv_results.append(model["cv_results"]) | |
cv_mean_results.append(model["means"]) | |
cv_std_results.append(model["stds"]) | |
return { | |
"cv_results": cv_results, | |
"cv_mean_results": cv_mean_results, | |
"cv_std_results": cv_std_results | |
} | |
def pred_output(): | |
model = ProtDescModel() | |
cv_result = model["cv_results"] | |
cv_mean_result = model["cv_mean_results"] | |
cv_std_result = model["cv_std_results"] | |
return { | |
"cv_result": cv_result, | |
"cv_mean_result": cv_mean_result, | |
"cv_std_result": cv_std_result | |
} | |
# Example call to the function | |
# result = pred_output() | |
print(datetime.now()) |