File size: 4,090 Bytes
072a327 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
import torch
from transformers import PreTrainedModel, AutoTokenizer, AutoModelForCausalLM, AutoModelForSequenceClassification, TextClassificationPipeline
from configuration_kraken import KrakenConfig
import tokenizer_template_switch
class KrakenForCausalLM(PreTrainedModel):
config_class = KrakenConfig
def __init__(self, config):
super().__init__(config)
self.tokenizers = {key: AutoTokenizer.from_pretrained(name, device_map="auto") for key, name in config.config_dict['tokenizers'].items()}
self.models = self.load_expert_models(config.config_dict['models'], config.config_dict['quantization'])
self.router_model = AutoModelForSequenceClassification.from_pretrained(config.config_dict['router'], trust_remote_code=True,device_map="auto")
self.tokenizer = AutoTokenizer.from_pretrained(config.config_dict['router'], trust_remote_code=True,device_map="auto")
self.router = TextClassificationPipeline(model=self.router_model, tokenizer=self.tokenizer)
self.models_indices = config.config_dict['class_indices']
def load_expert_models(self, models_dict, quantization_dict):
models = {}
for key, name in models_dict.items():
quantization = quantization_dict.get(key)
if quantization == "8bit":
models[key] = AutoModelForCausalLM.from_pretrained(name, trust_remote_code=True, device_map="auto", load_in_8bit=True, torch_dtype="auto")
elif quantization == "4bit":
models[key] = AutoModelForCausalLM.from_pretrained(name, trust_remote_code=True, device_map="auto", load_in_4bit=True, torch_dtype="auto")
elif quantization == "awq":
models[key] = self.load_awq_model(name)
else:
models[key] = AutoModelForCausalLM.from_pretrained(name, trust_remote_code=True, device_map="auto", torch_dtype="auto")
return models
def load_awq_model(self, name):
return AutoModelForCausalLM.from_pretrained(name, trust_remote_code=True, device_map="auto")
def tokenize_inputs(self, text, model_key):
return self.tokenizers[model_key](text, return_tensors="pt")
def determine_model(self, text):
prediction = self.router(text)[0]["label"]
model_decision_index = self.models_indices[prediction]
model_keys = ['expert1', 'expert2', 'expert3', 'expert4','expert5']
return model_keys[model_decision_index]
def expert_tokenizer(self, text):
model_key = self.determine_model(text)
return self.tokenizers[model_key]
def generate(self, input_ids, **generate_kwargs):
# Tokenize the input_ids
text = self.tokenizer.batch_decode(input_ids, skip_special_tokens=False)[0]
msgs = tokenizer_template_switch.recover_chat_messages(text, self.tokenizer)
if msgs and msgs[0]['role'] == 'system' and msgs[0]['content']=='<|im_start|>system':
# Delete the first element
msgs.pop(0)
# Check if the last element has the role 'assistant'
if msgs and msgs[-1]['role'] == 'assistant':
# Delete the last element
msgs.pop()
# Determine the model key using the existing routing logic
model_key = self.determine_model(text)
# Show the routing result
print(f"Choosing {model_key} ..")
# Retrieve the model from the dictionary
model = self.models[model_key]
mod_txt = self.tokenizers[model_key].apply_chat_template(msgs, tokenize=False, add_generation_prompt=True)
current_device = input_ids.device if isinstance(input_ids, torch.Tensor) else 'cpu'
# Tokenize accordingly to the best model
tok = self.tokenizers[model_key](mod_txt, return_tensors="pt")
tok_input_ids = tok.input_ids.to(current_device)
tok_attention_mask = tok.attention_mask.to(current_device)
# Generate text using the retrieved model
return model.generate(tok_input_ids, attention_mask=tok_attention_mask, **generate_kwargs)
|