geekyrakshit commited on
Commit
1a146c5
2 Parent(s): a22d59b 38ff3b5

Merge pull request #14 from soumik12345/feat/secrets-detection

Browse files
benchmarks/secrets_benchmark.py ADDED
@@ -0,0 +1,227 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ from typing import Any
3
+
4
+ import weave
5
+ from guardrails import Guard
6
+ from guardrails.hub import SecretsPresent
7
+ from llm_guard.input_scanners import Secrets
8
+ from llm_guard.util import configure_logger
9
+
10
+ from guardrails_genie.guardrails import GuardrailManager
11
+ from guardrails_genie.guardrails.base import Guardrail
12
+ from guardrails_genie.guardrails.secrets_detection import (
13
+ SecretsDetectionResponse,
14
+ SecretsDetectionSimpleResponse,
15
+ SecretsDetectionGuardrail,
16
+ )
17
+ from guardrails_genie.metrics import AccuracyMetric
18
+
19
+ logger = configure_logger(log_level="ERROR")
20
+
21
+
22
+ class GuardrailsAISecretsDetector(Guardrail):
23
+ """
24
+ A class to detect secrets using Guardrails AI.
25
+
26
+ Attributes:
27
+ validator (Any): The validator used for detecting secrets.
28
+ """
29
+
30
+ validator: Any
31
+
32
+ def __init__(self):
33
+ """
34
+ Initializes the GuardrailsAISecretsDetector with a validator.
35
+ """
36
+ validator = Guard().use(SecretsPresent, on_fail="fix")
37
+ super().__init__(validator=validator)
38
+
39
+ def scan(self, text: str) -> dict:
40
+ """
41
+ Scans the given text for secrets.
42
+
43
+ Args:
44
+ text (str): The text to scan for secrets.
45
+
46
+ Returns:
47
+ dict: A dictionary containing the scan results.
48
+ """
49
+ response = self.validator.validate(text)
50
+ if response.validation_summaries:
51
+ summary = response.validation_summaries[0]
52
+ return {
53
+ "has_secret": True,
54
+ "detected_secrets": {
55
+ str(k): v
56
+ for k, v in enumerate(
57
+ summary.failure_reason.splitlines()[1:], start=1
58
+ )
59
+ },
60
+ "explanation": summary.failure_reason,
61
+ "modified_prompt": response.validated_output,
62
+ "risk_score": 1.0,
63
+ }
64
+ else:
65
+ return {
66
+ "has_secret": False,
67
+ "detected_secrets": None,
68
+ "explanation": "No secrets detected in the text.",
69
+ "modified_prompt": response.validated_output,
70
+ "risk_score": 0.0,
71
+ }
72
+
73
+ @weave.op
74
+ def guard(
75
+ self,
76
+ prompt: str,
77
+ return_detected_secrets: bool = True,
78
+ **kwargs,
79
+ ) -> SecretsDetectionResponse | SecretsDetectionResponse:
80
+ """
81
+ Guards the given prompt by scanning for secrets.
82
+
83
+ Args:
84
+ prompt (str): The prompt to scan for secrets.
85
+ return_detected_secrets (bool): Whether to return detected secrets.
86
+
87
+ Returns:
88
+ SecretsDetectionResponse | SecretsDetectionSimpleResponse: The response after scanning for secrets.
89
+ """
90
+ results = self.scan(prompt)
91
+
92
+ if return_detected_secrets:
93
+ return SecretsDetectionResponse(
94
+ contains_secrets=results["has_secret"],
95
+ detected_secrets=results["detected_secrets"],
96
+ explanation=results["explanation"],
97
+ redacted_text=results["modified_prompt"],
98
+ risk_score=results["risk_score"],
99
+ )
100
+ else:
101
+ return SecretsDetectionSimpleResponse(
102
+ contains_secrets=not results["has_secret"],
103
+ explanation=results["explanation"],
104
+ redacted_text=results["modified_prompt"],
105
+ risk_score=results["risk_score"],
106
+ )
107
+
108
+
109
+ class LLMGuardSecretsDetector(Guardrail):
110
+ """
111
+ A class to detect secrets using LLM Guard.
112
+
113
+ Attributes:
114
+ validator (Any): The validator used for detecting secrets.
115
+ """
116
+
117
+ validator: Any
118
+
119
+ def __init__(self):
120
+ """
121
+ Initializes the LLMGuardSecretsDetector with a validator.
122
+ """
123
+ validator = Secrets(redact_mode="all")
124
+ super().__init__(validator=validator)
125
+
126
+ def scan(self, text: str) -> dict:
127
+ """
128
+ Scans the given text for secrets.
129
+
130
+ Args:
131
+ text (str): The text to scan for secrets.
132
+
133
+ Returns:
134
+ dict: A dictionary containing the scan results.
135
+ """
136
+ sanitized_prompt, is_valid, risk_score = self.validator.scan(text)
137
+ if is_valid:
138
+ return {
139
+ "has_secret": not is_valid,
140
+ "detected_secrets": None,
141
+ "explanation": "No secrets detected in the text.",
142
+ "modified_prompt": sanitized_prompt,
143
+ "risk_score": risk_score,
144
+ }
145
+ else:
146
+ return {
147
+ "has_secret": not is_valid,
148
+ "detected_secrets": {},
149
+ "explanation": "This library does not return detected secrets.",
150
+ "modified_prompt": sanitized_prompt,
151
+ "risk_score": risk_score,
152
+ }
153
+
154
+ @weave.op
155
+ def guard(
156
+ self,
157
+ prompt: str,
158
+ return_detected_secrets: bool = True,
159
+ **kwargs,
160
+ ) -> SecretsDetectionResponse | SecretsDetectionResponse:
161
+ """
162
+ Guards the given prompt by scanning for secrets.
163
+
164
+ Args:
165
+ prompt (str): The prompt to scan for secrets.
166
+ return_detected_secrets (bool): Whether to return detected secrets.
167
+
168
+ Returns:
169
+ SecretsDetectionResponse | SecretsDetectionSimpleResponse: The response after scanning for secrets.
170
+ """
171
+ results = self.scan(prompt)
172
+ if return_detected_secrets:
173
+ return SecretsDetectionResponse(
174
+ contains_secrets=results["has_secret"],
175
+ detected_secrets=results["detected_secrets"],
176
+ explanation=results["explanation"],
177
+ redacted_text=results["modified_prompt"],
178
+ risk_score=results["risk_score"],
179
+ )
180
+ else:
181
+ return SecretsDetectionSimpleResponse(
182
+ contains_secrets=not results["has_secret"],
183
+ explanation=results["explanation"],
184
+ redacted_text=results["modified_prompt"],
185
+ risk_score=results["risk_score"],
186
+ )
187
+
188
+
189
+ def main():
190
+ """
191
+ Main function to initialize and evaluate the secrets detectors.
192
+ """
193
+ client = weave.init("parambharat/secrets-detection")
194
+ dataset = weave.ref("secrets-detection-benchmark:latest").get()
195
+ llm_guard_guardrail = LLMGuardSecretsDetector()
196
+ guardrails_ai_guardrail = GuardrailsAISecretsDetector()
197
+ guardrails_genie_guardrail = SecretsDetectionGuardrail()
198
+
199
+ all_guards = [
200
+ llm_guard_guardrail,
201
+ guardrails_ai_guardrail,
202
+ guardrails_genie_guardrail,
203
+ ]
204
+ evaluation = weave.Evaluation(
205
+ dataset=dataset.rows,
206
+ scorers=[AccuracyMetric()],
207
+ )
208
+
209
+ for guard in all_guards:
210
+ name = guard.__class__.__name__
211
+ guardrail_manager = GuardrailManager(
212
+ guardrails=[
213
+ guard,
214
+ ]
215
+ )
216
+
217
+ results = asyncio.run(
218
+ evaluation.evaluate(
219
+ guardrail_manager,
220
+ __weave={"display_name": f"{name}"},
221
+ )
222
+ )
223
+ print(results)
224
+
225
+
226
+ if __name__ == "__main__":
227
+ main()
guardrails_genie/guardrails/secrets_detection/__init__.py CHANGED
@@ -1,17 +1,15 @@
1
  from guardrails_genie.guardrails.secrets_detection.secrets_detection import (
2
- DEFAULT_SECRETS_PATTERNS,
3
- REDACTION,
4
  SecretsDetectionGuardrail,
5
- SecretsDetectionResponse,
6
  SecretsDetectionSimpleResponse,
7
- redact,
 
 
8
  )
