from transformers import AutoConfig, AutoModelForSequenceClassification, AutoTokenizer from typing import Optional, Dict, Sequence, List import transformers from peft import PeftModel import torch from torch.nn.utils.rnn import pad_sequence from dataclasses import dataclass import pandas as pd from datasets import Dataset from tqdm import tqdm import numpy as np from huggingface_hub import hf_hub_download import os import pickle from sklearn import preprocessing import json import spaces import time class calculateDuration: def __init__(self, activity_name=""): self.activity_name = activity_name def __enter__(self): self.start_time = time.time() return self def __exit__(self, exc_type, exc_value, traceback): self.end_time = time.time() self.elapsed_time = self.end_time - self.start_time if self.activity_name: print(f"Elapsed time for {self.activity_name}: {self.elapsed_time:.6f} seconds") else: print(f"Elapsed time: {self.elapsed_time:.6f} seconds") from rdkit import RDLogger, Chem # Suppress RDKit INFO messages RDLogger.DisableLog('rdApp.*') # we have a dictionary to store the task types of the models #task_types = { # "admet_bioavailability_ma": "classification", # "admet_ppbr_az": "regression", # "admet_half_life_obach": "regression", #} # read the dataset descriptions with open("dataset_descriptions.json", "r") as f: dataset_description_temp = json.load(f) dataset_descriptions = dict() dataset_property_names = dict() dataset_task_types = dict() dataset_property_names_to_dataset = dict() for dataset in dataset_description_temp: dataset_name = dataset.lower() dataset_descriptions[dataset_name] = \ f"{dataset_description_temp[dataset]['task_name']} is a {dataset_description_temp[dataset]['task_type']} task, " + \ f"where the goal is to {dataset_description_temp[dataset]['description']}. \n" + \ f"More information can be found at {dataset_description_temp[dataset]['url']}." dataset_property_names[dataset_name] = dataset_description_temp[dataset]['task_name'] dataset_property_names_to_dataset[dataset_description_temp[dataset]['task_name']] = dataset_name dataset_task_types[dataset_name] = dataset_description_temp[dataset]['task_type'] class Scaler: def __init__(self, log=False): self.log = log self.offset = None self.scaler = None def fit(self, y): # make the values non-negative self.offset = np.min([np.min(y), 0.0]) y = y.reshape(-1, 1) - self.offset # scale the input data if self.log: y = np.log10(y + 1.0) self.scaler = preprocessing.StandardScaler().fit(y) def transform(self, y): y = y.reshape(-1, 1) - self.offset # scale the input data if self.log: y = np.log10(y + 1.0) y_scale = self.scaler.transform(y) return y_scale def inverse_transform(self, y_scale): y = self.scaler.inverse_transform(y_scale.reshape(-1, 1)) if self.log: y = 10.0**y - 1.0 y = y + self.offset return y def smart_tokenizer_and_embedding_resize( special_tokens_dict: Dict, tokenizer: transformers.PreTrainedTokenizer, model: transformers.PreTrainedModel, non_special_tokens = None, ): """Resize tokenizer and embedding. Note: This is the unoptimized version that may make your embedding size not be divisible by 64. """ num_new_tokens = tokenizer.add_special_tokens(special_tokens_dict) + tokenizer.add_tokens(non_special_tokens) num_old_tokens = model.get_input_embeddings().weight.shape[0] num_new_tokens = len(tokenizer) - num_old_tokens if num_new_tokens == 0: return model.resize_token_embeddings(len(tokenizer)) if num_new_tokens > 0: input_embeddings_data = model.get_input_embeddings().weight.data input_embeddings_avg = input_embeddings_data[:-num_new_tokens].mean(dim=0, keepdim=True) input_embeddings_data[-num_new_tokens:] = input_embeddings_avg print(f"Resized tokenizer and embedding from {num_old_tokens} to {len(tokenizer)} tokens.") @dataclass class DataCollator(object): tokenizer: transformers.PreTrainedTokenizer source_max_len: int molecule_start_str: str end_str: str def augment_molecule(self, molecule: str) -> str: return self.sme.augment([molecule])[0] def __call__(self, instances: Sequence[Dict]) -> Dict[str, torch.Tensor]: with calculateDuration("DataCollator"): sources = [] for example in instances: smiles = example['smiles'].strip() smiles = Chem.MolToSmiles(Chem.MolFromSmiles(smiles)) # get the properties except the smiles and mol_id cols #props = [example[col] if example[col] is not None else np.nan for col in sorted(example.keys()) if col not in ['smiles', 'is_aug']] source = f"{self.molecule_start_str}{smiles}{self.end_str}" sources.append(source) # Tokenize tokenized_sources_with_prompt = self.tokenizer( sources, max_length=self.source_max_len, truncation=True, add_special_tokens=False, ) input_ids = [torch.tensor(tokenized_source) for tokenized_source in tokenized_sources_with_prompt['input_ids']] input_ids = pad_sequence(input_ids, batch_first=True, padding_value=self.tokenizer.pad_token_id) data_dict = { 'input_ids': input_ids, 'attention_mask': input_ids.ne(self.tokenizer.pad_token_id), } return data_dict class MolecularPropertyPredictionModel(): def __init__(self, candidate_models): self.adapter_name = None # we need to keep track of the paths of adapter scalers # we don't want to download the same scaler multiple times self.apapter_scaler_path = dict() DEFAULT_PAD_TOKEN = "[PAD]" # load the base model config = AutoConfig.from_pretrained( "ChemFM/ChemFM-3B", num_labels=1, finetuning_task="classification", # this is not about our task type trust_remote_code=True, token = os.environ.get("TOKEN") ) self.base_model = AutoModelForSequenceClassification.from_pretrained( "ChemFM/ChemFM-3B", config=config, device_map="cuda", trust_remote_code=True, token = os.environ.get("TOKEN") ) # # load the tokenizer self.tokenizer = AutoTokenizer.from_pretrained( "ChemFM/admet_ppbr_az", trust_remote_code=True, token = os.environ.get("TOKEN") ) special_tokens_dict = dict(pad_token=DEFAULT_PAD_TOKEN) smart_tokenizer_and_embedding_resize( special_tokens_dict=special_tokens_dict, tokenizer=self.tokenizer, model=self.base_model ) self.base_model.config.pad_token_id = self.tokenizer.pad_token_id self.data_collator = DataCollator( tokenizer=self.tokenizer, source_max_len=512, molecule_start_str="", end_str="", ) # load the adapters firstly for adapter_name in candidate_models: adapter_id = candidate_models[adapter_name] print(f"loading {adapter_name} from {adapter_id}...") self.base_model.load_adapter(adapter_id, adapter_name=adapter_name, token = os.environ.get("TOKEN")) try: self.apapter_scaler_path[adapter_name] = hf_hub_download(adapter_id, filename="scaler.pkl", token = os.environ.get("TOKEN")) except: self.apapter_scaler_path[adapter_name] = None assert dataset_task_types[adapter_name] == "classification", f"{adapter_name} is not a regression task." self.base_model.to("cuda") def swith_adapter(self, adapter_name, adapter_id): # return flag: # keep: adapter is the same as the current one # switched: adapter is switched successfully # error: adapter is not found with calculateDuration("switching adapter"): if adapter_name == self.adapter_name: return "keep" # switch adapter try: #self.adapter_name = adapter_name #print(self.adapter_name, adapter_id) #self.lora_model = PeftModel.from_pretrained(self.base_model, adapter_id, token = os.environ.get("TOKEN")) #self.lora_model.to("cuda") #print(self.lora_model) self.base_model.set_adapter(adapter_name) self.base_model.eval() #if adapter_name not in self.apapter_scaler_path: # self.apapter_scaler_path[adapter_name] = hf_hub_download(adapter_id, filename="scaler.pkl", token = os.environ.get("TOKEN")) if self.apapter_scaler_path[adapter_name] and os.path.exists(self.apapter_scaler_path[adapter_name]): self.scaler = pickle.load(open(self.apapter_scaler_path[adapter_name], "rb")) else: self.scaler = None self.adapter_name = adapter_name return "switched" except Exception as e: # handle error return "error" def predict(self, valid_df, task_type): with calculateDuration("predicting"): with calculateDuration("construct dataloader"): test_dataset = Dataset.from_pandas(valid_df) # construct the dataloader test_loader = torch.utils.data.DataLoader( test_dataset, batch_size=16, collate_fn=self.data_collator, ) # predict y_pred = [] for i, batch in tqdm(enumerate(test_loader), total=len(test_loader), desc="Evaluating"): with torch.no_grad(): batch = {k: v.to(self.base_model.device) for k, v in batch.items()} print(self.base_model.device) print(batch) outputs = self.base_model(**batch) print(outputs) if task_type == "regression": # TODO: check if the model is regression or classification y_pred.append(outputs.logits.cpu().detach().numpy()) else: y_pred.append((torch.sigmoid(outputs.logits)).cpu().detach().numpy()) y_pred = np.concatenate(y_pred, axis=0) if task_type=="regression" and self.scaler is not None: y_pred = self.scaler.inverse_transform(y_pred) return y_pred def predict_single_smiles(self, smiles, task_type): with calculateDuration("predicting a single SMILES"): assert task_type in ["regression", "classification"] # check the SMILES string is valid if not Chem.MolFromSmiles(smiles): return None valid_df = pd.DataFrame([smiles], columns=['smiles']) results = self.predict(valid_df, task_type) # predict return results.item() def predict_file(self, df, task_type): with calculateDuration("predicting a file"): # we should add the index first df = df.reset_index() with calculateDuration("pre-checking SMILES"): # we need to check the SMILES strings are valid, the invalid ones will be moved to the last valid_idx = [] invalid_idx = [] for idx, smiles in enumerate(df['smiles']): if Chem.MolFromSmiles(smiles): valid_idx.append(idx) else: invalid_idx.append(idx) valid_df = df.loc[valid_idx] # get the smiles list valid_df_smiles = valid_df['smiles'].tolist() input_df = pd.DataFrame(valid_df_smiles, columns=['smiles']) results = self.predict(input_df, task_type) # add the results to the dataframe df.loc[valid_idx, 'prediction'] = results df.loc[invalid_idx, 'prediction'] = np.nan # drop the index column df = df.drop(columns=['index']) # phrase file return df