Spaces:
Running
on
Zero
Running
on
Zero
from transformers import AutoConfig, AutoModelForSequenceClassification, AutoTokenizer | |
from typing import Optional, Dict, Sequence, List | |
import transformers | |
from peft import PeftModel | |
import torch | |
from torch.nn.utils.rnn import pad_sequence | |
from dataclasses import dataclass | |
import pandas as pd | |
from datasets import Dataset | |
from tqdm import tqdm | |
import numpy as np | |
from huggingface_hub import hf_hub_download | |
import os | |
import pickle | |
from sklearn import preprocessing | |
import json | |
import spaces | |
import time | |
class calculateDuration: | |
def __init__(self, activity_name=""): | |
self.activity_name = activity_name | |
def __enter__(self): | |
self.start_time = time.time() | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
self.end_time = time.time() | |
self.elapsed_time = self.end_time - self.start_time | |
if self.activity_name: | |
print(f"Elapsed time for {self.activity_name}: {self.elapsed_time:.6f} seconds") | |
else: | |
print(f"Elapsed time: {self.elapsed_time:.6f} seconds") | |
from rdkit import RDLogger, Chem | |
# Suppress RDKit INFO messages | |
RDLogger.DisableLog('rdApp.*') | |
# we have a dictionary to store the task types of the models | |
#task_types = { | |
# "admet_bioavailability_ma": "classification", | |
# "admet_ppbr_az": "regression", | |
# "admet_half_life_obach": "regression", | |
#} | |
# read the dataset descriptions | |
with open("dataset_descriptions.json", "r") as f: | |
dataset_description_temp = json.load(f) | |
dataset_descriptions = dict() | |
dataset_property_names = dict() | |
dataset_task_types = dict() | |
dataset_property_names_to_dataset = dict() | |
for dataset in dataset_description_temp: | |
dataset_name = dataset.lower() | |
dataset_descriptions[dataset_name] = \ | |
f"{dataset_description_temp[dataset]['task_name']} is a {dataset_description_temp[dataset]['task_type']} task, " + \ | |
f"where the goal is to {dataset_description_temp[dataset]['description']}. \n" + \ | |
f"More information can be found at {dataset_description_temp[dataset]['url']}." | |
dataset_property_names[dataset_name] = dataset_description_temp[dataset]['task_name'] | |
dataset_property_names_to_dataset[dataset_description_temp[dataset]['task_name']] = dataset_name | |
dataset_task_types[dataset_name] = dataset_description_temp[dataset]['task_type'] | |
class Scaler: | |
def __init__(self, log=False): | |
self.log = log | |
self.offset = None | |
self.scaler = None | |
def fit(self, y): | |
# make the values non-negative | |
self.offset = np.min([np.min(y), 0.0]) | |
y = y.reshape(-1, 1) - self.offset | |
# scale the input data | |
if self.log: | |
y = np.log10(y + 1.0) | |
self.scaler = preprocessing.StandardScaler().fit(y) | |
def transform(self, y): | |
y = y.reshape(-1, 1) - self.offset | |
# scale the input data | |
if self.log: | |
y = np.log10(y + 1.0) | |
y_scale = self.scaler.transform(y) | |
return y_scale | |
def inverse_transform(self, y_scale): | |
y = self.scaler.inverse_transform(y_scale.reshape(-1, 1)) | |
if self.log: | |
y = 10.0**y - 1.0 | |
y = y + self.offset | |
return y | |
def smart_tokenizer_and_embedding_resize( | |
special_tokens_dict: Dict, | |
tokenizer: transformers.PreTrainedTokenizer, | |
model: transformers.PreTrainedModel, | |
non_special_tokens = None, | |
): | |
"""Resize tokenizer and embedding. | |
Note: This is the unoptimized version that may make your embedding size not be divisible by 64. | |
""" | |
num_new_tokens = tokenizer.add_special_tokens(special_tokens_dict) + tokenizer.add_tokens(non_special_tokens) | |
num_old_tokens = model.get_input_embeddings().weight.shape[0] | |
num_new_tokens = len(tokenizer) - num_old_tokens | |
if num_new_tokens == 0: | |
return | |
model.resize_token_embeddings(len(tokenizer)) | |
if num_new_tokens > 0: | |
input_embeddings_data = model.get_input_embeddings().weight.data | |
input_embeddings_avg = input_embeddings_data[:-num_new_tokens].mean(dim=0, keepdim=True) | |
input_embeddings_data[-num_new_tokens:] = input_embeddings_avg | |
print(f"Resized tokenizer and embedding from {num_old_tokens} to {len(tokenizer)} tokens.") | |
class DataCollator(object): | |
tokenizer: transformers.PreTrainedTokenizer | |
source_max_len: int | |
molecule_start_str: str | |
end_str: str | |
def augment_molecule(self, molecule: str) -> str: | |
return self.sme.augment([molecule])[0] | |
def __call__(self, instances: Sequence[Dict]) -> Dict[str, torch.Tensor]: | |
with calculateDuration("DataCollator"): | |
sources = [] | |
for example in instances: | |
smiles = example['smiles'].strip() | |
smiles = Chem.MolToSmiles(Chem.MolFromSmiles(smiles)) | |
# get the properties except the smiles and mol_id cols | |
#props = [example[col] if example[col] is not None else np.nan for col in sorted(example.keys()) if col not in ['smiles', 'is_aug']] | |
source = f"{self.molecule_start_str}{smiles}{self.end_str}" | |
sources.append(source) | |
# Tokenize | |
tokenized_sources_with_prompt = self.tokenizer( | |
sources, | |
max_length=self.source_max_len, | |
truncation=True, | |
add_special_tokens=False, | |
) | |
input_ids = [torch.tensor(tokenized_source) for tokenized_source in tokenized_sources_with_prompt['input_ids']] | |
input_ids = pad_sequence(input_ids, batch_first=True, padding_value=self.tokenizer.pad_token_id) | |
data_dict = { | |
'input_ids': input_ids, | |
'attention_mask': input_ids.ne(self.tokenizer.pad_token_id), | |
} | |
return data_dict | |
class MolecularPropertyPredictionModel(): | |
def __init__(self, candidate_models): | |
self.adapter_name = None | |
# we need to keep track of the paths of adapter scalers | |
# we don't want to download the same scaler multiple times | |
self.apapter_scaler_path = dict() | |
DEFAULT_PAD_TOKEN = "[PAD]" | |
# load the base model | |
config = AutoConfig.from_pretrained( | |
"ChemFM/ChemFM-3B", | |
num_labels=1, | |
finetuning_task="classification", # this is not about our task type | |
trust_remote_code=True, | |
token = os.environ.get("TOKEN") | |
) | |
self.base_model = AutoModelForSequenceClassification.from_pretrained( | |
"ChemFM/ChemFM-3B", | |
config=config, | |
device_map="cuda", | |
trust_remote_code=True, | |
token = os.environ.get("TOKEN") | |
) | |
# | |
# load the tokenizer | |
self.tokenizer = AutoTokenizer.from_pretrained( | |
"ChemFM/admet_ppbr_az", | |
trust_remote_code=True, | |
token = os.environ.get("TOKEN") | |
) | |
special_tokens_dict = dict(pad_token=DEFAULT_PAD_TOKEN) | |
smart_tokenizer_and_embedding_resize( | |
special_tokens_dict=special_tokens_dict, | |
tokenizer=self.tokenizer, | |
model=self.base_model | |
) | |
self.base_model.config.pad_token_id = self.tokenizer.pad_token_id | |
self.data_collator = DataCollator( | |
tokenizer=self.tokenizer, | |
source_max_len=512, | |
molecule_start_str="<molstart>", | |
end_str="<eos>", | |
) | |
# load the adapters firstly | |
for adapter_name in candidate_models: | |
adapter_id = candidate_models[adapter_name] | |
print(f"loading {adapter_name} from {adapter_id}...") | |
self.base_model.load_adapter(adapter_id, adapter_name=adapter_name, token = os.environ.get("TOKEN")) | |
try: | |
self.apapter_scaler_path[adapter_name] = hf_hub_download(adapter_id, filename="scaler.pkl", token = os.environ.get("TOKEN")) | |
except: | |
self.apapter_scaler_path[adapter_name] = None | |
assert dataset_task_types[adapter_name] == "classification", f"{adapter_name} is not a regression task." | |
self.base_model.to("cuda") | |
def swith_adapter(self, adapter_name, adapter_id): | |
# return flag: | |
# keep: adapter is the same as the current one | |
# switched: adapter is switched successfully | |
# error: adapter is not found | |
with calculateDuration("switching adapter"): | |
if adapter_name == self.adapter_name: | |
return "keep" | |
# switch adapter | |
try: | |
#self.adapter_name = adapter_name | |
#print(self.adapter_name, adapter_id) | |
#self.lora_model = PeftModel.from_pretrained(self.base_model, adapter_id, token = os.environ.get("TOKEN")) | |
#self.lora_model.to("cuda") | |
#print(self.lora_model) | |
self.base_model.set_adapter(adapter_name) | |
self.base_model.eval() | |
#if adapter_name not in self.apapter_scaler_path: | |
# self.apapter_scaler_path[adapter_name] = hf_hub_download(adapter_id, filename="scaler.pkl", token = os.environ.get("TOKEN")) | |
if self.apapter_scaler_path[adapter_name] and os.path.exists(self.apapter_scaler_path[adapter_name]): | |
self.scaler = pickle.load(open(self.apapter_scaler_path[adapter_name], "rb")) | |
else: | |
self.scaler = None | |
self.adapter_name = adapter_name | |
return "switched" | |
except Exception as e: | |
# handle error | |
return "error" | |
def predict(self, valid_df, task_type): | |
with calculateDuration("predicting"): | |
with calculateDuration("construct dataloader"): | |
test_dataset = Dataset.from_pandas(valid_df) | |
# construct the dataloader | |
test_loader = torch.utils.data.DataLoader( | |
test_dataset, | |
batch_size=16, | |
collate_fn=self.data_collator, | |
) | |
# predict | |
y_pred = [] | |
for i, batch in tqdm(enumerate(test_loader), total=len(test_loader), desc="Evaluating"): | |
with torch.no_grad(): | |
batch = {k: v.to(self.base_model.device) for k, v in batch.items()} | |
print(self.base_model.device) | |
print(batch) | |
outputs = self.base_model(**batch) | |
print(outputs) | |
if task_type == "regression": # TODO: check if the model is regression or classification | |
y_pred.append(outputs.logits.cpu().detach().numpy()) | |
else: | |
y_pred.append((torch.sigmoid(outputs.logits)).cpu().detach().numpy()) | |
y_pred = np.concatenate(y_pred, axis=0) | |
if task_type=="regression" and self.scaler is not None: | |
y_pred = self.scaler.inverse_transform(y_pred) | |
return y_pred | |
def predict_single_smiles(self, smiles, task_type): | |
with calculateDuration("predicting a single SMILES"): | |
assert task_type in ["regression", "classification"] | |
# check the SMILES string is valid | |
if not Chem.MolFromSmiles(smiles): | |
return None | |
valid_df = pd.DataFrame([smiles], columns=['smiles']) | |
results = self.predict(valid_df, task_type) | |
# predict | |
return results.item() | |
def predict_file(self, df, task_type): | |
with calculateDuration("predicting a file"): | |
# we should add the index first | |
df = df.reset_index() | |
with calculateDuration("pre-checking SMILES"): | |
# we need to check the SMILES strings are valid, the invalid ones will be moved to the last | |
valid_idx = [] | |
invalid_idx = [] | |
for idx, smiles in enumerate(df['smiles']): | |
if Chem.MolFromSmiles(smiles): | |
valid_idx.append(idx) | |
else: | |
invalid_idx.append(idx) | |
valid_df = df.loc[valid_idx] | |
# get the smiles list | |
valid_df_smiles = valid_df['smiles'].tolist() | |
input_df = pd.DataFrame(valid_df_smiles, columns=['smiles']) | |
results = self.predict(input_df, task_type) | |
# add the results to the dataframe | |
df.loc[valid_idx, 'prediction'] = results | |
df.loc[invalid_idx, 'prediction'] = np.nan | |
# drop the index column | |
df = df.drop(columns=['index']) | |
# phrase file | |
return df |