9
 
10
  __all__ = [
11
- "DEFAULT_SECRETS_PATTERNS",
12
  "SecretsDetectionGuardrail",
13
  "SecretsDetectionSimpleResponse",
14
  "SecretsDetectionResponse",
15
  "REDACTION",
16
- "redact",
17
  ]
 
1
  from guardrails_genie.guardrails.secrets_detection.secrets_detection import (
 
 
2
  SecretsDetectionGuardrail,
 
3
  SecretsDetectionSimpleResponse,
4
+ SecretsDetectionResponse,
5
+ REDACTION,
6
+ redact_value,
7
  )
8
 
9
  __all__ = [
 
10
  "SecretsDetectionGuardrail",
11
  "SecretsDetectionSimpleResponse",
12
  "SecretsDetectionResponse",
13
  "REDACTION",
14
+ "redact_value",
15
  ]
guardrails_genie/guardrails/secrets_detection/secrets_detection.py CHANGED
@@ -1,41 +1,30 @@
1
  import hashlib
2
  import json
 
3
  import pathlib
 
4
  from enum import Enum
5
- from typing import Optional, Union
6
 
7
  import weave
8
- from pydantic import BaseModel
9
 
10
  from guardrails_genie.guardrails.base import Guardrail
11
- from guardrails_genie.regex_model import RegexModel
12
 
13
-
14
- def load_secrets_patterns() -> dict[str, list[str]]:
15
- """
16
- Load secret patterns from a JSONL file and return them as a dictionary.
17
-
18
- Returns:
19
- dict: A dictionary where keys are pattern names and values are lists of regex patterns.
20
- """
21
- default_patterns = {}
22
- patterns = (
23
- pathlib.Path(__file__).parent.absolute() / "secrets_patterns.jsonl"
24
- ).read_text()
25
-
26
- for pattern in patterns.splitlines():
27
- pattern = json.loads(pattern)
28
- default_patterns[pattern["name"]] = [rf"{pat}" for pat in pattern["patterns"]]
29
- return default_patterns
30
-
31
-
32
- # Load default secret patterns from the JSONL file
33
- DEFAULT_SECRETS_PATTERNS = load_secrets_patterns()
34
 
35
 
36
  class REDACTION(str, Enum):
37
  """
38
- Enum for different types of redaction methods.
39
  """
40
 
41
  REDACT_PARTIAL = "REDACT_PARTIAL"
@@ -44,31 +33,31 @@ class REDACTION(str, Enum):
44
  REDACT_NONE = "REDACT_NONE"
45
 
46
 
