from transformers import AutoConfig, AutoModelForSequenceClassification, AutoTokenizer from typing import Optional, Dict, Sequence, List import transformers from peft import PeftModel import torch from torch.nn.utils.rnn import pad_sequence from dataclasses import dataclass import pandas as pd from datasets import Dataset from tqdm import tqdm import numpy as np from huggingface_hub import hf_hub_download import os import pickle from sklearn import preprocessing import json from rdkit import RDLogger, Chem # Suppress RDKit INFO messages RDLogger.DisableLog('rdApp.*') # we have a dictionary to store the task types of the models task_types = { "admet_ppbr_az": "regression", "admet_half_life_obach": "regression", } # read the dataset descriptions with open("dataset_descriptions.json", "r") as f: dataset_description_temp = json.load(f) dataset_descriptions = dict() for dataset in dataset_description_temp: dataset_name = dataset.lower() dataset_descriptions[dataset_name] = \ f"{dataset_name} is a {dataset_description_temp[dataset]['task_type']} task, " + \ f"where the goal is to {dataset_description_temp[dataset]['description']}." class Scaler: def __init__(self, log=False): self.log = log self.offset = None self.scaler = None def fit(self, y): # make the values non-negative self.offset = np.min([np.min(y), 0.0]) y = y.reshape(-1, 1) - self.offset # scale the input data if self.log: y = np.log10(y + 1.0) self.scaler = preprocessing.StandardScaler().fit(y) def transform(self, y): y = y.reshape(-1, 1) - self.offset # scale the input data if self.log: y = np.log10(y + 1.0) y_scale = self.scaler.transform(y) return y_scale def inverse_transform(self, y_scale): y = self.scaler.inverse_transform(y_scale.reshape(-1, 1)) if self.log: y = 10.0**y - 1.0 y = y + self.offset return y def smart_tokenizer_and_embedding_resize( special_tokens_dict: Dict, tokenizer: transformers.PreTrainedTokenizer, model: transformers.PreTrainedModel, non_special_tokens = None, ): """Resize tokenizer and embedding. Note: This is the unoptimized version that may make your embedding size not be divisible by 64. """ num_new_tokens = tokenizer.add_special_tokens(special_tokens_dict) + tokenizer.add_tokens(non_special_tokens) num_old_tokens = model.get_input_embeddings().weight.shape[0] num_new_tokens = len(tokenizer) - num_old_tokens if num_new_tokens == 0: return model.resize_token_embeddings(len(tokenizer)) if num_new_tokens > 0: input_embeddings_data = model.get_input_embeddings().weight.data input_embeddings_avg = input_embeddings_data[:-num_new_tokens].mean(dim=0, keepdim=True) input_embeddings_data[-num_new_tokens:] = input_embeddings_avg print(f"Resized tokenizer and embedding from {num_old_tokens} to {len(tokenizer)} tokens.") @dataclass class DataCollator(object): tokenizer: transformers.PreTrainedTokenizer source_max_len: int molecule_start_str: str end_str: str def augment_molecule(self, molecule: str) -> str: return self.sme.augment([molecule])[0] def __call__(self, instances: Sequence[Dict]) -> Dict[str, torch.Tensor]: sources = [] targets = [] for example in instances: smiles = example['smiles'].strip() smiles = Chem.MolToSmiles(Chem.MolFromSmiles(smiles)) # get the properties except the smiles and mol_id cols #props = [example[col] if example[col] is not None else np.nan for col in sorted(example.keys()) if col not in ['smiles', 'is_aug']] source = f"{self.molecule_start_str}{smiles}{self.end_str}" sources.append(source) # Tokenize tokenized_sources_with_prompt = self.tokenizer( sources, max_length=self.source_max_len, truncation=True, add_special_tokens=False, ) input_ids = [torch.tensor(tokenized_source) for tokenized_source in tokenized_sources_with_prompt['input_ids']] input_ids = pad_sequence(input_ids, batch_first=True, padding_value=self.tokenizer.pad_token_id) data_dict = { 'input_ids': input_ids, 'attention_mask': input_ids.ne(self.tokenizer.pad_token_id), } return data_dict class MolecularPropertyPredictionModel(): def __init__(self): self.adapter_name = None # we need to keep track of the paths of adapter scalers # we don't want to download the same scaler multiple times self.apapter_scaler_path = dict() DEFAULT_PAD_TOKEN = "[PAD]" # load the base model config = AutoConfig.from_pretrained( "ChemFM/ChemFM-3B", num_labels=1, finetuning_task="classification", # this is not about our task type trust_remote_code=True, ) self.base_model = AutoModelForSequenceClassification.from_pretrained( "ChemFM/ChemFM-3B", config=config, device_map="cpu", trust_remote_code=True, ) # load the tokenizer self.tokenizer = AutoTokenizer.from_pretrained( "ChemFM/admet_ppbr_az", trust_remote_code=True, ) special_tokens_dict = dict(pad_token=DEFAULT_PAD_TOKEN) smart_tokenizer_and_embedding_resize( special_tokens_dict=special_tokens_dict, tokenizer=self.tokenizer, model=self.base_model ) self.base_model.config.pad_token_id = self.tokenizer.pad_token_id self.data_collator = DataCollator( tokenizer=self.tokenizer, source_max_len=512, molecule_start_str="", end_str="", ) def swith_adapter(self, adapter_name, adapter_id): # return flag: # keep: adapter is the same as the current one # switched: adapter is switched successfully # error: adapter is not found if adapter_name == self.adapter_name: return "keep" # switch adapter try: self.adapter_name = adapter_name self.lora_model = PeftModel.from_pretrained(self.base_model, adapter_id) if adapter_name not in self.apapter_scaler_path: self.apapter_scaler_path[adapter_name] = hf_hub_download(adapter_id, filename="scaler.pkl") if os.path.exists(self.apapter_scaler_path[adapter_name]): self.scaler = pickle.load(open(self.apapter_scaler_path[adapter_name], "rb")) else: self.scaler = None return "switched" except Exception as e: # handle error return "error" def predict(self, valid_df, task_type): test_dataset = Dataset.from_pandas(valid_df) # construct the dataloader test_loader = torch.utils.data.DataLoader( test_dataset, batch_size=4, collate_fn=self.data_collator, ) # predict y_pred = [] for i, batch in tqdm(enumerate(test_loader), total=len(test_loader), desc="Evaluating"): with torch.no_grad(): batch = {k: v.to(self.lora_model.device) for k, v in batch.items()} outputs = self.lora_model(**batch) if task_type == "regression": # TODO: check if the model is regression or classification y_pred.append(outputs.logits.cpu().detach().numpy()) else: y_pred.append((torch.sigmoid(outputs.logits) > 0.5).cpu().detach().numpy()) y_pred = np.concatenate(y_pred, axis=0) if task_type=="regression" and self.scaler is not None: y_pred = self.scaler.inverse_transform(y_pred) return y_pred def predict_single_smiles(self, smiles, task_type): assert task_type in ["regression", "classification"] # check the SMILES string is valid if not Chem.MolFromSmiles(smiles): return None valid_df = pd.DataFrame([smiles], columns=['smiles']) results = self.predict(valid_df, task_type) # predict return results.item() def predict_file(self, df, task_type): # we should add the index first df = df.reset_index() # we need to check the SMILES strings are valid, the invalid ones will be moved to the last valid_idx = [] invalid_idx = [] for idx, smiles in enumerate(df['smiles']): if Chem.MolFromSmiles(smiles): valid_idx.append(idx) else: invalid_idx.append(idx) valid_df = df.loc[valid_idx] # get the smiles list valid_df_smiles = valid_df['smiles'].tolist() input_df = pd.DataFrame(valid_df_smiles, columns=['smiles']) results = self.predict(input_df, task_type) # add the results to the dataframe df.loc[valid_idx, 'prediction'] = results df.loc[invalid_idx, 'prediction'] = np.nan # drop the index column df = df.drop(columns=['index']) # phrase file return df