|
|
|
import os |
|
import time |
|
import wandb |
|
import torch |
|
import argparse |
|
from datasets import load_dataset |
|
from typing import List, Dict, Union |
|
from transformers import ( |
|
AutoTokenizer, |
|
AutoModelForCausalLM, |
|
TrainingArguments, |
|
DataCollatorForLanguageModeling |
|
) |
|
|
|
from src.args import default_args |
|
from src.orpo_trainer import ORPOTrainer |
|
from src.utils import preprocess_logits_for_metrics, dataset_split_selector |
|
|
|
class ORPO(object): |
|
def __init__(self, args) -> None: |
|
self.start = time.gmtime() |
|
self.args = args |
|
|
|
|
|
print(">>> 1. Loading Tokenizer") |
|
self.tokenizer = AutoTokenizer.from_pretrained(self.args.model_name, cache_dir=self.args.cache_dir) |
|
if self.tokenizer.chat_template is None: |
|
self.tokenizer.chat_template = "{% for message in messages %}\n{% if message['role'] == 'user' %}\n{{ '<|user|>\n' + message['content'] + eos_token }}\n{% elif message['role'] == 'system' %}\n{{ '<|system|>\n' + message['content'] + eos_token }}\n{% elif message['role'] == 'assistant' %}\n{{ '<|assistant|>\n' + message['content'] + eos_token }}\n{% endif %}\n{% if loop.last and add_generation_prompt %}\n{{ '<|assistant|>' }}\n{% endif %}\n{% endfor %}" |
|
print(" 1-1. Chat Template Applied (<|user|> <|assistant|>)") |
|
else: |
|
pass |
|
self.tokenizer.pad_token_id = self.tokenizer.eos_token_id |
|
|
|
|
|
print(">>> 2. Loading Model") |
|
if self.args.flash_attention_2: |
|
self.model = AutoModelForCausalLM.from_pretrained(self.args.model_name, |
|
cache_dir=self.args.cache_dir, |
|
torch_dtype=torch.bfloat16, |
|
attn_implementation="flash_attention_2") |
|
else: |
|
self.model = AutoModelForCausalLM.from_pretrained(self.args.model_name, |
|
cache_dir=self.args.cache_dir, |
|
torch_dtype=torch.bfloat16) |
|
|
|
|
|
print(">>> 3. Loading Dataset") |
|
self.data = load_dataset(self.args.data_name, cache_dir=self.args.cache_dir) |
|
|
|
|
|
print(">>> 4. Filtering and Preprocessing Dataset") |
|
data_split = dataset_split_selector(self.data) |
|
|
|
if len(data_split) == 1: |
|
self.is_test = False |
|
train_split = data_split[0] |
|
else: |
|
self.is_test = True |
|
train_split = data_split[0] |
|
test_split = data_split[0] |
|
|
|
test = self.data[test_split].filter(self.filter_dataset) |
|
self.test = test.map(self.preprocess_dataset, batched=True, num_proc=self.args.num_proc, remove_columns=self.data[test_split].column_names) |
|
|
|
train = self.data[train_split].filter(self.filter_dataset) |
|
print(f"\n\n>>> {len(train)} / {len(self.data[train_split])} rows left after filtering by prompt length.") |
|
self.train = train.map(self.preprocess_dataset, batched=True, num_proc=self.args.num_proc, remove_columns=self.data[train_split].column_names) |
|
|
|
|
|
self.run_name = f"{self.args.model_name.split('/')[-1]}-{self.args.data_name.split('/')[-1]}-ORPO-{self.start.tm_mday}-{self.start.tm_hour}-{self.start.tm_min}" |
|
self.save_dir = os.path.join('./checkpoints/', f"{self.args.data_name.split('/')[-1]}/{self.run_name}") |
|
self.log_dir = os.path.join('./checkpoints/', f"{self.args.data_name.split('/')[-1]}/{self.run_name}/logs") |
|
|
|
os.makedirs(self.save_dir, exist_ok=True) |
|
os.makedirs(self.log_dir, exist_ok=True) |
|
|
|
def preprocess_dataset(self, examples: Union[List, Dict]): |
|
if 'instruction' in examples.keys(): |
|
prompt_key = 'instruction' |
|
prompt = [self.tokenizer.apply_chat_template([{'role': 'user', 'content': item}], tokenize=False, add_generation_prompt=True) for item in examples[prompt_key]] |
|
chosen = [self.tokenizer.apply_chat_template([{'role': 'user', 'content': item_prompt}, {'role': 'assistant', 'content': item_chosen}], tokenize=False) for item_prompt, item_chosen in zip(examples[prompt_key], examples['chosen'])] |
|
rejected = [self.tokenizer.apply_chat_template([{'role': 'user', 'content': item_prompt}, {'role': 'assistant', 'content': item_rejected}], tokenize=False) for item_prompt, item_rejected in zip(examples[prompt_key], examples['rejected'])] |
|
else: |
|
prompt = [self.tokenizer.apply_chat_template([item[0]], tokenize=False, add_generation_prompt=True) for item in examples['chosen']] |
|
chosen = [self.tokenizer.apply_chat_template(item, tokenize=False) for item in examples['chosen']] |
|
rejected = [self.tokenizer.apply_chat_template(item, tokenize=False) for item in examples['rejected']] |
|
|
|
model_inputs = self.tokenizer(prompt, |
|
max_length=self.args.response_max_length, |
|
padding='max_length', |
|
truncation=True, |
|
return_tensors='pt') |
|
pos_labels = self.tokenizer(chosen, |
|
max_length=self.args.response_max_length, |
|
padding='max_length', |
|
truncation=True, |
|
return_tensors='pt') |
|
neg_labels = self.tokenizer(rejected, |
|
max_length=self.args.response_max_length, |
|
padding='max_length', |
|
truncation=True, |
|
return_tensors='pt') |
|
|
|
model_inputs['positive_input_ids'] = pos_labels['input_ids'] |
|
model_inputs['positive_attention_mask'] = pos_labels['attention_mask'] |
|
|
|
model_inputs['negative_input_ids'] = neg_labels['input_ids'] |
|
model_inputs['negative_attention_mask'] = neg_labels['attention_mask'] |
|
|
|
return model_inputs |
|
|
|
def filter_dataset(self, examples: Union[List, Dict]): |
|
if 'instruction' in examples.keys(): |
|
query = examples['instruction'] |
|
prompt_length = self.tokenizer.apply_chat_template([{'content': query, 'role': 'user'}], tokenize=True, add_generation_prompt=True, return_tensors='pt').size(-1) |
|
else: |
|
prompt_length = self.tokenizer.apply_chat_template([examples['chosen'][0]], tokenize=True, add_generation_prompt=True, return_tensors='pt').size(-1) |
|
|
|
if prompt_length < self.args.prompt_max_length: |
|
return True |
|
else: |
|
return False |
|
|
|
def prepare_trainer(self): |
|
wandb.init(name=self.run_name) |
|
arguments = TrainingArguments( |
|
torch_compile=self.args.torch_compile, |
|
output_dir=self.save_dir, |
|
logging_dir=self.log_dir, |
|
logging_steps=50, |
|
learning_rate=self.args.lr, |
|
overwrite_output_dir=True, |
|
num_train_epochs=self.args.num_train_epochs, |
|
per_device_train_batch_size=self.args.per_device_train_batch_size, |
|
per_device_eval_batch_size=self.args.per_device_eval_batch_size, |
|
evaluation_strategy=self.args.evaluation_strategy, |
|
save_strategy=self.args.evaluation_strategy, |
|
optim=self.args.optim, |
|
warmup_steps=self.args.warmup_steps, |
|
gradient_accumulation_steps=self.args.gradient_accumulation_steps, |
|
gradient_checkpointing=True, |
|
gradient_checkpointing_kwargs={'use_reentrant':True}, |
|
load_best_model_at_end=True, |
|
do_train=True, |
|
do_eval= self.is_test, |
|
lr_scheduler_type=self.args.lr_scheduler_type, |
|
remove_unused_columns=False, |
|
report_to='wandb', |
|
run_name=self.run_name, |
|
bf16=True |
|
) |
|
|
|
data_collator = DataCollatorForLanguageModeling(tokenizer=self.tokenizer, mlm=False) |
|
|
|
self.trainer = ORPOTrainer( |
|
model=self.model, |
|
alpha=self.args.alpha, |
|
pad=self.tokenizer.pad_token_id, |
|
args=arguments, |
|
train_dataset=self.train, |
|
eval_dataset=self.test if self.is_test else None, |
|
data_collator=data_collator, |
|
preprocess_logits_for_metrics=preprocess_logits_for_metrics |
|
) |
|
|
|
def run(self): |
|
print(">>> 5. Preparing ORPOTrainer") |
|
self.prepare_trainer() |
|
self.trainer.train() |
|
|
|
|
|
if self.trainer.is_fsdp_enabled: |
|
self.trainer.accelerator.state.fsdp_plugin.set_state_dict_type("FULL_STATE_DICT") |
|
self.trainer.save_model() |
|
|
|
|
|
if __name__ == '__main__': |
|
parser = argparse.ArgumentParser("ORPO") |
|
args = default_args(parser) |
|
|
|
|
|
if args.wandb_entity is not None and args.wandb_project_name is not None: |
|
os.environ["WANDB_ENTITY"] = args.wandb_entity |
|
os.environ["WANDB_PROJECT"] = args.wandb_project_name |
|
else: |
|
pass |
|
os.environ["TOKENIZERS_PARALLELISM"] = 'false' |
|
|
|
print("================================================================================================\n") |
|
print(f">>> Fine-tuning {args.model_name} with ORPO on {args.data_name}\n") |
|
print("================================================================================================") |
|
print("\n\n>>> Summary:") |
|
print(f" - Lambda : {args.alpha}") |
|
print(f" - Training Epochs : {args.num_train_epochs}") |
|
print(f" - Prompt Max Length : {args.prompt_max_length}") |
|
print(f" - Response Max Length : {args.response_max_length}") |
|
|
|
item = ORPO(args=args) |
|
item.run() |
|
|