47
- def redact(text: str, matches: list[str], redaction_type: REDACTION) -> str:
48
  """
49
- Redact the given matches in the text based on the redaction type.
50
 
51
  Args:
52
- text (str): The input text to redact.
53
- matches (list[str]): List of strings to be redacted.
54
- redaction_type (REDACTION): The type of redaction to apply.
 
 
 
55
 
56
  Returns:
57
- str: The redacted text.
58
- """
59
- for match in matches:
60
- if redaction_type == REDACTION.REDACT_PARTIAL:
61
- replacement = "[REDACTED:]" + match[:2] + ".." + match[-2:] + "[:REDACTED]"
62
- elif redaction_type == REDACTION.REDACT_ALL:
63
- replacement = "[REDACTED:]" + ("*" * len(match)) + "[:REDACTED]"
64
- elif redaction_type == REDACTION.REDACT_HASH:
65
- replacement = (
66
- "[REDACTED:]" + hashlib.md5(match.encode()).hexdigest() + "[:REDACTED]"
67
- )
68
- else:
69
- replacement = match
70
- text = text.replace(match, replacement)
71
- return text
72
 
73
 
74
  class SecretsDetectionSimpleResponse(BaseModel):
@@ -79,11 +68,13 @@ class SecretsDetectionSimpleResponse(BaseModel):
79
  contains_secrets (bool): Indicates if secrets were detected.
80
  explanation (str): Explanation of the detection result.
81
  redacted_text (Optional[str]): The redacted text if secrets were found.
 
82
  """
83
 
84
  contains_secrets: bool
85
  explanation: str
86
  redacted_text: Optional[str] = None
 
87
 
88
  @property
89
  def safe(self) -> bool:
@@ -104,54 +95,329 @@ class SecretsDetectionResponse(SecretsDetectionSimpleResponse):
104
  detected_secrets (dict[str, list[str]]): Dictionary of detected secrets.
105
  """
106
 
107
- detected_secrets: dict[str, list[str]]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
108
 
109
 
110
  class SecretsDetectionGuardrail(Guardrail):
111
  """
112
- A guardrail for detecting secrets in text using regex patterns.
113
- reference: SecretBench: A Dataset of Software Secrets
114
- https://arxiv.org/abs/2303.06729
115
 
116
  Attributes:
117
- regex_model (RegexModel): The regex model used for detection.
118
- patterns (Union[dict[str, str], dict[str, list[str]]]): The patterns used for detection.
119
- redaction (REDACTION): The type of redaction to apply.
120
  """
121
 
122
- regex_model: RegexModel
123
- patterns: Union[dict[str, str], dict[str, list[str]]] = {}
124
  redaction: REDACTION
 
 
 
 
 
 
 
 
 
125
 
126
  def __init__(
127
  self,
128
- use_defaults: bool = True,
129
  redaction: REDACTION = REDACTION.REDACT_ALL,
130
  **kwargs,
131
  ):
132
  """
133
- Initialize the SecretsDetectionGuardrail.
134
 
135
  Args:
136
- use_defaults (bool): Whether to use default patterns.
137
- redaction (REDACTION): The type of redaction to apply.
138
  **kwargs: Additional keyword arguments.
139
  """
140
- patterns = {}
141
- if use_defaults:
142
- patterns = DEFAULT_SECRETS_PATTERNS.copy()
143
- if kwargs.get("patterns"):
144
- patterns.update(kwargs["patterns"])
145
-
146
- regex_model = RegexModel(patterns=patterns)
147
-
148
  super().__init__(
149
- regex_model=regex_model,
150
- patterns=patterns,
151
  redaction=redaction,
152
  )
153
 
154
- @weave.op()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
155
  def guard(
156
  self,
157
  prompt: str,
@@ -159,40 +425,38 @@ class SecretsDetectionGuardrail(Guardrail):
159
  **kwargs,
160
  ) -> SecretsDetectionResponse | SecretsDetectionResponse:
161
  """
162
- Check if the input prompt contains any secrets based on the regex patterns.
163
 
164
  Args:
165
- prompt (str): Input text to check for secrets.
166
- return_detected_secrets (bool): If True, returns detailed secrets type information.
 
167
 
168
  Returns:
169
- SecretsDetectionResponse or SecretsDetectionResponse: Detection results.
170
  """
171
- result = self.regex_model.check(prompt)
172
 
173
  explanation_parts = []
174
- if result.matched_patterns:
175
  explanation_parts.append("Found the following secrets in the text:")
176
- for secret_type, matches in result.matched_patterns.items():
177
  explanation_parts.append(f"- {secret_type}: {len(matches)} instance(s)")
178
  else:
179
  explanation_parts.append("No secrets detected in the text.")
180
 
181
- redacted_text = prompt
182
- if result.matched_patterns:
183
- for secret_type, matches in result.matched_patterns.items():
184
- redacted_text = redact(redacted_text, matches, self.redaction)
185
-
186
  if return_detected_secrets:
187
  return SecretsDetectionResponse(
188
- contains_secrets=not result.passed,
189
- detected_secrets=result.matched_patterns,
190
  explanation="\n".join(explanation_parts),
191
- redacted_text=redacted_text,
 
192
  )
193
  else:
194
  return SecretsDetectionSimpleResponse(
195
- contains_secrets=not result.passed,
196
  explanation="\n".join(explanation_parts),
197
- redacted_text=redacted_text,
 
198
  )
 
1
  import hashlib
2
  import json
3
+ import os
4
  import pathlib
5
+ import tempfile
6
  from enum import Enum
7
+ from typing import Optional, Any
8
 
9
  import weave
