Spaces:
Running
Running
import hashlib | |
import json | |
import os | |
import pathlib | |
import tempfile | |
from enum import Enum | |
from typing import Optional, Any | |
import weave | |
from pydantic import BaseModel, PrivateAttr | |
from guardrails_genie.guardrails.base import Guardrail | |
try: | |
from detect_secrets import SecretsCollection | |
from detect_secrets.settings import default_settings | |
import hyperscan | |
except ImportError: | |
raise ImportError( | |
"The `detect-secrets` and the `hyperscan` packages are required for using the SecretsGuardrail. " | |
"Please install then by running `pip install detect-secrets hyperscan`." | |
) | |
class REDACTION(str, Enum): | |
""" | |
Enum for different types of redaction modes. | |
""" | |
REDACT_PARTIAL = "REDACT_PARTIAL" | |
REDACT_ALL = "REDACT_ALL" | |
REDACT_HASH = "REDACT_HASH" | |
REDACT_NONE = "REDACT_NONE" | |
def redact_value(value: str, mode: str) -> str: | |
""" | |
Redacts the given value based on the specified redaction mode. | |
Args: | |
value (str): The string value to be redacted. | |
mode (str): The redaction mode to be applied. It can be one of the following: | |
- REDACTION.REDACT_PARTIAL: Partially redacts the value. | |
- REDACTION.REDACT_ALL: Fully redacts the value. | |
- REDACTION.REDACT_HASH: Redacts the value by hashing it. | |
- REDACTION.REDACT_NONE: No redaction is applied. | |
Returns: | |
str: The redacted value based on the specified mode. | |
""" | |
replacement = value | |
if mode == REDACTION.REDACT_PARTIAL: | |
replacement = "[REDACTED:]" + value[:2] + ".." + value[-2:] + "[:REDACTED]" | |
elif mode == REDACTION.REDACT_ALL: | |
replacement = "[REDACTED:]" + ("*" * len(value)) + "[:REDACTED]" | |
elif mode == REDACTION.REDACT_HASH: | |
replacement = ( | |
"[REDACTED:]" + hashlib.md5(value.encode()).hexdigest() + "[:REDACTED]" | |
) | |
return replacement | |
class SecretsDetectionSimpleResponse(BaseModel): | |
""" | |
A simple response model for secrets detection. | |
Attributes: | |
contains_secrets (bool): Indicates if secrets were detected. | |
explanation (str): Explanation of the detection result. | |
redacted_text (Optional[str]): The redacted text if secrets were found. | |
risk_score (float): The risk score of the detection result. (0.0, 0.5, 1.0) | |
""" | |
contains_secrets: bool | |
explanation: str | |
redacted_text: Optional[str] = None | |
risk_score: float = 0.0 | |
def safe(self) -> bool: | |
""" | |
Property to check if the text is safe (no secrets detected). | |
Returns: | |
bool: True if no secrets were detected, False otherwise. | |
""" | |
return not self.contains_secrets | |
class SecretsDetectionResponse(SecretsDetectionSimpleResponse): | |
""" | |
A detailed response model for secrets detection. | |
Attributes: | |
detected_secrets (dict[str, list[str]]): Dictionary of detected secrets. | |
""" | |
detected_secrets: dict[str, Any] | None = None | |
class SecretsInfo(BaseModel): | |
""" | |
Model representing information about a detected secret. | |
Attributes: | |
secret (str): The detected secret value. | |
line_number (int): The line number where the secret was found. | |
""" | |
secret: str | |
line_number: int | |
class ScanResult(BaseModel): | |
""" | |
Model representing the result of a secrets scan. | |
Attributes: | |
detected_secrets (dict[str, Any] | None): Dictionary of detected secrets, or None if no secrets were found. | |
modified_prompt (str): The modified prompt with secrets redacted. | |
has_secret (bool): Indicates if any secrets were detected. | |
risk_score (float): The risk score of the detection result. | |
""" | |
detected_secrets: dict[str, Any] | None = None | |
modified_prompt: str | |
has_secret: bool | |
risk_score: float | |
class DetectSecretsModel(weave.Model): | |
""" | |
Model for detecting secrets using the detect-secrets library. | |
""" | |
def scan(text: str) -> dict[str, list[SecretsInfo]]: | |
""" | |
Scans the given text for secrets using the detect-secrets library. | |
Args: | |
text (str): The text to scan for secrets. | |
Returns: | |
dict[str, list[SecretsInfo]]: A dictionary where the keys are secret types and the values are lists of SecretsInfo objects. | |
""" | |
secrets = SecretsCollection() | |
temp_file = tempfile.NamedTemporaryFile(delete=False) | |
temp_file.write(text.encode("utf-8")) | |
temp_file.close() | |
with default_settings(): | |
secrets.scan_file(str(temp_file.name)) | |
unique_secrets = {} | |
for file in secrets.files: | |
for found_secret in secrets[file]: | |
if found_secret.secret_value is None: | |
continue | |
secret_type = found_secret.type | |
actual_secret = found_secret.secret_value | |
line_number = found_secret.line_number | |
if secret_type not in unique_secrets: | |
unique_secrets[secret_type] = [] | |
unique_secrets[secret_type].append( | |
SecretsInfo(secret=actual_secret, line_number=line_number) | |
) | |
os.remove(temp_file.name) | |
return unique_secrets | |
def invoke(self, text: str) -> dict[str, list[SecretsInfo]]: | |
""" | |
Invokes the scan method to detect secrets in the given text. | |
Args: | |
text (str): The text to scan for secrets. | |
Returns: | |
dict[str, list[SecretsInfo]]: A dictionary where the keys are secret types and the values are lists of SecretsInfo objects. | |
""" | |
return self.scan(text) | |
class HyperScanModel(weave.Model): | |
""" | |
Model for detecting secrets using the Hyperscan library. | |
We use the Hyperscan library to scan for secrets using regex patterns. | |
The patterns are mined from https://github.com/mazen160/secrets-patterns-db | |
This model is used in conjunction with the DetectSecretsModel to improve the detection of secrets. | |
""" | |
_db: Any = PrivateAttr() | |
_pattern_map: dict[str, str] = PrivateAttr() | |
only_high_confidence: bool = False | |
ids: list[str] = [] | |
def _load_patterns(self) -> dict[str, str]: | |
""" | |
Loads the patterns from a JSONL file. | |
Returns: | |
dict[str, str]: A dictionary where the keys are pattern names and the values are regex patterns. | |
""" | |
patterns = ( | |
pathlib.Path(__file__).parent.resolve() / "secrets_patterns.jsonl" | |
).open() | |
patterns_list = [json.loads(line) for line in patterns] | |
if self.only_high_confidence: | |
patterns_list = [ | |
pattern for pattern in patterns_list if pattern["confidence"] == "high" | |
] | |
return {pattern["name"]: pattern["regex"] for pattern in patterns_list} | |
def __init__(self, **kwargs: Any): | |
""" | |
Initializes the HyperScanModel instance. | |
""" | |
super().__init__(**kwargs) | |
def model_post_init(self, __context: Any) -> None: | |
""" | |
Post-initialization method to load patterns and compile the Hyperscan database. | |
""" | |
self._pattern_map = self._load_patterns() | |
self.ids = list(self._pattern_map.keys()) | |
expressions = [pattern.encode() for pattern in self._pattern_map.values()] | |
self._db = hyperscan.Database() | |
self._db.compile(expressions=expressions, ids=list(range(len(expressions)))) | |
def scan(self, text: str) -> dict[str, list[SecretsInfo]]: | |
""" | |
Scans the given text for secrets using the Hyperscan library. | |
Args: | |
text (str): The text to scan for secrets. | |
Returns: | |
dict[str, list[SecretsInfo]]: A dictionary where the keys are secret types and the values are lists of SecretsInfo objects. | |
""" | |
unique_secrets = {} | |
def on_match(idx, start, end, flags, context): | |
""" | |
Callback function for handling matches found by Hyperscan. | |
Args: | |
idx: The index of the matched pattern. | |
start: The start position of the match. | |
end: The end position of the match. | |
flags: The flags associated with the match. | |
context: The context provided to the scan method. | |
""" | |
secret = context["text"][start:end] | |
line_number = context["line_number"] | |
current_match = unique_secrets.setdefault(self.ids[idx], []) | |
if not current_match or len(secret) > len(current_match[0].secret): | |
unique_secrets[self.ids[idx]] = [ | |
SecretsInfo(line_number=line_number, secret=secret) | |
] | |
for line_no, line in enumerate(text.splitlines(), start=1): | |
self._db.scan( | |
line.encode(), | |
match_event_handler=on_match, | |
context={"text": line, "line_number": line_no}, | |
) | |
return unique_secrets | |
def invoke(self, text: str) -> dict[str, list[SecretsInfo]]: | |
""" | |
Invokes the scan method to detect secrets in the given text. | |
Args: | |
text (str): The text to scan for secrets. | |
Returns: | |
dict[str, list[SecretsInfo]]: A dictionary where the keys are secret types and the values are lists of SecretsInfo objects. | |
""" | |
return self.scan(text) | |
class SecretsDetectionGuardrail(Guardrail): | |
""" | |
Guardrail class for secrets detection using both detect-secrets and Hyperscan models. | |
Attributes: | |
redaction (REDACTION): The redaction mode to be applied. | |
_detect_secrets_model (Any): Instance of the DetectSecretsModel. | |
_hyperscan_model (Any): Instance of the HyperScanModel. | |
""" | |
redaction: REDACTION | |
_detect_secrets_model: Any = PrivateAttr() | |
_hyperscan_model: Any = PrivateAttr() | |
def model_post_init(self, __context: Any) -> None: | |
""" | |
Post-initialization method to initialize the detect-secrets and Hyperscan models. | |
""" | |
self._detect_secrets_model = DetectSecretsModel() | |
self._hyperscan_model = HyperScanModel() | |
def __init__( | |
self, | |
redaction: REDACTION = REDACTION.REDACT_ALL, | |
**kwargs, | |
): | |
""" | |
Initializes the SecretsDetectionGuardrail instance. | |
Args: | |
redaction (REDACTION): The redaction mode to be applied. Defaults to REDACTION.REDACT_ALL. | |
**kwargs: Additional keyword arguments. | |
""" | |
super().__init__( | |
redaction=redaction, | |
) | |
def get_modified_value( | |
self, unique_secrets: dict[str, Any], lines: list[str] | |
) -> str: | |
""" | |
Redacts the detected secrets in the given lines of text. | |
Args: | |
unique_secrets (dict[str, Any]): Dictionary of detected secrets. | |
lines (list[str]): List of lines of text. | |
Returns: | |
str: The modified text with secrets redacted. | |
""" | |
for _, secrets_list in unique_secrets.items(): | |
for secret_info in secrets_list: | |
secret = secret_info.secret | |
line_number = secret_info.line_number | |
lines[line_number - 1] = lines[line_number - 1].replace( | |
secret, redact_value(secret, self.redaction) | |
) | |
modified_value = "\n".join(lines) | |
return modified_value | |
def get_scan_result( | |
self, unique_secrets: dict[str, list[SecretsInfo]], lines: list[str] | |
) -> ScanResult | None: | |
""" | |
Generates a ScanResult based on the detected secrets. | |
Args: | |
unique_secrets (dict[str, list[SecretsInfo]]): Dictionary of detected secrets. | |
lines (list[str]): List of lines of text. | |
Returns: | |
ScanResult | None: The scan result if secrets are detected, otherwise None. | |
""" | |
if unique_secrets: | |
modified_value = self.get_modified_value(unique_secrets, lines) | |
detected_secrets = { | |
k: [i.secret for i in v] for k, v in unique_secrets.items() | |
} | |
return ScanResult( | |
**{ | |
"detected_secrets": detected_secrets, | |
"modified_prompt": modified_value, | |
"has_secret": True, | |
"risk_score": 1.0, | |
} | |
) | |
return None | |
def scan(self, prompt: str) -> ScanResult: | |
""" | |
Scans the given prompt for secrets using both detect-secrets and Hyperscan models. | |
Args: | |
prompt (str): The text to scan for secrets. | |
Returns: | |
ScanResult: The scan result with detected secrets and redacted text. | |
""" | |
if prompt.strip() == "": | |
return ScanResult( | |
**{ | |
"detected_secrets": None, | |
"modified_prompt": prompt, | |
"has_secret": False, | |
"risk_score": 0.0, | |
} | |
) | |
unique_secrets = self._detect_secrets_model.invoke(text=prompt) | |
results = self.get_scan_result(unique_secrets, prompt.splitlines()) | |
if results: | |
return results | |
unique_secrets = self._hyperscan_model.invoke(text=prompt) | |
results = self.get_scan_result(unique_secrets, prompt.splitlines()) | |
if results: | |
results.risk_score = 0.5 | |
return results | |
return ScanResult( | |
**{ | |
"detected_secrets": None, | |
"modified_prompt": prompt, | |
"has_secret": False, | |
"risk_score": 0.0, | |
} | |
) | |
def guard( | |
self, | |
prompt: str, | |
return_detected_secrets: bool = True, | |
**kwargs, | |
) -> SecretsDetectionResponse | SecretsDetectionResponse: | |
""" | |
Guards the given prompt by scanning for secrets and optionally returning detected secrets. | |
Args: | |
prompt (str): The text to scan for secrets. | |
return_detected_secrets (bool): Whether to return detected secrets in the response. Defaults to True. | |
**kwargs: Additional keyword arguments. | |
Returns: | |
SecretsDetectionResponse | SecretsDetectionSimpleResponse: The response with scan results and redacted text. | |
""" | |
results = self.scan(prompt) | |
explanation_parts = [] | |
if results.has_secret: | |
explanation_parts.append("Found the following secrets in the text:") | |
for secret_type, matches in results.detected_secrets.items(): | |
explanation_parts.append(f"- {secret_type}: {len(matches)} instance(s)") | |
else: | |
explanation_parts.append("No secrets detected in the text.") | |
if return_detected_secrets: | |
return SecretsDetectionResponse( | |
contains_secrets=results.has_secret, | |
detected_secrets=results.detected_secrets, | |
explanation="\n".join(explanation_parts), | |
redacted_text=results.modified_prompt, | |
risk_score=results.risk_score, | |
) | |
else: | |
return SecretsDetectionSimpleResponse( | |
contains_secrets=not results.has_secret, | |
explanation="\n".join(explanation_parts), | |
redacted_text=results.modified_prompt, | |
risk_score=results.risk_score, | |
) | |