Spaces:
Running
Running
import os | |
import shutil | |
from glob import glob | |
from typing import Optional | |
# import torch.optim as optim | |
import bitsandbytes.optim as optim | |
import plotly.graph_objects as go | |
import streamlit as st | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from datasets import load_dataset | |
from pydantic import BaseModel | |
from rich.progress import track | |
from safetensors.torch import load_model, save_model | |
from sklearn.metrics import roc_auc_score, roc_curve | |
from torch.utils.data import DataLoader | |
from transformers import AutoModelForSequenceClassification, AutoTokenizer | |
import wandb | |
class DatasetArgs(BaseModel): | |
dataset_address: str | |
train_dataset_range: int | |
test_dataset_range: int | |
class LlamaGuardFineTuner: | |
""" | |
`LlamaGuardFineTuner` is a class designed to fine-tune and evaluate the | |
[Prompt Guard model by Meta LLama](meta-llama/Prompt-Guard-86M) for prompt | |
classification tasks, specifically for detecting prompt injection attacks. It | |
integrates with Weights & Biases for experiment tracking and optionally | |
displays progress in a Streamlit app. | |
!!! example "Sample Usage" | |
```python | |
from guardrails_genie.train.llama_guard import LlamaGuardFineTuner, DatasetArgs | |
fine_tuner = LlamaGuardFineTuner( | |
wandb_project="guardrails-genie", | |
wandb_entity="geekyrakshit", | |
streamlit_mode=False, | |
) | |
fine_tuner.load_dataset( | |
DatasetArgs( | |
dataset_address="wandb/synthetic-prompt-injections", | |
train_dataset_range=-1, | |
test_dataset_range=-1, | |
) | |
) | |
fine_tuner.load_model() | |
fine_tuner.train(save_interval=100) | |
``` | |
Args: | |
wandb_project (str): The name of the Weights & Biases project. | |
wandb_entity (str): The Weights & Biases entity (user or team). | |
streamlit_mode (bool): If True, integrates with Streamlit to display progress. | |
""" | |
def __init__( | |
self, wandb_project: str, wandb_entity: str, streamlit_mode: bool = False | |
): | |
self.wandb_project = wandb_project | |
self.wandb_entity = wandb_entity | |
self.streamlit_mode = streamlit_mode | |
def load_dataset(self, dataset_args: DatasetArgs): | |
""" | |
Loads the training and testing datasets based on the provided dataset arguments. | |
This function uses the `load_dataset` function from the `datasets` library to load | |
the dataset specified by the `dataset_address` attribute of the `dataset_args` parameter. | |
It then selects a subset of the training and testing datasets based on the specified | |
ranges in `train_dataset_range` and `test_dataset_range` attributes of `dataset_args`. | |
If the specified range is less than or equal to 0 or exceeds the length of the dataset, | |
the entire dataset is used. | |
Args: | |
dataset_args (DatasetArgs): An instance of the `DatasetArgs` class containing | |
the dataset address and the ranges for training and testing datasets. | |
Attributes: | |
train_dataset: The selected training dataset. | |
test_dataset: The selected testing dataset. | |
""" | |
self.dataset_args = dataset_args | |
dataset = load_dataset(dataset_args.dataset_address) | |
self.train_dataset = ( | |
dataset["train"] | |
if dataset_args.train_dataset_range <= 0 | |
or dataset_args.train_dataset_range > len(dataset["train"]) | |
else dataset["train"].select(range(dataset_args.train_dataset_range)) | |
) | |
self.test_dataset = ( | |
dataset["test"] | |
if dataset_args.test_dataset_range <= 0 | |
or dataset_args.test_dataset_range > len(dataset["test"]) | |
else dataset["test"].select(range(dataset_args.test_dataset_range)) | |
) | |
def load_model( | |
self, | |
model_name: str = "meta-llama/Prompt-Guard-86M", | |
checkpoint: Optional[str] = None, | |
): | |
""" | |
Loads the specified pre-trained model and tokenizer for sequence classification tasks. | |
This function sets the device to GPU if available, otherwise defaults to CPU. It then | |
loads the tokenizer and model from the Hugging Face model hub using the provided model name. | |
The model is moved to the specified device (GPU or CPU). | |
Args: | |
model_name (str): The name of the pre-trained model to load. | |
Attributes: | |
device (str): The device to run the model on, either "cuda" for GPU or "cpu". | |
model_name (str): The name of the loaded pre-trained model. | |
tokenizer (AutoTokenizer): The tokenizer associated with the pre-trained model. | |
model (AutoModelForSequenceClassification): The loaded pre-trained model for sequence classification. | |
""" | |
self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
self.model_name = model_name | |
self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
if checkpoint is None: | |
self.model = AutoModelForSequenceClassification.from_pretrained( | |
model_name | |
).to(self.device) | |
else: | |
api = wandb.Api() | |
artifact = api.artifact(checkpoint.removeprefix("wandb://")) | |
artifact_dir = artifact.download() | |
model_file_path = glob(os.path.join(artifact_dir, "model-*.safetensors"))[0] | |
self.model = AutoModelForSequenceClassification.from_pretrained(model_name) | |
self.model.classifier = nn.Linear(self.model.classifier.in_features, 2) | |
self.model.num_labels = 2 | |
load_model(self.model, model_file_path) | |
self.model = self.model.to(self.device) | |
def show_dataset_sample(self): | |
""" | |
Displays a sample of the training and testing datasets using Streamlit. | |
This function checks if the `streamlit_mode` attribute is enabled. If it is, | |
it converts the training and testing datasets to pandas DataFrames and displays | |
the first few rows of each dataset using Streamlit's `dataframe` function. The | |
training dataset sample is displayed under the heading "Train Dataset Sample", | |
and the testing dataset sample is displayed under the heading "Test Dataset Sample". | |
Note: | |
This function requires the `streamlit` library to be installed and the | |
`streamlit_mode` attribute to be set to True. | |
""" | |
if self.streamlit_mode: | |
st.markdown("### Train Dataset Sample") | |
st.dataframe(self.train_dataset.to_pandas().head()) | |
st.markdown("### Test Dataset Sample") | |
st.dataframe(self.test_dataset.to_pandas().head()) | |
def evaluate_batch( | |
self, | |
texts, | |
batch_size: int = 32, | |
positive_label: int = 2, | |
temperature: float = 1.0, | |
truncation: bool = True, | |
max_length: int = 512, | |
) -> list[float]: | |
self.model.eval() | |
encoded_texts = self.tokenizer( | |
texts, | |
padding=True, | |
truncation=truncation, | |
max_length=max_length, | |
return_tensors="pt", | |
) | |
dataset = torch.utils.data.TensorDataset( | |
encoded_texts["input_ids"], encoded_texts["attention_mask"] | |
) | |
data_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size) | |
scores = [] | |
progress_bar = ( | |
st.progress(0, text="Evaluating") if self.streamlit_mode else None | |
) | |
for i, batch in track( | |
enumerate(data_loader), description="Evaluating", total=len(data_loader) | |
): | |
input_ids, attention_mask = [b.to(self.device) for b in batch] | |
with torch.no_grad(): | |
logits = self.model( | |
input_ids=input_ids, attention_mask=attention_mask | |
).logits | |
scaled_logits = logits / temperature | |
probabilities = F.softmax(scaled_logits, dim=-1) | |
positive_class_probabilities = ( | |
probabilities[:, positive_label].cpu().numpy() | |
) | |
scores.extend(positive_class_probabilities) | |
if progress_bar: | |
progress_percentage = (i + 1) * 100 // len(data_loader) | |
progress_bar.progress( | |
progress_percentage, | |
text=f"Evaluating batch {i + 1}/{len(data_loader)}", | |
) | |
return scores | |
def visualize_roc_curve(self, test_scores: list[float]): | |
test_labels = [int(elt) for elt in self.test_dataset["label"]] | |
fpr, tpr, _ = roc_curve(test_labels, test_scores) | |
roc_auc = roc_auc_score(test_labels, test_scores) | |
fig = go.Figure() | |
fig.add_trace( | |
go.Scatter( | |
x=fpr, | |
y=tpr, | |
mode="lines", | |
name=f"ROC curve (area = {roc_auc:.3f})", | |
line=dict(color="darkorange", width=2), | |
) | |
) | |
fig.add_trace( | |
go.Scatter( | |
x=[0, 1], | |
y=[0, 1], | |
mode="lines", | |
name="Random Guess", | |
line=dict(color="navy", width=2, dash="dash"), | |
) | |
) | |
fig.update_layout( | |
title="Receiver Operating Characteristic", | |
xaxis_title="False Positive Rate", | |
yaxis_title="True Positive Rate", | |
xaxis=dict(range=[0.0, 1.0]), | |
yaxis=dict(range=[0.0, 1.05]), | |
legend=dict(x=0.8, y=0.2), | |
) | |
if self.streamlit_mode: | |
st.plotly_chart(fig) | |
else: | |
fig.show() | |
def visualize_score_distribution(self, scores: list[float]): | |
test_labels = [int(elt) for elt in self.test_dataset["label"]] | |
positive_scores = [scores[i] for i in range(500) if test_labels[i] == 1] | |
negative_scores = [scores[i] for i in range(500) if test_labels[i] == 0] | |
fig = go.Figure() | |
fig.add_trace( | |
go.Histogram( | |
x=positive_scores, | |
histnorm="probability density", | |
name="Positive", | |
marker_color="darkblue", | |
opacity=0.75, | |
) | |
) | |
fig.add_trace( | |
go.Histogram( | |
x=negative_scores, | |
histnorm="probability density", | |
name="Negative", | |
marker_color="darkred", | |
opacity=0.75, | |
) | |
) | |
fig.update_layout( | |
title="Score Distribution for Positive and Negative Examples", | |
xaxis_title="Score", | |
yaxis_title="Density", | |
barmode="overlay", | |
legend_title="Scores", | |
) | |
if self.streamlit_mode: | |
st.plotly_chart(fig) | |
else: | |
fig.show() | |
def evaluate_model( | |
self, | |
batch_size: int = 32, | |
positive_label: int = 2, | |
temperature: float = 3.0, | |
truncation: bool = True, | |
max_length: int = 512, | |
): | |
""" | |
Evaluates the fine-tuned model on the test dataset and visualizes the results. | |
This function evaluates the model by processing the test dataset in batches. | |
It computes the test scores using the `evaluate_batch` method, which takes | |
several parameters to control the evaluation process, such as batch size, | |
positive label, temperature, truncation, and maximum sequence length. | |
After obtaining the test scores, it visualizes the performance of the model | |
using two methods: | |
1. `visualize_roc_curve`: Plots the Receiver Operating Characteristic (ROC) curve | |
to show the trade-off between the true positive rate and false positive rate. | |
2. `visualize_score_distribution`: Plots the distribution of scores for positive | |
and negative examples to provide insights into the model's performance. | |
Args: | |
batch_size (int, optional): The number of samples to process in each batch. | |
positive_label (int, optional): The label considered as positive for evaluation. | |
temperature (float, optional): The temperature parameter for scaling logits. | |
truncation (bool, optional): Whether to truncate sequences to the maximum length. | |
max_length (int, optional): The maximum length of sequences after truncation. | |
Returns: | |
list[float]: The test scores obtained from the evaluation. | |
""" | |
test_scores = self.evaluate_batch( | |
self.test_dataset["prompt"], | |
batch_size=batch_size, | |
positive_label=positive_label, | |
temperature=temperature, | |
truncation=truncation, | |
max_length=max_length, | |
) | |
self.visualize_roc_curve(test_scores) | |
self.visualize_score_distribution(test_scores) | |
return test_scores | |
def collate_fn(self, batch): | |
texts = [item["prompt"] for item in batch] | |
labels = torch.tensor([int(item["label"]) for item in batch]) | |
encodings = self.tokenizer( | |
texts, padding=True, truncation=True, max_length=512, return_tensors="pt" | |
) | |
return encodings.input_ids, encodings.attention_mask, labels | |
def train( | |
self, | |
batch_size: int = 16, | |
starting_lr: float = 1e-7, | |
num_classes: int = 2, | |
log_interval: int = 1, | |
save_interval: int = 50, | |
): | |
""" | |
Fine-tunes the pre-trained LlamaGuard model on the training dataset for a single epoch. | |
This function sets up and executes the training loop for the LlamaGuard model. | |
It initializes the Weights & Biases (wandb) logging, configures the model's | |
classifier layer to match the specified number of classes, and sets the model | |
to training mode. The function uses an AdamW optimizer to update the model | |
parameters based on the computed loss. | |
The training process involves iterating over the training dataset in batches, | |
computing the loss for each batch, and updating the model parameters. The | |
function logs the loss to wandb at specified intervals and optionally displays | |
a progress bar using Streamlit if `streamlit_mode` is enabled. Model checkpoints | |
are saved at specified intervals during training. | |
Args: | |
batch_size (int, optional): The number of samples per batch during training. | |
starting_lr (float, optional): The starting learning rate for the optimizer. | |
num_classes (int, optional): The number of output classes for the classifier. | |
log_interval (int, optional): The interval (in batches) at which to log the loss. | |
save_interval (int, optional): The interval (in batches) at which to save model checkpoints. | |
Note: | |
This function requires the `wandb` and `streamlit` libraries to be installed | |
and configured appropriately. | |
""" | |
os.makedirs("checkpoints", exist_ok=True) | |
wandb.init( | |
project=self.wandb_project, | |
entity=self.wandb_entity, | |
name=f"{self.model_name}-{self.dataset_args.dataset_address.split('/')[-1]}", | |
job_type="fine-tune-llama-guard", | |
) | |
wandb.config.dataset_args = self.dataset_args.model_dump() | |
wandb.config.model_name = self.model_name | |
wandb.config.batch_size = batch_size | |
wandb.config.starting_lr = starting_lr | |
wandb.config.num_classes = num_classes | |
wandb.config.log_interval = log_interval | |
wandb.config.save_interval = save_interval | |
self.model.classifier = nn.Linear( | |
self.model.classifier.in_features, num_classes | |
) | |
self.model.num_labels = num_classes | |
self.model = self.model.to(self.device) | |
self.model.train() | |
# optimizer = optim.AdamW(self.model.parameters(), lr=starting_lr) | |
optimizer = optim.Lion( | |
self.model.parameters(), lr=starting_lr, weight_decay=0.01 | |
) | |
scheduler = torch.optim.lr_scheduler.OneCycleLR( | |
optimizer, | |
max_lr=starting_lr, | |
steps_per_epoch=len(self.train_dataset) // batch_size + 1, | |
epochs=1, | |
) | |
data_loader = DataLoader( | |
self.train_dataset, | |
batch_size=batch_size, | |
shuffle=True, | |
collate_fn=self.collate_fn, | |
) | |
progress_bar = st.progress(0, text="Training") if self.streamlit_mode else None | |
for i, batch in track( | |
enumerate(data_loader), description="Training", total=len(data_loader) | |
): | |
input_ids, attention_mask, labels = [x.to(self.device) for x in batch] | |
outputs = self.model( | |
input_ids=input_ids, attention_mask=attention_mask, labels=labels | |
) | |
loss = outputs.loss | |
optimizer.zero_grad() | |
loss.backward() | |
# torch.nn.utils.clip_grad_norm_(self.model.parameters(), gradient_clipping) | |
optimizer.step() | |
scheduler.step() | |
if (i + 1) % log_interval == 0: | |
wandb.log({"loss": loss.item()}, step=i + 1) | |
wandb.log({"learning_rate": scheduler.get_last_lr()[0]}, step=i + 1) | |
if progress_bar: | |
progress_percentage = (i + 1) * 100 // len(data_loader) | |
progress_bar.progress( | |
progress_percentage, | |
text=f"Training batch {i + 1}/{len(data_loader)}, Loss: {loss.item()}", | |
) | |
if (i + 1) % save_interval == 0 or i + 1 == len(data_loader): | |
with torch.no_grad(): | |
save_model(self.model, f"checkpoints/model-{i + 1}.safetensors") | |
wandb.log_model( | |
f"checkpoints/model-{i + 1}.safetensors", | |
name=f"{wandb.run.id}-model", | |
aliases=f"step-{i + 1}", | |
) | |
wandb.finish() | |
shutil.rmtree("checkpoints") | |