diff --git a/README.md b/README.md index 62f7e10d..e176586e 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,24 @@ Use the engine that matches your accuracy and dependency constraints: - Cascades regex with optional NER engines. - If optional deps are missing, it degrades gracefully and warns. +### Locale-specific regex patterns + +German regex patterns (DE_*) are locale-specific and disabled by default to avoid +false positives on non-German text. Enable them explicitly via `locales`: + +```python +import datafog + +result = datafog.scan( + "Steuer-ID 12345678903", + engine="regex", + locales=["de"], +) +print(result.entities) +``` + +Some German DE_* patterns include additional checksum or context validation to reduce noise (for example, `DE_TAX_ID` and `DE_RESIDENCE_PERMIT_NUMBER`). + ## Backward-Compatible APIs The existing public API remains available. diff --git a/datafog/__init__.py b/datafog/__init__.py index e3974ad7..d29adb99 100644 --- a/datafog/__init__.py +++ b/datafog/__init__.py @@ -163,6 +163,7 @@ def scan( text: str, engine: str = "regex", entity_types: list[str] | None = None, + locales: list[str] | str | None = None, ) -> ScanResult: """ v5-preview scan entrypoint. @@ -170,7 +171,9 @@ def scan( Defaults to the lightweight regex engine so the core install works without optional dependency fallback warnings. """ - return _scan(text=text, engine=engine, entity_types=entity_types) + return _scan( + text=text, engine=engine, entity_types=entity_types, locales=locales + ) def redact( @@ -180,6 +183,7 @@ def redact( entity_types: list[str] | None = None, strategy: str = "token", preset: str | None = None, + locales: list[str] | str | None = None, ) -> RedactResult: """ v5-preview redaction entrypoint. @@ -201,6 +205,7 @@ def redact( text=text, engine=engine, entity_types=entity_types, + locales=locales, strategy=strategy, ) @@ -210,6 +215,7 @@ def protect( engine: str = "regex", strategy: str = "token", on_detect: str = "redact", + locales: list[str] | str | None = None, ): """ v5-preview guardrail factory. @@ -219,11 +225,12 @@ def protect( engine=engine, strategy=strategy, on_detect=on_detect, + locales=locales, ) # Simple API for core functionality (backward compatibility) -def detect(text: str) -> list: +def detect(text: str, locales: list[str] | str | None = None) -> list: """ Detect PII in text using regex patterns. @@ -240,16 +247,16 @@ def detect(text: str) -> list: """ _warn_v5_replacement("detect", "datafog.scan()") - return _detect_impl(text) + return _detect_impl(text, locales=locales) -def _detect_impl(text: str) -> list: +def _detect_impl(text: str, locales: list[str] | str | None = None) -> list: import time as _time _start = _time.monotonic() _lazy_import_regex_annotator() - annotator = RegexAnnotator() + annotator = RegexAnnotator(locales=locales) # Use the structured output to get proper positions _, result = annotator.annotate_with_spans(text) @@ -290,7 +297,12 @@ def _detect_impl(text: str) -> list: return entities -def process(text: str, anonymize: bool = False, method: str = "redact") -> dict: +def process( + text: str, + anonymize: bool = False, + method: str = "redact", + locales: list[str] | str | None = None, +) -> dict: """ Process text to detect and optionally anonymize PII. @@ -317,7 +329,7 @@ def process(text: str, anonymize: bool = False, method: str = "redact") -> dict: _start = _time.monotonic() - findings = _detect_impl(text) + findings = _detect_impl(text, locales=locales) result = {"original": text, "findings": findings} diff --git a/datafog/__init___lean.py b/datafog/__init___lean.py index 40a3f530..50f2c7ed 100644 --- a/datafog/__init___lean.py +++ b/datafog/__init___lean.py @@ -79,7 +79,7 @@ def _missing_dependency(*args, **kwargs): # Simple API for core functionality -def detect(text: str) -> list: +def detect(text: str, locales: list[str] | None = None) -> list: """ Detect PII in text using regex patterns. @@ -94,7 +94,7 @@ def detect(text: str) -> list: >>> detect("Contact john@example.com") [{'type': 'EMAIL', 'value': 'john@example.com', 'start': 8, 'end': 24}] """ - annotator = RegexAnnotator() + annotator = RegexAnnotator(locales=locales) result = annotator.annotate(text) # Convert to simple format @@ -113,7 +113,12 @@ def detect(text: str) -> list: return entities -def process(text: str, anonymize: bool = False, method: str = "redact") -> dict: +def process( + text: str, + anonymize: bool = False, + method: str = "redact", + locales: list[str] | None = None, +) -> dict: """ Process text to detect and optionally anonymize PII. @@ -134,7 +139,7 @@ def process(text: str, anonymize: bool = False, method: str = "redact") -> dict: 'findings': [{'type': 'EMAIL', 'value': 'john@example.com', ...}] } """ - findings = detect(text) + findings = detect(text, locales=locales) result = {"original": text, "findings": findings} diff --git a/datafog/agent.py b/datafog/agent.py index 58a84ed7..9bf88dac 100644 --- a/datafog/agent.py +++ b/datafog/agent.py @@ -6,7 +6,7 @@ from contextlib import contextmanager from dataclasses import dataclass from functools import wraps -from typing import Any, Callable, Iterator, Optional, TypeVar +from typing import Any, Callable, Iterable, Iterator, Optional, TypeVar from .engine import Entity, RedactResult, ScanResult, scan, scan_and_redact @@ -31,6 +31,7 @@ def scan(self, text: str) -> ScanResult: text=text, engine=self.guardrail.engine, entity_types=self.guardrail.entity_types, + locales=self.guardrail.locales, ) if result.entities: self.detections += len(result.entities) @@ -54,6 +55,7 @@ class Guardrail: engine: str = "smart" strategy: str = "token" on_detect: str = "redact" + locales: Optional[Iterable[str] | str] = None def __post_init__(self) -> None: if self.on_detect not in {"redact", "block", "warn"}: @@ -61,7 +63,12 @@ def __post_init__(self) -> None: def scan(self, text: str) -> ScanResult: """Scan a text value for entities.""" - return scan(text=text, engine=self.engine, entity_types=self.entity_types) + return scan( + text=text, + engine=self.engine, + entity_types=self.entity_types, + locales=self.locales, + ) def filter(self, text: str) -> RedactResult: """Scan then enforce configured behavior.""" @@ -69,6 +76,7 @@ def filter(self, text: str) -> RedactResult: text=text, engine=self.engine, entity_types=self.entity_types, + locales=self.locales, strategy=self.strategy, ) if not result.entities: @@ -140,6 +148,7 @@ def create_guardrail( engine: str = "smart", strategy: str = "token", on_detect: str = "redact", + locales: Optional[Iterable[str] | str] = None, ) -> Guardrail: """ Create a reusable guardrail object for wrapping LLM calls. @@ -149,6 +158,7 @@ def create_guardrail( engine=engine, strategy=strategy, on_detect=on_detect, + locales=locales, ) diff --git a/datafog/core.py b/datafog/core.py index f4e17850..633a5fd5 100644 --- a/datafog/core.py +++ b/datafog/core.py @@ -5,10 +5,11 @@ without requiring heavy dependencies like spaCy or PyTorch. """ -from typing import Dict, List, Union +from typing import Dict, Iterable, List, Optional, Union from datafog.engine import scan, scan_and_redact from datafog.models.anonymizer import AnonymizerType +from datafog.processing.text_processing.regex_annotator import RegexAnnotator # Engine types as constants REGEX_ENGINE = "regex" @@ -16,12 +17,15 @@ AUTO_ENGINE = "auto" -def detect_pii(text: str) -> Dict[str, List[str]]: +def detect_pii( + text: str, locales: Optional[Iterable[str] | str] = None +) -> Dict[str, List[str]]: """ Simple PII detection using lightweight regex engine. Args: text: Text to scan for PII + locales: Optional list of locale codes that enable locale-specific labels Returns: Dictionary mapping entity types to lists of detected values @@ -37,7 +41,7 @@ def detect_pii(text: str) -> Dict[str, List[str]]: try: # Use engine boundary for canonical scan behavior. - scan_result = scan(text=text, engine=REGEX_ENGINE) + scan_result = scan(text=text, engine=REGEX_ENGINE, locales=locales) pii_dict: Dict[str, List[str]] = {} for entity in scan_result.entities: if not entity.text.strip(): @@ -81,13 +85,18 @@ def detect_pii(text: str) -> Dict[str, List[str]]: ) from e -def anonymize_text(text: str, method: Union[str, AnonymizerType] = "redact") -> str: +def anonymize_text( + text: str, + method: Union[str, AnonymizerType] = "redact", + locales: Optional[Iterable[str] | str] = None, +) -> str: """ Simple text anonymization using lightweight regex engine. Args: text: Text to anonymize method: Anonymization method ('redact', 'replace', or 'hash') + locales: Optional list of locale codes that enable locale-specific labels Returns: Anonymized text string @@ -119,6 +128,7 @@ def anonymize_text(text: str, method: Union[str, AnonymizerType] = "redact") -> result = scan_and_redact( text=text, engine=REGEX_ENGINE, + locales=locales, strategy=strategy_map[method], ) @@ -155,7 +165,9 @@ def anonymize_text(text: str, method: Union[str, AnonymizerType] = "redact") -> def scan_text( - text: str, return_entities: bool = False + text: str, + return_entities: bool = False, + locales: Optional[Iterable[str] | str] = None, ) -> Union[bool, Dict[str, List[str]]]: """ Quick scan to check if text contains any PII. @@ -163,6 +175,7 @@ def scan_text( Args: text: Text to scan return_entities: If True, return detected entities; if False, return boolean + locales: Optional list of locale codes that enable locale-specific labels Returns: Boolean indicating PII presence, or dictionary of detected entities @@ -180,7 +193,7 @@ def scan_text( _start = _time.monotonic() - entities = detect_pii(text) + entities = detect_pii(text, locales=locales) result = entities if return_entities else len(entities) > 0 @@ -200,19 +213,21 @@ def scan_text( return result -def get_supported_entities() -> List[str]: +def get_supported_entities(locales: Optional[Iterable[str] | str] = None) -> List[str]: """ Get list of PII entity types supported by the regex engine. + Locale-specific labels (e.g., DE_*) are only included when locales include "de". + Returns: List of supported entity type names Example: >>> entities = get_supported_entities() >>> print(entities) - ['EMAIL', 'PHONE', 'SSN', 'CREDIT_CARD', 'IP_ADDRESS', 'DOB', 'ZIP'] + ['EMAIL', 'PHONE', 'SSN', 'CREDIT_CARD', 'IP_ADDRESS', 'DATE', 'ZIP_CODE'] """ - result = [ + base = [ "EMAIL", "PHONE", "SSN", @@ -222,6 +237,14 @@ def get_supported_entities() -> List[str]: "ZIP_CODE", ] + normalized_locales = RegexAnnotator._normalize_locales(locales) + locale_labels = [ + label + for locale in normalized_locales + for label in RegexAnnotator.LOCALE_LABELS.get(locale, []) + ] + result = base + locale_labels if locale_labels else base + try: from datafog.telemetry import track_function_call diff --git a/datafog/engine.py b/datafog/engine.py index 1a94e634..5c51e17b 100644 --- a/datafog/engine.py +++ b/datafog/engine.py @@ -6,7 +6,7 @@ import warnings from dataclasses import dataclass from functools import lru_cache -from typing import Optional +from typing import Iterable, Optional from .exceptions import EngineNotAvailable from .processing.text_processing.regex_annotator import RegexAnnotator @@ -31,6 +31,13 @@ "SSN", "CREDIT_CARD", "IP_ADDRESS", + "DE_VAT_ID", + "DE_IBAN", + "DE_TAX_ID", + "DE_SOCIAL_SECURITY_NUMBER", + "DE_POSTAL_CODE", + "DE_PASSPORT_NUMBER", + "DE_RESIDENCE_PERMIT_NUMBER", "DATE", "ZIP_CODE", "PERSON", @@ -131,8 +138,21 @@ def _entities_from_dict( return entities -def _regex_entities(text: str) -> list[Entity]: - annotator = RegexAnnotator() +def _normalize_regex_locales(locales: Optional[Iterable[str] | str]) -> tuple[str, ...]: + normalized = RegexAnnotator._normalize_locales(locales) + supported_locales = set(RegexAnnotator.LOCALE_LABELS) + return tuple(sorted(normalized & supported_locales)) + + +@lru_cache(maxsize=32) +def _get_regex_annotator(locales_key: tuple[str, ...]) -> RegexAnnotator: + return RegexAnnotator(locales=locales_key) + + +def _regex_entities( + text: str, locales: Optional[Iterable[str] | str] = None +) -> list[Entity]: + annotator = _get_regex_annotator(_normalize_regex_locales(locales)) _, structured = annotator.annotate_with_spans(text) entities: list[Entity] = [] for span in structured.spans: @@ -235,6 +255,7 @@ def scan( text: str, engine: str = "smart", entity_types: Optional[list[str]] = None, + locales: Optional[Iterable[str] | str] = None, ) -> ScanResult: """Scan text for PII entities.""" if not isinstance(text, str): @@ -243,7 +264,7 @@ def scan( if engine not in {"regex", "spacy", "gliner", "smart"}: raise ValueError("engine must be one of: regex, spacy, gliner, smart") - regex_entities = _regex_entities(text) + regex_entities = _regex_entities(text, locales=locales) if engine == "regex": filtered = _filter_entity_types(regex_entities, entity_types) @@ -378,7 +399,10 @@ def scan_and_redact( engine: str = "smart", entity_types: Optional[list[str]] = None, strategy: str = "token", + locales: Optional[Iterable[str] | str] = None, ) -> RedactResult: """Convenience wrapper: scan then redact.""" - scan_result = scan(text=text, engine=engine, entity_types=entity_types) + scan_result = scan( + text=text, engine=engine, entity_types=entity_types, locales=locales + ) return redact(text=text, entities=scan_result.entities, strategy=strategy) diff --git a/datafog/main.py b/datafog/main.py index 31ac22e5..c045cf0d 100644 --- a/datafog/main.py +++ b/datafog/main.py @@ -10,7 +10,7 @@ import json import logging -from typing import List +from typing import List, Optional from .config import OperationType from .engine import scan, scan_and_redact @@ -39,8 +39,10 @@ def __init__( operations: List[OperationType] = [OperationType.SCAN], hash_type: HashType = HashType.SHA256, anonymizer_type: AnonymizerType = AnonymizerType.REPLACE, + locales: Optional[List[str]] = None, ): - self.regex_annotator = RegexAnnotator() + self.regex_annotator = RegexAnnotator(locales=locales) + self.locales = locales normalized_ops: List[OperationType] = [] for op in operations: if isinstance(op, OperationType): @@ -181,7 +183,7 @@ def detect(self, text: str) -> dict: _start = _time.monotonic() - scan_result = scan(text=text, engine="regex") + scan_result = scan(text=text, engine="regex", locales=self.locales) result = {label: [] for label in RegexAnnotator.LABELS} legacy_map = {"DATE": "DOB", "ZIP_CODE": "ZIP"} for entity in scan_result.entities: @@ -245,6 +247,7 @@ def process( redact_result = scan_and_redact( text=text, engine="regex", + locales=self.locales, strategy=strategy, ) result["anonymized"] = redact_result.redacted_text @@ -288,8 +291,9 @@ class TextPIIAnnotator: regex_annotator: RegexAnnotator instance for text annotation. """ - def __init__(self): - self.regex_annotator = RegexAnnotator() + def __init__(self, locales: Optional[List[str]] = None): + self.regex_annotator = RegexAnnotator(locales=locales) + self.locales = locales def run(self, text, output_path=None): """ diff --git a/datafog/main_lean.py b/datafog/main_lean.py index af61559e..4a260ff9 100644 --- a/datafog/main_lean.py +++ b/datafog/main_lean.py @@ -10,7 +10,7 @@ import json import logging -from typing import List +from typing import List, Optional from .config import OperationType from .models.anonymizer import Anonymizer, AnonymizerType, HashType @@ -38,8 +38,10 @@ def __init__( operations: List[OperationType] = [OperationType.SCAN], hash_type: HashType = HashType.SHA256, anonymizer_type: AnonymizerType = AnonymizerType.REPLACE, + locales: Optional[List[str]] = None, ): - self.regex_annotator = RegexAnnotator() + self.regex_annotator = RegexAnnotator(locales=locales) + self.locales = locales self.operations: List[OperationType] = operations self.anonymizer = Anonymizer( hash_type=hash_type, anonymizer_type=anonymizer_type @@ -161,8 +163,9 @@ class TextPIIAnnotator: regex_annotator: RegexAnnotator instance for text annotation. """ - def __init__(self): - self.regex_annotator = RegexAnnotator() + def __init__(self, locales: Optional[List[str]] = None): + self.regex_annotator = RegexAnnotator(locales=locales) + self.locales = locales def run(self, text, output_path=None): """ diff --git a/datafog/processing/text_processing/regex_annotator/regex_annotator.py b/datafog/processing/text_processing/regex_annotator/regex_annotator.py index a843a8d8..464cc34d 100644 --- a/datafog/processing/text_processing/regex_annotator/regex_annotator.py +++ b/datafog/processing/text_processing/regex_annotator/regex_annotator.py @@ -1,5 +1,5 @@ import re -from typing import Dict, List, Pattern, Tuple +from typing import Callable, Dict, Iterable, List, Optional, Pattern, Set, Tuple from pydantic import BaseModel @@ -25,14 +25,85 @@ class RegexAnnotator: This annotator serves as a fallback to the SpaCy annotator and is optimized for performance, targeting ≤ 20 µs / kB on a MacBook M-series. + + Locale notes: + German-specific entity types (DE_*) are disabled by default. Enable them by + passing locales=["de"]. This avoids false positives on non-German text. """ # Labels for PII entities - LABELS = ["EMAIL", "PHONE", "SSN", "CREDIT_CARD", "IP_ADDRESS", "DOB", "ZIP"] + BASE_LABELS = [ + "EMAIL", + "PHONE", + "SSN", + "CREDIT_CARD", + "IP_ADDRESS", + "DOB", + "ZIP", + ] + + LOCALE_LABELS = { + "de": [ + "DE_VAT_ID", + "DE_IBAN", + "DE_TAX_ID", + "DE_SOCIAL_SECURITY_NUMBER", + "DE_POSTAL_CODE", + "DE_PASSPORT_NUMBER", + "DE_RESIDENCE_PERMIT_NUMBER", + ], + } + + LABELS = BASE_LABELS + [ + label for locale_labels in LOCALE_LABELS.values() for label in locale_labels + ] + + _DE_PASSPORT_PREFIXES = "CFGHJKLMNPRTVWXYZ" + _DE_RESIDENCE_CONTEXT_RE = re.compile( + r"\b(aufenthaltstitel|aufenthaltserlaubnis|aufenthaltskarte|residence permit|residence card)\b", + re.IGNORECASE, + ) + + def __init__(self, locales: Optional[Iterable[str] | str] = None): + self.locales = self._normalize_locales(locales) + self.active_labels = self._labels_for_locales(self.locales) - def __init__(self): # Compile all patterns once at initialization - self.patterns: Dict[str, Pattern] = { + self.patterns = self._compile_patterns() + self.validators = self._build_validators() + + @staticmethod + def _normalize_locales(locales: Optional[Iterable[str] | str]) -> Set[str]: + if locales is None: + return set() + if isinstance(locales, str): + values = [locales] + else: + values = list(locales) + normalized = { + value.strip().lower() + for value in values + if isinstance(value, str) and value.strip() + } + return normalized + + @classmethod + def labels_for_locales( + cls, locales: Optional[Iterable[str] | str] = None + ) -> List[str]: + normalized = cls._normalize_locales(locales) + return cls._labels_for_locales(normalized) + + @classmethod + def _labels_for_locales(cls, locales: Set[str]) -> List[str]: + labels = list(cls.BASE_LABELS) + for locale, locale_labels in cls.LOCALE_LABELS.items(): + if locale in locales: + labels.extend(locale_labels) + return labels + + def _compile_patterns(self) -> Dict[str, Pattern]: + patterns: Dict[str, Pattern] = { # Email pattern - RFC 5322 subset # Intentionally permissive to favor false positives over false negatives # Allows for multiple dots, special characters in local part, and subdomains @@ -177,10 +248,138 @@ def __init__(self): ), } + if "de" in self.locales: + patterns.update( + { + # German VAT ID (USt-IdNr) - DE followed by 9 digits + "DE_VAT_ID": re.compile( + r""" + (? Dict[str, Callable[[re.Match, str], bool]]: + validators: Dict[str, Callable[[re.Match, str], bool]] = {} + if "DE_TAX_ID" in self.active_labels: + validators["DE_TAX_ID"] = self._validate_de_tax_id + if "DE_RESIDENCE_PERMIT_NUMBER" in self.active_labels: + validators["DE_RESIDENCE_PERMIT_NUMBER"] = self._validate_de_residence_permit + return validators + + @staticmethod + def _digits_only(value: str) -> str: + return "".join(ch for ch in value if ch.isdigit()) + + @staticmethod + def _de_tax_id_check_digit(digits10: str) -> int: + product = 10 + for ch in digits10: + sum_ = (int(ch) + product) % 10 + if sum_ == 0: + sum_ = 10 + product = (sum_ * 2) % 11 + return (11 - product) % 10 + + def _validate_de_tax_id(self, match: re.Match, text: str) -> bool: + digits = self._digits_only(match.group()) + if len(digits) != 11: + return False + if digits[0] == "0": + return False + return digits[-1] == str(self._de_tax_id_check_digit(digits[:10])) + + def _validate_de_residence_permit(self, match: re.Match, text: str) -> bool: + window = 40 + start = max(match.start() - window, 0) + end = min(match.end() + window, len(text)) + context = text[start:end] + return bool(self._DE_RESIDENCE_CONTEXT_RE.search(context)) + @classmethod - def create(cls) -> "RegexAnnotator": + def create(cls, locales: Optional[Iterable[str]] = None) -> "RegexAnnotator": """Factory method to create a new RegexAnnotator instance.""" - return cls() + return cls(locales=locales) def annotate(self, text: str) -> Dict[str, List[str]]: """Annotate text with PII entities using regex patterns. @@ -199,7 +398,10 @@ def annotate(self, text: str) -> Dict[str, List[str]]: # Process with each pattern for label, pattern in self.patterns.items(): + validator = self.validators.get(label) for match in pattern.finditer(text): + if validator and not validator(match, text): + continue result[label].append(match.group()) return result @@ -224,7 +426,10 @@ def annotate_with_spans( return spans_by_label, AnnotationResult(text=text, spans=all_spans) for label, pattern in self.patterns.items(): + validator = self.validators.get(label) for match in pattern.finditer(text): + if validator and not validator(match, text): + continue span = Span( label=label, start=match.start(), diff --git a/datafog/services/text_service.py b/datafog/services/text_service.py index 0956256f..7ed4298d 100644 --- a/datafog/services/text_service.py +++ b/datafog/services/text_service.py @@ -7,7 +7,7 @@ import asyncio import warnings -from typing import TYPE_CHECKING, Dict, List, Union +from typing import TYPE_CHECKING, Dict, List, Optional, Union if TYPE_CHECKING: from datafog.processing.text_processing.regex_annotator.regex_annotator import Span @@ -43,6 +43,7 @@ def __init__( text_chunk_length: int = 1000, engine: str = "regex", gliner_model: str = "urchade/gliner_multi_pii-v1", + locales: Optional[List[str]] = None, ): """ Initialize the TextService with specified chunk length and annotation engine. @@ -56,6 +57,7 @@ def __init__( - "auto": Try RegexAnnotator first and fall back to SpacyPIIAnnotator if no entities found - "smart": Try RegexAnnotator → GLiNER → SpaCy cascade (requires nlp-advanced extra) gliner_model: GLiNER model name to use when engine is "gliner" or "smart" + locales: Optional list of locale codes that enable locale-specific regex labels Raises: AssertionError: If an invalid engine type is provided @@ -65,6 +67,7 @@ def __init__( self.engine = engine self.text_chunk_length = text_chunk_length self.gliner_model = gliner_model + self.locales = locales # Lazy initialization - annotators created only when needed self._regex_annotator = None @@ -90,6 +93,7 @@ def __init__( engine=engine, text_chunk_length=text_chunk_length, gliner_model=gliner_model if engine in ("gliner", "smart") else None, + locales=locales, ) except Exception: pass @@ -102,7 +106,7 @@ def regex_annotator(self): RegexAnnotator, ) - self._regex_annotator = RegexAnnotator() + self._regex_annotator = RegexAnnotator(locales=self.locales) return self._regex_annotator @property diff --git a/datafog/services/text_service_lean.py b/datafog/services/text_service_lean.py index ce9203ec..50d110cd 100644 --- a/datafog/services/text_service_lean.py +++ b/datafog/services/text_service_lean.py @@ -6,7 +6,7 @@ """ import asyncio -from typing import Dict, List, Union +from typing import Dict, List, Optional, Union from datafog.processing.text_processing.regex_annotator.regex_annotator import ( RegexAnnotator, @@ -26,7 +26,12 @@ class TextService: pip install datafog[nlp] """ - def __init__(self, text_chunk_length: int = 1000, engine: str = "regex"): + def __init__( + self, + text_chunk_length: int = 1000, + engine: str = "regex", + locales: Optional[List[str]] = None, + ): """ Initialize the TextService with specified chunk length and annotation engine. @@ -36,6 +41,7 @@ def __init__(self, text_chunk_length: int = 1000, engine: str = "regex"): - "regex": (Default) Use RegexAnnotator for fast pattern-based entity detection - "spacy": Use SpacyPIIAnnotator for NLP-based entity detection (requires nlp extra) - "auto": Try RegexAnnotator first and fall back to SpacyPIIAnnotator if no entities found + locales: Optional list of locale codes that enable locale-specific regex labels Raises: AssertionError: If an invalid engine type is provided @@ -43,8 +49,9 @@ def __init__(self, text_chunk_length: int = 1000, engine: str = "regex"): """ assert engine in {"regex", "spacy", "auto"}, "Invalid engine" self.engine = engine - self.regex_annotator = RegexAnnotator() + self.regex_annotator = RegexAnnotator(locales=locales) self.text_chunk_length = text_chunk_length + self.locales = locales # Only initialize spacy if needed and available self.spacy_annotator = None diff --git a/datafog/services/text_service_original.py b/datafog/services/text_service_original.py index 6d5dde1b..e8ea4ab3 100644 --- a/datafog/services/text_service_original.py +++ b/datafog/services/text_service_original.py @@ -4,7 +4,7 @@ """ import asyncio -from typing import Dict, List, Union +from typing import Dict, List, Optional, Union from datafog.processing.text_processing.regex_annotator.regex_annotator import ( RegexAnnotator, @@ -22,7 +22,12 @@ class TextService: and combining annotations from multiple chunks. """ - def __init__(self, text_chunk_length: int = 1000, engine: str = "auto"): + def __init__( + self, + text_chunk_length: int = 1000, + engine: str = "auto", + locales: Optional[List[str]] = None, + ): """ Initialize the TextService with specified chunk length and annotation engine. @@ -32,6 +37,7 @@ def __init__(self, text_chunk_length: int = 1000, engine: str = "auto"): - "regex": Use only the RegexAnnotator for pattern-based entity detection - "spacy": Use only the SpacyPIIAnnotator for NLP-based entity detection - "auto": (Default) Try RegexAnnotator first and fall back to SpacyPIIAnnotator if no entities are found + locales: Optional list of locale codes that enable locale-specific regex labels Raises: AssertionError: If an invalid engine type is provided @@ -39,8 +45,9 @@ def __init__(self, text_chunk_length: int = 1000, engine: str = "auto"): assert engine in {"regex", "spacy", "auto"}, "Invalid engine" self.engine = engine self.spacy_annotator = SpacyPIIAnnotator.create() - self.regex_annotator = RegexAnnotator() + self.regex_annotator = RegexAnnotator(locales=locales) self.text_chunk_length = text_chunk_length + self.locales = locales def _chunk_text(self, text: str) -> List[str]: """Split the text into chunks of specified length.""" diff --git a/tests/corpus/structured_pii.json b/tests/corpus/structured_pii.json index 672e7483..1b169f33 100644 --- a/tests/corpus/structured_pii.json +++ b/tests/corpus/structured_pii.json @@ -733,5 +733,96 @@ "end": 5 } ] + }, + { + "id": "de-vat-id-simple", + "locales": ["de"], + "input": "VAT number: DE123456789 for invoices.", + "expected_entities": [ + { + "type": "DE_VAT_ID", + "text": "DE123456789", + "start": 12, + "end": 23 + } + ] + }, + { + "id": "de-iban-formatted", + "locales": ["de"], + "input": "IBAN: DE89 3704 0044 0532 0130 00 for payments.", + "expected_entities": [ + { + "type": "DE_IBAN", + "text": "DE89 3704 0044 0532 0130 00", + "start": 6, + "end": 33 + } + ] + }, + { + "id": "de-tax-id-simple", + "locales": ["de"], + "input": "Steuer-ID 12345678903 liegt vor.", + "expected_entities": [ + { + "type": "DE_TAX_ID", + "text": "12345678903", + "start": 10, + "end": 21 + } + ] + }, + { + "id": "de-social-security-number", + "locales": ["de"], + "input": "Rentenversicherungsnummer 65150804A123 liegt vor.", + "expected_entities": [ + { + "type": "DE_SOCIAL_SECURITY_NUMBER", + "text": "65150804A123", + "start": 26, + "end": 38 + } + ] + }, + { + "id": "de-postal-code-prefixed", + "locales": ["de"], + "input": "PLZ10115 Berlin.", + "expected_entities": [ + { + "type": "DE_POSTAL_CODE", + "text": "PLZ10115", + "start": 0, + "end": 8 + } + ] + }, + { + "id": "de-passport-number", + "locales": ["de"], + "input": "Passnummer C12345678 wurde geprueft.", + "expected_entities": [ + { + "type": "DE_PASSPORT_NUMBER", + "text": "C12345678", + "start": 11, + "end": 20 + } + ] + }, + { + "id": "de-residence-permit-number", + "locales": ["de"], + "input": "Aufenthaltstitel AT1234567 gueltig.", + "expected_entities": [ + { + "type": "DE_RESIDENCE_PERMIT_NUMBER", + "text": "AT1234567", + "start": 17, + "end": 26 + } + ] } ] diff --git a/tests/test_agent_api.py b/tests/test_agent_api.py index ff72e9fa..cba4f128 100644 --- a/tests/test_agent_api.py +++ b/tests/test_agent_api.py @@ -67,6 +67,16 @@ def test_create_guardrail_warn_mode_warns_and_returns_original() -> None: assert result.mapping == {} +def test_create_guardrail_with_locales_enables_de_patterns() -> None: + guard = datafog.create_guardrail(engine="regex", locales=["de"]) + + result = guard.filter("Steuer-ID 12 345 678 903 liegt vor.") + + assert result.redacted_text != "Steuer-ID 12 345 678 903 liegt vor." + assert any(entity.type == "DE_TAX_ID" for entity in result.entities) + assert "[DE_TAX_ID_1]" in result.redacted_text + + def test_guardrail_watch_context_manager_tracks_activity() -> None: guard = datafog.create_guardrail(engine="regex") diff --git a/tests/test_de_pii_regex.py b/tests/test_de_pii_regex.py new file mode 100644 index 00000000..4df16639 --- /dev/null +++ b/tests/test_de_pii_regex.py @@ -0,0 +1,125 @@ +import pytest + +from datafog.processing.text_processing.regex_annotator import RegexAnnotator + + +VALID_DE_TAX_ID = "12345678903" +VALID_DE_TAX_ID_SPACED = "12 345 678 903" +INVALID_DE_TAX_ID = ( + VALID_DE_TAX_ID[:-1] + + str((int(VALID_DE_TAX_ID[-1]) + 1) % 10) +) + + +@pytest.mark.parametrize( + "label,text,expected", + [ + ( + "DE_VAT_ID", + "USt-IdNr DE 123456789 ist gesetzt.", + "DE 123456789", + ), + ( + "DE_VAT_ID", + "USt-IdNr DE-123456789 liegt vor.", + "DE-123456789", + ), + ( + "DE_IBAN", + "IBAN DE44500105175407324931 ist gueltig.", + "DE44500105175407324931", + ), + ( + "DE_IBAN", + "IBAN DE44 5001 0517 5407 3249 31 ist gueltig.", + "DE44 5001 0517 5407 3249 31", + ), + ( + "DE_TAX_ID", + f"Steuer-ID {VALID_DE_TAX_ID} liegt vor.", + VALID_DE_TAX_ID, + ), + ( + "DE_TAX_ID", + f"Steuer-ID {VALID_DE_TAX_ID_SPACED} ist gesetzt.", + VALID_DE_TAX_ID_SPACED, + ), + ( + "DE_SOCIAL_SECURITY_NUMBER", + "Rentenversicherungsnummer 65150804A123 liegt vor.", + "65150804A123", + ), + ( + "DE_SOCIAL_SECURITY_NUMBER", + "Rentenversicherungsnummer 65 150804 A123 liegt vor.", + "65 150804 A123", + ), + ( + "DE_POSTAL_CODE", + "PLZ10115 Berlin.", + "PLZ10115", + ), + ( + "DE_POSTAL_CODE", + "PLZ 10115 Berlin.", + "PLZ 10115", + ), + ( + "DE_PASSPORT_NUMBER", + "Passnummer C12345678 wurde geprueft.", + "C12345678", + ), + ( + "DE_RESIDENCE_PERMIT_NUMBER", + "Aufenthaltstitel AT1234567 gueltig.", + "AT1234567", + ), + ], +) +def test_de_regex_positive_cases(label: str, text: str, expected: str) -> None: + annotator = RegexAnnotator(locales=["de"]) + result = annotator.annotate(text) + assert expected in result[label] + + +@pytest.mark.parametrize( + "label,text", + [ + ("DE_VAT_ID", "USt-IdNr DE12345678 liegt vor."), + ("DE_VAT_ID", "USt-IdNr DE1234567890 liegt vor."), + ("DE_IBAN", "IBAN DE4450010517540732493 ist gueltig."), + ("DE_IBAN", "IBAN DE44 5001 0517 5407 3249 3X ist gueltig."), + ("DE_TAX_ID", "Steuer-ID 1234567890 liegt vor."), + ("DE_TAX_ID", "Steuer-ID 123456789012 liegt vor."), + ("DE_TAX_ID", f"Steuer-ID {INVALID_DE_TAX_ID} liegt vor."), + ("DE_TAX_ID", "Steuer-ID 12345678901 liegt vor."), + ("DE_TAX_ID", "Steuer-ID 01234567890 liegt vor."), + ( + "DE_SOCIAL_SECURITY_NUMBER", + "Rentenversicherungsnummer 65150804123 liegt vor.", + ), + ( + "DE_SOCIAL_SECURITY_NUMBER", + "Rentenversicherungsnummer 65150804AA23 liegt vor.", + ), + ("DE_POSTAL_CODE", "10115 Berlin."), + ("DE_POSTAL_CODE", "D12345"), + ("DE_POSTAL_CODE", "DE12345"), + ("DE_POSTAL_CODE", "DE10115 Berlin."), + ("DE_POSTAL_CODE", "D10115 Berlin."), + ("DE_PASSPORT_NUMBER", "Passnummer 12345678 wurde geprueft."), + ("DE_PASSPORT_NUMBER", "Bestellung A12345678 liegt vor."), + ( + "DE_RESIDENCE_PERMIT_NUMBER", + "Aufenthaltstitel AT12345678 gueltig.", + ), + ( + "DE_RESIDENCE_PERMIT_NUMBER", + "AT1234567 ohne Kontext.", + ), + ], +) +def test_de_regex_negative_cases(label: str, text: str) -> None: + annotator = RegexAnnotator(locales=["de"]) + result = annotator.annotate(text) + assert not result[label] diff --git a/tests/test_detection_accuracy.py b/tests/test_detection_accuracy.py index 852a7937..1f6fac01 100644 --- a/tests/test_detection_accuracy.py +++ b/tests/test_detection_accuracy.py @@ -22,6 +22,13 @@ "SSN", "CREDIT_CARD", "IP_ADDRESS", + "DE_VAT_ID", + "DE_IBAN", + "DE_TAX_ID", + "DE_SOCIAL_SECURITY_NUMBER", + "DE_POSTAL_CODE", + "DE_PASSPORT_NUMBER", + "DE_RESIDENCE_PERMIT_NUMBER", "DATE", "ZIP_CODE", } @@ -278,9 +285,12 @@ def _canon_type(entity_type: str) -> str: return TYPE_ALIASES.get(raw, raw) -def _extract_entities(text: str, engine: str) -> list[dict[str, Any]]: +def _extract_entities( + text: str, engine: str, locales: Iterable[str] | str | None = None +) -> list[dict[str, Any]]: try: - result = scan(text=text, engine=engine) + locale_values = [locales] if isinstance(locales, str) else list(locales or []) + result = scan(text=text, engine=engine, locales=locale_values) except (ImportError, EngineNotAvailable) as exc: pytest.skip(f"{engine} engine unavailable in this environment: {exc}") @@ -345,7 +355,7 @@ def _assert_expected_found( case: dict[str, Any], engine: str, corpus_kind: str ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: text = case["input"] - actual = _extract_entities(text, engine) + actual = _extract_entities(text, engine, case.get("locales")) expected = _required_expected(case["expected_entities"], engine, corpus_kind) for exp in expected: @@ -401,7 +411,7 @@ def _compute_metrics( for engine in engines: for corpus_kind, cases in corpora: for case in cases: - actual = _extract_entities(case["input"], engine) + actual = _extract_entities(case["input"], engine, case.get("locales")) expected = _required_expected( case["expected_entities"], engine, corpus_kind ) @@ -488,7 +498,7 @@ def test_structured_pii_detection_slow(case: dict[str, Any], engine: str) -> Non @pytest.mark.parametrize("engine", FAST_ENGINES) def test_negative_cases_fast(case: dict[str, Any], engine: str) -> None: _xfail_if_known_limitation(case, engine, "negative") - actual = _extract_entities(case["input"], engine) + actual = _extract_entities(case["input"], engine, case.get("locales")) assert not actual, f"{case['id']} ({engine}) false positives: {actual}" @@ -499,7 +509,7 @@ def test_negative_cases_fast(case: dict[str, Any], engine: str) -> None: @pytest.mark.parametrize("engine", SLOW_ENGINES) def test_negative_cases_slow(case: dict[str, Any], engine: str) -> None: _xfail_if_known_limitation(case, engine, "negative") - actual = _extract_entities(case["input"], engine) + actual = _extract_entities(case["input"], engine, case.get("locales")) assert not actual, f"{case['id']} ({engine}) false positives: {actual}" diff --git a/tests/test_no_network_core.py b/tests/test_no_network_core.py index 905984f4..fa17393c 100644 --- a/tests/test_no_network_core.py +++ b/tests/test_no_network_core.py @@ -43,6 +43,28 @@ def blocked(*_args, **_kwargs): ) +def test_redact_positional_strategy_remains_compatible() -> None: + import datafog + from datafog.engine import scan_and_redact + + public_result = datafog.redact( + "Email jane@example.com", + None, + "regex", + None, + "mask", + ) + engine_result = scan_and_redact( + "Email jane@example.com", + "regex", + None, + "mask", + ) + + assert public_result.redacted_text == engine_result.redacted_text + assert public_result.redacted_text != "Email jane@example.com" + + def test_core_defaults_do_not_initialize_optional_engines(monkeypatch) -> None: import datafog import datafog.engine as engine diff --git a/tests/test_regex_annotator.py b/tests/test_regex_annotator.py index 5916bfae..600d80e6 100644 --- a/tests/test_regex_annotator.py +++ b/tests/test_regex_annotator.py @@ -40,9 +40,23 @@ def test_regex_annotator_initialization(): """Test that the RegexAnnotator can be initialized.""" annotator = RegexAnnotator() assert annotator is not None - assert ( - len(annotator.LABELS) == 7 - ) # EMAIL, PHONE, SSN, CREDIT_CARD, IP_ADDRESS, DOB, ZIP + required_labels = { + "EMAIL", + "PHONE", + "SSN", + "CREDIT_CARD", + "IP_ADDRESS", + "DOB", + "ZIP", + "DE_VAT_ID", + "DE_IBAN", + "DE_TAX_ID", + "DE_SOCIAL_SECURITY_NUMBER", + "DE_POSTAL_CODE", + "DE_PASSPORT_NUMBER", + "DE_RESIDENCE_PERMIT_NUMBER", + } + assert required_labels.issubset(set(annotator.LABELS)) def test_regex_annotator_create_method(): @@ -52,6 +66,20 @@ def test_regex_annotator_create_method(): assert isinstance(annotator, RegexAnnotator) +def test_de_labels_inactive_without_locale(): + """German DE_ labels should be inactive unless locales include 'de'.""" + annotator = RegexAnnotator() + result = annotator.annotate("Passnummer C12345678 wurde geprueft.") + assert not result["DE_PASSPORT_NUMBER"] + + +def test_de_labels_active_with_locale(): + """German DE_ labels should activate when locales include 'de'.""" + annotator = RegexAnnotator(locales=["de"]) + result = annotator.annotate("Passnummer C12345678 wurde geprueft.") + assert "C12345678" in result["DE_PASSPORT_NUMBER"] + + def test_empty_text_annotation(): """Test that annotating empty text returns empty results.""" annotator = RegexAnnotator()