10
+ from pydantic import BaseModel, PrivateAttr
11
 
12
  from guardrails_genie.guardrails.base import Guardrail
 
13
 
14
+ try:
15
+ from detect_secrets import SecretsCollection
16
+ from detect_secrets.settings import default_settings
17
+ import hyperscan
18
+ except ImportError:
19
+ raise ImportError(
20
+ "The `detect-secrets` and the `hyperscan` packages are required for using the SecretsGuardrail. "
21
+ "Please install then by running `pip install detect-secrets hyperscan`."
22
+ )
 
 
 
 
 
 
 
 
 
 
 
 
23
 
24
 
25
  class REDACTION(str, Enum):
26
  """
27
+ Enum for different types of redaction modes.
28
  """
29
 
30
  REDACT_PARTIAL = "REDACT_PARTIAL"
 
33
  REDACT_NONE = "REDACT_NONE"
34
 
35
 
36
+ def redact_value(value: str, mode: str) -> str:
37
  """
38
+ Redacts the given value based on the specified redaction mode.
39
 
40
  Args:
41
+ value (str): The string value to be redacted.
42
+ mode (str): The redaction mode to be applied. It can be one of the following:
43
+ - REDACTION.REDACT_PARTIAL: Partially redacts the value.
44
+ - REDACTION.REDACT_ALL: Fully redacts the value.
45
+ - REDACTION.REDACT_HASH: Redacts the value by hashing it.
46
+ - REDACTION.REDACT_NONE: No redaction is applied.
47
 
48
  Returns:
49
+ str: The redacted value based on the specified mode.
50
+ """
51
+ replacement = value
52
+ if mode == REDACTION.REDACT_PARTIAL:
53
+ replacement = "[REDACTED:]" + value[:2] + ".." + value[-2:] + "[:REDACTED]"
54
+ elif mode == REDACTION.REDACT_ALL:
55
+ replacement = "[REDACTED:]" + ("*" * len(value)) + "[:REDACTED]"
56
+ elif mode == REDACTION.REDACT_HASH:
57
+ replacement = (
58
+ "[REDACTED:]" + hashlib.md5(value.encode()).hexdigest() + "[:REDACTED]"
59
+ )
60
+ return replacement
 
 
 
61
 
62
 
63
  class SecretsDetectionSimpleResponse(BaseModel):
 
68
  contains_secrets (bool): Indicates if secrets were detected.
69
  explanation (str): Explanation of the detection result.
70
  redacted_text (Optional[str]): The redacted text if secrets were found.
71
+ risk_score (float): The risk score of the detection result. (0.0, 0.5, 1.0)
72
  """
73
 
74
  contains_secrets: bool
75
  explanation: str
76
  redacted_text: Optional[str] = None
77
+ risk_score: float = 0.0
78
 
79
  @property
80
  def safe(self) -> bool:
 
95
  detected_secrets (dict[str, list[str]]): Dictionary of detected secrets.
96
  """
97
 
