import os import shutil from glob import glob from typing import Optional # import torch.optim as optim import bitsandbytes.optim as optim import plotly.graph_objects as go import streamlit as st import torch import torch.nn as nn import torch.nn.functional as F from datasets import load_dataset from pydantic import BaseModel from rich.progress import track from safetensors.torch import load_model, save_model from sklearn.metrics import roc_auc_score, roc_curve from torch.utils.data import DataLoader from transformers import AutoModelForSequenceClassification, AutoTokenizer import wandb class DatasetArgs(BaseModel): dataset_address: str train_dataset_range: int test_dataset_range: int class LlamaGuardFineTuner: """ `LlamaGuardFineTuner` is a class designed to fine-tune and evaluate the [Prompt Guard model by Meta LLama](meta-llama/Prompt-Guard-86M) for prompt classification tasks, specifically for detecting prompt injection attacks. It integrates with Weights & Biases for experiment tracking and optionally displays progress in a Streamlit app. !!! example "Sample Usage" ```python from guardrails_genie.train.llama_guard import LlamaGuardFineTuner, DatasetArgs fine_tuner = LlamaGuardFineTuner( wandb_project="guardrails-genie", wandb_entity="geekyrakshit", streamlit_mode=False, ) fine_tuner.load_dataset( DatasetArgs( dataset_address="wandb/synthetic-prompt-injections", train_dataset_range=-1, test_dataset_range=-1, ) ) fine_tuner.load_model() fine_tuner.train(save_interval=100) ``` Args: wandb_project (str): The name of the Weights & Biases project. wandb_entity (str): The Weights & Biases entity (user or team). streamlit_mode (bool): If True, integrates with Streamlit to display progress. """ def __init__( self, wandb_project: str, wandb_entity: str, streamlit_mode: bool = False ): self.wandb_project = wandb_project self.wandb_entity = wandb_entity self.streamlit_mode = streamlit_mode def load_dataset(self, dataset_args: DatasetArgs): """ Loads the training and testing datasets based on the provided dataset arguments. This function uses the `load_dataset` function from the `datasets` library to load the dataset specified by the `dataset_address` attribute of the `dataset_args` parameter. It then selects a subset of the training and testing datasets based on the specified ranges in `train_dataset_range` and `test_dataset_range` attributes of `dataset_args`. If the specified range is less than or equal to 0 or exceeds the length of the dataset, the entire dataset is used. Args: dataset_args (DatasetArgs): An instance of the `DatasetArgs` class containing the dataset address and the ranges for training and testing datasets. Attributes: train_dataset: The selected training dataset. test_dataset: The selected testing dataset. """ self.dataset_args = dataset_args dataset = load_dataset(dataset_args.dataset_address) self.train_dataset = ( dataset["train"] if dataset_args.train_dataset_range <= 0 or dataset_args.train_dataset_range > len(dataset["train"]) else dataset["train"].select(range(dataset_args.train_dataset_range)) ) self.test_dataset = ( dataset["test"] if dataset_args.test_dataset_range <= 0 or dataset_args.test_dataset_range > len(dataset["test"]) else dataset["test"].select(range(dataset_args.test_dataset_range)) ) def load_model( self, model_name: str = "meta-llama/Prompt-Guard-86M", checkpoint: Optional[str] = None, ): """ Loads the specified pre-trained model and tokenizer for sequence classification tasks. This function sets the device to GPU if available, otherwise defaults to CPU. It then loads the tokenizer and model from the Hugging Face model hub using the provided model name. The model is moved to the specified device (GPU or CPU). Args: model_name (str): The name of the pre-trained model to load. Attributes: device (str): The device to run the model on, either "cuda" for GPU or "cpu". model_name (str): The name of the loaded pre-trained model. tokenizer (AutoTokenizer): The tokenizer associated with the pre-trained model. model (AutoModelForSequenceClassification): The loaded pre-trained model for sequence classification. """ self.device = "cuda" if torch.cuda.is_available() else "cpu" self.model_name = model_name self.tokenizer = AutoTokenizer.from_pretrained(model_name) if checkpoint is None: self.model = AutoModelForSequenceClassification.from_pretrained( model_name ).to(self.device) else: api = wandb.Api() artifact = api.artifact(checkpoint.removeprefix("wandb://")) artifact_dir = artifact.download() model_file_path = glob(os.path.join(artifact_dir, "model-*.safetensors"))[0] self.model = AutoModelForSequenceClassification.from_pretrained(model_name) self.model.classifier = nn.Linear(self.model.classifier.in_features, 2) self.model.num_labels = 2 load_model(self.model, model_file_path) self.model = self.model.to(self.device) def show_dataset_sample(self): """ Displays a sample of the training and testing datasets using Streamlit. This function checks if the `streamlit_mode` attribute is enabled. If it is, it converts the training and testing datasets to pandas DataFrames and displays the first few rows of each dataset using Streamlit's `dataframe` function. The training dataset sample is displayed under the heading "Train Dataset Sample", and the testing dataset sample is displayed under the heading "Test Dataset Sample". Note: This function requires the `streamlit` library to be installed and the `streamlit_mode` attribute to be set to True. """ if self.streamlit_mode: st.markdown("### Train Dataset Sample") st.dataframe(self.train_dataset.to_pandas().head()) st.markdown("### Test Dataset Sample") st.dataframe(self.test_dataset.to_pandas().head()) def evaluate_batch( self, texts, batch_size: int = 32, positive_label: int = 2, temperature: float = 1.0, truncation: bool = True, max_length: int = 512, ) -> list[float]: self.model.eval() encoded_texts = self.tokenizer( texts, padding=True, truncation=truncation, max_length=max_length, return_tensors="pt", ) dataset = torch.utils.data.TensorDataset( encoded_texts["input_ids"], encoded_texts["attention_mask"] ) data_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size) scores = [] progress_bar = ( st.progress(0, text="Evaluating") if self.streamlit_mode else None ) for i, batch in track( enumerate(data_loader), description="Evaluating", total=len(data_loader) ): input_ids, attention_mask = [b.to(self.device) for b in batch] with torch.no_grad(): logits = self.model( input_ids=input_ids, attention_mask=attention_mask ).logits scaled_logits = logits / temperature probabilities = F.softmax(scaled_logits, dim=-1) positive_class_probabilities = ( probabilities[:, positive_label].cpu().numpy() ) scores.extend(positive_class_probabilities) if progress_bar: progress_percentage = (i + 1) * 100 // len(data_loader) progress_bar.progress( progress_percentage, text=f"Evaluating batch {i + 1}/{len(data_loader)}", ) return scores def visualize_roc_curve(self, test_scores: list[float]): test_labels = [int(elt) for elt in self.test_dataset["label"]] fpr, tpr, _ = roc_curve(test_labels, test_scores) roc_auc = roc_auc_score(test_labels, test_scores) fig = go.Figure() fig.add_trace( go.Scatter( x=fpr, y=tpr, mode="lines", name=f"ROC curve (area = {roc_auc:.3f})", line=dict(color="darkorange", width=2), ) ) fig.add_trace( go.Scatter( x=[0, 1], y=[0, 1], mode="lines", name="Random Guess", line=dict(color="navy", width=2, dash="dash"), ) ) fig.update_layout( title="Receiver Operating Characteristic", xaxis_title="False Positive Rate", yaxis_title="True Positive Rate", xaxis=dict(range=[0.0, 1.0]), yaxis=dict(range=[0.0, 1.05]), legend=dict(x=0.8, y=0.2), ) if self.streamlit_mode: st.plotly_chart(fig) else: fig.show() def visualize_score_distribution(self, scores: list[float]): test_labels = [int(elt) for elt in self.test_dataset["label"]] positive_scores = [scores[i] for i in range(500) if test_labels[i] == 1] negative_scores = [scores[i] for i in range(500) if test_labels[i] == 0] fig = go.Figure() fig.add_trace( go.Histogram( x=positive_scores, histnorm="probability density", name="Positive", marker_color="darkblue", opacity=0.75, ) ) fig.add_trace( go.Histogram( x=negative_scores, histnorm="probability density", name="Negative", marker_color="darkred", opacity=0.75, ) ) fig.update_layout( title="Score Distribution for Positive and Negative Examples", xaxis_title="Score", yaxis_title="Density", barmode="overlay", legend_title="Scores", ) if self.streamlit_mode: st.plotly_chart(fig) else: fig.show() def evaluate_model( self, batch_size: int = 32, positive_label: int = 2, temperature: float = 3.0, truncation: bool = True, max_length: int = 512, ): """ Evaluates the fine-tuned model on the test dataset and visualizes the results. This function evaluates the model by processing the test dataset in batches. It computes the test scores using the `evaluate_batch` method, which takes several parameters to control the evaluation process, such as batch size, positive label, temperature, truncation, and maximum sequence length. After obtaining the test scores, it visualizes the performance of the model using two methods: 1. `visualize_roc_curve`: Plots the Receiver Operating Characteristic (ROC) curve to show the trade-off between the true positive rate and false positive rate. 2. `visualize_score_distribution`: Plots the distribution of scores for positive and negative examples to provide insights into the model's performance. Args: batch_size (int, optional): The number of samples to process in each batch. positive_label (int, optional): The label considered as positive for evaluation. temperature (float, optional): The temperature parameter for scaling logits. truncation (bool, optional): Whether to truncate sequences to the maximum length. max_length (int, optional): The maximum length of sequences after truncation. Returns: list[float]: The test scores obtained from the evaluation. """ test_scores = self.evaluate_batch( self.test_dataset["prompt"], batch_size=batch_size, positive_label=positive_label, temperature=temperature, truncation=truncation, max_length=max_length, ) self.visualize_roc_curve(test_scores) self.visualize_score_distribution(test_scores) return test_scores def collate_fn(self, batch): texts = [item["prompt"] for item in batch] labels = torch.tensor([int(item["label"]) for item in batch]) encodings = self.tokenizer( texts, padding=True, truncation=True, max_length=512, return_tensors="pt" ) return encodings.input_ids, encodings.attention_mask, labels def train( self, batch_size: int = 16, starting_lr: float = 1e-7, num_classes: int = 2, log_interval: int = 1, save_interval: int = 50, ): """ Fine-tunes the pre-trained LlamaGuard model on the training dataset for a single epoch. This function sets up and executes the training loop for the LlamaGuard model. It initializes the Weights & Biases (wandb) logging, configures the model's classifier layer to match the specified number of classes, and sets the model to training mode. The function uses an AdamW optimizer to update the model parameters based on the computed loss. The training process involves iterating over the training dataset in batches, computing the loss for each batch, and updating the model parameters. The function logs the loss to wandb at specified intervals and optionally displays a progress bar using Streamlit if `streamlit_mode` is enabled. Model checkpoints are saved at specified intervals during training. Args: batch_size (int, optional): The number of samples per batch during training. starting_lr (float, optional): The starting learning rate for the optimizer. num_classes (int, optional): The number of output classes for the classifier. log_interval (int, optional): The interval (in batches) at which to log the loss. save_interval (int, optional): The interval (in batches) at which to save model checkpoints. Note: This function requires the `wandb` and `streamlit` libraries to be installed and configured appropriately. """ os.makedirs("checkpoints", exist_ok=True) wandb.init( project=self.wandb_project, entity=self.wandb_entity, name=f"{self.model_name}-{self.dataset_args.dataset_address.split('/')[-1]}", job_type="fine-tune-llama-guard", ) wandb.config.dataset_args = self.dataset_args.model_dump() wandb.config.model_name = self.model_name wandb.config.batch_size = batch_size wandb.config.starting_lr = starting_lr wandb.config.num_classes = num_classes wandb.config.log_interval = log_interval wandb.config.save_interval = save_interval self.model.classifier = nn.Linear( self.model.classifier.in_features, num_classes ) self.model.num_labels = num_classes self.model = self.model.to(self.device) self.model.train() # optimizer = optim.AdamW(self.model.parameters(), lr=starting_lr) optimizer = optim.Lion( self.model.parameters(), lr=starting_lr, weight_decay=0.01 ) scheduler = torch.optim.lr_scheduler.OneCycleLR( optimizer, max_lr=starting_lr, steps_per_epoch=len(self.train_dataset) // batch_size + 1, epochs=1, ) data_loader = DataLoader( self.train_dataset, batch_size=batch_size, shuffle=True, collate_fn=self.collate_fn, ) progress_bar = st.progress(0, text="Training") if self.streamlit_mode else None for i, batch in track( enumerate(data_loader), description="Training", total=len(data_loader) ): input_ids, attention_mask, labels = [x.to(self.device) for x in batch] outputs = self.model( input_ids=input_ids, attention_mask=attention_mask, labels=labels ) loss = outputs.loss optimizer.zero_grad() loss.backward() # torch.nn.utils.clip_grad_norm_(self.model.parameters(), gradient_clipping) optimizer.step() scheduler.step() if (i + 1) % log_interval == 0: wandb.log({"loss": loss.item()}, step=i + 1) wandb.log({"learning_rate": scheduler.get_last_lr()[0]}, step=i + 1) if progress_bar: progress_percentage = (i + 1) * 100 // len(data_loader) progress_bar.progress( progress_percentage, text=f"Training batch {i + 1}/{len(data_loader)}, Loss: {loss.item()}", ) if (i + 1) % save_interval == 0 or i + 1 == len(data_loader): with torch.no_grad(): save_model(self.model, f"checkpoints/model-{i + 1}.safetensors") wandb.log_model( f"checkpoints/model-{i + 1}.safetensors", name=f"{wandb.run.id}-model", aliases=f"step-{i + 1}", ) wandb.finish() shutil.rmtree("checkpoints")