Spaces:
Running
Running
param-bharat
commited on
Commit
·
993988f
1
Parent(s):
46466a5
feat: implement secrets detection guardrail
Browse files
guardrails_genie/guardrails/secrets_detection/__init__.py
ADDED
@@ -0,0 +1,17 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from guardrails_genie.guardrails.secrets_detection.secrets_detection import (
|
2 |
+
DEFAULT_SECRETS_PATTERNS,
|
3 |
+
SecretsDetectionGuardrail,
|
4 |
+
SecretsDetectionSimpleResponse,
|
5 |
+
SecretsDetectionResponse,
|
6 |
+
REDACTION,
|
7 |
+
redact,
|
8 |
+
)
|
9 |
+
|
10 |
+
__all__ = [
|
11 |
+
"DEFAULT_SECRETS_PATTERNS",
|
12 |
+
"SecretsDetectionGuardrail",
|
13 |
+
"SecretsDetectionSimpleResponse",
|
14 |
+
"SecretsDetectionResponse",
|
15 |
+
"REDACTION",
|
16 |
+
"redact",
|
17 |
+
]
|
guardrails_genie/guardrails/secrets_detection/secrets_detection.py
ADDED
@@ -0,0 +1,183 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import hashlib
|
2 |
+
import json
|
3 |
+
import pathlib
|
4 |
+
from enum import Enum
|
5 |
+
from typing import Union, Optional
|
6 |
+
|
7 |
+
import weave
|
8 |
+
from pydantic import BaseModel
|
9 |
+
|
10 |
+
from guardrails_genie.guardrails.base import Guardrail
|
11 |
+
from guardrails_genie.regex_model import RegexModel
|
12 |
+
|
13 |
+
|
14 |
+
def load_secrets_patterns():
|
15 |
+
default_patterns = {}
|
16 |
+
patterns = (
|
17 |
+
pathlib.Path(__file__).parent.absolute() / "secrets_patterns.jsonl"
|
18 |
+
).read_text()
|
19 |
+
|
20 |
+
for pattern in patterns.splitlines():
|
21 |
+
pattern = json.loads(pattern)
|
22 |
+
default_patterns[pattern["name"]] = [rf"{pat}" for pat in pattern["patterns"]]
|
23 |
+
return default_patterns
|
24 |
+
|
25 |
+
|
26 |
+
DEFAULT_SECRETS_PATTERNS = load_secrets_patterns()
|
27 |
+
|
28 |
+
|
29 |
+
class REDACTION(str, Enum):
|
30 |
+
REDACT_PARTIAL = "REDACT_PARTIAL"
|
31 |
+
REDACT_ALL = "REDACT_ALL"
|
32 |
+
REDACT_HASH = "REDACT_HASH"
|
33 |
+
REDACT_NONE = "REDACT_NONE"
|
34 |
+
|
35 |
+
|
36 |
+
def redact(text: str, matches: list[str], redaction_type: REDACTION) -> str:
|
37 |
+
for match in matches:
|
38 |
+
if redaction_type == REDACTION.REDACT_PARTIAL:
|
39 |
+
replacement = "[REDACTED:]" + match[:2] + ".." + match[-2:] + "[:REDACTED]"
|
40 |
+
elif redaction_type == REDACTION.REDACT_ALL:
|
41 |
+
replacement = "[REDACTED:]" + ("*" * len(match)) + "[:REDACTED]"
|
42 |
+
elif redaction_type == REDACTION.REDACT_HASH:
|
43 |
+
replacement = (
|
44 |
+
"[REDACTED:]" + hashlib.md5(match.encode()).hexdigest() + "[:REDACTED]"
|
45 |
+
)
|
46 |
+
else:
|
47 |
+
replacement = match
|
48 |
+
text = text.replace(match, replacement)
|
49 |
+
return text
|
50 |
+
|
51 |
+
|
52 |
+
class SecretsDetectionSimpleResponse(BaseModel):
|
53 |
+
contains_secrets: bool
|
54 |
+
explanation: str
|
55 |
+
redacted_text: Optional[str] = None
|
56 |
+
|
57 |
+
@property
|
58 |
+
def safe(self) -> bool:
|
59 |
+
return not self.contains_entities
|
60 |
+
|
61 |
+
|
62 |
+
class SecretsDetectionResponse(SecretsDetectionSimpleResponse):
|
63 |
+
detected_secrets: dict[str, list[str]]
|
64 |
+
|
65 |
+
|
66 |
+
class SecretsDetectionGuardrail(Guardrail):
|
67 |
+
regex_model: RegexModel
|
68 |
+
patterns: Union[dict[str, str], dict[str, list[str]]] = {}
|
69 |
+
redaction: REDACTION
|
70 |
+
|
71 |
+
def __init__(
|
72 |
+
self,
|
73 |
+
use_defaults: bool = True,
|
74 |
+
redaction: REDACTION = REDACTION.REDACT_ALL,
|
75 |
+
**kwargs,
|
76 |
+
):
|
77 |
+
patterns = {}
|
78 |
+
if use_defaults:
|
79 |
+
patterns = DEFAULT_SECRETS_PATTERNS.copy()
|
80 |
+
if kwargs.get("patterns"):
|
81 |
+
patterns.update(kwargs["patterns"])
|
82 |
+
|
83 |
+
# Create the RegexModel instance
|
84 |
+
regex_model = RegexModel(patterns=patterns)
|
85 |
+
|
86 |
+
# Initialize the base class with both the regex_model and patterns
|
87 |
+
super().__init__(
|
88 |
+
regex_model=regex_model,
|
89 |
+
patterns=patterns,
|
90 |
+
redaction=redaction,
|
91 |
+
)
|
92 |
+
|
93 |
+
@weave.op()
|
94 |
+
def guard(
|
95 |
+
self,
|
96 |
+
prompt: str,
|
97 |
+
return_detected_types: bool = True,
|
98 |
+
**kwargs,
|
99 |
+
) -> SecretsDetectionResponse | SecretsDetectionResponse:
|
100 |
+
"""
|
101 |
+
Check if the input prompt contains any entities based on the regex patterns.
|
102 |
+
|
103 |
+
Args:
|
104 |
+
prompt: Input text to check for entities
|
105 |
+
return_detected_types: If True, returns detailed entity type information
|
106 |
+
|
107 |
+
Returns:
|
108 |
+
SecretsDetectionResponse or SecretsDetectionResponse containing detection results
|
109 |
+
"""
|
110 |
+
result = self.regex_model.check(prompt)
|
111 |
+
|
112 |
+
# Create detailed explanation
|
113 |
+
explanation_parts = []
|
114 |
+
if result.matched_patterns:
|
115 |
+
explanation_parts.append("Found the following secrets in the text:")
|
116 |
+
for secret_type, matches in result.matched_patterns.items():
|
117 |
+
explanation_parts.append(f"- {secret_type}: {len(matches)} instance(s)")
|
118 |
+
else:
|
119 |
+
explanation_parts.append("No secrets detected in the text.")
|
120 |
+
|
121 |
+
redacted_text = prompt
|
122 |
+
if result.matched_patterns:
|
123 |
+
for secret_type, matches in result.matched_patterns.items():
|
124 |
+
redacted_text = redact(redacted_text, matches, self.redaction)
|
125 |
+
|
126 |
+
if return_detected_types:
|
127 |
+
return SecretsDetectionResponse(
|
128 |
+
contains_secrets=not result.passed,
|
129 |
+
detected_secrets=result.matched_patterns,
|
130 |
+
explanation="\n".join(explanation_parts),
|
131 |
+
redacted_text=redacted_text,
|
132 |
+
)
|
133 |
+
else:
|
134 |
+
return SecretsDetectionSimpleResponse(
|
135 |
+
contains_entities=not result.passed,
|
136 |
+
explanation="\n".join(explanation_parts),
|
137 |
+
redacted_text=redacted_text,
|
138 |
+
)
|
139 |
+
|
140 |
+
|
141 |
+
def main():
|
142 |
+
weave.init(project_name="parambharat/guardrails-genie")
|
143 |
+
|
144 |
+
guardrail = SecretsDetectionGuardrail(redaction=REDACTION.REDACT_ALL)
|
145 |
+
dataset = [
|
146 |
+
{
|
147 |
+
"input": 'I need to pass a key\naws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"',
|
148 |
+
},
|
149 |
+
{
|
150 |
+
"input": "My github token is: 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",
|
151 |
+
},
|
152 |
+
{
|
153 |
+
"input": "My JWT token is: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c",
|
154 |
+
},
|
155 |
+
]
|
156 |
+
|
157 |
+
for item in dataset:
|
158 |
+
# Check text for entities
|
159 |
+
result = guardrail.guard(prompt=item["input"])
|
160 |
+
|
161 |
+
# Access results
|
162 |
+
print(f"Contains entities: {result.contains_secrets}")
|
163 |
+
print(f"Detected entities: {result.detected_secrets}")
|
164 |
+
print(f"Explanation: {result.explanation}")
|
165 |
+
print(f"Anonymized text: {result.redacted_text}")
|
166 |
+
# import regex as re
|
167 |
+
#
|
168 |
+
# sample_input = "My JWT token is: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
|
169 |
+
# jwt_pattern = DEFAULT_SECRETS_PATTERNS["JwtToken"][0]
|
170 |
+
# print(jwt_pattern)
|
171 |
+
# pattern = re.compile(jwt_pattern)
|
172 |
+
# print(pattern)
|
173 |
+
# print(pattern.findall(sample_input))
|
174 |
+
|
175 |
+
# import pandas as pd
|
176 |
+
#
|
177 |
+
# df = pd.read_json("secrets_patterns_bak.jsonl", lines=True)
|
178 |
+
# df.loc[:, "patterns"] = df["patterns"].map(lambda x: [i[2:-1] for i in x])
|
179 |
+
# df.to_json("secrets_patterns.jsonl", orient="records", lines=True)
|
180 |
+
|
181 |
+
|
182 |
+
if __name__ == "__main__":
|
183 |
+
main()
|