98
+ detected_secrets: dict[str, Any] | None = None
99
+
100
+
101
+ class SecretsInfo(BaseModel):
102
+ """
103
+ Model representing information about a detected secret.
104
+
105
+ Attributes:
106
+ secret (str): The detected secret value.
107
+ line_number (int): The line number where the secret was found.
108
+ """
109
+
110
+ secret: str
111
+ line_number: int
112
+
113
+
114
+ class ScanResult(BaseModel):
115
+ """
116
+ Model representing the result of a secrets scan.
117
+
118
+ Attributes:
119
+ detected_secrets (dict[str, Any] | None): Dictionary of detected secrets, or None if no secrets were found.
120
+ modified_prompt (str): The modified prompt with secrets redacted.
121
+ has_secret (bool): Indicates if any secrets were detected.
122
+ risk_score (float): The risk score of the detection result.
123
+ """
124
+
125
+ detected_secrets: dict[str, Any] | None = None
126
+ modified_prompt: str
127
+ has_secret: bool
128
+ risk_score: float
129
+
130
+
131
+ class DetectSecretsModel(weave.Model):
132
+ """
133
+ Model for detecting secrets using the detect-secrets library.
134
+ """
135
+
136
+ @staticmethod
137
+ def scan(text: str) -> dict[str, list[SecretsInfo]]:
138
+ """
139
+ Scans the given text for secrets using the detect-secrets library.
140
+
141
+ Args:
142
+ text (str): The text to scan for secrets.
143
+
144
+ Returns:
145
+ dict[str, list[SecretsInfo]]: A dictionary where the keys are secret types and the values are lists of SecretsInfo objects.
146
+ """
147
+ secrets = SecretsCollection()
148
+ temp_file = tempfile.NamedTemporaryFile(delete=False)
149
+ temp_file.write(text.encode("utf-8"))
150
+ temp_file.close()
151
+
152
+ with default_settings():
153
+ secrets.scan_file(str(temp_file.name))
154
+
155
+ unique_secrets = {}
156
+ for file in secrets.files:
157
+ for found_secret in secrets[file]:
158
+ if found_secret.secret_value is None:
159
+ continue
160
+
161
+ secret_type = found_secret.type
162
+ actual_secret = found_secret.secret_value
163
+ line_number = found_secret.line_number
164
+
165
+ if secret_type not in unique_secrets:
166
+ unique_secrets[secret_type] = []
167
+
168
+ unique_secrets[secret_type].append(
169
+ SecretsInfo(secret=actual_secret, line_number=line_number)
170
+ )
171
+
172
+ os.remove(temp_file.name)
173
+ return unique_secrets
174
+
175
+ @weave.op
176
+ def invoke(self, text: str) -> dict[str, list[SecretsInfo]]:
177
+ """
178
+ Invokes the scan method to detect secrets in the given text.
179
+
180
+ Args:
181
+ text (str): The text to scan for secrets.
182
+
183
+ Returns:
184
+ dict[str, list[SecretsInfo]]: A dictionary where the keys are secret types and the values are lists of SecretsInfo objects.
185
+ """
186
+ return self.scan(text)
187
+
188
+
189
+ class HyperScanModel(weave.Model):
190
+ """
191
+ Model for detecting secrets using the Hyperscan library.
192
+ We use the Hyperscan library to scan for secrets using regex patterns.
193
+ The patterns are mined from https://github.com/mazen160/secrets-patterns-db
194
+ This model is used in conjunction with the DetectSecretsModel to improve the detection of secrets.
195
+ """
196
+
197
+ _db: Any = PrivateAttr()
198
+ _pattern_map: dict[str, str] = PrivateAttr()
199
+ only_high_confidence: bool = False
200
+ ids: list[str] = []
201
+
202
+ def _load_patterns(self) -> dict[str, str]:
203
+ """
204
+ Loads the patterns from a JSONL file.
205
+
206
+ Returns:
207
+ dict[str, str]: A dictionary where the keys are pattern names and the values are regex patterns.
208
+ """
209
+ patterns = (
210
+ pathlib.Path(__file__).parent.resolve() / "secrets_patterns.jsonl"
211
+ ).open()
212
+ patterns_list = [json.loads(line) for line in patterns]
213
+ if self.only_high_confidence:
214
+ patterns_list = [
215
+ pattern for pattern in patterns_list if pattern["confidence"] == "high"
216
+ ]
217
+ return {pattern["name"]: pattern["regex"] for pattern in patterns_list}
218
+
219
+ def __init__(self, **kwargs: Any):
220
+ """
221
+ Initializes the HyperScanModel instance.
222
+ """
223
+ super().__init__(**kwargs)
224
+
225
+ def model_post_init(self, __context: Any) -> None:
226
+ """
227
+ Post-initialization method to load patterns and compile the Hyperscan database.
228
+ """
229
+ self._pattern_map = self._load_patterns()
230
+ self.ids = list(self._pattern_map.keys())
231
+ expressions = [pattern.encode() for pattern in self._pattern_map.values()]
232
+ self._db = hyperscan.Database()
233
+ self._db.compile(expressions=expressions, ids=list(range(len(expressions))))
234
+
235
+ def scan(self, text: str) -> dict[str, list[SecretsInfo]]:
236
+ """
237
+ Scans the given text for secrets using the Hyperscan library.
238
+
239
+ Args:
240
+ text (str): The text to scan for secrets.
241
+
242
+ Returns:
243
+ dict[str, list[SecretsInfo]]: A dictionary where the keys are secret types and the values are lists of SecretsInfo objects.
244
+ """
245
+ unique_secrets = {}
246
+
247
+ def on_match(idx, start, end, flags, context):
248
+ """
249
+ Callback function for handling matches found by Hyperscan.
250
+
251
+ Args:
252
+ idx: The index of the matched pattern.
253
+ start: The start position of the match.
254
+ end: The end position of the match.
255
+ flags: The flags associated with the match.
256
+ context: The context provided to the scan method.
257
+ """
258
+ secret = context["text"][start:end]
259
+ line_number = context["line_number"]
260
+ current_match = unique_secrets.setdefault(self.ids[idx], [])
261
+
262
+ if not current_match or len(secret) > len(current_match[0].secret):
263
+ unique_secrets[self.ids[idx]] = [
264
+ SecretsInfo(line_number=line_number, secret=secret)
265
+ ]
266
+
267
+ for line_no, line in enumerate(text.splitlines(), start=1):
268
+ self._db.scan(
269
+ line.encode(),
270
+ match_event_handler=on_match,
271
+ context={"text": line, "line_number": line_no},
272
+ )
273
+
274
+ return unique_secrets
275
+
276
+ @weave.op
277
+ def invoke(self, text: str) -> dict[str, list[SecretsInfo]]:
278
+ """
279
+ Invokes the scan method to detect secrets in the given text.
280
+
281
+ Args:
282
+ text (str): The text to scan for secrets.
283
+
284
+ Returns:
285
+ dict[str, list[SecretsInfo]]: A dictionary where the keys are secret types and the values are lists of SecretsInfo objects.
286
+ """
287
+ return self.scan(text)
288
 
289
 
290
  class SecretsDetectionGuardrail(Guardrail):
291
  """
292
+ Guardrail class for secrets detection using both detect-secrets and Hyperscan models.
 
 
293
 
294
  Attributes:
295
+ redaction (REDACTION): The redaction mode to be applied.
296
+ _detect_secrets_model (Any): Instance of the DetectSecretsModel.
297
+ _hyperscan_model (Any): Instance of the HyperScanModel.
298
  """
299
 
 
 
300
  redaction: REDACTION
301
+ _detect_secrets_model: Any = PrivateAttr()
302
+ _hyperscan_model: Any = PrivateAttr()
303
+
304
+ def model_post_init(self, __context: Any) -> None:
305
+ """
306
+ Post-initialization method to initialize the detect-secrets and Hyperscan models.
307
+ """
308
+ self._detect_secrets_model = DetectSecretsModel()
309
+ self._hyperscan_model = HyperScanModel()
310
 
311
  def __init__(
312
  self,
 
313
  redaction: REDACTION = REDACTION.REDACT_ALL,
314
  **kwargs,
315
  ):
