diff --git a/docs/dynamic-model-generation.md b/docs/dynamic-model-generation.md new file mode 100644 index 000000000..d172e68d0 --- /dev/null +++ b/docs/dynamic-model-generation.md @@ -0,0 +1,301 @@ +# Dynamic Model Generation + +Generate real Python model classes from JSON Schema or OpenAPI at runtime without writing files. + +## Overview + +While `generate()` produces source code as strings, `generate_dynamic_models()` creates actual Python classes that you can use immediately for validation and data processing. This is useful for: + +- Runtime schema validation without code generation step +- Dynamic API clients that adapt to schema changes +- Testing and prototyping +- Plugin systems with dynamic schemas + +## Quick Start + +```python +from datamodel_code_generator import generate_dynamic_models + +schema = { + "type": "object", + "properties": { + "name": {"type": "string"}, + "age": {"type": "integer"} + }, + "required": ["name"] +} + +models = generate_dynamic_models(schema) +User = models["Model"] + +# Use the model for validation +user = User(name="Alice", age=30) +print(user.model_dump()) # {'name': 'Alice', 'age': 30} + +# Validation errors are raised +try: + User(age="not a number") # Missing required 'name', wrong type for 'age' +except Exception as e: + print(e) +``` + +## API Reference + +### `generate_dynamic_models()` + +```python +def generate_dynamic_models( + input_: Mapping[str, Any], + *, + config: GenerateConfig | None = None, + cache_size: int = 128, +) -> dict[str, type]: +``` + +**Parameters:** + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `input_` | `Mapping[str, Any]` | required | JSON Schema or OpenAPI schema as dict | +| `config` | `GenerateConfig \| None` | `None` | Generation options (same as `generate()`) | +| `cache_size` | `int` | `128` | Maximum cached schemas. Set to `0` to disable | + +**Returns:** `dict[str, type]` - Dictionary mapping class names to model classes. + +### `clear_dynamic_models_cache()` + +```python +def clear_dynamic_models_cache() -> int: +``` + +Clears the internal cache and returns the number of entries cleared. + +## Examples + +### JSON Schema with Nested Models + +```python +from datamodel_code_generator import generate_dynamic_models + +schema = { + "$defs": { + "Address": { + "type": "object", + "properties": { + "street": {"type": "string"}, + "city": {"type": "string"} + }, + "required": ["street", "city"] + }, + "Person": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "address": {"$ref": "#/$defs/Address"} + }, + "required": ["name"] + } + }, + "$ref": "#/$defs/Person" +} + +models = generate_dynamic_models(schema) + +# Both models are available +Person = models["Person"] +Address = models["Address"] + +person = Person( + name="Bob", + address={"street": "123 Main St", "city": "NYC"} +) +print(person.model_dump()) +# {'name': 'Bob', 'address': {'street': '123 Main St', 'city': 'NYC'}} +``` + +### OpenAPI Schema + +OpenAPI schemas are auto-detected: + +```python +from datamodel_code_generator import generate_dynamic_models + +openapi_schema = { + "openapi": "3.0.0", + "info": {"title": "User API", "version": "1.0.0"}, + "paths": {}, + "components": { + "schemas": { + "User": { + "type": "object", + "properties": { + "id": {"type": "integer"}, + "email": {"type": "string", "format": "email"} + }, + "required": ["id", "email"] + } + } + } +} + +models = generate_dynamic_models(openapi_schema) +User = models["User"] + +user = User(id=1, email="alice@example.com") +``` + +### With Custom Configuration + +```python +from datamodel_code_generator import generate_dynamic_models, GenerateConfig, DataModelType + +schema = {"type": "object", "properties": {"name": {"type": "string"}}} + +config = GenerateConfig( + class_name="Customer", + output_model_type=DataModelType.PydanticV2BaseModel, +) + +models = generate_dynamic_models(schema, config=config) +Customer = models["Customer"] +``` + +### Enum Models + +```python +from datamodel_code_generator import generate_dynamic_models + +schema = { + "type": "object", + "properties": { + "status": { + "type": "string", + "enum": ["pending", "approved", "rejected"] + } + } +} + +models = generate_dynamic_models(schema) +Model = models["Model"] +Status = models["Status"] + +# Enum validation +item = Model(status="approved") +print(item.status) # Status.approved +print(item.status.value) # 'approved' + +# Invalid enum value raises error +try: + Model(status="invalid") +except Exception as e: + print(e) +``` + +### Circular References + +```python +from datamodel_code_generator import generate_dynamic_models + +schema = { + "$defs": { + "Node": { + "type": "object", + "properties": { + "value": {"type": "string"}, + "children": { + "type": "array", + "items": {"$ref": "#/$defs/Node"} + } + } + } + }, + "$ref": "#/$defs/Node" +} + +models = generate_dynamic_models(schema) +Node = models["Node"] + +tree = Node( + value="root", + children=[ + Node(value="child1", children=[]), + Node(value="child2", children=[ + Node(value="grandchild", children=[]) + ]) + ] +) +``` + +## Caching + +Models are cached by schema content and configuration to avoid regeneration: + +```python +from datamodel_code_generator import generate_dynamic_models, clear_dynamic_models_cache + +schema = {"type": "object", "properties": {"x": {"type": "integer"}}} + +# First call generates models +models1 = generate_dynamic_models(schema) + +# Second call returns cached models (same object) +models2 = generate_dynamic_models(schema) +assert models1 is models2 # True + +# Disable caching for specific call +models3 = generate_dynamic_models(schema, cache_size=0) +assert models1 is not models3 # True + +# Clear all cached models +cleared = clear_dynamic_models_cache() +print(f"Cleared {cleared} cached schemas") +``` + +## Thread Safety + +`generate_dynamic_models()` is thread-safe. Multiple threads can safely call it concurrently: + +```python +import threading +from datamodel_code_generator import generate_dynamic_models + +schema = {"type": "object", "properties": {"x": {"type": "integer"}}} +results = [] + +def worker(): + models = generate_dynamic_models(schema) + results.append(models) + +threads = [threading.Thread(target=worker) for _ in range(10)] +for t in threads: + t.start() +for t in threads: + t.join() + +# All threads get the same cached models +assert all(r is results[0] for r in results) +``` + +## Limitations + +| Limitation | Details | +|------------|---------| +| Pydantic v2 only | Pydantic v1 is not supported | +| Not pickle-able | Use `model_dump()` to serialize instances | +| Dict input only | Schema must be a `dict`, not a file path or string | + +## Comparison with `generate()` + +| Feature | `generate()` | `generate_dynamic_models()` | +|---------|-------------|----------------------------| +| Output | Source code string | Actual Python classes | +| Use case | Code generation, file output | Runtime validation | +| Caching | No | Yes (configurable) | +| Thread-safe | Yes | Yes | +| Pydantic v1 | Yes | No | + +## See Also + +- [Using as Module](using_as_module.md) - `generate()` function reference +- [JSON Schema](jsonschema.md) - JSON Schema examples +- [OpenAPI](openapi.md) - OpenAPI examples diff --git a/docs/using_as_module.md b/docs/using_as_module.md index 903933f20..cdfa104d7 100644 --- a/docs/using_as_module.md +++ b/docs/using_as_module.md @@ -251,5 +251,6 @@ class Model(BaseModel): ## 📖 See Also -- 🖥️ [CLI Reference](cli-reference/index.md) - Complete CLI options (same parameters as module) -- 📋 [Generate from JSON Schema](jsonschema.md) - JSON Schema examples +- [Dynamic Model Generation](dynamic-model-generation.md) - Generate Python classes at runtime +- [CLI Reference](cli-reference/index.md) - Complete CLI options (same parameters as module) +- [Generate from JSON Schema](jsonschema.md) - JSON Schema examples diff --git a/pyproject.toml b/pyproject.toml index bd9548174..d2b219503 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -184,6 +184,7 @@ lint.per-file-ignores."scripts/*.py" = [ lint.per-file-ignores."tests/**/*.py" = [ "FBT", # don't care about booleans as positional arguments in tests "INP001", # no implicit namespace + "N806", # variable names for model classes can be uppercase "PLC0415", # local imports in tests are fine "PLC2701", # private import is fine "PLR0904", # too many public methods in test classes is fine diff --git a/src/datamodel_code_generator/__init__.py b/src/datamodel_code_generator/__init__.py index 5f51fad18..1c84597f0 100644 --- a/src/datamodel_code_generator/__init__.py +++ b/src/datamodel_code_generator/__init__.py @@ -258,7 +258,7 @@ def chdir(path: Path | None) -> Iterator[None]: os.chdir(prev_cwd) -def is_openapi(data: dict) -> bool: +def is_openapi(data: Mapping[str, Any]) -> bool: """Check if the data dict is an OpenAPI specification.""" return "openapi" in data @@ -927,6 +927,23 @@ def infer_input_type(text: str) -> InputFileType: "`--input-file-type` option." ) + +_LAZY_IMPORTS = { + "clear_dynamic_models_cache": "datamodel_code_generator.dynamic", + "generate_dynamic_models": "datamodel_code_generator.dynamic", +} + + +def __getattr__(name: str) -> Any: + if name in _LAZY_IMPORTS: + import importlib # noqa: PLC0415 + + module = importlib.import_module(_LAZY_IMPORTS[name]) + return getattr(module, name) + msg = f"module {__name__!r} has no attribute {name!r}" + raise AttributeError(msg) + + __all__ = [ "DEFAULT_FORMATTERS", "DEFAULT_SHARED_MODULE_NAME", @@ -959,5 +976,7 @@ def infer_input_type(text: str) -> InputFileType: "ReuseScope", "SchemaParseError", "TargetPydanticVersion", + "clear_dynamic_models_cache", # noqa: F822 "generate", + "generate_dynamic_models", # noqa: F822 ] diff --git a/src/datamodel_code_generator/dynamic.py b/src/datamodel_code_generator/dynamic.py new file mode 100644 index 000000000..28b5378bc --- /dev/null +++ b/src/datamodel_code_generator/dynamic.py @@ -0,0 +1,261 @@ +"""Dynamic model generation module for datamodel-code-generator. + +This module provides runtime generation of Pydantic v2 models from JSON Schema +or OpenAPI schemas. +""" + +from __future__ import annotations + +import ast +import builtins +import itertools +import json +import sys +import threading +import types +from enum import Enum +from pathlib import PurePath +from typing import TYPE_CHECKING, Any + +import pydantic +from pydantic import BaseModel + +from datamodel_code_generator import Error, generate, is_openapi +from datamodel_code_generator.config import GenerateConfig +from datamodel_code_generator.enums import DataModelType, InputFileType +from datamodel_code_generator.model.pydantic_v2 import UnionMode +from datamodel_code_generator.parser._graph import stable_toposort +from datamodel_code_generator.types import StrictTypes + +if TYPE_CHECKING: + from collections.abc import Mapping + +_dynamic_models_cache: dict[str, dict[str, type]] = {} +_dynamic_models_lock = threading.Lock() +_dynamic_module_counter = itertools.count(1) + + +def _is_init_file(path_tuple: tuple[str, ...]) -> bool: + """Check if path tuple represents an __init__.py file.""" + return PurePath(path_tuple[-1]).stem == "__init__" + + +def _path_to_module_name(package_name: str, path_tuple: tuple[str, ...]) -> str: + """Convert path tuple to module name.""" + parts = [package_name, *path_tuple[:-1]] + stem = PurePath(path_tuple[-1]).stem + if stem != "__init__": + parts.append(stem) + return ".".join(parts) + + +def _execute_single_module(code: str) -> dict[str, type]: + """Execute single module code and extract models.""" + namespace: dict[str, Any] = {"__builtins__": builtins.__dict__} + exec(code, namespace) # noqa: S102 + + models = _extract_models(namespace) + + for obj in models.values(): + if issubclass(obj, BaseModel) and hasattr(obj, "__pydantic_generic_metadata__"): + obj.model_rebuild(_types_namespace=namespace) + + return models + + +def _get_relative_imports(code: str) -> set[str]: + """Extract relative import module names from code using AST.""" + imports: set[str] = set() + tree = ast.parse(code) + for node in ast.walk(tree): + if isinstance(node, ast.ImportFrom) and node.level == 1: + if node.module: + imports.add(node.module.split(".")[0]) + else: + imports.update(alias.name for alias in node.names) + return imports + + +def _build_module_edges(modules: dict[tuple[str, ...], str]) -> dict[tuple[str, ...], set[tuple[str, ...]]]: + """Build dependency edges for topological sort. + + Returns edges where edges[u] contains v means u must come before v. + """ + name_to_path: dict[str, tuple[str, ...]] = {} + for path in modules: + if (filepath := PurePath(path[-1])).suffix == ".py" and (name := filepath.stem) != "__init__": + name_to_path[name] = path + + edges: dict[tuple[str, ...], set[tuple[str, ...]]] = {path: set() for path in modules} + for path, code in modules.items(): + for imported in _get_relative_imports(code): + if dep_path := name_to_path.get(imported): + edges[dep_path].add(path) + return edges + + +def _execute_multi_module(modules: dict[tuple[str, ...], str]) -> dict[str, type]: + """Execute multiple modules and extract models.""" + package_name = f"_dcg_dynamic_{next(_dynamic_module_counter)}" + + created_modules: list[str] = [] + all_namespaces: dict[str, dict[str, Any]] = {} + + try: + nodes = list(modules.keys()) + nodes.sort(key=lambda p: (_is_init_file(p), p)) + node_index = {node: i for i, node in enumerate(nodes)} + edges = _build_module_edges(modules) + sorted_paths = stable_toposort(nodes, edges, key=node_index.__getitem__) + + for path_tuple in sorted_paths: + module_name = _path_to_module_name(package_name, path_tuple) + module = types.ModuleType(module_name) + module.__dict__["__builtins__"] = builtins.__dict__ + module.__package__ = package_name if _is_init_file(path_tuple) else ".".join(module_name.split(".")[:-1]) + sys.modules[module_name] = module + created_modules.append(module_name) + all_namespaces[module_name] = module.__dict__ + + if package_name not in sys.modules: + pkg = types.ModuleType(package_name) + pkg.__path__ = [] + pkg.__package__ = package_name + sys.modules[package_name] = pkg + created_modules.insert(0, package_name) + + for path_tuple in sorted_paths: + module_name = _path_to_module_name(package_name, path_tuple) + exec(modules[path_tuple], all_namespaces[module_name]) # noqa: S102 + + models: dict[str, type] = {} + combined_namespace: dict[str, Any] = {} + for ns in all_namespaces.values(): + combined_namespace.update(ns) + models.update(_extract_models(ns)) + + for obj in models.values(): + if issubclass(obj, BaseModel) and hasattr(obj, "__pydantic_generic_metadata__"): + obj.model_rebuild(_types_namespace=combined_namespace) + + return models + finally: + for module_name in reversed(created_modules): + sys.modules.pop(module_name, None) + + +def _extract_models(namespace: dict[str, Any]) -> dict[str, type]: + """Extract model and enum classes from namespace.""" + return { + k: v + for k, v in namespace.items() + if isinstance(v, type) + and not k.startswith("_") + and ((issubclass(v, BaseModel) and v is not BaseModel) or (issubclass(v, Enum) and v is not Enum)) + } + + +def _make_cache_key(schema: Mapping[str, Any], config: GenerateConfig) -> str | None: + """Create cache key from schema and config. + + Returns None if the schema is not JSON-serializable. + """ + try: + key_data = {"schema": dict(schema), "config": config.model_dump(mode="json", exclude_defaults=True)} + return json.dumps(key_data, sort_keys=True, separators=(",", ":")) + except (TypeError, ValueError): + return None + + +def generate_dynamic_models( + input_: Mapping[str, Any], + *, + config: GenerateConfig | None = None, + cache_size: int = 128, +) -> dict[str, type]: + """Generate actual Python model classes from schema at runtime. + + This function creates real Python classes from JSON Schema or OpenAPI schemas + using Pydantic's model creation. The generated models can be used immediately + for validation and data processing. + + Args: + input_: JSON Schema or OpenAPI schema as dict. + config: A GenerateConfig object with generation options. If None, uses defaults. + cache_size: Maximum number of schemas to cache. Set to 0 to disable caching. + + Returns: + Dictionary mapping class names to model classes. + + Note: + - Thread-safe (uses internal lock and cache) + - Pydantic v2 only (v1 is not supported) + - Not pickle-able (use model_dump() to serialize instances) + - Cached by schema + config hash with FIFO eviction when cache_size is exceeded + - Supports both single-module and multi-module output + + Example: + >>> schema = { + ... "type": "object", + ... "properties": {"name": {"type": "string"}, "age": {"type": "integer"}}, + ... "required": ["name"], + ... } + >>> models = generate_dynamic_models(schema) + >>> User = models["Model"] + >>> user = User(name="John", age=30) + >>> user.model_dump() + {'name': 'John', 'age': 30} + """ + if pydantic.VERSION < "2.0.0": # pragma: no cover + msg = f"generate_dynamic_models requires Pydantic v2, found v{pydantic.VERSION}" + raise Error(msg) + + GenerateConfig.model_rebuild(_types_namespace={"StrictTypes": StrictTypes, "UnionMode": UnionMode}) + + if config is None: + if is_openapi(input_): + config = GenerateConfig( + input_file_type=InputFileType.OpenAPI, + output_model_type=DataModelType.PydanticV2BaseModel, + ) + else: + config = GenerateConfig( + input_file_type=InputFileType.JsonSchema, + output_model_type=DataModelType.PydanticV2BaseModel, + ) + elif config.input_file_type == InputFileType.Auto: + detected_type = InputFileType.OpenAPI if is_openapi(input_) else InputFileType.JsonSchema + config = config.model_copy(update={"input_file_type": detected_type}) + + cache_key = _make_cache_key(input_, config) + use_cache = cache_size > 0 and cache_key is not None + + with _dynamic_models_lock: + if use_cache and cache_key in _dynamic_models_cache: + return _dynamic_models_cache[cache_key] + + result = generate(input_=input_, config=config) + if result is None: # pragma: no cover + msg = "generate() returned None" + raise Error(msg) + models = _execute_single_module(result) if isinstance(result, str) else _execute_multi_module(result) + + if use_cache: + while len(_dynamic_models_cache) >= cache_size: + oldest_key = next(iter(_dynamic_models_cache)) + del _dynamic_models_cache[oldest_key] + _dynamic_models_cache[cache_key] = models # type: ignore[index] + + return models + + +def clear_dynamic_models_cache() -> int: + """Clear the dynamic models cache. + + Returns: + Number of cached entries that were cleared. + """ + with _dynamic_models_lock: + count = len(_dynamic_models_cache) + _dynamic_models_cache.clear() + return count diff --git a/src/datamodel_code_generator/types.py b/src/datamodel_code_generator/types.py index 9890afa61..42805501c 100644 --- a/src/datamodel_code_generator/types.py +++ b/src/datamodel_code_generator/types.py @@ -92,14 +92,14 @@ READ_ONLY_PREFIX = f"{READ_ONLY}[" -def __getattr__(name: str) -> Any: - """Provide lazy access to StrictTypes for backwards compatibility.""" - if name == "StrictTypes": - from datamodel_code_generator.enums import StrictTypes # noqa: PLC0415 - - return StrictTypes - msg = f"module {__name__!r} has no attribute {name!r}" - raise AttributeError(msg) +from datamodel_code_generator.util import create_module_getattr # noqa: E402 + +__getattr__ = create_module_getattr( + __name__, + { + "StrictTypes": ("datamodel_code_generator.enums", "StrictTypes"), + }, +) if TYPE_CHECKING: diff --git a/src/datamodel_code_generator/util.py b/src/datamodel_code_generator/util.py index 9117a0ed1..9d0e250b0 100644 --- a/src/datamodel_code_generator/util.py +++ b/src/datamodel_code_generator/util.py @@ -227,6 +227,37 @@ class _BaseModelV2(_PydanticBaseModel): _BaseModel: type | None = None +def create_module_getattr( + module_name: str, + lazy_imports: dict[str, tuple[str, str]], +) -> Callable[[str], Any]: + """Create a __getattr__ function for lazy module imports. + + Args: + module_name: The name of the module (typically __name__). + lazy_imports: Mapping of attribute name to (module_path, attribute_name). + + Returns: + A __getattr__ function that lazily imports the specified attributes. + + Example: + __getattr__ = create_module_getattr(__name__, { + "MyClass": ("mypackage.mymodule", "MyClass"), + }) + """ + from importlib import import_module # noqa: PLC0415 + + def _getattr(name: str) -> Any: + if name in lazy_imports: + module_path, attr_name = lazy_imports[name] + module = import_module(module_path) + return getattr(module, attr_name) + msg = f"module {module_name!r} has no attribute {name!r}" + raise AttributeError(msg) + + return _getattr + + def __getattr__(name: str) -> Any: """Provide lazy access to BaseModel and SafeLoader.""" global _BaseModel # noqa: PLW0603 diff --git a/tests/data/dynamic_models/openapi_schema.json b/tests/data/dynamic_models/openapi_schema.json new file mode 100644 index 000000000..f39ea4e88 --- /dev/null +++ b/tests/data/dynamic_models/openapi_schema.json @@ -0,0 +1,14 @@ +{ + "openapi": "3.0.0", + "info": {"title": "Test API", "version": "1.0.0"}, + "paths": {}, + "components": { + "schemas": { + "User": { + "type": "object", + "properties": {"id": {"type": "integer"}, "name": {"type": "string"}}, + "required": ["id", "name"] + } + } + } +} diff --git a/tests/data/expected/dynamic_models/allof_inheritance.json b/tests/data/expected/dynamic_models/allof_inheritance.json new file mode 100644 index 000000000..e46a185fd --- /dev/null +++ b/tests/data/expected/dynamic_models/allof_inheritance.json @@ -0,0 +1,6 @@ +{ + "Extended": { + "id": 42, + "name": "test" + } +} \ No newline at end of file diff --git a/tests/data/expected/dynamic_models/circular_reference.json b/tests/data/expected/dynamic_models/circular_reference.json new file mode 100644 index 000000000..4c5cf0a63 --- /dev/null +++ b/tests/data/expected/dynamic_models/circular_reference.json @@ -0,0 +1,9 @@ +{ + "Node": { + "value": "root", + "children": [ + {"value": "child1", "children": []}, + {"value": "child2", "children": []} + ] + } +} \ No newline at end of file diff --git a/tests/data/expected/dynamic_models/complex_schema.py b/tests/data/expected/dynamic_models/complex_schema.py new file mode 100644 index 000000000..729771e0b --- /dev/null +++ b/tests/data/expected/dynamic_models/complex_schema.py @@ -0,0 +1,21 @@ +# generated by datamodel-codegen: +# filename: +# timestamp: 2019-07-26T00:00:00+00:00 + +from __future__ import annotations + +from pydantic import BaseModel, RootModel + + +class Address(BaseModel): + street: str + city: str + + +class Person(BaseModel): + name: str + address: Address | None = None + + +class Model(RootModel[Person]): + root: Person \ No newline at end of file diff --git a/tests/data/expected/dynamic_models/enum_model.json b/tests/data/expected/dynamic_models/enum_model.json new file mode 100644 index 000000000..59ec4a83a --- /dev/null +++ b/tests/data/expected/dynamic_models/enum_model.json @@ -0,0 +1,5 @@ +{ + "Model": { + "status": "active" + } +} \ No newline at end of file diff --git a/tests/data/expected/dynamic_models/generated_code_validation.json b/tests/data/expected/dynamic_models/generated_code_validation.json new file mode 100644 index 000000000..4373fe2cb --- /dev/null +++ b/tests/data/expected/dynamic_models/generated_code_validation.json @@ -0,0 +1,13 @@ +{ + "Person": { + "name": "John", + "address": { + "street": "123 Main St", + "city": "NYC" + } + }, + "Address": { + "street": "456 Oak Ave", + "city": "LA" + } +} \ No newline at end of file diff --git a/tests/data/expected/dynamic_models/multi_module_output.json b/tests/data/expected/dynamic_models/multi_module_output.json new file mode 100644 index 000000000..0080d11f7 --- /dev/null +++ b/tests/data/expected/dynamic_models/multi_module_output.json @@ -0,0 +1,13 @@ +{ + "User": { + "name": "Alice", + "age": 25 + }, + "Order": { + "id": 1, + "user": { + "name": "Bob", + "age": 30 + } + } +} \ No newline at end of file diff --git a/tests/data/expected/dynamic_models/nested_models.json b/tests/data/expected/dynamic_models/nested_models.json new file mode 100644 index 000000000..ae3111417 --- /dev/null +++ b/tests/data/expected/dynamic_models/nested_models.json @@ -0,0 +1,7 @@ +{ + "Model": { + "user": { + "name": "Alice" + } + } +} \ No newline at end of file diff --git a/tests/data/expected/dynamic_models/numeric_constraints.json b/tests/data/expected/dynamic_models/numeric_constraints.json new file mode 100644 index 000000000..f27b7f479 --- /dev/null +++ b/tests/data/expected/dynamic_models/numeric_constraints.json @@ -0,0 +1,5 @@ +{ + "Model": { + "age": 30 + } +} \ No newline at end of file diff --git a/tests/data/expected/dynamic_models/openapi_auto_detection.json b/tests/data/expected/dynamic_models/openapi_auto_detection.json new file mode 100644 index 000000000..53f5cbe61 --- /dev/null +++ b/tests/data/expected/dynamic_models/openapi_auto_detection.json @@ -0,0 +1,6 @@ +{ + "User": { + "id": 1, + "name": "Alice" + } +} \ No newline at end of file diff --git a/tests/data/expected/dynamic_models/simple_model.json b/tests/data/expected/dynamic_models/simple_model.json new file mode 100644 index 000000000..77a4415f7 --- /dev/null +++ b/tests/data/expected/dynamic_models/simple_model.json @@ -0,0 +1,6 @@ +{ + "Model": { + "name": "John", + "age": 30 + } +} \ No newline at end of file diff --git a/tests/data/expected/dynamic_models/simple_model_optional.json b/tests/data/expected/dynamic_models/simple_model_optional.json new file mode 100644 index 000000000..cc0fd39d7 --- /dev/null +++ b/tests/data/expected/dynamic_models/simple_model_optional.json @@ -0,0 +1,6 @@ +{ + "Model": { + "name": "Jane", + "age": null + } +} \ No newline at end of file diff --git a/tests/data/expected/dynamic_models/string_constraints.json b/tests/data/expected/dynamic_models/string_constraints.json new file mode 100644 index 000000000..de2f94024 --- /dev/null +++ b/tests/data/expected/dynamic_models/string_constraints.json @@ -0,0 +1,5 @@ +{ + "Model": { + "email": "test@example.com" + } +} \ No newline at end of file diff --git a/tests/main/test_dynamic_models.py b/tests/main/test_dynamic_models.py new file mode 100644 index 000000000..3cd2ff6ef --- /dev/null +++ b/tests/main/test_dynamic_models.py @@ -0,0 +1,473 @@ +"""Tests for generate_dynamic_models function.""" + +from __future__ import annotations + +import json +import threading +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import TYPE_CHECKING + +import pydantic +import pytest +from inline_snapshot import external_file + +from datamodel_code_generator import ( + DataModelType, + InputFileType, + clear_dynamic_models_cache, + generate, + generate_dynamic_models, +) +from datamodel_code_generator.config import GenerateConfig +from datamodel_code_generator.enums import ModuleSplitMode +from datamodel_code_generator.model.pydantic_v2 import UnionMode +from datamodel_code_generator.types import StrictTypes +from tests.conftest import assert_output + +if TYPE_CHECKING: + from typing import Any + + +pytestmark = pytest.mark.skipif(pydantic.VERSION < "2.0.0", reason="generate_dynamic_models requires Pydantic v2") + +DATA_PATH = Path(__file__).parent.parent / "data" / "dynamic_models" +EXPECTED_PATH = Path(__file__).parent.parent / "data" / "expected" / "dynamic_models" + + +def make_object_schema(properties: dict[str, Any], required: list[str] | None = None) -> dict[str, Any]: + """Create a simple object schema.""" + schema: dict[str, Any] = {"type": "object", "properties": properties} + if required: + schema["required"] = required + return schema + + +def make_config( + class_name: str | None = None, + module_split_mode: ModuleSplitMode | None = None, +) -> GenerateConfig: + """Create a GenerateConfig with common defaults.""" + return GenerateConfig( + input_file_type=InputFileType.JsonSchema, + output_model_type=DataModelType.PydanticV2BaseModel, + class_name=class_name, + module_split_mode=module_split_mode, + ) + + +def assert_dynamic_models( + schema: dict[str, Any], + validations: dict[str, dict[str, Any]], + expected_path: Path, + *, + config: GenerateConfig | None = None, +) -> None: + """Generate dynamic models, validate data, and assert with external file.""" + models = generate_dynamic_models(schema, config=config) + assert { + name: models[name].model_validate(data).model_dump(mode="json") for name, data in validations.items() + } == external_file(expected_path) + + +@pytest.fixture(autouse=True) +def _setup_and_clear_cache() -> None: + """Rebuild GenerateConfig and clear cache before each test.""" + GenerateConfig.model_rebuild(_types_namespace={"StrictTypes": StrictTypes, "UnionMode": UnionMode}) + clear_dynamic_models_cache() + + +def test_simple_model() -> None: + """Test generating a simple model and validating data.""" + schema = make_object_schema({"name": {"type": "string"}, "age": {"type": "integer"}}, required=["name"]) + assert_dynamic_models(schema, {"Model": {"name": "John", "age": 30}}, EXPECTED_PATH / "simple_model.json") + assert_dynamic_models(schema, {"Model": {"name": "Jane"}}, EXPECTED_PATH / "simple_model_optional.json") + + +def test_nested_models() -> None: + """Test generating nested models and validating nested data.""" + schema = make_object_schema({"user": {"type": "object", "properties": {"name": {"type": "string"}}}}) + assert_dynamic_models(schema, {"Model": {"user": {"name": "Alice"}}}, EXPECTED_PATH / "nested_models.json") + + +def test_enum_model() -> None: + """Test generating model with enum and validating enum values.""" + schema = make_object_schema({"status": {"type": "string", "enum": ["active", "inactive"]}}) + assert_dynamic_models(schema, {"Model": {"status": "active"}}, EXPECTED_PATH / "enum_model.json") + + models = generate_dynamic_models(schema) + with pytest.raises(pydantic.ValidationError): + models["Model"].model_validate({"status": "invalid"}) + + +def test_circular_reference() -> None: + """Test generating models with circular references.""" + schema: dict[str, Any] = { + "$defs": { + "Node": { + "type": "object", + "properties": { + "value": {"type": "string"}, + "children": {"type": "array", "items": {"$ref": "#/$defs/Node"}}, + }, + }, + }, + "$ref": "#/$defs/Node", + } + assert_dynamic_models( + schema, + { + "Node": { + "value": "root", + "children": [{"value": "child1", "children": []}, {"value": "child2", "children": []}], + } + }, + EXPECTED_PATH / "circular_reference.json", + ) + + +def test_allof_inheritance() -> None: + """Test generating models with allOf inheritance.""" + schema: dict[str, Any] = { + "$defs": { + "Base": {"type": "object", "properties": {"id": {"type": "integer"}}}, + "Extended": { + "allOf": [{"$ref": "#/$defs/Base"}, {"type": "object", "properties": {"name": {"type": "string"}}}] + }, + }, + "$ref": "#/$defs/Extended", + } + assert_dynamic_models(schema, {"Extended": {"id": 42, "name": "test"}}, EXPECTED_PATH / "allof_inheritance.json") + + +def test_validation_error() -> None: + """Test that validation errors are raised for invalid data.""" + schema = make_object_schema({"count": {"type": "integer"}}, required=["count"]) + models = generate_dynamic_models(schema) + Model = models["Model"] + + with pytest.raises(pydantic.ValidationError): + Model.model_validate({"count": "not_an_integer"}) + + with pytest.raises(pydantic.ValidationError): + Model.model_validate({}) + + +def test_cache_hit() -> None: + """Test that cached models are returned.""" + schema = make_object_schema({"name": {"type": "string"}}) + models1 = generate_dynamic_models(schema) + models2 = generate_dynamic_models(schema) + assert models1 is models2 + + +def test_cache_miss_different_schema() -> None: + """Test that different schemas create different models.""" + models1 = generate_dynamic_models(make_object_schema({"name": {"type": "string"}})) + models2 = generate_dynamic_models(make_object_schema({"age": {"type": "integer"}})) + assert models1 is not models2 + + +def test_cache_miss_different_config() -> None: + """Test that different configs create different cache entries.""" + schema = make_object_schema({"name": {"type": "string"}}) + models1 = generate_dynamic_models(schema, config=make_config(class_name="User")) + models2 = generate_dynamic_models(schema, config=make_config(class_name="Person")) + assert models1 is not models2 + assert sorted(models1.keys()) == ["User"] + assert sorted(models2.keys()) == ["Person"] + + +def test_cache_disabled() -> None: + """Test that caching can be disabled.""" + schema = make_object_schema({"name": {"type": "string"}}) + models1 = generate_dynamic_models(schema, cache_size=0) + models2 = generate_dynamic_models(schema, cache_size=0) + assert models1 is not models2 + + +def test_cache_eviction() -> None: + """Test that old entries are evicted when cache is full.""" + for i in range(5): + generate_dynamic_models(make_object_schema({f"field{i}": {"type": "string"}}), cache_size=3) + assert clear_dynamic_models_cache() == 3 + + +def test_cache_shrinks_when_smaller_size_requested() -> None: + """Test that cache shrinks when a smaller cache_size is used.""" + schemas = [make_object_schema({f"field{i}": {"type": "string"}}) for i in range(5)] + for schema in schemas: + generate_dynamic_models(schema, cache_size=10) + assert clear_dynamic_models_cache() == 5 + + for schema in schemas: + generate_dynamic_models(schema, cache_size=10) + generate_dynamic_models(make_object_schema({"new_field": {"type": "string"}}), cache_size=2) + assert clear_dynamic_models_cache() == 2 + + +def test_clear_cache() -> None: + """Test clearing the cache.""" + generate_dynamic_models(make_object_schema({"name": {"type": "string"}})) + assert clear_dynamic_models_cache() == 1 + assert clear_dynamic_models_cache() == 0 + + +def test_concurrent_same_schema() -> None: + """Test concurrent access with the same schema.""" + schema = make_object_schema({"name": {"type": "string"}}) + results: list[dict[str, type]] = [] + + def worker() -> None: + results.append(generate_dynamic_models(schema)) + + threads = [threading.Thread(target=worker) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert len(results) == 10 + assert all(r is results[0] for r in results) + + +def test_concurrent_different_schemas() -> None: + """Test concurrent access with different schemas.""" + schemas = [make_object_schema({f"field{i}": {"type": "string"}}) for i in range(5)] + results: list[dict[str, type]] = [] + + def worker(schema: dict[str, Any]) -> None: + results.append(generate_dynamic_models(schema)) + + with ThreadPoolExecutor(max_workers=5) as executor: + executor.map(worker, schemas) + + assert len(results) == 5 + + +def test_numeric_constraints() -> None: + """Test models with numeric constraints validate properly.""" + schema = make_object_schema({"age": {"type": "integer", "minimum": 0, "maximum": 150}}) + assert_dynamic_models(schema, {"Model": {"age": 30}}, EXPECTED_PATH / "numeric_constraints.json") + + models = generate_dynamic_models(schema) + with pytest.raises(pydantic.ValidationError): + models["Model"].model_validate({"age": -1}) + with pytest.raises(pydantic.ValidationError): + models["Model"].model_validate({"age": 200}) + + +def test_string_constraints() -> None: + """Test models with string constraints validate properly.""" + schema = make_object_schema({"email": {"type": "string", "pattern": r"^[\w\.-]+@[\w\.-]+\.\w+$"}}) + assert_dynamic_models(schema, {"Model": {"email": "test@example.com"}}, EXPECTED_PATH / "string_constraints.json") + + models = generate_dynamic_models(schema) + with pytest.raises(pydantic.ValidationError): + models["Model"].model_validate({"email": "invalid-email"}) + + +def test_explicit_input_file_type() -> None: + """Test passing explicit input_file_type via config.""" + schema = make_object_schema({"name": {"type": "string"}}) + models = generate_dynamic_models(schema, config=make_config()) + assert sorted(models.keys()) == ["Model"] + + +def test_openapi_auto_detection() -> None: + """Test that OpenAPI schemas are auto-detected and models work.""" + with (DATA_PATH / "openapi_schema.json").open() as f: + openapi_schema = json.load(f) + assert_dynamic_models( + openapi_schema, {"User": {"id": 1, "name": "Alice"}}, EXPECTED_PATH / "openapi_auto_detection.json" + ) + + +def test_config_with_auto_input_type() -> None: + """Test that input_file_type=Auto in config is auto-detected.""" + schema = make_object_schema({"name": {"type": "string"}}) + models = generate_dynamic_models(schema, config=GenerateConfig(class_name="User")) + assert sorted(models.keys()) == ["User"] + + +def test_non_serializable_schema_skips_cache() -> None: + """Test that non-JSON-serializable schemas skip caching.""" + from datamodel_code_generator.dynamic import _make_cache_key + + schema: dict[str, Any] = {"type": "object", "properties": {"name": {"type": "string"}}, "custom": object()} + assert _make_cache_key(schema, make_config()) is None + + +def test_cache_hit_inside_lock() -> None: + """Test cache hit after acquiring lock (double-checked locking).""" + from datamodel_code_generator import dynamic as dcg + from datamodel_code_generator.dynamic import _make_cache_key + + schema = make_object_schema({"name": {"type": "string"}}) + original_lock = dcg._dynamic_models_lock + cached_models: dict[str, type] = {"Model": type("Model", (), {})} + cache_populated, lock_acquired = threading.Event(), threading.Event() + result_holder: list[dict[str, type]] = [] + cache_key: str | None = None + + class InstrumentedLock: + def __enter__(self) -> None: + lock_acquired.set() + cache_populated.wait(timeout=5) + original_lock.__enter__() + + def __exit__(self, *args: object) -> None: + original_lock.__exit__(*args) + + dcg._dynamic_models_lock = InstrumentedLock() # type: ignore[assignment] + + try: + thread = threading.Thread(target=lambda: result_holder.append(generate_dynamic_models(schema))) + thread.start() + lock_acquired.wait(timeout=5) + + cache_key = _make_cache_key(schema, make_config()) + assert cache_key is not None + dcg._dynamic_models_cache[cache_key] = cached_models + + cache_populated.set() + thread.join(timeout=10) + + assert len(result_holder) == 1 + assert result_holder[0] is cached_models + finally: + cache_populated.set() + dcg._dynamic_models_lock = original_lock + dcg._dynamic_models_cache.pop(cache_key, None) # type: ignore[arg-type] + + +def test_multi_module_output() -> None: + """Test generating models with multi-module output (module_split_mode=Single).""" + schema: dict[str, Any] = { + "$defs": { + "User": { + "type": "object", + "properties": {"name": {"type": "string"}, "age": {"type": "integer"}}, + "required": ["name"], + }, + "Order": { + "type": "object", + "properties": {"id": {"type": "integer"}, "user": {"$ref": "#/$defs/User"}}, + "required": ["id"], + }, + }, + "$ref": "#/$defs/Order", + } + assert_dynamic_models( + schema, + {"User": {"name": "Alice", "age": 25}, "Order": {"id": 1, "user": {"name": "Bob", "age": 30}}}, + EXPECTED_PATH / "multi_module_output.json", + config=make_config(module_split_mode=ModuleSplitMode.Single), + ) + + +def test_generated_code_matches_expected() -> None: + """Test that generate() produces expected code for a complex schema.""" + schema: dict[str, Any] = { + "$defs": { + "Address": { + "type": "object", + "properties": {"street": {"type": "string"}, "city": {"type": "string"}}, + "required": ["street", "city"], + }, + "Person": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "address": {"$ref": "#/$defs/Address"}, + }, + "required": ["name"], + }, + }, + "$ref": "#/$defs/Person", + } + config = GenerateConfig( + input_file_type=InputFileType.JsonSchema, + output_model_type=DataModelType.PydanticV2BaseModel, + ) + result = generate(input_=schema, config=config) + assert isinstance(result, str) + assert_output(result, EXPECTED_PATH / "complex_schema.py") + + assert_dynamic_models( + schema, + { + "Person": {"name": "John", "address": {"street": "123 Main St", "city": "NYC"}}, + "Address": {"street": "456 Oak Ave", "city": "LA"}, + }, + EXPECTED_PATH / "generated_code_validation.json", + ) + + +def test_get_relative_imports_with_module_path() -> None: + """Test _get_relative_imports with 'from .module import X' style imports.""" + from datamodel_code_generator.dynamic import _get_relative_imports + + code = "from .user import User\nfrom .order import Order" + imports = _get_relative_imports(code) + assert imports == {"user", "order"} + + +def test_get_relative_imports_with_dotted_module() -> None: + """Test _get_relative_imports with dotted module path.""" + from datamodel_code_generator.dynamic import _get_relative_imports + + code = "from .models.user import User" + imports = _get_relative_imports(code) + assert imports == {"models"} + + +def test_build_module_edges_no_matching_import() -> None: + """Test _build_module_edges when import doesn't match any module.""" + from datamodel_code_generator.dynamic import _build_module_edges + + modules = { + ("user.py",): "class User: pass", + ("order.py",): "from .nonexistent import Something\nclass Order: pass", + } + edges = _build_module_edges(modules) + assert edges["user.py",] == set() + assert edges["order.py",] == set() + + +def test_execute_multi_module_without_init() -> None: + """Test _execute_multi_module without __init__.py to cover package registration branch.""" + from datamodel_code_generator.dynamic import _execute_multi_module + + modules = { + ("user.py",): "from pydantic import BaseModel\n\nclass User(BaseModel):\n name: str", + } + models = _execute_multi_module(modules) + assert "User" in models + user = models["User"](name="Alice") + assert user.name == "Alice" + + +def test_execute_multi_module_no_models() -> None: + """Test _execute_multi_module with code that has no models.""" + from datamodel_code_generator.dynamic import _execute_multi_module + + modules = { + ("utils.py",): "def helper(): pass", + } + models = _execute_multi_module(modules) + assert models == {} + + +def test_execute_multi_module_enum_only() -> None: + """Test _execute_multi_module with enum only to cover non-BaseModel branch.""" + from datamodel_code_generator.dynamic import _execute_multi_module + + modules = { + ( + "status.py", + ): "from enum import Enum\n\nclass Status(Enum):\n ACTIVE = 'active'\n INACTIVE = 'inactive'", + } + models = _execute_multi_module(modules) + assert "Status" in models + assert models["Status"].ACTIVE.value == "active" diff --git a/zensical.toml b/zensical.toml index dc99fd3a6..999cfbbe4 100644 --- a/zensical.toml +++ b/zensical.toml @@ -47,6 +47,7 @@ nav = [ { "Integration" = [ { "One-liner Usage" = "oneliner.md" }, { "Using as Module" = "using_as_module.md" }, + { "Dynamic Model Generation" = "dynamic-model-generation.md" }, { "pyproject.toml" = "pyproject_toml.md" }, { "CI/CD Integration" = "ci-cd.md" }, { "LLM Integration" = "llm-integration.md" }