feiyang-cai's picture
update
5801d99
from transformers import AutoConfig, AutoModelForSequenceClassification, AutoTokenizer
from typing import Optional, Dict, Sequence, List
import transformers
from peft import PeftModel
import torch
from torch.nn.utils.rnn import pad_sequence
from dataclasses import dataclass
import pandas as pd
from datasets import Dataset
from tqdm import tqdm
import numpy as np
from huggingface_hub import hf_hub_download
import os
import pickle
from sklearn import preprocessing
import json
import spaces
import time
class calculateDuration:
def __init__(self, activity_name=""):
self.activity_name = activity_name
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.end_time = time.time()
self.elapsed_time = self.end_time - self.start_time
if self.activity_name:
print(f"Elapsed time for {self.activity_name}: {self.elapsed_time:.6f} seconds")
else:
print(f"Elapsed time: {self.elapsed_time:.6f} seconds")
from rdkit import RDLogger, Chem
# Suppress RDKit INFO messages
RDLogger.DisableLog('rdApp.*')
# we have a dictionary to store the task types of the models
#task_types = {
# "admet_bioavailability_ma": "classification",
# "admet_ppbr_az": "regression",
# "admet_half_life_obach": "regression",
#}
# read the dataset descriptions
with open("dataset_descriptions.json", "r") as f:
dataset_description_temp = json.load(f)
dataset_descriptions = dict()
dataset_property_names = dict()
dataset_task_types = dict()
dataset_property_names_to_dataset = dict()
for dataset in dataset_description_temp:
dataset_name = dataset.lower()
dataset_descriptions[dataset_name] = \
f"{dataset_description_temp[dataset]['task_name']} is a {dataset_description_temp[dataset]['task_type']} task, " + \
f"where the goal is to {dataset_description_temp[dataset]['description']}. \n" + \
f"More information can be found at {dataset_description_temp[dataset]['url']}."
dataset_property_names[dataset_name] = dataset_description_temp[dataset]['task_name']
dataset_property_names_to_dataset[dataset_description_temp[dataset]['task_name']] = dataset_name
dataset_task_types[dataset_name] = dataset_description_temp[dataset]['task_type']
class Scaler:
def __init__(self, log=False):
self.log = log
self.offset = None
self.scaler = None
def fit(self, y):
# make the values non-negative
self.offset = np.min([np.min(y), 0.0])
y = y.reshape(-1, 1) - self.offset
# scale the input data
if self.log:
y = np.log10(y + 1.0)
self.scaler = preprocessing.StandardScaler().fit(y)
def transform(self, y):
y = y.reshape(-1, 1) - self.offset
# scale the input data
if self.log:
y = np.log10(y + 1.0)
y_scale = self.scaler.transform(y)
return y_scale
def inverse_transform(self, y_scale):
y = self.scaler.inverse_transform(y_scale.reshape(-1, 1))
if self.log:
y = 10.0**y - 1.0
y = y + self.offset
return y
def smart_tokenizer_and_embedding_resize(
special_tokens_dict: Dict,
tokenizer: transformers.PreTrainedTokenizer,
model: transformers.PreTrainedModel,
non_special_tokens = None,
):
"""Resize tokenizer and embedding.
Note: This is the unoptimized version that may make your embedding size not be divisible by 64.
"""
num_new_tokens = tokenizer.add_special_tokens(special_tokens_dict) + tokenizer.add_tokens(non_special_tokens)
num_old_tokens = model.get_input_embeddings().weight.shape[0]
num_new_tokens = len(tokenizer) - num_old_tokens
if num_new_tokens == 0:
return
model.resize_token_embeddings(len(tokenizer))
if num_new_tokens > 0:
input_embeddings_data = model.get_input_embeddings().weight.data
input_embeddings_avg = input_embeddings_data[:-num_new_tokens].mean(dim=0, keepdim=True)
input_embeddings_data[-num_new_tokens:] = input_embeddings_avg
print(f"Resized tokenizer and embedding from {num_old_tokens} to {len(tokenizer)} tokens.")
@dataclass
class DataCollator(object):
tokenizer: transformers.PreTrainedTokenizer
source_max_len: int
molecule_start_str: str
end_str: str
def augment_molecule(self, molecule: str) -> str:
return self.sme.augment([molecule])[0]
def __call__(self, instances: Sequence[Dict]) -> Dict[str, torch.Tensor]:
with calculateDuration("DataCollator"):
sources = []
for example in instances:
smiles = example['smiles'].strip()
smiles = Chem.MolToSmiles(Chem.MolFromSmiles(smiles))
# get the properties except the smiles and mol_id cols
#props = [example[col] if example[col] is not None else np.nan for col in sorted(example.keys()) if col not in ['smiles', 'is_aug']]
source = f"{self.molecule_start_str}{smiles}{self.end_str}"
sources.append(source)
# Tokenize
tokenized_sources_with_prompt = self.tokenizer(
sources,
max_length=self.source_max_len,
truncation=True,
add_special_tokens=False,
)
input_ids = [torch.tensor(tokenized_source) for tokenized_source in tokenized_sources_with_prompt['input_ids']]
input_ids = pad_sequence(input_ids, batch_first=True, padding_value=self.tokenizer.pad_token_id)
data_dict = {
'input_ids': input_ids,
'attention_mask': input_ids.ne(self.tokenizer.pad_token_id),
}
return data_dict
class MolecularPropertyPredictionModel():
def __init__(self, candidate_models):
self.adapter_name = None
# we need to keep track of the paths of adapter scalers
# we don't want to download the same scaler multiple times
self.apapter_scaler_path = dict()
DEFAULT_PAD_TOKEN = "[PAD]"
# load the base model
config = AutoConfig.from_pretrained(
"ChemFM/ChemFM-3B",
num_labels=1,
finetuning_task="classification", # this is not about our task type
trust_remote_code=True,
token = os.environ.get("TOKEN")
)
self.base_model = AutoModelForSequenceClassification.from_pretrained(
"ChemFM/ChemFM-3B",
config=config,
device_map="cuda",
trust_remote_code=True,
token = os.environ.get("TOKEN")
)
#
# load the tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(
"ChemFM/admet_ppbr_az",
trust_remote_code=True,
token = os.environ.get("TOKEN")
)
special_tokens_dict = dict(pad_token=DEFAULT_PAD_TOKEN)
smart_tokenizer_and_embedding_resize(
special_tokens_dict=special_tokens_dict,
tokenizer=self.tokenizer,
model=self.base_model
)
self.base_model.config.pad_token_id = self.tokenizer.pad_token_id
self.data_collator = DataCollator(
tokenizer=self.tokenizer,
source_max_len=512,
molecule_start_str="<molstart>",
end_str="<eos>",
)
# load the adapters firstly
for adapter_name in candidate_models:
adapter_id = candidate_models[adapter_name]
print(f"loading {adapter_name} from {adapter_id}...")
self.base_model.load_adapter(adapter_id, adapter_name=adapter_name, token = os.environ.get("TOKEN"))
try:
self.apapter_scaler_path[adapter_name] = hf_hub_download(adapter_id, filename="scaler.pkl", token = os.environ.get("TOKEN"))
except:
self.apapter_scaler_path[adapter_name] = None
assert dataset_task_types[adapter_name] == "classification", f"{adapter_name} is not a regression task."
self.base_model.to("cuda")
def swith_adapter(self, adapter_name, adapter_id):
# return flag:
# keep: adapter is the same as the current one
# switched: adapter is switched successfully
# error: adapter is not found
with calculateDuration("switching adapter"):
if adapter_name == self.adapter_name:
return "keep"
# switch adapter
try:
#self.adapter_name = adapter_name
#print(self.adapter_name, adapter_id)
#self.lora_model = PeftModel.from_pretrained(self.base_model, adapter_id, token = os.environ.get("TOKEN"))
#self.lora_model.to("cuda")
#print(self.lora_model)
self.base_model.set_adapter(adapter_name)
self.base_model.eval()
#if adapter_name not in self.apapter_scaler_path:
# self.apapter_scaler_path[adapter_name] = hf_hub_download(adapter_id, filename="scaler.pkl", token = os.environ.get("TOKEN"))
if self.apapter_scaler_path[adapter_name] and os.path.exists(self.apapter_scaler_path[adapter_name]):
self.scaler = pickle.load(open(self.apapter_scaler_path[adapter_name], "rb"))
else:
self.scaler = None
self.adapter_name = adapter_name
return "switched"
except Exception as e:
# handle error
return "error"
def predict(self, valid_df, task_type):
with calculateDuration("predicting"):
with calculateDuration("construct dataloader"):
test_dataset = Dataset.from_pandas(valid_df)
# construct the dataloader
test_loader = torch.utils.data.DataLoader(
test_dataset,
batch_size=16,
collate_fn=self.data_collator,
)
# predict
y_pred = []
for i, batch in tqdm(enumerate(test_loader), total=len(test_loader), desc="Evaluating"):
with torch.no_grad():
batch = {k: v.to(self.base_model.device) for k, v in batch.items()}
print(self.base_model.device)
print(batch)
outputs = self.base_model(**batch)
print(outputs)
if task_type == "regression": # TODO: check if the model is regression or classification
y_pred.append(outputs.logits.cpu().detach().numpy())
else:
y_pred.append((torch.sigmoid(outputs.logits)).cpu().detach().numpy())
y_pred = np.concatenate(y_pred, axis=0)
if task_type=="regression" and self.scaler is not None:
y_pred = self.scaler.inverse_transform(y_pred)
return y_pred
def predict_single_smiles(self, smiles, task_type):
with calculateDuration("predicting a single SMILES"):
assert task_type in ["regression", "classification"]
# check the SMILES string is valid
if not Chem.MolFromSmiles(smiles):
return None
valid_df = pd.DataFrame([smiles], columns=['smiles'])
results = self.predict(valid_df, task_type)
# predict
return results.item()
def predict_file(self, df, task_type):
with calculateDuration("predicting a file"):
# we should add the index first
df = df.reset_index()
with calculateDuration("pre-checking SMILES"):
# we need to check the SMILES strings are valid, the invalid ones will be moved to the last
valid_idx = []
invalid_idx = []
for idx, smiles in enumerate(df['smiles']):
if Chem.MolFromSmiles(smiles):
valid_idx.append(idx)
else:
invalid_idx.append(idx)
valid_df = df.loc[valid_idx]
# get the smiles list
valid_df_smiles = valid_df['smiles'].tolist()
input_df = pd.DataFrame(valid_df_smiles, columns=['smiles'])
results = self.predict(input_df, task_type)
# add the results to the dataframe
df.loc[valid_idx, 'prediction'] = results
df.loc[invalid_idx, 'prediction'] = np.nan
# drop the index column
df = df.drop(columns=['index'])
# phrase file
return df