316
  """
317
+ Initializes the SecretsDetectionGuardrail instance.
318
 
319
  Args:
320
+ redaction (REDACTION): The redaction mode to be applied. Defaults to REDACTION.REDACT_ALL.
 
321
  **kwargs: Additional keyword arguments.
322
  """
 
 
 
 
 
 
 
 
323
  super().__init__(
 
 
324
  redaction=redaction,
325
  )
326
 
327
+ def get_modified_value(
328
+ self, unique_secrets: dict[str, Any], lines: list[str]
329
+ ) -> str:
330
+ """
331
+ Redacts the detected secrets in the given lines of text.
332
+
333
+ Args:
334
+ unique_secrets (dict[str, Any]): Dictionary of detected secrets.
335
+ lines (list[str]): List of lines of text.
336
+
337
+ Returns:
338
+ str: The modified text with secrets redacted.
339
+ """
340
+ for _, secrets_list in unique_secrets.items():
341
+ for secret_info in secrets_list:
342
+ secret = secret_info.secret
343
+ line_number = secret_info.line_number
344
+ lines[line_number - 1] = lines[line_number - 1].replace(
345
+ secret, redact_value(secret, self.redaction)
346
+ )
347
+
348
+ modified_value = "\n".join(lines)
349
+ return modified_value
350
+
351
+ def get_scan_result(
352
+ self, unique_secrets: dict[str, list[SecretsInfo]], lines: list[str]
353
+ ) -> ScanResult | None:
354
+ """
355
+ Generates a ScanResult based on the detected secrets.
356
+
357
+ Args:
358
+ unique_secrets (dict[str, list[SecretsInfo]]): Dictionary of detected secrets.
359
+ lines (list[str]): List of lines of text.
360
+
361
+ Returns:
362
+ ScanResult | None: The scan result if secrets are detected, otherwise None.
363
+ """
364
+ if unique_secrets:
365
+ modified_value = self.get_modified_value(unique_secrets, lines)
366
+ detected_secrets = {
367
+ k: [i.secret for i in v] for k, v in unique_secrets.items()
368
+ }
369
+
370
+ return ScanResult(
371
+ **{
372
+ "detected_secrets": detected_secrets,
373
+ "modified_prompt": modified_value,
374
+ "has_secret": True,
375
+ "risk_score": 1.0,
376
+ }
377
+ )
378
+ return None
379
+
380
+ def scan(self, prompt: str) -> ScanResult:
381
+ """
382
+ Scans the given prompt for secrets using both detect-secrets and Hyperscan models.
383
+
384
+ Args:
385
+ prompt (str): The text to scan for secrets.
386
+
387
+ Returns:
388
+ ScanResult: The scan result with detected secrets and redacted text.
389
+ """
390
+ if prompt.strip() == "":
391
+ return ScanResult(
392
+ **{
393
+ "detected_secrets": None,
394
+ "modified_prompt": prompt,
395
+ "has_secret": False,
396
+ "risk_score": 0.0,
397
+ }
398
+ )
399
+
400
+ unique_secrets = self._detect_secrets_model.invoke(text=prompt)
401
+ results = self.get_scan_result(unique_secrets, prompt.splitlines())
402
+ if results:
403
+ return results
404
+
405
+ unique_secrets = self._hyperscan_model.invoke(text=prompt)
406
+ results = self.get_scan_result(unique_secrets, prompt.splitlines())
407
+ if results:
408
+ results.risk_score = 0.5
409
+ return results
410
+
411
+ return ScanResult(
412
+ **{
413
+ "detected_secrets": None,
414
+ "modified_prompt": prompt,
415
+ "has_secret": False,
416
+ "risk_score": 0.0,
417
+ }
418
+ )
419
+
420
+ @weave.op
421
  def guard(
422
  self,
423
  prompt: str,
 
425
  **kwargs,
426
  ) -> SecretsDetectionResponse | SecretsDetectionResponse:
427
  """
428
+ Guards the given prompt by scanning for secrets and optionally returning detected secrets.
429
 
430
  Args:
431
+ prompt (str): The text to scan for secrets.
432
+ return_detected_secrets (bool): Whether to return detected secrets in the response. Defaults to True.
433
+ **kwargs: Additional keyword arguments.
434
 
435
  Returns:
436
+ SecretsDetectionResponse | SecretsDetectionSimpleResponse: The response with scan results and redacted text.
437
  """
438
+ results = self.scan(prompt)
439
 
440
  explanation_parts = []
441
+ if results.has_secret:
442
  explanation_parts.append("Found the following secrets in the text:")
443
+ for secret_type, matches in results.detected_secrets.items():
444
  explanation_parts.append(f"- {secret_type}: {len(matches)} instance(s)")
445
  else:
446
  explanation_parts.append("No secrets detected in the text.")
447
 
 
 
 
 
 
448
  if return_detected_secrets:
449
  return SecretsDetectionResponse(
450
+ contains_secrets=results.has_secret,
451
+ detected_secrets=results.detected_secrets,
452
  explanation="\n".join(explanation_parts),
453
+ redacted_text=results.modified_prompt,
454
+ risk_score=results.risk_score,
455
  )
456
  else:
457
  return SecretsDetectionSimpleResponse(
458
+ contains_secrets=not results.has_secret,
459
  explanation="\n".join(explanation_parts),
460
+ redacted_text=results.modified_prompt,
461
+ risk_score=results.risk_score,
462
  )
guardrails_genie/guardrails/secrets_detection/secrets_patterns.jsonl CHANGED
The diff for this file is too large to render. See raw diff
 
