Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,24 @@ Use the engine that matches your accuracy and dependency constraints:
- Cascades regex with optional NER engines.
- If optional deps are missing, it degrades gracefully and warns.

### Locale-specific regex patterns

German regex patterns (DE_*) are locale-specific and disabled by default to avoid
false positives on non-German text. Enable them explicitly via `locales`:

```python
import datafog

result = datafog.scan(
"Steuer-ID 12345678903",
engine="regex",
locales=["de"],
)
print(result.entities)
```

Some German DE_* patterns include additional checksum or context validation to reduce noise (for example, `DE_TAX_ID` and `DE_RESIDENCE_PERMIT_NUMBER`).

## Backward-Compatible APIs

The existing public API remains available.
Expand Down
26 changes: 19 additions & 7 deletions datafog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,17 @@ def scan(
text: str,
engine: str = "regex",
entity_types: list[str] | None = None,
locales: list[str] | str | None = None,
) -> ScanResult:
"""
v5-preview scan entrypoint.

Defaults to the lightweight regex engine so the core install works without
optional dependency fallback warnings.
"""
return _scan(text=text, engine=engine, entity_types=entity_types)
return _scan(
text=text, engine=engine, entity_types=entity_types, locales=locales
)


def redact(
Expand All @@ -180,6 +183,7 @@ def redact(
entity_types: list[str] | None = None,
strategy: str = "token",
preset: str | None = None,
locales: list[str] | str | None = None,
) -> RedactResult:
"""
v5-preview redaction entrypoint.
Expand All @@ -201,6 +205,7 @@ def redact(
text=text,
engine=engine,
entity_types=entity_types,
locales=locales,
strategy=strategy,
)

Expand All @@ -210,6 +215,7 @@ def protect(
engine: str = "regex",
strategy: str = "token",
on_detect: str = "redact",
locales: list[str] | str | None = None,
):
"""
v5-preview guardrail factory.
Expand All @@ -219,11 +225,12 @@ def protect(
engine=engine,
strategy=strategy,
on_detect=on_detect,
locales=locales,
)


# Simple API for core functionality (backward compatibility)
def detect(text: str) -> list:
def detect(text: str, locales: list[str] | str | None = None) -> list:
"""
Detect PII in text using regex patterns.

Expand All @@ -240,16 +247,16 @@ def detect(text: str) -> list:
"""
_warn_v5_replacement("detect", "datafog.scan()")

return _detect_impl(text)
return _detect_impl(text, locales=locales)


def _detect_impl(text: str) -> list:
def _detect_impl(text: str, locales: list[str] | str | None = None) -> list:
import time as _time

_start = _time.monotonic()

_lazy_import_regex_annotator()
annotator = RegexAnnotator()
annotator = RegexAnnotator(locales=locales)
# Use the structured output to get proper positions
_, result = annotator.annotate_with_spans(text)

Expand Down Expand Up @@ -290,7 +297,12 @@ def _detect_impl(text: str) -> list:
return entities


def process(text: str, anonymize: bool = False, method: str = "redact") -> dict:
def process(
text: str,
anonymize: bool = False,
method: str = "redact",
locales: list[str] | str | None = None,
) -> dict:
"""
Process text to detect and optionally anonymize PII.

Expand All @@ -317,7 +329,7 @@ def process(text: str, anonymize: bool = False, method: str = "redact") -> dict:

_start = _time.monotonic()

findings = _detect_impl(text)
findings = _detect_impl(text, locales=locales)

result = {"original": text, "findings": findings}

Expand Down
13 changes: 9 additions & 4 deletions datafog/__init___lean.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def _missing_dependency(*args, **kwargs):


# Simple API for core functionality
def detect(text: str) -> list:
def detect(text: str, locales: list[str] | None = None) -> list:
"""
Detect PII in text using regex patterns.

Expand All @@ -94,7 +94,7 @@ def detect(text: str) -> list:
>>> detect("Contact john@example.com")
[{'type': 'EMAIL', 'value': 'john@example.com', 'start': 8, 'end': 24}]
"""
annotator = RegexAnnotator()
annotator = RegexAnnotator(locales=locales)
result = annotator.annotate(text)

# Convert to simple format
Expand All @@ -113,7 +113,12 @@ def detect(text: str) -> list:
return entities


def process(text: str, anonymize: bool = False, method: str = "redact") -> dict:
def process(
text: str,
anonymize: bool = False,
method: str = "redact",
locales: list[str] | None = None,
) -> dict:
"""
Process text to detect and optionally anonymize PII.

Expand All @@ -134,7 +139,7 @@ def process(text: str, anonymize: bool = False, method: str = "redact") -> dict:
'findings': [{'type': 'EMAIL', 'value': 'john@example.com', ...}]
}
"""
findings = detect(text)
findings = detect(text, locales=locales)

result = {"original": text, "findings": findings}

Expand Down
14 changes: 12 additions & 2 deletions datafog/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from contextlib import contextmanager
from dataclasses import dataclass
from functools import wraps
from typing import Any, Callable, Iterator, Optional, TypeVar
from typing import Any, Callable, Iterable, Iterator, Optional, TypeVar

from .engine import Entity, RedactResult, ScanResult, scan, scan_and_redact

Expand All @@ -31,6 +31,7 @@ def scan(self, text: str) -> ScanResult:
text=text,
engine=self.guardrail.engine,
entity_types=self.guardrail.entity_types,
locales=self.guardrail.locales,
)
if result.entities:
self.detections += len(result.entities)
Expand All @@ -54,21 +55,28 @@ class Guardrail:
engine: str = "smart"
strategy: str = "token"
on_detect: str = "redact"
locales: Optional[Iterable[str] | str] = None

def __post_init__(self) -> None:
if self.on_detect not in {"redact", "block", "warn"}:
raise ValueError("on_detect must be one of: redact, block, warn")

def scan(self, text: str) -> ScanResult:
"""Scan a text value for entities."""
return scan(text=text, engine=self.engine, entity_types=self.entity_types)
return scan(
text=text,
engine=self.engine,
entity_types=self.entity_types,
locales=self.locales,
)

def filter(self, text: str) -> RedactResult:
"""Scan then enforce configured behavior."""
result = scan_and_redact(
text=text,
engine=self.engine,
entity_types=self.entity_types,
locales=self.locales,
strategy=self.strategy,
)
if not result.entities:
Expand Down Expand Up @@ -140,6 +148,7 @@ def create_guardrail(
engine: str = "smart",
strategy: str = "token",
on_detect: str = "redact",
locales: Optional[Iterable[str] | str] = None,
) -> Guardrail:
"""
Create a reusable guardrail object for wrapping LLM calls.
Expand All @@ -149,6 +158,7 @@ def create_guardrail(
engine=engine,
strategy=strategy,
on_detect=on_detect,
locales=locales,
)