pyproject.toml CHANGED
@@ -25,6 +25,13 @@ presidio = [
25
  "presidio-analyzer>=2.2.355",
26
  "presidio-anonymizer>=2.2.355",
27
  ]
 
 
 
 
 
 
 
28
  dev = [
29
  "isort>=5.13.2",
30
  "black>=24.10.0",
 
25
  "presidio-analyzer>=2.2.355",
26
  "presidio-anonymizer>=2.2.355",
27
  ]
28
+
29
+ secrets = [
30
+ "gibberish-detector>=0.1.1",
31
+ "detect-secrets>=1.5.0",
32
+ "hyperscan>=0.7.8"
33
+ ]
34
+
35
  dev = [
36
  "isort>=5.13.2",
37
  "black>=24.10.0",
tests/guardrails_genie/guardrails/test_secrets_detection.py CHANGED
@@ -2,16 +2,14 @@ import hashlib
2
  import re
3
 
4
  import pytest
5
- from hypothesis import given, settings
6
- from hypothesis import strategies as st
7
 
 
8
  from guardrails_genie.guardrails.secrets_detection import (
9
- DEFAULT_SECRETS_PATTERNS,
10
- REDACTION,
11
- SecretsDetectionGuardrail,
12
- SecretsDetectionResponse,
13
  SecretsDetectionSimpleResponse,
14
- redact,
 
 
15
  )
16
 
17
 
@@ -19,7 +17,7 @@ from guardrails_genie.guardrails.secrets_detection import (
19
  def mock_secrets_guard(monkeypatch):
20
  def _mock_guard(*args, **kwargs):
21
  prompt = kwargs.get("prompt")
22
- return_detected_types = kwargs.get("return_detected_types")
23
 
24
  if "safe text" in prompt:
25
  if return_detected_types:
@@ -28,12 +26,14 @@ def mock_secrets_guard(monkeypatch):
28
  explanation="No secrets detected in the text.",
29
  detected_secrets={},
30
  redacted_text=prompt,
 
31
  )
32
  else:
33
  return SecretsDetectionSimpleResponse(
34
  contains_secrets=False,
35
  explanation="No secrets detected in the text.",
36
  redacted_text=prompt,
 
37
  )
38
  else:
39
  if return_detected_types:
@@ -42,12 +42,14 @@ def mock_secrets_guard(monkeypatch):
42
  explanation="The output contains secrets.",
43
  detected_secrets={"secrets": ["API_KEY"]},
44
  redacted_text="My secret key is [REDACTED:]************[:REDACTED]",
 
45
  )
46
  else:
47
  return SecretsDetectionSimpleResponse(
48
  contains_secrets=True,
49
  explanation="The output contains secrets.",
50
  redacted_text="My secret key is [REDACTED:]************[:REDACTED]",
 
51
  )
52
 
53
  monkeypatch.setattr(
@@ -57,38 +59,28 @@ def mock_secrets_guard(monkeypatch):
57
 
58
 
59
  def test_redact_partial():
60
- text = "My secret key is ABCDEFGHIJKL"
61
- matches = ["ABCDEFGHIJKL"]
62
- redacted_text = redact(text, matches, REDACTION.REDACT_PARTIAL)
63
- assert redacted_text == "My secret key is [REDACTED:]AB..KL[:REDACTED]"
64
 
65
 
66
  def test_redact_all():
67
- text = "My secret key is ABCDEFGHIJKL"
68
- matches = ["ABCDEFGHIJKL"]
69
- redacted_text = redact(text, matches, REDACTION.REDACT_ALL)
70
- assert redacted_text == "My secret key is [REDACTED:]************[:REDACTED]"
71
 
72
 
73
  def test_redact_hash():
74
- text = "My secret key is ABCDEFGHIJKL"
75
- matches = ["ABCDEFGHIJKL"]
76
- hashed_value = hashlib.md5("ABCDEFGHIJKL".encode()).hexdigest()
77
- redacted_text = redact(text, matches, REDACTION.REDACT_HASH)
78
- assert redacted_text == f"My secret key is [REDACTED:]{hashed_value}[:REDACTED]"
79
-
80
-
81
- def test_redact_no_match():
82
- text = "My secret key is ABCDEFGHIJKL"
83
- matches = ["XYZ"]
84
- redacted_text = redact(text, matches, REDACTION.REDACT_ALL)
85
- assert redacted_text == text
86
 
87
 
88
  def test_secrets_detection_guardrail_detect_types(mock_secrets_guard):
89
  from guardrails_genie.guardrails.secrets_detection import (
90
- REDACTION,
91
  SecretsDetectionGuardrail,
 
92
  )
93
 
94
  guardrail = SecretsDetectionGuardrail(redaction=REDACTION.REDACT_ALL)
@@ -104,8 +96,8 @@ def test_secrets_detection_guardrail_detect_types(mock_secrets_guard):
104
 
105
  def test_secrets_detection_guardrail_simple_response(mock_secrets_guard):
106
  from guardrails_genie.guardrails.secrets_detection import (
107
- REDACTION,
108
  SecretsDetectionGuardrail,
 
109
  )
110
 
111
  guardrail = SecretsDetectionGuardrail(redaction=REDACTION.REDACT_ALL)
@@ -120,8 +112,8 @@ def test_secrets_detection_guardrail_simple_response(mock_secrets_guard):
120
 
121
  def test_secrets_detection_guardrail_no_secrets(mock_secrets_guard):
122
  from guardrails_genie.guardrails.secrets_detection import (
123
- REDACTION,
124
  SecretsDetectionGuardrail,
 
125
  )
126
 
127
  guardrail = SecretsDetectionGuardrail(redaction=REDACTION.REDACT_ALL)
@@ -135,16 +127,15 @@ def test_secrets_detection_guardrail_no_secrets(mock_secrets_guard):
135
  assert result.redacted_text == prompt
136
 
137
 
138
- # Create a strategy to generate strings that match the patterns
139
  def pattern_strategy(pattern):
140
  return st.from_regex(re.compile(pattern), fullmatch=True)
141
 
142
 
143
- @settings(deadline=1000) # Set the deadline to 1000 milliseconds (1 second)
144
- @given(pattern_strategy(DEFAULT_SECRETS_PATTERNS["JwtToken"][0]))
145
  def test_specific_pattern_guardrail(text):
146
  guardrail = SecretsDetectionGuardrail(redaction=REDACTION.REDACT_ALL)
147
  result = guardrail.guard(prompt=text, return_detected_secrets=True)
148
 
149
  assert result.contains_secrets is True
150
- assert "JwtToken" in result.detected_secrets
 
2
  import re
3
 
4
  import pytest
5
+ from hypothesis import strategies as st, given, settings
 
6
 
7
+ from guardrails_genie.guardrails import SecretsDetectionGuardrail
8
  from guardrails_genie.guardrails.secrets_detection import (
 
 
 
 
9
  SecretsDetectionSimpleResponse,
10
+ SecretsDetectionResponse,
11
+ REDACTION,
12
+ redact_value,
13
  )
14
 
15
 
 
17
  def mock_secrets_guard(monkeypatch):
18
  def _mock_guard(*args, **kwargs):
19
  prompt = kwargs.get("prompt")
20
+ return_detected_types = kwargs.get("return_detected_secrets")
21
 
22
  if "safe text" in prompt:
23
  if return_detected_types:
 
26
  explanation="No secrets detected in the text.",
27
  detected_secrets={},
28
  redacted_text=prompt,
29
+ risk_score=0.0,
30
  )