Expand Down
41 changes: 32 additions & 9 deletions datafog/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,27 @@
without requiring heavy dependencies like spaCy or PyTorch.
"""

from typing import Dict, List, Union
from typing import Dict, Iterable, List, Optional, Union

from datafog.engine import scan, scan_and_redact
from datafog.models.anonymizer import AnonymizerType
from datafog.processing.text_processing.regex_annotator import RegexAnnotator

# Engine types as constants
REGEX_ENGINE = "regex"
SPACY_ENGINE = "spacy"
AUTO_ENGINE = "auto"


def detect_pii(text: str) -> Dict[str, List[str]]:
def detect_pii(
text: str, locales: Optional[Iterable[str] | str] = None
) -> Dict[str, List[str]]:
"""
Simple PII detection using lightweight regex engine.

Args:
text: Text to scan for PII
locales: Optional list of locale codes that enable locale-specific labels

Returns:
Dictionary mapping entity types to lists of detected values
Expand All @@ -37,7 +41,7 @@ def detect_pii(text: str) -> Dict[str, List[str]]:

try:
# Use engine boundary for canonical scan behavior.
scan_result = scan(text=text, engine=REGEX_ENGINE)
scan_result = scan(text=text, engine=REGEX_ENGINE, locales=locales)
pii_dict: Dict[str, List[str]] = {}
for entity in scan_result.entities:
if not entity.text.strip():
Expand Down Expand Up @@ -81,13 +85,18 @@ def detect_pii(text: str) -> Dict[str, List[str]]:
) from e


def anonymize_text(text: str, method: Union[str, AnonymizerType] = "redact") -> str:
def anonymize_text(
text: str,
method: Union[str, AnonymizerType] = "redact",
locales: Optional[Iterable[str] | str] = None,
) -> str:
"""
Simple text anonymization using lightweight regex engine.

Args:
text: Text to anonymize
method: Anonymization method ('redact', 'replace', or 'hash')
locales: Optional list of locale codes that enable locale-specific labels

Returns:
Anonymized text string
Expand Down Expand Up @@ -119,6 +128,7 @@ def anonymize_text(text: str, method: Union[str, AnonymizerType] = "redact") ->
result = scan_and_redact(
text=text,
engine=REGEX_ENGINE,
locales=locales,
strategy=strategy_map[method],
)

Expand Down Expand Up @@ -155,14 +165,17 @@ def anonymize_text(text: str, method: Union[str, AnonymizerType] = "redact") ->


def scan_text(
text: str, return_entities: bool = False
text: str,
return_entities: bool = False,
locales: Optional[Iterable[str] | str] = None,
) -> Union[bool, Dict[str, List[str]]]:
"""
Quick scan to check if text contains any PII.

Args:
text: Text to scan
return_entities: If True, return detected entities; if False, return boolean
locales: Optional list of locale codes that enable locale-specific labels

Returns:
Boolean indicating PII presence, or dictionary of detected entities
Expand All @@ -180,7 +193,7 @@ def scan_text(

_start = _time.monotonic()

entities = detect_pii(text)
entities = detect_pii(text, locales=locales)

result = entities if return_entities else len(entities) > 0

Expand All @@ -200,19 +213,21 @@ def scan_text(
return result


def get_supported_entities() -> List[str]:
def get_supported_entities(locales: Optional[Iterable[str] | str] = None) -> List[str]:
"""
Get list of PII entity types supported by the regex engine.

Locale-specific labels (e.g., DE_*) are only included when locales include "de".

Returns:
List of supported entity type names

Example:
>>> entities = get_supported_entities()
>>> print(entities)
['EMAIL', 'PHONE', 'SSN', 'CREDIT_CARD', 'IP_ADDRESS', 'DOB', 'ZIP']
['EMAIL', 'PHONE', 'SSN', 'CREDIT_CARD', 'IP_ADDRESS', 'DATE', 'ZIP_CODE']
"""
result = [
base = [
"EMAIL",
"PHONE",
"SSN",
Expand All @@ -222,6 +237,14 @@ def get_supported_entities() -> List[str]:
"ZIP_CODE",
]

normalized_locales = RegexAnnotator._normalize_locales(locales)
locale_labels = [
label
for locale in normalized_locales
for label in RegexAnnotator.LOCALE_LABELS.get(locale, [])
]
result = base + locale_labels if locale_labels else base

try:
from datafog.telemetry import track_function_call

Expand Down
Loading