31
  else:
32
  return SecretsDetectionSimpleResponse(
33
  contains_secrets=False,
34
  explanation="No secrets detected in the text.",
35
  redacted_text=prompt,
36
+ risk_score=0.0,
37
  )
38
  else:
39
  if return_detected_types:
 
42
  explanation="The output contains secrets.",
43
  detected_secrets={"secrets": ["API_KEY"]},
44
  redacted_text="My secret key is [REDACTED:]************[:REDACTED]",
45
+ risk_score=1.0,
46
  )
47
  else:
48
  return SecretsDetectionSimpleResponse(
49
  contains_secrets=True,
50
  explanation="The output contains secrets.",
51
  redacted_text="My secret key is [REDACTED:]************[:REDACTED]",
52
+ risk_score=1.0,
53
  )
54
 
55
  monkeypatch.setattr(
 
59
 
60
 
61
  def test_redact_partial():
62
+ text = "ABCDEFGHIJKL"
63
+ redacted_text = redact_value(text, REDACTION.REDACT_PARTIAL)
64
+ assert redacted_text == "[REDACTED:]AB..KL[:REDACTED]"
 
65
 
66
 
67
  def test_redact_all():
68
+ text = "ABCDEFGHIJKL"
69
+ redacted_text = redact_value(text, REDACTION.REDACT_ALL)
70
+ assert redacted_text == "[REDACTED:]************[:REDACTED]"
 
71
 
72
 
73
  def test_redact_hash():
74
+ text = "ABCDEFGHIJKL"
75
+ hashed_value = hashlib.md5(text.encode()).hexdigest()
76
+ redacted_text = redact_value(text, REDACTION.REDACT_HASH)
77
+ assert redacted_text == f"[REDACTED:]{hashed_value}[:REDACTED]"
 
 
 
 
 
 
 
 
78
 
79
 
80
  def test_secrets_detection_guardrail_detect_types(mock_secrets_guard):
81
  from guardrails_genie.guardrails.secrets_detection import (
 
82
  SecretsDetectionGuardrail,
83
+ REDACTION,
84
  )
85
 
86
  guardrail = SecretsDetectionGuardrail(redaction=REDACTION.REDACT_ALL)
 
96
 
97
  def test_secrets_detection_guardrail_simple_response(mock_secrets_guard):
98
  from guardrails_genie.guardrails.secrets_detection import (
 
99
  SecretsDetectionGuardrail,
100
+ REDACTION,
101
  )
102
 
103
  guardrail = SecretsDetectionGuardrail(redaction=REDACTION.REDACT_ALL)
 
112
 
113
  def test_secrets_detection_guardrail_no_secrets(mock_secrets_guard):
114
  from guardrails_genie.guardrails.secrets_detection import (
 
115
  SecretsDetectionGuardrail,
116
+ REDACTION,
117
  )
118
 
119
  guardrail = SecretsDetectionGuardrail(redaction=REDACTION.REDACT_ALL)
 
127
  assert result.redacted_text == prompt
128
 
129
 
 
130
  def pattern_strategy(pattern):
131
  return st.from_regex(re.compile(pattern), fullmatch=True)
132
 
133
 
134
+ @settings(deadline=1000)
135
+ @given(pattern_strategy(r"AKIA[0-9A-Z]{16}"))
136
  def test_specific_pattern_guardrail(text):
137
  guardrail = SecretsDetectionGuardrail(redaction=REDACTION.REDACT_ALL)
138
  result = guardrail.guard(prompt=text, return_detected_secrets=True)
139
 
140
  assert result.contains_secrets is True
141
+ assert "AWS Access Key" in result.detected_secrets