From fe25d6834b71a34f828c34fb742f97d639df7adb Mon Sep 17 00:00:00 2001 From: Koudai Aono Date: Fri, 2 Jan 2026 11:09:04 +0000 Subject: [PATCH 1/9] Add dynamic model generation support --- src/datamodel_code_generator/__init__.py | 80 ++++ .../dynamic/__init__.py | 25 + .../dynamic/constraints.py | 94 ++++ .../dynamic/creator.py | 263 +++++++++++ .../dynamic/exceptions.py | 52 +++ .../dynamic/type_resolver.py | 174 +++++++ src/datamodel_code_generator/parser/base.py | 36 ++ tests/dynamic/__init__.py | 1 + tests/dynamic/test_creator.py | 432 ++++++++++++++++++ 9 files changed, 1157 insertions(+) create mode 100644 src/datamodel_code_generator/dynamic/__init__.py create mode 100644 src/datamodel_code_generator/dynamic/constraints.py create mode 100644 src/datamodel_code_generator/dynamic/creator.py create mode 100644 src/datamodel_code_generator/dynamic/exceptions.py create mode 100644 src/datamodel_code_generator/dynamic/type_resolver.py create mode 100644 tests/dynamic/__init__.py create mode 100644 tests/dynamic/test_creator.py diff --git a/src/datamodel_code_generator/__init__.py b/src/datamodel_code_generator/__init__.py index a75f5da13..4ac6e5f32 100644 --- a/src/datamodel_code_generator/__init__.py +++ b/src/datamodel_code_generator/__init__.py @@ -896,6 +896,85 @@ def get_header_and_first_line(csv_file: IO[str]) -> dict[str, Any]: return None +def generate_dynamic_models( + input_: Path | str | ParseResult | Mapping[str, Any], + *, + input_file_type: InputFileType = InputFileType.Auto, +) -> dict[str, type]: + """Generate actual Python model classes from schema at runtime. + + This function creates real Python classes using Pydantic's create_model(), + allowing you to instantiate and validate data directly without generating code. + + Args: + input_: The input source (file path, string content, URL, or dict). + input_file_type: The type of input file. Defaults to Auto-detection. + + Returns: + Dictionary mapping class names to actual Python model classes. + + Raises: + DynamicModelError: If model generation fails. + TypeResolutionError: If a field type cannot be resolved. + + Example: + >>> schema = { + ... "type": "object", + ... "properties": {"name": {"type": "string"}, "age": {"type": "integer"}}, + ... "required": ["name"], + ... } + >>> models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + >>> User = models["Model"] + >>> user = User(name="John", age=30) + >>> user.model_dump() + {'name': 'John', 'age': 30} + + Note: + - Only Pydantic v2 models are supported + - Circular references are handled via model_rebuild() + - Custom validators/methods are not included + - For code generation, use generate() instead + """ + from datamodel_code_generator.parser.jsonschema import JsonSchemaParser # noqa: PLC0415 + from datamodel_code_generator.parser.openapi import OpenAPIParser # noqa: PLC0415 + + # Convert input to source and determine file path for base_path resolution + source: str | Path | dict[str, Any] | ParseResult + if isinstance(input_, Mapping): + source = dict(input_) + elif isinstance(input_, Path): + source = input_ # Keep Path object for base_path resolution + elif isinstance(input_, str): + # Check if it's a file path - keep Path object for base_path resolution + path = Path(input_) + source = path if path.exists() and path.is_file() else input_ + else: + source = input_ + + # Infer input type if not specified + if input_file_type == InputFileType.Auto: + if isinstance(source, Mapping): + input_file_type = InputFileType.JsonSchema + elif isinstance(source, Path): + input_file_type = infer_input_type(source.read_text()) + elif isinstance(source, str): + input_file_type = infer_input_type(source) + else: + # ParseResult - default to JsonSchema + input_file_type = InputFileType.JsonSchema + + if input_file_type == InputFileType.OpenAPI: + parser = OpenAPIParser(source=source) + elif input_file_type == InputFileType.GraphQL: + msg = "GraphQL is not yet supported for dynamic model generation" + raise Error(msg) + else: + parser = JsonSchemaParser(source=source) + + parser.parse_raw() + return parser.create_dynamic_models() + + def infer_input_type(text: str) -> InputFileType: """Automatically detect the input file type from text content.""" import yaml.parser # noqa: PLC0415 @@ -955,4 +1034,5 @@ def infer_input_type(text: str) -> InputFileType: "SchemaParseError", "TargetPydanticVersion", "generate", + "generate_dynamic_models", ] diff --git a/src/datamodel_code_generator/dynamic/__init__.py b/src/datamodel_code_generator/dynamic/__init__.py new file mode 100644 index 000000000..1ee75360e --- /dev/null +++ b/src/datamodel_code_generator/dynamic/__init__.py @@ -0,0 +1,25 @@ +"""Dynamic model generation module. + +This module provides functionality to generate actual Python model classes +at runtime using Pydantic's create_model(), instead of generating text code. +""" + +from __future__ import annotations + +from datamodel_code_generator.dynamic.creator import DynamicModelCreator +from datamodel_code_generator.dynamic.exceptions import ( + CircularReferenceError, + ConstraintConversionError, + DynamicModelError, + TypeResolutionError, + UnsupportedModelTypeError, +) + +__all__ = [ + "CircularReferenceError", + "ConstraintConversionError", + "DynamicModelCreator", + "DynamicModelError", + "TypeResolutionError", + "UnsupportedModelTypeError", +] diff --git a/src/datamodel_code_generator/dynamic/constraints.py b/src/datamodel_code_generator/dynamic/constraints.py new file mode 100644 index 000000000..9c874b0b3 --- /dev/null +++ b/src/datamodel_code_generator/dynamic/constraints.py @@ -0,0 +1,94 @@ +"""Constraint conversion utilities for dynamic model generation.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from datamodel_code_generator.model.base import ConstraintsBase + + +def constraints_to_field_kwargs( # noqa: PLR0912, PLR0914, PLR0915 + constraints: ConstraintsBase | None, +) -> dict[str, Any]: + """Convert DataModel constraints to Pydantic Field kwargs. + + Handles all constraint types from ConstraintsBase. + + Note: + - In JSON Schema, minItems/maxItems are for arrays while minLength/maxLength + are for strings. These constraints are mutually exclusive by schema type. + - Pydantic uses min_length/max_length for both strings and sequences. + - Constraints like unique_items are stored in json_schema_extra for + documentation only and are not enforced by Pydantic v2. + """ + if constraints is None: + return {} + + kwargs: dict[str, Any] = {} + json_schema_extra: dict[str, Any] = {} + + min_length = getattr(constraints, "min_length", None) + if min_length is not None: + kwargs["min_length"] = min_length + + max_length = getattr(constraints, "max_length", None) + if max_length is not None: + kwargs["max_length"] = max_length + + pattern_value = getattr(constraints, "regex", None) or getattr(constraints, "pattern", None) + if pattern_value is not None: + kwargs["pattern"] = pattern_value + + ge = getattr(constraints, "ge", None) + if ge is not None: + kwargs["ge"] = ge + + gt = getattr(constraints, "gt", None) + if gt is not None: + kwargs["gt"] = gt + + le = getattr(constraints, "le", None) + if le is not None: + kwargs["le"] = le + + lt = getattr(constraints, "lt", None) + if lt is not None: + kwargs["lt"] = lt + + multiple_of = getattr(constraints, "multiple_of", None) + if multiple_of is not None: + kwargs["multiple_of"] = multiple_of + + min_items = getattr(constraints, "min_items", None) + if min_items is not None: + kwargs["min_length"] = min_items + + max_items = getattr(constraints, "max_items", None) + if max_items is not None: + kwargs["max_length"] = max_items + + unique_items = getattr(constraints, "unique_items", None) + if unique_items is not None: + json_schema_extra["uniqueItems"] = unique_items + + min_properties = getattr(constraints, "min_properties", None) + if min_properties is not None: + json_schema_extra["minProperties"] = min_properties + + max_properties = getattr(constraints, "max_properties", None) + if max_properties is not None: + json_schema_extra["maxProperties"] = max_properties + + exclusive_minimum = getattr(constraints, "exclusive_minimum", None) + if exclusive_minimum is not None: + kwargs["gt"] = exclusive_minimum + + exclusive_maximum = getattr(constraints, "exclusive_maximum", None) + if exclusive_maximum is not None: + kwargs["lt"] = exclusive_maximum + + if json_schema_extra: + kwargs["json_schema_extra"] = json_schema_extra + + return kwargs diff --git a/src/datamodel_code_generator/dynamic/creator.py b/src/datamodel_code_generator/dynamic/creator.py new file mode 100644 index 000000000..fd314100e --- /dev/null +++ b/src/datamodel_code_generator/dynamic/creator.py @@ -0,0 +1,263 @@ +"""Dynamic model creator for generating Python classes at runtime.""" + +from __future__ import annotations + +import importlib +import logging +from enum import Enum +from typing import TYPE_CHECKING, Any, cast + +from pydantic import ConfigDict, Field, create_model + +try: + from pydantic.errors import PydanticUndefinedAnnotation +except ImportError: + PydanticUndefinedAnnotation = NameError # type: ignore[misc,assignment] + +from datamodel_code_generator.dynamic.constraints import constraints_to_field_kwargs +from datamodel_code_generator.dynamic.exceptions import ( + TypeResolutionError, +) +from datamodel_code_generator.dynamic.type_resolver import TypeResolver + +if TYPE_CHECKING: + from pydantic.fields import FieldInfo + + from datamodel_code_generator.model.base import DataModel, DataModelFieldBase + from datamodel_code_generator.parser.base import Parser + +logger = logging.getLogger(__name__) + + +class DynamicModelCreator: + """Creates actual Python classes from DataModel objects.""" + + def __init__(self, parser: Parser) -> None: + """Initialize with a parser instance.""" + self.parser = parser + self._models: dict[str, type[Any]] = {} + self._short_name_lookup: dict[str, type[Any]] = {} + self._type_resolver = TypeResolver(self._short_name_lookup) + + def create_models(self) -> dict[str, type]: + """Create all models from parser results. + + Returns: + Dictionary mapping class names to actual Python classes. + + Raises: + TypeResolutionError: If a type cannot be resolved. + DynamicModelError: If model generation fails. + """ + from datamodel_code_generator.model.enum import Enum as EnumModel # noqa: PLC0415 + from datamodel_code_generator.parser.base import sort_data_models # noqa: PLC0415 + + if not self.parser.results: + return {} + + _, sorted_models_dict, _ = sort_data_models(self.parser.results) + + for data_model in sorted_models_dict.values(): + if isinstance(data_model, EnumModel): + self._create_enum_model(data_model) + else: + self._create_pydantic_model(data_model) + + self._rebuild_models() + + return self._models + + def _create_pydantic_model(self, data_model: DataModel) -> type[Any]: + """Create a single Pydantic model from DataModel.""" + field_definitions: dict[str, tuple[Any, FieldInfo]] = {} + + for field in data_model.fields: + if field.name is None: + continue + + try: + field_type, type_constraints = self._type_resolver.resolve_with_constraints(field.data_type) + except Exception as e: + raise TypeResolutionError(field.data_type, data_model.class_name, field.name or "") from e + + field_info = self._create_field_info(field, type_constraints) + field_definitions[field.name] = (field_type, field_info) + + base_classes = self._resolve_base_classes(data_model) + model_config = self._build_model_config(data_model) + module_name = self._get_module_name(data_model) + + from pydantic import BaseModel as BaseModelCls # noqa: PLC0415 + + if len(base_classes) > 1: + combined_base_name = f"_{data_model.class_name}Base" + combined_base = type(combined_base_name, base_classes, {}) + effective_base: type[Any] = combined_base + else: + effective_base = base_classes[0] if base_classes else BaseModelCls + + create_kwargs: dict[str, Any] = { + "__base__": effective_base, + "__module__": module_name, + } + + if model_config: + create_kwargs["__config__"] = model_config + + model = cast( + "type[Any]", + create_model( + data_model.class_name, + **create_kwargs, + **cast("dict[str, Any]", field_definitions), + ), + ) + + model_key = self._get_model_key(data_model) + self._models[model_key] = model + self._short_name_lookup[data_model.class_name] = model + + if model_key != data_model.class_name: + self._models[data_model.class_name] = model + + return model + + def _get_model_key(self, data_model: DataModel) -> str: + """Get module-qualified key for model storage.""" + module_name = self._get_module_name(data_model) + if module_name and module_name != "__dynamic__": + return f"{module_name}.{data_model.class_name}" + return data_model.class_name + + def _create_field_info( # noqa: PLR6301 + self, field: DataModelFieldBase, type_constraints: dict[str, Any] | None = None + ) -> FieldInfo: + """Convert DataModelFieldBase to Pydantic FieldInfo.""" + kwargs = constraints_to_field_kwargs(field.constraints) + + if type_constraints: + kwargs.update(type_constraints) + + if (hasattr(field, "has_default") and field.has_default) or field.default is not None: + kwargs["default"] = field.default + elif (default_factory := getattr(field, "default_factory", None)) is not None: + kwargs["default_factory"] = default_factory + elif field.required: + kwargs["default"] = ... + else: + kwargs["default"] = None + + if field.alias: + kwargs["alias"] = field.alias + + description = getattr(field, "description", None) + if description: + kwargs["description"] = description + + return Field(**kwargs) + + def _resolve_base_classes(self, data_model: DataModel) -> tuple[type, ...]: + """Resolve base classes for the model, including custom base classes.""" + from pydantic import BaseModel # noqa: PLC0415 + + custom_base = getattr(data_model, "custom_base_class", None) + if custom_base and isinstance(custom_base, str): + if "." in custom_base: + try: + module_path, class_name = custom_base.rsplit(".", 1) + module = importlib.import_module(module_path) + custom_base_cls = getattr(module, class_name) + return (custom_base_cls,) # noqa: TRY300 + except (ValueError, ImportError, AttributeError) as e: + logger.warning("Failed to import custom_base_class %s: %s", custom_base, e) + else: + logger.warning("Invalid custom_base_class format: %s. Expected 'module.ClassName'", custom_base) + + if not data_model.base_classes: + return (BaseModel,) + + bases = [] + for base_class in data_model.base_classes: + if base_class.reference and base_class.reference.short_name in self._short_name_lookup: + bases.append(self._short_name_lookup[base_class.reference.short_name]) + elif hasattr(base_class, "type") and base_class.type: + type_name = base_class.type + if type_name in self._short_name_lookup: + bases.append(self._short_name_lookup[type_name]) + else: + bases.append(BaseModel) + else: + bases.append(BaseModel) + + return tuple(bases) if bases else (BaseModel,) + + def _build_model_config(self, data_model: DataModel) -> ConfigDict | None: # noqa: PLR6301 + """Build Pydantic ConfigDict from DataModel settings.""" + config_dict: dict[str, Any] = {} + + extra_data = getattr(data_model, "extra_template_data", {}) or {} + + if "config" in extra_data: + existing_config = extra_data["config"] + if isinstance(existing_config, dict): + config_dict.update(existing_config) + + if "model_config" in extra_data: + config_dict.update(extra_data["model_config"]) + + if getattr(data_model, "allow_extra", False): + config_dict["extra"] = "allow" + + if getattr(data_model, "use_enum_values", False): + config_dict["use_enum_values"] = True + + if getattr(data_model, "populate_by_name", False): + config_dict["populate_by_name"] = True + + return ConfigDict(**config_dict) if config_dict else None + + def _get_module_name(self, data_model: DataModel) -> str: # noqa: PLR6301 + """Determine module name for the dynamic model.""" + if data_model.reference and data_model.reference.path: + parts = data_model.reference.path.split("/") + return ".".join(p for p in parts if p and p != "#") + return "__dynamic__" + + def _create_enum_model(self, data_model: DataModel) -> type[Any]: + """Create an Enum class from DataModel.""" + members = {} + for field in data_model.fields: + if field.name and field.default is not None: + value = field.default + if isinstance(value, str): + value = value.strip("'\"") + members[field.name] = value + + enum_class: type[Any] = Enum(data_model.class_name, members) # type: ignore[assignment] + + model_key = self._get_model_key(data_model) + self._models[model_key] = enum_class + self._short_name_lookup[data_model.class_name] = enum_class + + if model_key != data_model.class_name: + self._models[data_model.class_name] = enum_class + + return enum_class + + def _rebuild_models(self) -> None: + """Resolve forward references by calling model_rebuild() on all models.""" + from pydantic import BaseModel # noqa: PLC0415 + + namespace = {**self._short_name_lookup} + + for model_key, model in self._models.items(): + if isinstance(model, type) and issubclass(model, BaseModel): + try: + model.model_rebuild(_types_namespace=namespace) + except PydanticUndefinedAnnotation: + pass + except (NameError, AttributeError) as e: + logger.debug("Model %s rebuild skipped: %s", model_key, e) + except Exception as e: + logger.warning("Unexpected error rebuilding model %s: %s: %s", model_key, type(e).__name__, e) + raise diff --git a/src/datamodel_code_generator/dynamic/exceptions.py b/src/datamodel_code_generator/dynamic/exceptions.py new file mode 100644 index 000000000..62cbecdbe --- /dev/null +++ b/src/datamodel_code_generator/dynamic/exceptions.py @@ -0,0 +1,52 @@ +"""Custom exceptions for dynamic model generation.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from datamodel_code_generator.types import DataType + + +class DynamicModelError(Exception): + """Base exception for dynamic model generation.""" + + +class TypeResolutionError(DynamicModelError): + """Failed to resolve a type to a Python type object.""" + + def __init__(self, type_info: DataType, model_name: str, field_name: str) -> None: + """Initialize with type info and context.""" + self.type_info = type_info + self.model_name = model_name + self.field_name = field_name + super().__init__(f"Cannot resolve type for field '{field_name}' in model '{model_name}': {type_info}") + + +class CircularReferenceError(DynamicModelError): + """Circular reference that cannot be resolved.""" + + def __init__(self, cycle_path: list[str]) -> None: + """Initialize with the cycle path.""" + self.cycle_path = cycle_path + super().__init__(f"Unresolvable circular reference: {' -> '.join(cycle_path)}") + + +class ConstraintConversionError(DynamicModelError): + """Failed to convert schema constraints to Pydantic Field.""" + + def __init__(self, constraint_name: str, constraint_value: Any, reason: str) -> None: + """Initialize with constraint details and reason.""" + self.constraint_name = constraint_name + self.constraint_value = constraint_value + self.reason = reason + super().__init__(f"Cannot convert constraint '{constraint_name}={constraint_value}': {reason}") + + +class UnsupportedModelTypeError(DynamicModelError): + """Model type not supported for dynamic generation.""" + + def __init__(self, model_type: str) -> None: + """Initialize with the unsupported model type.""" + self.model_type = model_type + super().__init__(f"Dynamic generation not supported for model type: {model_type}") diff --git a/src/datamodel_code_generator/dynamic/type_resolver.py b/src/datamodel_code_generator/dynamic/type_resolver.py new file mode 100644 index 000000000..e50509dca --- /dev/null +++ b/src/datamodel_code_generator/dynamic/type_resolver.py @@ -0,0 +1,174 @@ +"""Type resolution utilities for dynamic model generation.""" + +from __future__ import annotations + +import datetime +import decimal +import uuid +from typing import TYPE_CHECKING, Any, ForwardRef, Literal, Union + +if TYPE_CHECKING: + from datamodel_code_generator.types import DataType + +FORMAT_TYPE_MAP: dict[str, type[Any]] = { + "date": datetime.date, + "date-time": datetime.datetime, + "time": datetime.time, + "uuid": uuid.UUID, + "uuid1": uuid.UUID, + "uuid2": uuid.UUID, + "uuid3": uuid.UUID, + "uuid4": uuid.UUID, + "uuid5": uuid.UUID, + "decimal": decimal.Decimal, + "email": str, + "uri": str, + "hostname": str, + "ipv4": str, + "ipv6": str, + "binary": bytes, + "byte": bytes, +} + +PRIMITIVE_TYPE_MAP: dict[str, type[Any]] = { + "str": str, + "string": str, + "int": int, + "integer": int, + "float": float, + "number": float, + "bool": bool, + "boolean": bool, + "bytes": bytes, + "None": type(None), + "Any": Any, # type: ignore[dict-item] +} + +CONSTRAINED_TYPE_MAP: dict[str, type[Any]] = { + "conint": int, + "confloat": float, + "constr": str, + "conbytes": bytes, + "condecimal": decimal.Decimal, + "conlist": list, + "conset": set, + "confrozenset": frozenset, +} + +_DICT_MIN_TYPE_ARGS = 2 + + +class TypeResolver: + """Resolves DataType objects to actual Python types.""" + + def __init__(self, models: dict[str, type[Any]]) -> None: + """Initialize with a models lookup dictionary.""" + self._models = models + self._forward_refs: set[str] = set() + + def resolve(self, data_type: DataType) -> Any: + """Resolve a DataType to a Python type. + + Uses the structured DataType object directly rather than parsing strings. + """ + resolved_type, _ = self.resolve_with_constraints(data_type) + return resolved_type + + def resolve_with_constraints( # noqa: PLR0911 + self, data_type: DataType + ) -> tuple[Any, dict[str, Any]]: + """Resolve a DataType to a Python type and extract constraints. + + Returns: + A tuple of (resolved_type, constraints_kwargs). + """ + constraints: dict[str, Any] = {} + + if data_type.reference is not None: + model_name = data_type.reference.short_name + if model_name in self._models: + return self._models[model_name], constraints + self._forward_refs.add(model_name) + return ForwardRef(model_name), constraints + + if data_type.literals: + return Literal[tuple(data_type.literals)], constraints # type: ignore[valid-type] + + if data_type.is_list: + inner = self._resolve_inner_types(data_type.data_types) + return (list[inner] if inner else list), constraints # type: ignore[valid-type] + + if data_type.is_set: + inner = self._resolve_inner_types(data_type.data_types) + return (set[inner] if inner else set), constraints # type: ignore[valid-type] + + if data_type.is_dict: + if len(data_type.data_types) >= _DICT_MIN_TYPE_ARGS: + key_type = self.resolve(data_type.data_types[0]) + value_type = self.resolve(data_type.data_types[1]) + return dict[key_type, value_type], constraints # type: ignore[valid-type] + return dict, constraints + + if data_type.is_tuple: + inner_types = tuple(self.resolve(dt) for dt in data_type.data_types) + return (tuple.__class_getitem__(inner_types) if inner_types else tuple), constraints + + if len(data_type.data_types) > 1: + inner_types = tuple(self.resolve(dt) for dt in data_type.data_types) + return Union[inner_types], constraints # type: ignore[valid-type] # noqa: UP007 + + if data_type.is_optional and data_type.data_types: + inner = self.resolve(data_type.data_types[0]) + return inner | None, constraints # type: ignore[operator] + + return self._resolve_type_string(data_type, constraints) + + def _resolve_type_string(self, data_type: DataType, constraints: dict[str, Any]) -> tuple[Any, dict[str, Any]]: + """Resolve type from type string.""" + if not data_type.type: + return Any, constraints # type: ignore[return-value] + + type_str = data_type.type + + if type_str in CONSTRAINED_TYPE_MAP: + base_type = CONSTRAINED_TYPE_MAP[type_str] + self._extract_constraints(data_type, constraints) + return base_type, constraints + + if (fmt := getattr(data_type, "format", None)) and fmt in FORMAT_TYPE_MAP: + return FORMAT_TYPE_MAP[fmt], constraints + + if type_str in PRIMITIVE_TYPE_MAP: + return PRIMITIVE_TYPE_MAP[type_str], constraints + + if type_str in self._models: + return self._models[type_str], constraints + + return Any, constraints # type: ignore[return-value] + + def _extract_constraints(self, data_type: DataType, constraints: dict[str, Any]) -> None: # noqa: PLR6301 + """Extract constraints from DataType kwargs.""" + if not data_type.kwargs: + return + for key, kwarg_value in data_type.kwargs.items(): + if key == "regex": + pattern_value = kwarg_value + if isinstance(pattern_value, str) and pattern_value.startswith("r'") and pattern_value.endswith("'"): + pattern_value = pattern_value[2:-1] + constraints["pattern"] = pattern_value + else: + constraints[key] = kwarg_value + + def _resolve_inner_types(self, data_types: list[DataType]) -> Any: + """Resolve inner types for container types.""" + if not data_types: + return None + if len(data_types) == 1: + return self.resolve(data_types[0]) + inner_types = tuple(self.resolve(dt) for dt in data_types) + return Union[inner_types] # type: ignore[valid-type] # noqa: UP007 + + @property + def forward_refs(self) -> set[str]: + """Return the set of forward references encountered during resolution.""" + return self._forward_refs diff --git a/src/datamodel_code_generator/parser/base.py b/src/datamodel_code_generator/parser/base.py index bfcaaa93a..c7ec1d5b2 100644 --- a/src/datamodel_code_generator/parser/base.py +++ b/src/datamodel_code_generator/parser/base.py @@ -3132,3 +3132,39 @@ def parse( # noqa: PLR0913, PLR0914, PLR0917 for k, v in results.items() } ) + + def create_dynamic_models(self) -> dict[str, type]: + """Create actual Python model classes from parsed schema. + + This method should be called after parse_raw() has been executed. + It uses Pydantic's create_model() to generate real Python classes + that can be instantiated and used for validation. + + Returns: + Dictionary mapping class names to Pydantic model classes. + + Raises: + DynamicModelError: Base exception for any generation failure. + TypeResolutionError: If a field type cannot be resolved. + + Example: + parser = JsonSchemaParser(source=schema) + parser.parse_raw() + models = parser.create_dynamic_models() + + User = models['User'] + user = User(name='John', age=30) + print(user.model_dump()) + + Note: + - Only Pydantic v2 models are supported + - Circular references are resolved via model_rebuild() + - Custom validators/methods are not included + """ + from datamodel_code_generator.dynamic.creator import DynamicModelCreator # noqa: PLC0415 + + if not self.results: + return {} + + creator = DynamicModelCreator(self) + return creator.create_models() diff --git a/tests/dynamic/__init__.py b/tests/dynamic/__init__.py new file mode 100644 index 000000000..0bc4701ff --- /dev/null +++ b/tests/dynamic/__init__.py @@ -0,0 +1 @@ +"""Tests for dynamic model generation.""" diff --git a/tests/dynamic/test_creator.py b/tests/dynamic/test_creator.py new file mode 100644 index 000000000..bfe50a9fd --- /dev/null +++ b/tests/dynamic/test_creator.py @@ -0,0 +1,432 @@ +# ruff: noqa: D101, D102 +"""Tests for dynamic model generation.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest +from pydantic import VERSION, ValidationError + +from datamodel_code_generator import InputFileType, generate_dynamic_models + +if TYPE_CHECKING: + from pathlib import Path + +PYDANTIC_V2 = VERSION.startswith("2.") + + +@pytest.mark.skipif(not PYDANTIC_V2, reason="Dynamic models require Pydantic v2") +class TestDynamicModelCreation: + def test_simple_model(self) -> None: + schema = { + "type": "object", + "properties": {"name": {"type": "string"}, "age": {"type": "integer"}}, + "required": ["name"], + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + assert "Model" in models + instance = models["Model"](name="John", age=30) + assert instance.name == "John" + assert instance.age == 30 + + def test_nested_models(self) -> None: + schema = { + "type": "object", + "properties": {"user": {"$ref": "#/$defs/User"}}, + "$defs": { + "User": { + "type": "object", + "properties": {"name": {"type": "string"}}, + } + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + assert "User" in models + assert "Model" in models + user = models["User"](name="Alice") + model = models["Model"](user=user) + assert model.user.name == "Alice" + + def test_string_field(self) -> None: + schema = { + "type": "object", + "properties": { + "email": { + "type": "string", + } + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + instance = models["Model"](email="test@example.com") + assert instance.email == "test@example.com" + + def test_numeric_constraints(self) -> None: + schema = { + "type": "object", + "properties": { + "count": { + "type": "integer", + "minimum": 0, + "maximum": 100, + "multipleOf": 5, + } + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + models["Model"](count=50) + + with pytest.raises(ValidationError): + models["Model"](count=-1) + + with pytest.raises(ValidationError): + models["Model"](count=7) + + def test_array_constraints(self) -> None: + schema = { + "type": "object", + "properties": {"tags": {"type": "array", "items": {"type": "string"}, "minItems": 1, "maxItems": 5}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + models["Model"](tags=["a", "b", "c"]) + + with pytest.raises(ValidationError): + models["Model"](tags=[]) + + with pytest.raises(ValidationError): + models["Model"](tags=["a", "b", "c", "d", "e", "f"]) + + def test_circular_reference(self) -> None: + schema = { + "$defs": { + "Node": { + "type": "object", + "properties": { + "value": {"type": "string"}, + "children": {"type": "array", "items": {"$ref": "#/$defs/Node"}}, + }, + } + }, + "$ref": "#/$defs/Node", + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + node_class = models["Node"] + node = node_class(value="root", children=[node_class(value="child", children=[])]) + assert node.children[0].value == "child" + + def test_enum_values(self) -> None: + schema = { + "type": "object", + "properties": {"status": {"enum": ["pending", "active", "completed"]}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + models["Model"](status="active") + + with pytest.raises(ValidationError): + models["Model"](status="invalid") + + def test_optional_field(self) -> None: + schema = { + "type": "object", + "properties": {"name": {"type": "string"}, "nickname": {"type": "string"}}, + "required": ["name"], + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + instance = models["Model"](name="John") + assert instance.name == "John" + assert instance.nickname is None + + def test_default_value(self) -> None: + schema = { + "type": "object", + "properties": {"count": {"type": "integer", "default": 10}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + instance = models["Model"]() + assert instance.count == 10 + + def test_all_of_inheritance(self) -> None: + schema = { + "$defs": { + "Base": {"type": "object", "properties": {"id": {"type": "integer"}}}, + "Child": { + "allOf": [ + {"$ref": "#/$defs/Base"}, + {"properties": {"name": {"type": "string"}}}, + ] + }, + }, + "$ref": "#/$defs/Child", + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + child = models["Child"](id=1, name="Test") + assert child.id == 1 + assert child.name == "Test" + + def test_openapi_schema(self) -> None: + schema = """ +openapi: "3.0.0" +info: + title: Test API + version: "1.0" +paths: {} +components: + schemas: + User: + type: object + properties: + name: + type: string + email: + type: string + required: + - name +""" + models = generate_dynamic_models(schema, input_file_type=InputFileType.OpenAPI) + + assert "User" in models + user = models["User"](name="John", email="john@example.com") + assert user.name == "John" + assert user.email == "john@example.com" + + def test_empty_schema(self) -> None: + schema = {"type": "object", "properties": {}} + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + assert "Model" in models + instance = models["Model"]() + assert instance is not None + + def test_string_constraints(self) -> None: + schema = { + "type": "object", + "properties": { + "code": { + "type": "string", + "minLength": 2, + "maxLength": 10, + "pattern": "^[A-Z]+$", + } + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + models["Model"](code="ABC") + + with pytest.raises(ValidationError): + models["Model"](code="A") + + with pytest.raises(ValidationError): + models["Model"](code="abc") + + def test_exclusive_numeric_constraints(self) -> None: + schema = { + "type": "object", + "properties": { + "value": { + "type": "number", + "exclusiveMinimum": 0, + "exclusiveMaximum": 100, + } + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + models["Model"](value=50) + + with pytest.raises(ValidationError): + models["Model"](value=0) + + with pytest.raises(ValidationError): + models["Model"](value=100) + + def test_date_format(self) -> None: + schema = { + "type": "object", + "properties": { + "created": {"type": "string", "format": "date"}, + "updated": {"type": "string", "format": "date-time"}, + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + from datetime import date, datetime, timezone + + instance = models["Model"](created=date(2024, 1, 1), updated=datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc)) + assert instance.created == date(2024, 1, 1) + + def test_uuid_format(self) -> None: + schema = { + "type": "object", + "properties": {"id": {"type": "string", "format": "uuid"}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + import uuid + + test_uuid = uuid.uuid4() + instance = models["Model"](id=test_uuid) + assert instance.id == test_uuid + + def test_dict_type(self) -> None: + schema = { + "type": "object", + "properties": { + "metadata": { + "type": "object", + "additionalProperties": {"type": "string"}, + } + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + instance = models["Model"](metadata={"key": "value"}) + assert instance.metadata == {"key": "value"} + + def test_set_type(self) -> None: + schema = { + "type": "object", + "properties": { + "tags": { + "type": "array", + "items": {"type": "string"}, + "uniqueItems": True, + } + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + instance = models["Model"](tags=["a", "b", "c"]) + assert "a" in instance.tags + + def test_ref_type(self) -> None: + schema = { + "$defs": { + "Address": { + "type": "object", + "properties": {"city": {"type": "string"}}, + } + }, + "type": "object", + "properties": {"address": {"$ref": "#/$defs/Address"}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + address = models["Address"](city="Tokyo") + instance = models["Model"](address=address) + assert instance.address.city == "Tokyo" + + def test_literal_type(self) -> None: + schema = { + "type": "object", + "properties": {"status": {"const": "active"}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + instance = models["Model"](status="active") + assert instance.status == "active" + + def test_nullable_field(self) -> None: + schema = { + "type": "object", + "properties": {"name": {"type": ["string", "null"]}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + instance1 = models["Model"](name="test") + assert instance1.name == "test" + + instance2 = models["Model"](name=None) + assert instance2.name is None + + def test_integer_enum(self) -> None: + schema = { + "type": "object", + "properties": {"priority": {"enum": [1, 2, 3]}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + models["Model"](priority=2) + + with pytest.raises(ValidationError): + models["Model"](priority=5) + + def test_boolean_field(self) -> None: + schema = { + "type": "object", + "properties": {"active": {"type": "boolean"}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + instance = models["Model"](active=True) + assert instance.active is True + + def test_float_field(self) -> None: + schema = { + "type": "object", + "properties": {"price": {"type": "number"}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + instance = models["Model"](price=19.99) + assert instance.price == 19.99 + + def test_empty_results(self) -> None: + schema = {} + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + assert isinstance(models, dict) + + def test_file_path_input(self, tmp_path: Path) -> None: + schema_file = tmp_path / "schema.json" + schema_file.write_text('{"type": "object", "properties": {"name": {"type": "string"}}}') + + models = generate_dynamic_models(str(schema_file), input_file_type=InputFileType.JsonSchema) + + assert "Model" in models + instance = models["Model"](name="Test") + assert instance.name == "Test" + + def test_path_object_input(self, tmp_path: Path) -> None: + schema_file = tmp_path / "schema.json" + schema_file.write_text('{"type": "object", "properties": {"value": {"type": "integer"}}}') + + models = generate_dynamic_models(schema_file, input_file_type=InputFileType.JsonSchema) + + assert "Model" in models + instance = models["Model"](value=42) + assert instance.value == 42 From 018c6af8a833b6a274b6f3eaef44190b9d7f58b4 Mon Sep 17 00:00:00 2001 From: Koudai Aono Date: Fri, 2 Jan 2026 11:42:33 +0000 Subject: [PATCH 2/9] Simplify dynamic module and improve coverage --- .../dynamic/constraints.py | 111 ++-- .../dynamic/creator.py | 81 +-- .../dynamic/type_resolver.py | 55 +- tests/dynamic/test_creator.py | 513 ++++++++++++++++++ 4 files changed, 562 insertions(+), 198 deletions(-) diff --git a/src/datamodel_code_generator/dynamic/constraints.py b/src/datamodel_code_generator/dynamic/constraints.py index 9c874b0b3..570a57a06 100644 --- a/src/datamodel_code_generator/dynamic/constraints.py +++ b/src/datamodel_code_generator/dynamic/constraints.py @@ -4,91 +4,60 @@ from typing import TYPE_CHECKING, Any +from datamodel_code_generator.util import model_dump + if TYPE_CHECKING: from datamodel_code_generator.model.base import ConstraintsBase - -def constraints_to_field_kwargs( # noqa: PLR0912, PLR0914, PLR0915 +CONSTRAINT_FIELD_MAP: dict[str, str] = { + "ge": "ge", + "gt": "gt", + "le": "le", + "lt": "lt", + "multiple_of": "multiple_of", + "min_length": "min_length", + "max_length": "max_length", + "regex": "pattern", + "pattern": "pattern", + "min_items": "min_length", + "max_items": "max_length", + "exclusive_minimum": "gt", + "exclusive_maximum": "lt", +} + +JSON_SCHEMA_EXTRA_FIELDS: frozenset[str] = frozenset({ + "unique_items", + "min_properties", + "max_properties", +}) + + +def constraints_to_field_kwargs( constraints: ConstraintsBase | None, ) -> dict[str, Any]: - """Convert DataModel constraints to Pydantic Field kwargs. - - Handles all constraint types from ConstraintsBase. - - Note: - - In JSON Schema, minItems/maxItems are for arrays while minLength/maxLength - are for strings. These constraints are mutually exclusive by schema type. - - Pydantic uses min_length/max_length for both strings and sequences. - - Constraints like unique_items are stored in json_schema_extra for - documentation only and are not enforced by Pydantic v2. - """ + """Convert DataModel constraints to Pydantic Field kwargs.""" if constraints is None: return {} kwargs: dict[str, Any] = {} json_schema_extra: dict[str, Any] = {} - min_length = getattr(constraints, "min_length", None) - if min_length is not None: - kwargs["min_length"] = min_length - - max_length = getattr(constraints, "max_length", None) - if max_length is not None: - kwargs["max_length"] = max_length - - pattern_value = getattr(constraints, "regex", None) or getattr(constraints, "pattern", None) - if pattern_value is not None: - kwargs["pattern"] = pattern_value - - ge = getattr(constraints, "ge", None) - if ge is not None: - kwargs["ge"] = ge - - gt = getattr(constraints, "gt", None) - if gt is not None: - kwargs["gt"] = gt - - le = getattr(constraints, "le", None) - if le is not None: - kwargs["le"] = le - - lt = getattr(constraints, "lt", None) - if lt is not None: - kwargs["lt"] = lt + for field_name, value in model_dump(constraints).items(): + if value is None: + continue - multiple_of = getattr(constraints, "multiple_of", None) - if multiple_of is not None: - kwargs["multiple_of"] = multiple_of - - min_items = getattr(constraints, "min_items", None) - if min_items is not None: - kwargs["min_length"] = min_items - - max_items = getattr(constraints, "max_items", None) - if max_items is not None: - kwargs["max_length"] = max_items - - unique_items = getattr(constraints, "unique_items", None) - if unique_items is not None: - json_schema_extra["uniqueItems"] = unique_items - - min_properties = getattr(constraints, "min_properties", None) - if min_properties is not None: - json_schema_extra["minProperties"] = min_properties - - max_properties = getattr(constraints, "max_properties", None) - if max_properties is not None: - json_schema_extra["maxProperties"] = max_properties - - exclusive_minimum = getattr(constraints, "exclusive_minimum", None) - if exclusive_minimum is not None: - kwargs["gt"] = exclusive_minimum - - exclusive_maximum = getattr(constraints, "exclusive_maximum", None) - if exclusive_maximum is not None: - kwargs["lt"] = exclusive_maximum + if field_name in CONSTRAINT_FIELD_MAP: + kwargs[CONSTRAINT_FIELD_MAP[field_name]] = value + elif field_name in JSON_SCHEMA_EXTRA_FIELDS: + json_schema_extra[_to_camel_case(field_name)] = value if json_schema_extra: kwargs["json_schema_extra"] = json_schema_extra return kwargs + + +def _to_camel_case(snake_str: str) -> str: + """Convert snake_case to camelCase.""" + components = snake_str.split("_") + return components[0] + "".join(x.title() for x in components[1:]) diff --git a/src/datamodel_code_generator/dynamic/creator.py b/src/datamodel_code_generator/dynamic/creator.py index fd314100e..0ce4e2094 100644 --- a/src/datamodel_code_generator/dynamic/creator.py +++ b/src/datamodel_code_generator/dynamic/creator.py @@ -2,17 +2,10 @@ from __future__ import annotations -import importlib -import logging from enum import Enum from typing import TYPE_CHECKING, Any, cast -from pydantic import ConfigDict, Field, create_model - -try: - from pydantic.errors import PydanticUndefinedAnnotation -except ImportError: - PydanticUndefinedAnnotation = NameError # type: ignore[misc,assignment] +from pydantic import Field, create_model from datamodel_code_generator.dynamic.constraints import constraints_to_field_kwargs from datamodel_code_generator.dynamic.exceptions import ( @@ -26,8 +19,6 @@ from datamodel_code_generator.model.base import DataModel, DataModelFieldBase from datamodel_code_generator.parser.base import Parser -logger = logging.getLogger(__name__) - class DynamicModelCreator: """Creates actual Python classes from DataModel objects.""" @@ -84,7 +75,6 @@ def _create_pydantic_model(self, data_model: DataModel) -> type[Any]: field_definitions[field.name] = (field_type, field_info) base_classes = self._resolve_base_classes(data_model) - model_config = self._build_model_config(data_model) module_name = self._get_module_name(data_model) from pydantic import BaseModel as BaseModelCls # noqa: PLC0415 @@ -96,19 +86,12 @@ def _create_pydantic_model(self, data_model: DataModel) -> type[Any]: else: effective_base = base_classes[0] if base_classes else BaseModelCls - create_kwargs: dict[str, Any] = { - "__base__": effective_base, - "__module__": module_name, - } - - if model_config: - create_kwargs["__config__"] = model_config - model = cast( "type[Any]", create_model( data_model.class_name, - **create_kwargs, + __base__=effective_base, + __module__=module_name, **cast("dict[str, Any]", field_definitions), ), ) @@ -157,22 +140,9 @@ def _create_field_info( # noqa: PLR6301 return Field(**kwargs) def _resolve_base_classes(self, data_model: DataModel) -> tuple[type, ...]: - """Resolve base classes for the model, including custom base classes.""" + """Resolve base classes for the model.""" from pydantic import BaseModel # noqa: PLC0415 - custom_base = getattr(data_model, "custom_base_class", None) - if custom_base and isinstance(custom_base, str): - if "." in custom_base: - try: - module_path, class_name = custom_base.rsplit(".", 1) - module = importlib.import_module(module_path) - custom_base_cls = getattr(module, class_name) - return (custom_base_cls,) # noqa: TRY300 - except (ValueError, ImportError, AttributeError) as e: - logger.warning("Failed to import custom_base_class %s: %s", custom_base, e) - else: - logger.warning("Invalid custom_base_class format: %s. Expected 'module.ClassName'", custom_base) - if not data_model.base_classes: return (BaseModel,) @@ -180,42 +150,11 @@ def _resolve_base_classes(self, data_model: DataModel) -> tuple[type, ...]: for base_class in data_model.base_classes: if base_class.reference and base_class.reference.short_name in self._short_name_lookup: bases.append(self._short_name_lookup[base_class.reference.short_name]) - elif hasattr(base_class, "type") and base_class.type: - type_name = base_class.type - if type_name in self._short_name_lookup: - bases.append(self._short_name_lookup[type_name]) - else: - bases.append(BaseModel) else: bases.append(BaseModel) return tuple(bases) if bases else (BaseModel,) - def _build_model_config(self, data_model: DataModel) -> ConfigDict | None: # noqa: PLR6301 - """Build Pydantic ConfigDict from DataModel settings.""" - config_dict: dict[str, Any] = {} - - extra_data = getattr(data_model, "extra_template_data", {}) or {} - - if "config" in extra_data: - existing_config = extra_data["config"] - if isinstance(existing_config, dict): - config_dict.update(existing_config) - - if "model_config" in extra_data: - config_dict.update(extra_data["model_config"]) - - if getattr(data_model, "allow_extra", False): - config_dict["extra"] = "allow" - - if getattr(data_model, "use_enum_values", False): - config_dict["use_enum_values"] = True - - if getattr(data_model, "populate_by_name", False): - config_dict["populate_by_name"] = True - - return ConfigDict(**config_dict) if config_dict else None - def _get_module_name(self, data_model: DataModel) -> str: # noqa: PLR6301 """Determine module name for the dynamic model.""" if data_model.reference and data_model.reference.path: @@ -250,14 +189,6 @@ def _rebuild_models(self) -> None: namespace = {**self._short_name_lookup} - for model_key, model in self._models.items(): + for model in self._models.values(): if isinstance(model, type) and issubclass(model, BaseModel): - try: - model.model_rebuild(_types_namespace=namespace) - except PydanticUndefinedAnnotation: - pass - except (NameError, AttributeError) as e: - logger.debug("Model %s rebuild skipped: %s", model_key, e) - except Exception as e: - logger.warning("Unexpected error rebuilding model %s: %s: %s", model_key, type(e).__name__, e) - raise + model.model_rebuild(_types_namespace=namespace) diff --git a/src/datamodel_code_generator/dynamic/type_resolver.py b/src/datamodel_code_generator/dynamic/type_resolver.py index e50509dca..e056ee462 100644 --- a/src/datamodel_code_generator/dynamic/type_resolver.py +++ b/src/datamodel_code_generator/dynamic/type_resolver.py @@ -55,8 +55,6 @@ "confrozenset": frozenset, } -_DICT_MIN_TYPE_ARGS = 2 - class TypeResolver: """Resolves DataType objects to actual Python types.""" @@ -64,55 +62,25 @@ class TypeResolver: def __init__(self, models: dict[str, type[Any]]) -> None: """Initialize with a models lookup dictionary.""" self._models = models - self._forward_refs: set[str] = set() def resolve(self, data_type: DataType) -> Any: - """Resolve a DataType to a Python type. - - Uses the structured DataType object directly rather than parsing strings. - """ + """Resolve a DataType to a Python type.""" resolved_type, _ = self.resolve_with_constraints(data_type) return resolved_type - def resolve_with_constraints( # noqa: PLR0911 - self, data_type: DataType - ) -> tuple[Any, dict[str, Any]]: - """Resolve a DataType to a Python type and extract constraints. - - Returns: - A tuple of (resolved_type, constraints_kwargs). - """ + def resolve_with_constraints(self, data_type: DataType) -> tuple[Any, dict[str, Any]]: + """Resolve a DataType to a Python type and extract constraints.""" constraints: dict[str, Any] = {} if data_type.reference is not None: model_name = data_type.reference.short_name if model_name in self._models: return self._models[model_name], constraints - self._forward_refs.add(model_name) return ForwardRef(model_name), constraints if data_type.literals: return Literal[tuple(data_type.literals)], constraints # type: ignore[valid-type] - if data_type.is_list: - inner = self._resolve_inner_types(data_type.data_types) - return (list[inner] if inner else list), constraints # type: ignore[valid-type] - - if data_type.is_set: - inner = self._resolve_inner_types(data_type.data_types) - return (set[inner] if inner else set), constraints # type: ignore[valid-type] - - if data_type.is_dict: - if len(data_type.data_types) >= _DICT_MIN_TYPE_ARGS: - key_type = self.resolve(data_type.data_types[0]) - value_type = self.resolve(data_type.data_types[1]) - return dict[key_type, value_type], constraints # type: ignore[valid-type] - return dict, constraints - - if data_type.is_tuple: - inner_types = tuple(self.resolve(dt) for dt in data_type.data_types) - return (tuple.__class_getitem__(inner_types) if inner_types else tuple), constraints - if len(data_type.data_types) > 1: inner_types = tuple(self.resolve(dt) for dt in data_type.data_types) return Union[inner_types], constraints # type: ignore[valid-type] # noqa: UP007 @@ -135,9 +103,6 @@ def _resolve_type_string(self, data_type: DataType, constraints: dict[str, Any]) self._extract_constraints(data_type, constraints) return base_type, constraints - if (fmt := getattr(data_type, "format", None)) and fmt in FORMAT_TYPE_MAP: - return FORMAT_TYPE_MAP[fmt], constraints - if type_str in PRIMITIVE_TYPE_MAP: return PRIMITIVE_TYPE_MAP[type_str], constraints @@ -158,17 +123,3 @@ def _extract_constraints(self, data_type: DataType, constraints: dict[str, Any]) constraints["pattern"] = pattern_value else: constraints[key] = kwarg_value - - def _resolve_inner_types(self, data_types: list[DataType]) -> Any: - """Resolve inner types for container types.""" - if not data_types: - return None - if len(data_types) == 1: - return self.resolve(data_types[0]) - inner_types = tuple(self.resolve(dt) for dt in data_types) - return Union[inner_types] # type: ignore[valid-type] # noqa: UP007 - - @property - def forward_refs(self) -> set[str]: - """Return the set of forward references encountered during resolution.""" - return self._forward_refs diff --git a/tests/dynamic/test_creator.py b/tests/dynamic/test_creator.py index bfe50a9fd..4e919c871 100644 --- a/tests/dynamic/test_creator.py +++ b/tests/dynamic/test_creator.py @@ -430,3 +430,516 @@ def test_path_object_input(self, tmp_path: Path) -> None: assert "Model" in models instance = models["Model"](value=42) assert instance.value == 42 + + def test_field_with_alias(self) -> None: + schema = """ +openapi: "3.0.0" +info: + title: Test API + version: "1.0" +paths: {} +components: + schemas: + User: + type: object + properties: + user_name: + type: string + x-alias: + user_name: userName +""" + models = generate_dynamic_models(schema, input_file_type=InputFileType.OpenAPI) + assert "User" in models + + def test_field_with_description(self) -> None: + schema = { + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "The user's name", + } + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + assert "Model" in models + instance = models["Model"](name="Test") + assert instance.name == "Test" + + def test_standalone_enum_type(self) -> None: + schema = { + "$defs": { + "Status": { + "type": "string", + "enum": ["active", "inactive", "pending"], + } + }, + "type": "object", + "properties": {"status": {"$ref": "#/$defs/Status"}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + assert "Status" in models + assert "Model" in models + + def test_any_of_union(self) -> None: + schema = { + "type": "object", + "properties": {"value": {"anyOf": [{"type": "string"}, {"type": "integer"}]}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + instance1 = models["Model"](value="test") + assert instance1.value == "test" + + instance2 = models["Model"](value=42) + assert instance2.value == 42 + + def test_one_of_union(self) -> None: + schema = { + "type": "object", + "properties": {"data": {"oneOf": [{"type": "boolean"}, {"type": "number"}]}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + instance = models["Model"](data=True) + assert instance.data is True + + def test_time_format(self) -> None: + schema = { + "type": "object", + "properties": {"time_value": {"type": "string", "format": "time"}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + from datetime import time + + instance = models["Model"](time_value=time(12, 30, 0)) + assert instance.time_value == time(12, 30, 0) + + def test_decimal_format(self) -> None: + schema = { + "type": "object", + "properties": {"amount": {"type": "string", "format": "decimal"}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + from decimal import Decimal + + instance = models["Model"](amount=Decimal("123.45")) + assert instance.amount == Decimal("123.45") + + def test_bytes_type(self) -> None: + schema = { + "type": "object", + "properties": {"data": {"type": "string", "format": "binary"}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + instance = models["Model"](data=b"hello") + assert instance.data == b"hello" + + def test_array_without_items(self) -> None: + schema = { + "type": "object", + "properties": {"items": {"type": "array"}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + instance = models["Model"](items=[1, "two", 3.0]) + assert instance.items == [1, "two", 3.0] + + def test_object_without_properties(self) -> None: + schema = { + "type": "object", + "properties": {"data": {"type": "object"}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + instance = models["Model"](data={"any": "value"}) + assert instance.data == {"any": "value"} + + def test_nested_allof_multiple_refs(self) -> None: + schema = { + "$defs": { + "Named": { + "type": "object", + "properties": {"name": {"type": "string"}}, + }, + "Aged": { + "type": "object", + "properties": {"age": {"type": "integer"}}, + }, + "Person": { + "allOf": [ + {"$ref": "#/$defs/Named"}, + {"$ref": "#/$defs/Aged"}, + ] + }, + }, + "$ref": "#/$defs/Person", + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + assert "Person" in models + person = models["Person"](name="John", age=30) + assert person.name == "John" + assert person.age == 30 + + def test_deep_nested_refs(self) -> None: + schema = { + "$defs": { + "Inner": { + "type": "object", + "properties": {"value": {"type": "string"}}, + }, + "Middle": { + "type": "object", + "properties": {"inner": {"$ref": "#/$defs/Inner"}}, + }, + "Outer": { + "type": "object", + "properties": {"middle": {"$ref": "#/$defs/Middle"}}, + }, + }, + "$ref": "#/$defs/Outer", + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + inner = models["Inner"](value="test") + middle = models["Middle"](inner=inner) + outer = models["Outer"](middle=middle) + assert outer.middle.inner.value == "test" + + def test_additional_properties_typed(self) -> None: + schema = { + "type": "object", + "properties": { + "meta": { + "type": "object", + "additionalProperties": {"type": "integer"}, + } + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + instance = models["Model"](meta={"count": 10, "total": 100}) + assert instance.meta["count"] == 10 + + def test_required_field(self) -> None: + schema = { + "type": "object", + "properties": { + "name": {"type": "string"}, + "email": {"type": "string"}, + }, + "required": ["name", "email"], + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + with pytest.raises(ValidationError): + models["Model"](name="John") + + instance = models["Model"](name="John", email="john@test.com") + assert instance.name == "John" + + def test_ge_constraint(self) -> None: + schema = { + "type": "object", + "properties": { + "value": {"type": "integer", "minimum": 10}, + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + models["Model"](value=10) + models["Model"](value=15) + + with pytest.raises(ValidationError): + models["Model"](value=9) + + def test_le_constraint(self) -> None: + schema = { + "type": "object", + "properties": { + "value": {"type": "integer", "maximum": 100}, + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + models["Model"](value=100) + models["Model"](value=50) + + with pytest.raises(ValidationError): + models["Model"](value=101) + + def test_forward_ref_resolution(self) -> None: + schema = { + "type": "object", + "properties": {"item": {"$ref": "#/$defs/Item"}}, + "$defs": { + "Item": { + "type": "object", + "properties": {"name": {"type": "string"}}, + } + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + item = models["Item"](name="Widget") + model = models["Model"](item=item) + assert model.item.name == "Widget" + + def test_auto_input_type_detection(self) -> None: + schema = {"type": "object", "properties": {"name": {"type": "string"}}} + + models = generate_dynamic_models(schema) + + assert "Model" in models + instance = models["Model"](name="Test") + assert instance.name == "Test" + + def test_graphql_not_supported(self) -> None: + schema = """ + type Query { + hello: String + } + """ + + from datamodel_code_generator import Error + + with pytest.raises(Error, match="GraphQL is not yet supported"): + generate_dynamic_models(schema, input_file_type=InputFileType.GraphQL) + + def test_tuple_type(self) -> None: + schema = { + "type": "object", + "properties": { + "coords": { + "type": "array", + "prefixItems": [{"type": "number"}, {"type": "number"}], + "items": False, + } + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + assert "Model" in models + + def test_typed_dict_additional_properties(self) -> None: + schema = { + "type": "object", + "properties": { + "data": { + "type": "object", + "additionalProperties": {"type": "integer"}, + } + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + instance = models["Model"](data={"a": 1, "b": 2}) + assert instance.data["a"] == 1 + + def test_json_schema_unique_items(self) -> None: + schema = { + "type": "object", + "properties": { + "items": { + "type": "array", + "items": {"type": "string"}, + "uniqueItems": True, + "minItems": 1, + "maxItems": 10, + } + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + assert "Model" in models + + def test_min_max_properties(self) -> None: + schema = { + "type": "object", + "properties": { + "data": { + "type": "object", + "minProperties": 1, + "maxProperties": 5, + } + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + + assert "Model" in models + + +@pytest.mark.skipif(not PYDANTIC_V2, reason="Dynamic models require Pydantic v2") +class TestCreatorEdgeCases: + def test_empty_parser_results(self) -> None: + schema = {} + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + assert models == {} or isinstance(models, dict) + + def test_field_with_pattern_constraint(self) -> None: + schema = { + "type": "object", + "properties": { + "code": {"type": "string", "pattern": "^[A-Z]{3}$"}, + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + instance = models["Model"](code="ABC") + assert instance.code == "ABC" + + with pytest.raises(ValidationError): + models["Model"](code="abc") + + +@pytest.mark.skipif(not PYDANTIC_V2, reason="Dynamic models require Pydantic v2") +class TestTypeResolver: + def test_set_with_typed_items(self) -> None: + schema = { + "type": "object", + "properties": { + "tags": { + "type": "array", + "items": {"type": "string"}, + "uniqueItems": True, + } + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + instance = models["Model"](tags=["a", "b"]) + assert "a" in instance.tags + + def test_dict_with_key_value_types(self) -> None: + schema = { + "type": "object", + "properties": { + "mapping": { + "type": "object", + "additionalProperties": {"type": "string"}, + } + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + instance = models["Model"](mapping={"key": "value"}) + assert instance.mapping["key"] == "value" + + def test_format_date_type(self) -> None: + from datetime import date + + schema = { + "type": "object", + "properties": {"birthday": {"type": "string", "format": "date"}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + instance = models["Model"](birthday=date(1990, 1, 1)) + assert instance.birthday == date(1990, 1, 1) + + def test_forward_ref_already_resolved(self) -> None: + schema = { + "$defs": { + "Address": { + "type": "object", + "properties": {"city": {"type": "string"}}, + }, + "Person": { + "type": "object", + "properties": { + "home": {"$ref": "#/$defs/Address"}, + "work": {"$ref": "#/$defs/Address"}, + }, + }, + }, + "$ref": "#/$defs/Person", + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + addr = models["Address"](city="NYC") + person = models["Person"](home=addr, work=addr) + assert person.home.city == "NYC" + + def test_union_with_multiple_types(self) -> None: + schema = { + "type": "object", + "properties": {"value": {"anyOf": [{"type": "string"}, {"type": "integer"}, {"type": "boolean"}]}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + assert models["Model"](value="test").value == "test" + assert models["Model"](value=42).value == 42 + assert models["Model"](value=True).value is True + + +@pytest.mark.skipif(not PYDANTIC_V2, reason="Dynamic models require Pydantic v2") +class TestExceptions: + def test_type_resolution_error(self) -> None: + from datamodel_code_generator.dynamic.exceptions import TypeResolutionError + from datamodel_code_generator.types import DataType + + dt = DataType(type="unknown_type") + error = TypeResolutionError(dt, "TestModel", "test_field") + assert "test_field" in str(error) + assert "TestModel" in str(error) + assert error.type_info == dt + assert error.model_name == "TestModel" + assert error.field_name == "test_field" + + def test_circular_reference_error(self) -> None: + from datamodel_code_generator.dynamic.exceptions import CircularReferenceError + + error = CircularReferenceError(["A", "B", "C", "A"]) + assert "A -> B -> C -> A" in str(error) + assert error.cycle_path == ["A", "B", "C", "A"] + + def test_constraint_conversion_error(self) -> None: + from datamodel_code_generator.dynamic.exceptions import ConstraintConversionError + + error = ConstraintConversionError("min_value", -1, "must be positive") + assert "min_value" in str(error) + assert "-1" in str(error) + assert "must be positive" in str(error) + assert error.constraint_name == "min_value" + assert error.constraint_value == -1 + assert error.reason == "must be positive" + + def test_unsupported_model_type_error(self) -> None: + from datamodel_code_generator.dynamic.exceptions import UnsupportedModelTypeError + + error = UnsupportedModelTypeError("CustomType") + assert "CustomType" in str(error) + assert error.model_type == "CustomType" + + def test_dynamic_model_error_base(self) -> None: + from datamodel_code_generator.dynamic.exceptions import DynamicModelError + + error = DynamicModelError("base error message") + assert str(error) == "base error message" From e739d425bc9cc42890bd212f76bb9867fee76643 Mon Sep 17 00:00:00 2001 From: Koudai Aono Date: Fri, 2 Jan 2026 14:18:45 +0000 Subject: [PATCH 3/9] Add more tests for dynamic module coverage --- tests/dynamic/test_creator.py | 246 ++++++++++++++++++++++++++++++++++ 1 file changed, 246 insertions(+) diff --git a/tests/dynamic/test_creator.py b/tests/dynamic/test_creator.py index 4e919c871..0f929ac66 100644 --- a/tests/dynamic/test_creator.py +++ b/tests/dynamic/test_creator.py @@ -943,3 +943,249 @@ def test_dynamic_model_error_base(self) -> None: error = DynamicModelError("base error message") assert str(error) == "base error message" + + +@pytest.mark.skipif(not PYDANTIC_V2, reason="Dynamic models require Pydantic v2") +class TestConstraints: + def test_constraints_none_input(self) -> None: + from datamodel_code_generator.dynamic.constraints import constraints_to_field_kwargs + + result = constraints_to_field_kwargs(None) + assert result == {} + + def test_constraints_with_all_none_values(self) -> None: + from datamodel_code_generator.dynamic.constraints import constraints_to_field_kwargs + from datamodel_code_generator.model.base import ConstraintsBase + + constraints = ConstraintsBase() + result = constraints_to_field_kwargs(constraints) + assert result == {} + + def test_constraints_with_ge_gt_le_lt(self) -> None: + from datamodel_code_generator.dynamic.constraints import constraints_to_field_kwargs + from datamodel_code_generator.model.pydantic.base_model import Constraints + + constraints = Constraints(minimum=1, maximum=10, exclusiveMinimum=0, exclusiveMaximum=11) + result = constraints_to_field_kwargs(constraints) + assert result["ge"] == 1 + assert result["le"] == 10 + assert result["gt"] == 0 + assert result["lt"] == 11 + + def test_constraints_with_multiple_of(self) -> None: + from datamodel_code_generator.dynamic.constraints import constraints_to_field_kwargs + from datamodel_code_generator.model.pydantic.base_model import Constraints + + constraints = Constraints(multipleOf=5) + result = constraints_to_field_kwargs(constraints) + assert result["multiple_of"] == 5 + + def test_constraints_with_regex_pattern(self) -> None: + from datamodel_code_generator.dynamic.constraints import constraints_to_field_kwargs + from datamodel_code_generator.model.pydantic.base_model import Constraints + + constraints = Constraints(pattern="^[A-Z]+$") + result = constraints_to_field_kwargs(constraints) + assert result["pattern"] == "^[A-Z]+$" + + def test_constraints_with_min_max_items(self) -> None: + from datamodel_code_generator.dynamic.constraints import constraints_to_field_kwargs + from datamodel_code_generator.model.pydantic.base_model import Constraints + + constraints = Constraints(minItems=1, maxItems=10) + result = constraints_to_field_kwargs(constraints) + assert result["min_length"] == 1 + assert result["max_length"] == 10 + + +@pytest.mark.skipif(not PYDANTIC_V2, reason="Dynamic models require Pydantic v2") +class TestTypeResolverInternal: + def test_resolve_constrained_type_with_kwargs(self) -> None: + from datamodel_code_generator.dynamic.type_resolver import TypeResolver + from datamodel_code_generator.types import DataType + + resolver = TypeResolver({}) + data_type = DataType(type="constr", kwargs={"min_length": 1, "max_length": 10}) + result_type, constraints = resolver.resolve_with_constraints(data_type) + assert result_type is str + assert constraints.get("min_length") == 1 + assert constraints.get("max_length") == 10 + + def test_resolve_constrained_type_with_regex(self) -> None: + from datamodel_code_generator.dynamic.type_resolver import TypeResolver + from datamodel_code_generator.types import DataType + + resolver = TypeResolver({}) + data_type = DataType(type="constr", kwargs={"regex": "r'^[A-Z]+$'"}) + result_type, constraints = resolver.resolve_with_constraints(data_type) + assert result_type is str + assert constraints.get("pattern") == "^[A-Z]+$" + + def test_resolve_constrained_int(self) -> None: + from datamodel_code_generator.dynamic.type_resolver import TypeResolver + from datamodel_code_generator.types import DataType + + resolver = TypeResolver({}) + data_type = DataType(type="conint", kwargs={"ge": 0, "le": 100}) + result_type, constraints = resolver.resolve_with_constraints(data_type) + assert result_type is int + assert constraints.get("ge") == 0 + assert constraints.get("le") == 100 + + def test_resolve_model_in_lookup(self) -> None: + from datamodel_code_generator.dynamic.type_resolver import TypeResolver + from datamodel_code_generator.types import DataType + + class MockModel: + pass + + resolver = TypeResolver({"MockModel": MockModel}) + data_type = DataType(type="MockModel") + result_type, constraints = resolver.resolve_with_constraints(data_type) + assert result_type == MockModel + assert constraints == {} + + def test_resolve_forward_ref_not_in_lookup(self) -> None: + from typing import ForwardRef + + from datamodel_code_generator.dynamic.type_resolver import TypeResolver + from datamodel_code_generator.reference import Reference + from datamodel_code_generator.types import DataType + + resolver = TypeResolver({}) + ref = Reference(path="#/$defs/UnknownModel", original_name="UnknownModel", name="UnknownModel") + data_type = DataType(reference=ref) + result_type, _constraints = resolver.resolve_with_constraints(data_type) + assert isinstance(result_type, ForwardRef) + assert result_type.__forward_arg__ == "UnknownModel" + + def test_resolve_empty_type(self) -> None: + from typing import Any + + from datamodel_code_generator.dynamic.type_resolver import TypeResolver + from datamodel_code_generator.types import DataType + + resolver = TypeResolver({}) + data_type = DataType(type=None) + result_type, _constraints = resolver.resolve_with_constraints(data_type) + assert result_type == Any + + def test_resolve_unknown_type_string(self) -> None: + from typing import Any + + from datamodel_code_generator.dynamic.type_resolver import TypeResolver + from datamodel_code_generator.types import DataType + + resolver = TypeResolver({}) + data_type = DataType(type="totally_unknown_type_xyz") + result_type, _constraints = resolver.resolve_with_constraints(data_type) + assert result_type == Any + + def test_resolve_constrained_without_kwargs(self) -> None: + from datamodel_code_generator.dynamic.type_resolver import TypeResolver + from datamodel_code_generator.types import DataType + + resolver = TypeResolver({}) + data_type = DataType(type="constr", kwargs=None) + result_type, constraints = resolver.resolve_with_constraints(data_type) + assert result_type is str + assert constraints == {} + + +@pytest.mark.skipif(not PYDANTIC_V2, reason="Dynamic models require Pydantic v2") +class TestCreatorInternal: + def test_enum_with_path_reference(self) -> None: + schema = { + "$defs": { + "types": { + "Status": { + "type": "string", + "enum": ["active", "inactive"], + } + } + }, + "type": "object", + "properties": {"status": {"$ref": "#/$defs/types/Status"}}, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + assert "Status" in models + + def test_model_with_default_factory_like_list(self) -> None: + schema = { + "type": "object", + "properties": { + "items": { + "type": "array", + "items": {"type": "string"}, + "default": [], + } + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + instance = models["Model"]() + assert instance.items == [] + + def test_base_class_without_reference(self) -> None: + schema = { + "$defs": { + "Base": { + "type": "object", + "properties": {"id": {"type": "integer"}}, + }, + "Child": { + "allOf": [ + {"$ref": "#/$defs/Base"}, + {"type": "object", "properties": {"name": {"type": "string"}}}, + ] + }, + }, + "$ref": "#/$defs/Child", + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + assert "Child" in models + child = models["Child"](id=1, name="Test") + assert child.id == 1 + assert child.name == "Test" + + def test_field_without_name_skipped(self) -> None: + schema = { + "type": "object", + "properties": { + "valid_field": {"type": "string"}, + }, + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + assert "Model" in models + + def test_multiple_inheritance_bases(self) -> None: + schema = { + "$defs": { + "Nameable": { + "type": "object", + "properties": {"name": {"type": "string"}}, + }, + "Identifiable": { + "type": "object", + "properties": {"id": {"type": "integer"}}, + }, + "Entity": { + "allOf": [ + {"$ref": "#/$defs/Nameable"}, + {"$ref": "#/$defs/Identifiable"}, + {"type": "object", "properties": {"active": {"type": "boolean"}}}, + ] + }, + }, + "$ref": "#/$defs/Entity", + } + + models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) + assert "Entity" in models + entity = models["Entity"](name="Test", id=1, active=True) + assert entity.name == "Test" + assert entity.id == 1 + assert entity.active is True From 242d46e70cf1d86480ab51aefcf10514680dc14d Mon Sep 17 00:00:00 2001 From: Koudai Aono Date: Fri, 2 Jan 2026 14:31:18 +0000 Subject: [PATCH 4/9] Remove unused code and fix function-level imports --- .../dynamic/__init__.py | 6 - .../dynamic/creator.py | 10 +- .../dynamic/exceptions.py | 31 +- .../dynamic/type_resolver.py | 22 -- tests/dynamic/test_creator.py | 313 ++---------------- 5 files changed, 27 insertions(+), 355 deletions(-) diff --git a/src/datamodel_code_generator/dynamic/__init__.py b/src/datamodel_code_generator/dynamic/__init__.py index 1ee75360e..7565eca6a 100644 --- a/src/datamodel_code_generator/dynamic/__init__.py +++ b/src/datamodel_code_generator/dynamic/__init__.py @@ -8,18 +8,12 @@ from datamodel_code_generator.dynamic.creator import DynamicModelCreator from datamodel_code_generator.dynamic.exceptions import ( - CircularReferenceError, - ConstraintConversionError, DynamicModelError, TypeResolutionError, - UnsupportedModelTypeError, ) __all__ = [ - "CircularReferenceError", - "ConstraintConversionError", "DynamicModelCreator", "DynamicModelError", "TypeResolutionError", - "UnsupportedModelTypeError", ] diff --git a/src/datamodel_code_generator/dynamic/creator.py b/src/datamodel_code_generator/dynamic/creator.py index 0ce4e2094..9a789ade3 100644 --- a/src/datamodel_code_generator/dynamic/creator.py +++ b/src/datamodel_code_generator/dynamic/creator.py @@ -5,7 +5,7 @@ from enum import Enum from typing import TYPE_CHECKING, Any, cast -from pydantic import Field, create_model +from pydantic import BaseModel, Field, create_model from datamodel_code_generator.dynamic.constraints import constraints_to_field_kwargs from datamodel_code_generator.dynamic.exceptions import ( @@ -77,14 +77,12 @@ def _create_pydantic_model(self, data_model: DataModel) -> type[Any]: base_classes = self._resolve_base_classes(data_model) module_name = self._get_module_name(data_model) - from pydantic import BaseModel as BaseModelCls # noqa: PLC0415 - if len(base_classes) > 1: combined_base_name = f"_{data_model.class_name}Base" combined_base = type(combined_base_name, base_classes, {}) effective_base: type[Any] = combined_base else: - effective_base = base_classes[0] if base_classes else BaseModelCls + effective_base = base_classes[0] if base_classes else BaseModel model = cast( "type[Any]", @@ -141,8 +139,6 @@ def _create_field_info( # noqa: PLR6301 def _resolve_base_classes(self, data_model: DataModel) -> tuple[type, ...]: """Resolve base classes for the model.""" - from pydantic import BaseModel # noqa: PLC0415 - if not data_model.base_classes: return (BaseModel,) @@ -185,8 +181,6 @@ def _create_enum_model(self, data_model: DataModel) -> type[Any]: def _rebuild_models(self) -> None: """Resolve forward references by calling model_rebuild() on all models.""" - from pydantic import BaseModel # noqa: PLC0415 - namespace = {**self._short_name_lookup} for model in self._models.values(): diff --git a/src/datamodel_code_generator/dynamic/exceptions.py b/src/datamodel_code_generator/dynamic/exceptions.py index 62cbecdbe..5777ee141 100644 --- a/src/datamodel_code_generator/dynamic/exceptions.py +++ b/src/datamodel_code_generator/dynamic/exceptions.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING if TYPE_CHECKING: from datamodel_code_generator.types import DataType @@ -21,32 +21,3 @@ def __init__(self, type_info: DataType, model_name: str, field_name: str) -> Non self.model_name = model_name self.field_name = field_name super().__init__(f"Cannot resolve type for field '{field_name}' in model '{model_name}': {type_info}") - - -class CircularReferenceError(DynamicModelError): - """Circular reference that cannot be resolved.""" - - def __init__(self, cycle_path: list[str]) -> None: - """Initialize with the cycle path.""" - self.cycle_path = cycle_path - super().__init__(f"Unresolvable circular reference: {' -> '.join(cycle_path)}") - - -class ConstraintConversionError(DynamicModelError): - """Failed to convert schema constraints to Pydantic Field.""" - - def __init__(self, constraint_name: str, constraint_value: Any, reason: str) -> None: - """Initialize with constraint details and reason.""" - self.constraint_name = constraint_name - self.constraint_value = constraint_value - self.reason = reason - super().__init__(f"Cannot convert constraint '{constraint_name}={constraint_value}': {reason}") - - -class UnsupportedModelTypeError(DynamicModelError): - """Model type not supported for dynamic generation.""" - - def __init__(self, model_type: str) -> None: - """Initialize with the unsupported model type.""" - self.model_type = model_type - super().__init__(f"Dynamic generation not supported for model type: {model_type}") diff --git a/src/datamodel_code_generator/dynamic/type_resolver.py b/src/datamodel_code_generator/dynamic/type_resolver.py index e056ee462..9d7d064e4 100644 --- a/src/datamodel_code_generator/dynamic/type_resolver.py +++ b/src/datamodel_code_generator/dynamic/type_resolver.py @@ -2,34 +2,12 @@ from __future__ import annotations -import datetime import decimal -import uuid from typing import TYPE_CHECKING, Any, ForwardRef, Literal, Union if TYPE_CHECKING: from datamodel_code_generator.types import DataType -FORMAT_TYPE_MAP: dict[str, type[Any]] = { - "date": datetime.date, - "date-time": datetime.datetime, - "time": datetime.time, - "uuid": uuid.UUID, - "uuid1": uuid.UUID, - "uuid2": uuid.UUID, - "uuid3": uuid.UUID, - "uuid4": uuid.UUID, - "uuid5": uuid.UUID, - "decimal": decimal.Decimal, - "email": str, - "uri": str, - "hostname": str, - "ipv4": str, - "ipv6": str, - "binary": bytes, - "byte": bytes, -} - PRIMITIVE_TYPE_MAP: dict[str, type[Any]] = { "str": str, "string": str, diff --git a/tests/dynamic/test_creator.py b/tests/dynamic/test_creator.py index 0f929ac66..09e0c8d18 100644 --- a/tests/dynamic/test_creator.py +++ b/tests/dynamic/test_creator.py @@ -900,292 +900,27 @@ def test_union_with_multiple_types(self) -> None: @pytest.mark.skipif(not PYDANTIC_V2, reason="Dynamic models require Pydantic v2") -class TestExceptions: - def test_type_resolution_error(self) -> None: - from datamodel_code_generator.dynamic.exceptions import TypeResolutionError - from datamodel_code_generator.types import DataType - - dt = DataType(type="unknown_type") - error = TypeResolutionError(dt, "TestModel", "test_field") - assert "test_field" in str(error) - assert "TestModel" in str(error) - assert error.type_info == dt - assert error.model_name == "TestModel" - assert error.field_name == "test_field" - - def test_circular_reference_error(self) -> None: - from datamodel_code_generator.dynamic.exceptions import CircularReferenceError - - error = CircularReferenceError(["A", "B", "C", "A"]) - assert "A -> B -> C -> A" in str(error) - assert error.cycle_path == ["A", "B", "C", "A"] - - def test_constraint_conversion_error(self) -> None: - from datamodel_code_generator.dynamic.exceptions import ConstraintConversionError - - error = ConstraintConversionError("min_value", -1, "must be positive") - assert "min_value" in str(error) - assert "-1" in str(error) - assert "must be positive" in str(error) - assert error.constraint_name == "min_value" - assert error.constraint_value == -1 - assert error.reason == "must be positive" - - def test_unsupported_model_type_error(self) -> None: - from datamodel_code_generator.dynamic.exceptions import UnsupportedModelTypeError - - error = UnsupportedModelTypeError("CustomType") - assert "CustomType" in str(error) - assert error.model_type == "CustomType" - - def test_dynamic_model_error_base(self) -> None: - from datamodel_code_generator.dynamic.exceptions import DynamicModelError - - error = DynamicModelError("base error message") - assert str(error) == "base error message" - - -@pytest.mark.skipif(not PYDANTIC_V2, reason="Dynamic models require Pydantic v2") -class TestConstraints: - def test_constraints_none_input(self) -> None: - from datamodel_code_generator.dynamic.constraints import constraints_to_field_kwargs - - result = constraints_to_field_kwargs(None) - assert result == {} - - def test_constraints_with_all_none_values(self) -> None: - from datamodel_code_generator.dynamic.constraints import constraints_to_field_kwargs - from datamodel_code_generator.model.base import ConstraintsBase - - constraints = ConstraintsBase() - result = constraints_to_field_kwargs(constraints) - assert result == {} - - def test_constraints_with_ge_gt_le_lt(self) -> None: - from datamodel_code_generator.dynamic.constraints import constraints_to_field_kwargs - from datamodel_code_generator.model.pydantic.base_model import Constraints - - constraints = Constraints(minimum=1, maximum=10, exclusiveMinimum=0, exclusiveMaximum=11) - result = constraints_to_field_kwargs(constraints) - assert result["ge"] == 1 - assert result["le"] == 10 - assert result["gt"] == 0 - assert result["lt"] == 11 - - def test_constraints_with_multiple_of(self) -> None: - from datamodel_code_generator.dynamic.constraints import constraints_to_field_kwargs - from datamodel_code_generator.model.pydantic.base_model import Constraints - - constraints = Constraints(multipleOf=5) - result = constraints_to_field_kwargs(constraints) - assert result["multiple_of"] == 5 - - def test_constraints_with_regex_pattern(self) -> None: - from datamodel_code_generator.dynamic.constraints import constraints_to_field_kwargs - from datamodel_code_generator.model.pydantic.base_model import Constraints - - constraints = Constraints(pattern="^[A-Z]+$") - result = constraints_to_field_kwargs(constraints) - assert result["pattern"] == "^[A-Z]+$" - - def test_constraints_with_min_max_items(self) -> None: - from datamodel_code_generator.dynamic.constraints import constraints_to_field_kwargs - from datamodel_code_generator.model.pydantic.base_model import Constraints - - constraints = Constraints(minItems=1, maxItems=10) - result = constraints_to_field_kwargs(constraints) - assert result["min_length"] == 1 - assert result["max_length"] == 10 - - -@pytest.mark.skipif(not PYDANTIC_V2, reason="Dynamic models require Pydantic v2") -class TestTypeResolverInternal: - def test_resolve_constrained_type_with_kwargs(self) -> None: - from datamodel_code_generator.dynamic.type_resolver import TypeResolver - from datamodel_code_generator.types import DataType - - resolver = TypeResolver({}) - data_type = DataType(type="constr", kwargs={"min_length": 1, "max_length": 10}) - result_type, constraints = resolver.resolve_with_constraints(data_type) - assert result_type is str - assert constraints.get("min_length") == 1 - assert constraints.get("max_length") == 10 - - def test_resolve_constrained_type_with_regex(self) -> None: - from datamodel_code_generator.dynamic.type_resolver import TypeResolver - from datamodel_code_generator.types import DataType - - resolver = TypeResolver({}) - data_type = DataType(type="constr", kwargs={"regex": "r'^[A-Z]+$'"}) - result_type, constraints = resolver.resolve_with_constraints(data_type) - assert result_type is str - assert constraints.get("pattern") == "^[A-Z]+$" - - def test_resolve_constrained_int(self) -> None: - from datamodel_code_generator.dynamic.type_resolver import TypeResolver - from datamodel_code_generator.types import DataType - - resolver = TypeResolver({}) - data_type = DataType(type="conint", kwargs={"ge": 0, "le": 100}) - result_type, constraints = resolver.resolve_with_constraints(data_type) - assert result_type is int - assert constraints.get("ge") == 0 - assert constraints.get("le") == 100 - - def test_resolve_model_in_lookup(self) -> None: - from datamodel_code_generator.dynamic.type_resolver import TypeResolver - from datamodel_code_generator.types import DataType - - class MockModel: - pass - - resolver = TypeResolver({"MockModel": MockModel}) - data_type = DataType(type="MockModel") - result_type, constraints = resolver.resolve_with_constraints(data_type) - assert result_type == MockModel - assert constraints == {} - - def test_resolve_forward_ref_not_in_lookup(self) -> None: - from typing import ForwardRef - - from datamodel_code_generator.dynamic.type_resolver import TypeResolver - from datamodel_code_generator.reference import Reference - from datamodel_code_generator.types import DataType - - resolver = TypeResolver({}) - ref = Reference(path="#/$defs/UnknownModel", original_name="UnknownModel", name="UnknownModel") - data_type = DataType(reference=ref) - result_type, _constraints = resolver.resolve_with_constraints(data_type) - assert isinstance(result_type, ForwardRef) - assert result_type.__forward_arg__ == "UnknownModel" - - def test_resolve_empty_type(self) -> None: - from typing import Any - - from datamodel_code_generator.dynamic.type_resolver import TypeResolver - from datamodel_code_generator.types import DataType - - resolver = TypeResolver({}) - data_type = DataType(type=None) - result_type, _constraints = resolver.resolve_with_constraints(data_type) - assert result_type == Any - - def test_resolve_unknown_type_string(self) -> None: - from typing import Any - - from datamodel_code_generator.dynamic.type_resolver import TypeResolver - from datamodel_code_generator.types import DataType - - resolver = TypeResolver({}) - data_type = DataType(type="totally_unknown_type_xyz") - result_type, _constraints = resolver.resolve_with_constraints(data_type) - assert result_type == Any - - def test_resolve_constrained_without_kwargs(self) -> None: - from datamodel_code_generator.dynamic.type_resolver import TypeResolver - from datamodel_code_generator.types import DataType - - resolver = TypeResolver({}) - data_type = DataType(type="constr", kwargs=None) - result_type, constraints = resolver.resolve_with_constraints(data_type) - assert result_type is str - assert constraints == {} - - -@pytest.mark.skipif(not PYDANTIC_V2, reason="Dynamic models require Pydantic v2") -class TestCreatorInternal: - def test_enum_with_path_reference(self) -> None: - schema = { - "$defs": { - "types": { - "Status": { - "type": "string", - "enum": ["active", "inactive"], - } - } - }, - "type": "object", - "properties": {"status": {"$ref": "#/$defs/types/Status"}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - assert "Status" in models - - def test_model_with_default_factory_like_list(self) -> None: - schema = { - "type": "object", - "properties": { - "items": { - "type": "array", - "items": {"type": "string"}, - "default": [], - } - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - instance = models["Model"]() - assert instance.items == [] - - def test_base_class_without_reference(self) -> None: - schema = { - "$defs": { - "Base": { - "type": "object", - "properties": {"id": {"type": "integer"}}, - }, - "Child": { - "allOf": [ - {"$ref": "#/$defs/Base"}, - {"type": "object", "properties": {"name": {"type": "string"}}}, - ] - }, - }, - "$ref": "#/$defs/Child", - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - assert "Child" in models - child = models["Child"](id=1, name="Test") - assert child.id == 1 - assert child.name == "Test" - - def test_field_without_name_skipped(self) -> None: - schema = { - "type": "object", - "properties": { - "valid_field": {"type": "string"}, - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - assert "Model" in models - - def test_multiple_inheritance_bases(self) -> None: - schema = { - "$defs": { - "Nameable": { - "type": "object", - "properties": {"name": {"type": "string"}}, - }, - "Identifiable": { - "type": "object", - "properties": {"id": {"type": "integer"}}, - }, - "Entity": { - "allOf": [ - {"$ref": "#/$defs/Nameable"}, - {"$ref": "#/$defs/Identifiable"}, - {"type": "object", "properties": {"active": {"type": "boolean"}}}, - ] - }, - }, - "$ref": "#/$defs/Entity", - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - assert "Entity" in models - entity = models["Entity"](name="Test", id=1, active=True) - assert entity.name == "Test" - assert entity.id == 1 - assert entity.active is True +def test_type_resolution_error() -> None: + """Test TypeResolutionError is raised when type resolution fails.""" + from unittest.mock import MagicMock, patch + + from datamodel_code_generator.dynamic.creator import DynamicModelCreator + from datamodel_code_generator.dynamic.exceptions import TypeResolutionError + + mock_parser = MagicMock() + creator = DynamicModelCreator(mock_parser) + + mock_field = MagicMock() + mock_field.name = "test_field" + mock_field.data_type = MagicMock() + + mock_data_model = MagicMock() + mock_data_model.class_name = "TestModel" + mock_data_model.fields = [mock_field] + mock_data_model.base_classes = [] + mock_data_model.reference = None + + with patch.object(creator._type_resolver, "resolve_with_constraints", side_effect=ValueError("test error")): + with pytest.raises(TypeResolutionError) as exc_info: + creator._create_pydantic_model(mock_data_model) + assert "test_field" in str(exc_info.value) From 8ff7e05d50dd15cf27e050ddd64adc5df35696ca Mon Sep 17 00:00:00 2001 From: Koudai Aono Date: Fri, 2 Jan 2026 14:34:49 +0000 Subject: [PATCH 5/9] Remove inline comments and move imports to top level --- .../dynamic/creator.py | 19 +++++++---------- .../dynamic/type_resolver.py | 21 +++++++++++-------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/datamodel_code_generator/dynamic/creator.py b/src/datamodel_code_generator/dynamic/creator.py index 9a789ade3..3a720104d 100644 --- a/src/datamodel_code_generator/dynamic/creator.py +++ b/src/datamodel_code_generator/dynamic/creator.py @@ -8,10 +8,10 @@ from pydantic import BaseModel, Field, create_model from datamodel_code_generator.dynamic.constraints import constraints_to_field_kwargs -from datamodel_code_generator.dynamic.exceptions import ( - TypeResolutionError, -) +from datamodel_code_generator.dynamic.exceptions import TypeResolutionError from datamodel_code_generator.dynamic.type_resolver import TypeResolver +from datamodel_code_generator.model.enum import Enum as EnumModel +from datamodel_code_generator.parser.base import sort_data_models if TYPE_CHECKING: from pydantic.fields import FieldInfo @@ -40,9 +40,6 @@ def create_models(self) -> dict[str, type]: TypeResolutionError: If a type cannot be resolved. DynamicModelError: If model generation fails. """ - from datamodel_code_generator.model.enum import Enum as EnumModel # noqa: PLC0415 - from datamodel_code_generator.parser.base import sort_data_models # noqa: PLC0415 - if not self.parser.results: return {} @@ -110,9 +107,8 @@ def _get_model_key(self, data_model: DataModel) -> str: return f"{module_name}.{data_model.class_name}" return data_model.class_name - def _create_field_info( # noqa: PLR6301 - self, field: DataModelFieldBase, type_constraints: dict[str, Any] | None = None - ) -> FieldInfo: + @staticmethod + def _create_field_info(field: DataModelFieldBase, type_constraints: dict[str, Any] | None = None) -> FieldInfo: """Convert DataModelFieldBase to Pydantic FieldInfo.""" kwargs = constraints_to_field_kwargs(field.constraints) @@ -151,7 +147,8 @@ def _resolve_base_classes(self, data_model: DataModel) -> tuple[type, ...]: return tuple(bases) if bases else (BaseModel,) - def _get_module_name(self, data_model: DataModel) -> str: # noqa: PLR6301 + @staticmethod + def _get_module_name(data_model: DataModel) -> str: """Determine module name for the dynamic model.""" if data_model.reference and data_model.reference.path: parts = data_model.reference.path.split("/") @@ -168,7 +165,7 @@ def _create_enum_model(self, data_model: DataModel) -> type[Any]: value = value.strip("'\"") members[field.name] = value - enum_class: type[Any] = Enum(data_model.class_name, members) # type: ignore[assignment] + enum_class: type[Any] = Enum(data_model.class_name, members) model_key = self._get_model_key(data_model) self._models[model_key] = enum_class diff --git a/src/datamodel_code_generator/dynamic/type_resolver.py b/src/datamodel_code_generator/dynamic/type_resolver.py index 9d7d064e4..5cd796ba8 100644 --- a/src/datamodel_code_generator/dynamic/type_resolver.py +++ b/src/datamodel_code_generator/dynamic/type_resolver.py @@ -3,7 +3,9 @@ from __future__ import annotations import decimal -from typing import TYPE_CHECKING, Any, ForwardRef, Literal, Union +import operator +from functools import reduce +from typing import TYPE_CHECKING, Any, ForwardRef, Literal if TYPE_CHECKING: from datamodel_code_generator.types import DataType @@ -19,7 +21,7 @@ "boolean": bool, "bytes": bytes, "None": type(None), - "Any": Any, # type: ignore[dict-item] + "Any": Any, } CONSTRAINED_TYPE_MAP: dict[str, type[Any]] = { @@ -57,22 +59,22 @@ def resolve_with_constraints(self, data_type: DataType) -> tuple[Any, dict[str, return ForwardRef(model_name), constraints if data_type.literals: - return Literal[tuple(data_type.literals)], constraints # type: ignore[valid-type] + return Literal[tuple(data_type.literals)], constraints if len(data_type.data_types) > 1: - inner_types = tuple(self.resolve(dt) for dt in data_type.data_types) - return Union[inner_types], constraints # type: ignore[valid-type] # noqa: UP007 + inner_types = [self.resolve(dt) for dt in data_type.data_types] + return reduce(operator.or_, inner_types), constraints if data_type.is_optional and data_type.data_types: inner = self.resolve(data_type.data_types[0]) - return inner | None, constraints # type: ignore[operator] + return inner | None, constraints return self._resolve_type_string(data_type, constraints) def _resolve_type_string(self, data_type: DataType, constraints: dict[str, Any]) -> tuple[Any, dict[str, Any]]: """Resolve type from type string.""" if not data_type.type: - return Any, constraints # type: ignore[return-value] + return Any, constraints type_str = data_type.type @@ -87,9 +89,10 @@ def _resolve_type_string(self, data_type: DataType, constraints: dict[str, Any]) if type_str in self._models: return self._models[type_str], constraints - return Any, constraints # type: ignore[return-value] + return Any, constraints - def _extract_constraints(self, data_type: DataType, constraints: dict[str, Any]) -> None: # noqa: PLR6301 + @staticmethod + def _extract_constraints(data_type: DataType, constraints: dict[str, Any]) -> None: """Extract constraints from DataType kwargs.""" if not data_type.kwargs: return From 9db77353ccd09d4684a8b79c3f3af011a81a3f62 Mon Sep 17 00:00:00 2001 From: Koudai Aono Date: Fri, 2 Jan 2026 14:42:31 +0000 Subject: [PATCH 6/9] Remove line comments and fix description access bug - Remove line comments from generate_dynamic_models function - Remove unused exclusive_minimum/exclusive_maximum from CONSTRAINT_FIELD_MAP - Fix description access: use field.extras.get() instead of getattr() --- src/datamodel_code_generator/__init__.py | 6 +----- src/datamodel_code_generator/dynamic/constraints.py | 2 -- src/datamodel_code_generator/dynamic/creator.py | 2 +- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/src/datamodel_code_generator/__init__.py b/src/datamodel_code_generator/__init__.py index 4ac6e5f32..876451ac0 100644 --- a/src/datamodel_code_generator/__init__.py +++ b/src/datamodel_code_generator/__init__.py @@ -938,20 +938,17 @@ def generate_dynamic_models( from datamodel_code_generator.parser.jsonschema import JsonSchemaParser # noqa: PLC0415 from datamodel_code_generator.parser.openapi import OpenAPIParser # noqa: PLC0415 - # Convert input to source and determine file path for base_path resolution source: str | Path | dict[str, Any] | ParseResult if isinstance(input_, Mapping): source = dict(input_) elif isinstance(input_, Path): - source = input_ # Keep Path object for base_path resolution + source = input_ elif isinstance(input_, str): - # Check if it's a file path - keep Path object for base_path resolution path = Path(input_) source = path if path.exists() and path.is_file() else input_ else: source = input_ - # Infer input type if not specified if input_file_type == InputFileType.Auto: if isinstance(source, Mapping): input_file_type = InputFileType.JsonSchema @@ -960,7 +957,6 @@ def generate_dynamic_models( elif isinstance(source, str): input_file_type = infer_input_type(source) else: - # ParseResult - default to JsonSchema input_file_type = InputFileType.JsonSchema if input_file_type == InputFileType.OpenAPI: diff --git a/src/datamodel_code_generator/dynamic/constraints.py b/src/datamodel_code_generator/dynamic/constraints.py index 570a57a06..a908594e7 100644 --- a/src/datamodel_code_generator/dynamic/constraints.py +++ b/src/datamodel_code_generator/dynamic/constraints.py @@ -21,8 +21,6 @@ "pattern": "pattern", "min_items": "min_length", "max_items": "max_length", - "exclusive_minimum": "gt", - "exclusive_maximum": "lt", } JSON_SCHEMA_EXTRA_FIELDS: frozenset[str] = frozenset({ diff --git a/src/datamodel_code_generator/dynamic/creator.py b/src/datamodel_code_generator/dynamic/creator.py index 3a720104d..3163573a6 100644 --- a/src/datamodel_code_generator/dynamic/creator.py +++ b/src/datamodel_code_generator/dynamic/creator.py @@ -127,7 +127,7 @@ def _create_field_info(field: DataModelFieldBase, type_constraints: dict[str, An if field.alias: kwargs["alias"] = field.alias - description = getattr(field, "description", None) + description = field.extras.get("description") if description: kwargs["description"] = description From 8ab5951b7df2fd98428e62c8c3dde30d909dac3a Mon Sep 17 00:00:00 2001 From: Koudai Aono Date: Fri, 2 Jan 2026 14:46:34 +0000 Subject: [PATCH 7/9] Refactor tests: use external files and inline-snapshot - Move all inline schemas to tests/data/dynamic/ - Rewrite tests as functions instead of classes - Use inline-snapshot for assertions - Consolidate similar tests into grouped test functions --- tests/data/dynamic/allof.json | 12 + tests/data/dynamic/array_constraints.json | 11 + tests/data/dynamic/circular.json | 12 + tests/data/dynamic/default.json | 6 + tests/data/dynamic/description.json | 9 + tests/data/dynamic/enum.json | 6 + tests/data/dynamic/exclusive_constraints.json | 10 + tests/data/dynamic/formats.json | 11 + tests/data/dynamic/min_max_properties.json | 10 + tests/data/dynamic/multiple_allof.json | 19 + tests/data/dynamic/nested.json | 12 + tests/data/dynamic/numeric_constraints.json | 11 + tests/data/dynamic/objects.json | 15 + tests/data/dynamic/openapi.yaml | 16 + tests/data/dynamic/optional.json | 8 + tests/data/dynamic/refs.json | 25 + tests/data/dynamic/required.json | 8 + tests/data/dynamic/simple.json | 8 + tests/data/dynamic/standalone_enum.json | 12 + tests/data/dynamic/string_constraints.json | 11 + tests/data/dynamic/tuple.json | 10 + tests/data/dynamic/types.json | 10 + tests/data/dynamic/unions.json | 14 + tests/data/dynamic/unique_items.json | 17 + tests/dynamic/test_creator.py | 1090 ++++------------- 25 files changed, 492 insertions(+), 881 deletions(-) create mode 100644 tests/data/dynamic/allof.json create mode 100644 tests/data/dynamic/array_constraints.json create mode 100644 tests/data/dynamic/circular.json create mode 100644 tests/data/dynamic/default.json create mode 100644 tests/data/dynamic/description.json create mode 100644 tests/data/dynamic/enum.json create mode 100644 tests/data/dynamic/exclusive_constraints.json create mode 100644 tests/data/dynamic/formats.json create mode 100644 tests/data/dynamic/min_max_properties.json create mode 100644 tests/data/dynamic/multiple_allof.json create mode 100644 tests/data/dynamic/nested.json create mode 100644 tests/data/dynamic/numeric_constraints.json create mode 100644 tests/data/dynamic/objects.json create mode 100644 tests/data/dynamic/openapi.yaml create mode 100644 tests/data/dynamic/optional.json create mode 100644 tests/data/dynamic/refs.json create mode 100644 tests/data/dynamic/required.json create mode 100644 tests/data/dynamic/simple.json create mode 100644 tests/data/dynamic/standalone_enum.json create mode 100644 tests/data/dynamic/string_constraints.json create mode 100644 tests/data/dynamic/tuple.json create mode 100644 tests/data/dynamic/types.json create mode 100644 tests/data/dynamic/unions.json create mode 100644 tests/data/dynamic/unique_items.json diff --git a/tests/data/dynamic/allof.json b/tests/data/dynamic/allof.json new file mode 100644 index 000000000..7ae171713 --- /dev/null +++ b/tests/data/dynamic/allof.json @@ -0,0 +1,12 @@ +{ + "$defs": { + "Base": {"type": "object", "properties": {"id": {"type": "integer"}}}, + "Child": { + "allOf": [ + {"$ref": "#/$defs/Base"}, + {"properties": {"name": {"type": "string"}}} + ] + } + }, + "$ref": "#/$defs/Child" +} diff --git a/tests/data/dynamic/array_constraints.json b/tests/data/dynamic/array_constraints.json new file mode 100644 index 000000000..cd40f86e3 --- /dev/null +++ b/tests/data/dynamic/array_constraints.json @@ -0,0 +1,11 @@ +{ + "type": "object", + "properties": { + "tags": { + "type": "array", + "items": {"type": "string"}, + "minItems": 1, + "maxItems": 5 + } + } +} diff --git a/tests/data/dynamic/circular.json b/tests/data/dynamic/circular.json new file mode 100644 index 000000000..8540a2194 --- /dev/null +++ b/tests/data/dynamic/circular.json @@ -0,0 +1,12 @@ +{ + "$defs": { + "Node": { + "type": "object", + "properties": { + "value": {"type": "string"}, + "children": {"type": "array", "items": {"$ref": "#/$defs/Node"}} + } + } + }, + "$ref": "#/$defs/Node" +} diff --git a/tests/data/dynamic/default.json b/tests/data/dynamic/default.json new file mode 100644 index 000000000..b3ae41222 --- /dev/null +++ b/tests/data/dynamic/default.json @@ -0,0 +1,6 @@ +{ + "type": "object", + "properties": { + "count": {"type": "integer", "default": 10} + } +} diff --git a/tests/data/dynamic/description.json b/tests/data/dynamic/description.json new file mode 100644 index 000000000..5b7f14edd --- /dev/null +++ b/tests/data/dynamic/description.json @@ -0,0 +1,9 @@ +{ + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "The user's name" + } + } +} diff --git a/tests/data/dynamic/enum.json b/tests/data/dynamic/enum.json new file mode 100644 index 000000000..cfaba9d33 --- /dev/null +++ b/tests/data/dynamic/enum.json @@ -0,0 +1,6 @@ +{ + "type": "object", + "properties": { + "status": {"enum": ["pending", "active", "completed"]} + } +} diff --git a/tests/data/dynamic/exclusive_constraints.json b/tests/data/dynamic/exclusive_constraints.json new file mode 100644 index 000000000..2aa691081 --- /dev/null +++ b/tests/data/dynamic/exclusive_constraints.json @@ -0,0 +1,10 @@ +{ + "type": "object", + "properties": { + "value": { + "type": "number", + "exclusiveMinimum": 0, + "exclusiveMaximum": 100 + } + } +} diff --git a/tests/data/dynamic/formats.json b/tests/data/dynamic/formats.json new file mode 100644 index 000000000..a701d5d6c --- /dev/null +++ b/tests/data/dynamic/formats.json @@ -0,0 +1,11 @@ +{ + "type": "object", + "properties": { + "date_field": {"type": "string", "format": "date"}, + "datetime_field": {"type": "string", "format": "date-time"}, + "uuid_field": {"type": "string", "format": "uuid"}, + "time_field": {"type": "string", "format": "time"}, + "decimal_field": {"type": "string", "format": "decimal"}, + "binary_field": {"type": "string", "format": "binary"} + } +} diff --git a/tests/data/dynamic/min_max_properties.json b/tests/data/dynamic/min_max_properties.json new file mode 100644 index 000000000..7e34c166c --- /dev/null +++ b/tests/data/dynamic/min_max_properties.json @@ -0,0 +1,10 @@ +{ + "type": "object", + "properties": { + "data": { + "type": "object", + "minProperties": 1, + "maxProperties": 5 + } + } +} diff --git a/tests/data/dynamic/multiple_allof.json b/tests/data/dynamic/multiple_allof.json new file mode 100644 index 000000000..9e8e2a8b7 --- /dev/null +++ b/tests/data/dynamic/multiple_allof.json @@ -0,0 +1,19 @@ +{ + "$defs": { + "Named": { + "type": "object", + "properties": {"name": {"type": "string"}} + }, + "Aged": { + "type": "object", + "properties": {"age": {"type": "integer"}} + }, + "Person": { + "allOf": [ + {"$ref": "#/$defs/Named"}, + {"$ref": "#/$defs/Aged"} + ] + } + }, + "$ref": "#/$defs/Person" +} diff --git a/tests/data/dynamic/nested.json b/tests/data/dynamic/nested.json new file mode 100644 index 000000000..c59fe5ae0 --- /dev/null +++ b/tests/data/dynamic/nested.json @@ -0,0 +1,12 @@ +{ + "type": "object", + "properties": { + "user": {"$ref": "#/$defs/User"} + }, + "$defs": { + "User": { + "type": "object", + "properties": {"name": {"type": "string"}} + } + } +} diff --git a/tests/data/dynamic/numeric_constraints.json b/tests/data/dynamic/numeric_constraints.json new file mode 100644 index 000000000..7af6f6cac --- /dev/null +++ b/tests/data/dynamic/numeric_constraints.json @@ -0,0 +1,11 @@ +{ + "type": "object", + "properties": { + "count": { + "type": "integer", + "minimum": 0, + "maximum": 100, + "multipleOf": 5 + } + } +} diff --git a/tests/data/dynamic/objects.json b/tests/data/dynamic/objects.json new file mode 100644 index 000000000..11698a0f2 --- /dev/null +++ b/tests/data/dynamic/objects.json @@ -0,0 +1,15 @@ +{ + "type": "object", + "properties": { + "dict_str": { + "type": "object", + "additionalProperties": {"type": "string"} + }, + "dict_int": { + "type": "object", + "additionalProperties": {"type": "integer"} + }, + "plain_object": {"type": "object"}, + "plain_array": {"type": "array"} + } +} diff --git a/tests/data/dynamic/openapi.yaml b/tests/data/dynamic/openapi.yaml new file mode 100644 index 000000000..1003f2ec8 --- /dev/null +++ b/tests/data/dynamic/openapi.yaml @@ -0,0 +1,16 @@ +openapi: "3.0.0" +info: + title: Test API + version: "1.0" +paths: {} +components: + schemas: + User: + type: object + properties: + name: + type: string + email: + type: string + required: + - name diff --git a/tests/data/dynamic/optional.json b/tests/data/dynamic/optional.json new file mode 100644 index 000000000..b2d86c099 --- /dev/null +++ b/tests/data/dynamic/optional.json @@ -0,0 +1,8 @@ +{ + "type": "object", + "properties": { + "name": {"type": "string"}, + "nickname": {"type": "string"} + }, + "required": ["name"] +} diff --git a/tests/data/dynamic/refs.json b/tests/data/dynamic/refs.json new file mode 100644 index 000000000..639858505 --- /dev/null +++ b/tests/data/dynamic/refs.json @@ -0,0 +1,25 @@ +{ + "$defs": { + "Address": { + "type": "object", + "properties": {"city": {"type": "string"}} + }, + "Inner": { + "type": "object", + "properties": {"value": {"type": "string"}} + }, + "Middle": { + "type": "object", + "properties": {"inner": {"$ref": "#/$defs/Inner"}} + }, + "Outer": { + "type": "object", + "properties": {"middle": {"$ref": "#/$defs/Middle"}} + } + }, + "type": "object", + "properties": { + "address": {"$ref": "#/$defs/Address"}, + "outer": {"$ref": "#/$defs/Outer"} + } +} diff --git a/tests/data/dynamic/required.json b/tests/data/dynamic/required.json new file mode 100644 index 000000000..def2d36a1 --- /dev/null +++ b/tests/data/dynamic/required.json @@ -0,0 +1,8 @@ +{ + "type": "object", + "properties": { + "name": {"type": "string"}, + "email": {"type": "string"} + }, + "required": ["name", "email"] +} diff --git a/tests/data/dynamic/simple.json b/tests/data/dynamic/simple.json new file mode 100644 index 000000000..878e9ed51 --- /dev/null +++ b/tests/data/dynamic/simple.json @@ -0,0 +1,8 @@ +{ + "type": "object", + "properties": { + "name": {"type": "string"}, + "age": {"type": "integer"} + }, + "required": ["name"] +} diff --git a/tests/data/dynamic/standalone_enum.json b/tests/data/dynamic/standalone_enum.json new file mode 100644 index 000000000..d11a5c962 --- /dev/null +++ b/tests/data/dynamic/standalone_enum.json @@ -0,0 +1,12 @@ +{ + "$defs": { + "Status": { + "type": "string", + "enum": ["active", "inactive", "pending"] + } + }, + "type": "object", + "properties": { + "status": {"$ref": "#/$defs/Status"} + } +} diff --git a/tests/data/dynamic/string_constraints.json b/tests/data/dynamic/string_constraints.json new file mode 100644 index 000000000..2142656eb --- /dev/null +++ b/tests/data/dynamic/string_constraints.json @@ -0,0 +1,11 @@ +{ + "type": "object", + "properties": { + "code": { + "type": "string", + "minLength": 2, + "maxLength": 10, + "pattern": "^[A-Z]+$" + } + } +} diff --git a/tests/data/dynamic/tuple.json b/tests/data/dynamic/tuple.json new file mode 100644 index 000000000..28678c998 --- /dev/null +++ b/tests/data/dynamic/tuple.json @@ -0,0 +1,10 @@ +{ + "type": "object", + "properties": { + "coords": { + "type": "array", + "prefixItems": [{"type": "number"}, {"type": "number"}], + "items": false + } + } +} diff --git a/tests/data/dynamic/types.json b/tests/data/dynamic/types.json new file mode 100644 index 000000000..e3ef17f35 --- /dev/null +++ b/tests/data/dynamic/types.json @@ -0,0 +1,10 @@ +{ + "type": "object", + "properties": { + "bool_field": {"type": "boolean"}, + "float_field": {"type": "number"}, + "string_field": {"type": "string"}, + "nullable_field": {"type": ["string", "null"]}, + "const_field": {"const": "active"} + } +} diff --git a/tests/data/dynamic/unions.json b/tests/data/dynamic/unions.json new file mode 100644 index 000000000..d8063a1a8 --- /dev/null +++ b/tests/data/dynamic/unions.json @@ -0,0 +1,14 @@ +{ + "type": "object", + "properties": { + "anyof_field": { + "anyOf": [{"type": "string"}, {"type": "integer"}] + }, + "oneof_field": { + "oneOf": [{"type": "boolean"}, {"type": "number"}] + }, + "multi_union": { + "anyOf": [{"type": "string"}, {"type": "integer"}, {"type": "boolean"}] + } + } +} diff --git a/tests/data/dynamic/unique_items.json b/tests/data/dynamic/unique_items.json new file mode 100644 index 000000000..e0c2e4cde --- /dev/null +++ b/tests/data/dynamic/unique_items.json @@ -0,0 +1,17 @@ +{ + "type": "object", + "properties": { + "tags": { + "type": "array", + "items": {"type": "string"}, + "uniqueItems": true + }, + "constrained_tags": { + "type": "array", + "items": {"type": "string"}, + "uniqueItems": true, + "minItems": 1, + "maxItems": 10 + } + } +} diff --git a/tests/dynamic/test_creator.py b/tests/dynamic/test_creator.py index 09e0c8d18..82fbb8fa0 100644 --- a/tests/dynamic/test_creator.py +++ b/tests/dynamic/test_creator.py @@ -1,907 +1,237 @@ -# ruff: noqa: D101, D102 +# ruff: noqa: D103 """Tests for dynamic model generation.""" from __future__ import annotations -from typing import TYPE_CHECKING +from pathlib import Path import pytest +from inline_snapshot import snapshot from pydantic import VERSION, ValidationError from datamodel_code_generator import InputFileType, generate_dynamic_models -if TYPE_CHECKING: - from pathlib import Path - PYDANTIC_V2 = VERSION.startswith("2.") +DATA_DIR = Path(__file__).parent.parent / "data" / "dynamic" + +pytestmark = pytest.mark.skipif(not PYDANTIC_V2, reason="Dynamic models require Pydantic v2") + + +def test_simple_model() -> None: + models = generate_dynamic_models(DATA_DIR / "simple.json") + assert "Model" in models + instance = models["Model"](name="John", age=30) + assert instance.name == snapshot("John") + assert instance.age == snapshot(30) + + +def test_nested_models() -> None: + models = generate_dynamic_models(DATA_DIR / "nested.json") + assert "Model" in models + assert "User" in models + user = models["User"](name="Alice") + model = models["Model"](user=user) + assert model.user.name == snapshot("Alice") + + +def test_numeric_constraints() -> None: + models = generate_dynamic_models(DATA_DIR / "numeric_constraints.json") + models["Model"](count=50) + with pytest.raises(ValidationError): + models["Model"](count=-1) + with pytest.raises(ValidationError): + models["Model"](count=7) + + +def test_array_constraints() -> None: + models = generate_dynamic_models(DATA_DIR / "array_constraints.json") + models["Model"](tags=["a", "b", "c"]) + with pytest.raises(ValidationError): + models["Model"](tags=[]) + with pytest.raises(ValidationError): + models["Model"](tags=["a", "b", "c", "d", "e", "f"]) + +def test_circular_reference() -> None: + models = generate_dynamic_models(DATA_DIR / "circular.json") + assert "Node" in models + node_class = models["Node"] + node = node_class(value="root", children=[node_class(value="child", children=[])]) + assert node.children[0].value == snapshot("child") -@pytest.mark.skipif(not PYDANTIC_V2, reason="Dynamic models require Pydantic v2") -class TestDynamicModelCreation: - def test_simple_model(self) -> None: - schema = { - "type": "object", - "properties": {"name": {"type": "string"}, "age": {"type": "integer"}}, - "required": ["name"], - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - assert "Model" in models - instance = models["Model"](name="John", age=30) - assert instance.name == "John" - assert instance.age == 30 - - def test_nested_models(self) -> None: - schema = { - "type": "object", - "properties": {"user": {"$ref": "#/$defs/User"}}, - "$defs": { - "User": { - "type": "object", - "properties": {"name": {"type": "string"}}, - } - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - assert "User" in models - assert "Model" in models - user = models["User"](name="Alice") - model = models["Model"](user=user) - assert model.user.name == "Alice" - - def test_string_field(self) -> None: - schema = { - "type": "object", - "properties": { - "email": { - "type": "string", - } - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - instance = models["Model"](email="test@example.com") - assert instance.email == "test@example.com" - - def test_numeric_constraints(self) -> None: - schema = { - "type": "object", - "properties": { - "count": { - "type": "integer", - "minimum": 0, - "maximum": 100, - "multipleOf": 5, - } - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - models["Model"](count=50) - - with pytest.raises(ValidationError): - models["Model"](count=-1) - - with pytest.raises(ValidationError): - models["Model"](count=7) - - def test_array_constraints(self) -> None: - schema = { - "type": "object", - "properties": {"tags": {"type": "array", "items": {"type": "string"}, "minItems": 1, "maxItems": 5}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - models["Model"](tags=["a", "b", "c"]) - - with pytest.raises(ValidationError): - models["Model"](tags=[]) - - with pytest.raises(ValidationError): - models["Model"](tags=["a", "b", "c", "d", "e", "f"]) - - def test_circular_reference(self) -> None: - schema = { - "$defs": { - "Node": { - "type": "object", - "properties": { - "value": {"type": "string"}, - "children": {"type": "array", "items": {"$ref": "#/$defs/Node"}}, - }, - } - }, - "$ref": "#/$defs/Node", - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - node_class = models["Node"] - node = node_class(value="root", children=[node_class(value="child", children=[])]) - assert node.children[0].value == "child" - - def test_enum_values(self) -> None: - schema = { - "type": "object", - "properties": {"status": {"enum": ["pending", "active", "completed"]}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - models["Model"](status="active") - - with pytest.raises(ValidationError): - models["Model"](status="invalid") - - def test_optional_field(self) -> None: - schema = { - "type": "object", - "properties": {"name": {"type": "string"}, "nickname": {"type": "string"}}, - "required": ["name"], - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - instance = models["Model"](name="John") - assert instance.name == "John" - assert instance.nickname is None - - def test_default_value(self) -> None: - schema = { - "type": "object", - "properties": {"count": {"type": "integer", "default": 10}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - instance = models["Model"]() - assert instance.count == 10 - - def test_all_of_inheritance(self) -> None: - schema = { - "$defs": { - "Base": {"type": "object", "properties": {"id": {"type": "integer"}}}, - "Child": { - "allOf": [ - {"$ref": "#/$defs/Base"}, - {"properties": {"name": {"type": "string"}}}, - ] - }, - }, - "$ref": "#/$defs/Child", - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - child = models["Child"](id=1, name="Test") - assert child.id == 1 - assert child.name == "Test" - - def test_openapi_schema(self) -> None: - schema = """ -openapi: "3.0.0" -info: - title: Test API - version: "1.0" -paths: {} -components: - schemas: - User: - type: object - properties: - name: - type: string - email: - type: string - required: - - name -""" - models = generate_dynamic_models(schema, input_file_type=InputFileType.OpenAPI) - - assert "User" in models - user = models["User"](name="John", email="john@example.com") - assert user.name == "John" - assert user.email == "john@example.com" - - def test_empty_schema(self) -> None: - schema = {"type": "object", "properties": {}} - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - assert "Model" in models - instance = models["Model"]() - assert instance is not None - - def test_string_constraints(self) -> None: - schema = { - "type": "object", - "properties": { - "code": { - "type": "string", - "minLength": 2, - "maxLength": 10, - "pattern": "^[A-Z]+$", - } - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - models["Model"](code="ABC") - - with pytest.raises(ValidationError): - models["Model"](code="A") - - with pytest.raises(ValidationError): - models["Model"](code="abc") - - def test_exclusive_numeric_constraints(self) -> None: - schema = { - "type": "object", - "properties": { - "value": { - "type": "number", - "exclusiveMinimum": 0, - "exclusiveMaximum": 100, - } - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - models["Model"](value=50) - - with pytest.raises(ValidationError): - models["Model"](value=0) - - with pytest.raises(ValidationError): - models["Model"](value=100) - - def test_date_format(self) -> None: - schema = { - "type": "object", - "properties": { - "created": {"type": "string", "format": "date"}, - "updated": {"type": "string", "format": "date-time"}, - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - from datetime import date, datetime, timezone - - instance = models["Model"](created=date(2024, 1, 1), updated=datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc)) - assert instance.created == date(2024, 1, 1) - - def test_uuid_format(self) -> None: - schema = { - "type": "object", - "properties": {"id": {"type": "string", "format": "uuid"}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - import uuid - - test_uuid = uuid.uuid4() - instance = models["Model"](id=test_uuid) - assert instance.id == test_uuid - - def test_dict_type(self) -> None: - schema = { - "type": "object", - "properties": { - "metadata": { - "type": "object", - "additionalProperties": {"type": "string"}, - } - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - instance = models["Model"](metadata={"key": "value"}) - assert instance.metadata == {"key": "value"} - def test_set_type(self) -> None: - schema = { - "type": "object", - "properties": { - "tags": { - "type": "array", - "items": {"type": "string"}, - "uniqueItems": True, - } - }, - } +def test_enum_values() -> None: + models = generate_dynamic_models(DATA_DIR / "enum.json") + models["Model"](status="active") + with pytest.raises(ValidationError): + models["Model"](status="invalid") - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - instance = models["Model"](tags=["a", "b", "c"]) - assert "a" in instance.tags +def test_optional_field() -> None: + models = generate_dynamic_models(DATA_DIR / "optional.json") + instance = models["Model"](name="John") + assert instance.name == snapshot("John") + assert instance.nickname is None - def test_ref_type(self) -> None: - schema = { - "$defs": { - "Address": { - "type": "object", - "properties": {"city": {"type": "string"}}, - } - }, - "type": "object", - "properties": {"address": {"$ref": "#/$defs/Address"}}, - } - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - address = models["Address"](city="Tokyo") - instance = models["Model"](address=address) - assert instance.address.city == "Tokyo" - - def test_literal_type(self) -> None: - schema = { - "type": "object", - "properties": {"status": {"const": "active"}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - instance = models["Model"](status="active") - assert instance.status == "active" - - def test_nullable_field(self) -> None: - schema = { - "type": "object", - "properties": {"name": {"type": ["string", "null"]}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - instance1 = models["Model"](name="test") - assert instance1.name == "test" - - instance2 = models["Model"](name=None) - assert instance2.name is None - - def test_integer_enum(self) -> None: - schema = { - "type": "object", - "properties": {"priority": {"enum": [1, 2, 3]}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - models["Model"](priority=2) - - with pytest.raises(ValidationError): - models["Model"](priority=5) - - def test_boolean_field(self) -> None: - schema = { - "type": "object", - "properties": {"active": {"type": "boolean"}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - instance = models["Model"](active=True) - assert instance.active is True - - def test_float_field(self) -> None: - schema = { - "type": "object", - "properties": {"price": {"type": "number"}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - instance = models["Model"](price=19.99) - assert instance.price == 19.99 - - def test_empty_results(self) -> None: - schema = {} - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - assert isinstance(models, dict) - - def test_file_path_input(self, tmp_path: Path) -> None: - schema_file = tmp_path / "schema.json" - schema_file.write_text('{"type": "object", "properties": {"name": {"type": "string"}}}') - - models = generate_dynamic_models(str(schema_file), input_file_type=InputFileType.JsonSchema) - - assert "Model" in models - instance = models["Model"](name="Test") - assert instance.name == "Test" - - def test_path_object_input(self, tmp_path: Path) -> None: - schema_file = tmp_path / "schema.json" - schema_file.write_text('{"type": "object", "properties": {"value": {"type": "integer"}}}') - - models = generate_dynamic_models(schema_file, input_file_type=InputFileType.JsonSchema) - - assert "Model" in models - instance = models["Model"](value=42) - assert instance.value == 42 - - def test_field_with_alias(self) -> None: - schema = """ -openapi: "3.0.0" -info: - title: Test API - version: "1.0" -paths: {} -components: - schemas: - User: - type: object - properties: - user_name: - type: string - x-alias: - user_name: userName -""" - models = generate_dynamic_models(schema, input_file_type=InputFileType.OpenAPI) - assert "User" in models +def test_default_value() -> None: + models = generate_dynamic_models(DATA_DIR / "default.json") + instance = models["Model"]() + assert instance.count == snapshot(10) - def test_field_with_description(self) -> None: - schema = { - "type": "object", - "properties": { - "name": { - "type": "string", - "description": "The user's name", - } - }, - } - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) +def test_allof_inheritance() -> None: + models = generate_dynamic_models(DATA_DIR / "allof.json") + child = models["Child"](id=1, name="Test") + assert child.id == snapshot(1) + assert child.name == snapshot("Test") - assert "Model" in models - instance = models["Model"](name="Test") - assert instance.name == "Test" - def test_standalone_enum_type(self) -> None: - schema = { - "$defs": { - "Status": { - "type": "string", - "enum": ["active", "inactive", "pending"], - } - }, - "type": "object", - "properties": {"status": {"$ref": "#/$defs/Status"}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - assert "Status" in models - assert "Model" in models - - def test_any_of_union(self) -> None: - schema = { - "type": "object", - "properties": {"value": {"anyOf": [{"type": "string"}, {"type": "integer"}]}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - instance1 = models["Model"](value="test") - assert instance1.value == "test" - - instance2 = models["Model"](value=42) - assert instance2.value == 42 - - def test_one_of_union(self) -> None: - schema = { - "type": "object", - "properties": {"data": {"oneOf": [{"type": "boolean"}, {"type": "number"}]}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - instance = models["Model"](data=True) - assert instance.data is True - - def test_time_format(self) -> None: - schema = { - "type": "object", - "properties": {"time_value": {"type": "string", "format": "time"}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - from datetime import time - - instance = models["Model"](time_value=time(12, 30, 0)) - assert instance.time_value == time(12, 30, 0) - - def test_decimal_format(self) -> None: - schema = { - "type": "object", - "properties": {"amount": {"type": "string", "format": "decimal"}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - from decimal import Decimal - - instance = models["Model"](amount=Decimal("123.45")) - assert instance.amount == Decimal("123.45") - - def test_bytes_type(self) -> None: - schema = { - "type": "object", - "properties": {"data": {"type": "string", "format": "binary"}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - instance = models["Model"](data=b"hello") - assert instance.data == b"hello" - - def test_array_without_items(self) -> None: - schema = { - "type": "object", - "properties": {"items": {"type": "array"}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - instance = models["Model"](items=[1, "two", 3.0]) - assert instance.items == [1, "two", 3.0] - - def test_object_without_properties(self) -> None: - schema = { - "type": "object", - "properties": {"data": {"type": "object"}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - instance = models["Model"](data={"any": "value"}) - assert instance.data == {"any": "value"} - - def test_nested_allof_multiple_refs(self) -> None: - schema = { - "$defs": { - "Named": { - "type": "object", - "properties": {"name": {"type": "string"}}, - }, - "Aged": { - "type": "object", - "properties": {"age": {"type": "integer"}}, - }, - "Person": { - "allOf": [ - {"$ref": "#/$defs/Named"}, - {"$ref": "#/$defs/Aged"}, - ] - }, - }, - "$ref": "#/$defs/Person", - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - assert "Person" in models - person = models["Person"](name="John", age=30) - assert person.name == "John" - assert person.age == 30 - - def test_deep_nested_refs(self) -> None: - schema = { - "$defs": { - "Inner": { - "type": "object", - "properties": {"value": {"type": "string"}}, - }, - "Middle": { - "type": "object", - "properties": {"inner": {"$ref": "#/$defs/Inner"}}, - }, - "Outer": { - "type": "object", - "properties": {"middle": {"$ref": "#/$defs/Middle"}}, - }, - }, - "$ref": "#/$defs/Outer", - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - inner = models["Inner"](value="test") - middle = models["Middle"](inner=inner) - outer = models["Outer"](middle=middle) - assert outer.middle.inner.value == "test" - - def test_additional_properties_typed(self) -> None: - schema = { - "type": "object", - "properties": { - "meta": { - "type": "object", - "additionalProperties": {"type": "integer"}, - } - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - instance = models["Model"](meta={"count": 10, "total": 100}) - assert instance.meta["count"] == 10 - - def test_required_field(self) -> None: - schema = { - "type": "object", - "properties": { - "name": {"type": "string"}, - "email": {"type": "string"}, - }, - "required": ["name", "email"], - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - with pytest.raises(ValidationError): - models["Model"](name="John") - - instance = models["Model"](name="John", email="john@test.com") - assert instance.name == "John" - - def test_ge_constraint(self) -> None: - schema = { - "type": "object", - "properties": { - "value": {"type": "integer", "minimum": 10}, - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - models["Model"](value=10) - models["Model"](value=15) - - with pytest.raises(ValidationError): - models["Model"](value=9) - - def test_le_constraint(self) -> None: - schema = { - "type": "object", - "properties": { - "value": {"type": "integer", "maximum": 100}, - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) +def test_openapi_schema() -> None: + models = generate_dynamic_models(DATA_DIR / "openapi.yaml", input_file_type=InputFileType.OpenAPI) + assert "User" in models + user = models["User"](name="John", email="john@example.com") + assert user.name == snapshot("John") + assert user.email == snapshot("john@example.com") + +def test_string_constraints() -> None: + models = generate_dynamic_models(DATA_DIR / "string_constraints.json") + models["Model"](code="ABC") + with pytest.raises(ValidationError): + models["Model"](code="A") + with pytest.raises(ValidationError): + models["Model"](code="abc") + + +def test_exclusive_constraints() -> None: + models = generate_dynamic_models(DATA_DIR / "exclusive_constraints.json") + models["Model"](value=50) + with pytest.raises(ValidationError): + models["Model"](value=0) + with pytest.raises(ValidationError): models["Model"](value=100) - models["Model"](value=50) - - with pytest.raises(ValidationError): - models["Model"](value=101) - - def test_forward_ref_resolution(self) -> None: - schema = { - "type": "object", - "properties": {"item": {"$ref": "#/$defs/Item"}}, - "$defs": { - "Item": { - "type": "object", - "properties": {"name": {"type": "string"}}, - } - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - item = models["Item"](name="Widget") - model = models["Model"](item=item) - assert model.item.name == "Widget" - - def test_auto_input_type_detection(self) -> None: - schema = {"type": "object", "properties": {"name": {"type": "string"}}} - - models = generate_dynamic_models(schema) - - assert "Model" in models - instance = models["Model"](name="Test") - assert instance.name == "Test" - - def test_graphql_not_supported(self) -> None: - schema = """ - type Query { - hello: String - } - """ - - from datamodel_code_generator import Error - - with pytest.raises(Error, match="GraphQL is not yet supported"): - generate_dynamic_models(schema, input_file_type=InputFileType.GraphQL) - - def test_tuple_type(self) -> None: - schema = { - "type": "object", - "properties": { - "coords": { - "type": "array", - "prefixItems": [{"type": "number"}, {"type": "number"}], - "items": False, - } - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - assert "Model" in models - - def test_typed_dict_additional_properties(self) -> None: - schema = { - "type": "object", - "properties": { - "data": { - "type": "object", - "additionalProperties": {"type": "integer"}, - } - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - instance = models["Model"](data={"a": 1, "b": 2}) - assert instance.data["a"] == 1 - - def test_json_schema_unique_items(self) -> None: - schema = { - "type": "object", - "properties": { - "items": { - "type": "array", - "items": {"type": "string"}, - "uniqueItems": True, - "minItems": 1, - "maxItems": 10, - } - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - assert "Model" in models - - def test_min_max_properties(self) -> None: - schema = { - "type": "object", - "properties": { - "data": { - "type": "object", - "minProperties": 1, - "maxProperties": 5, - } - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - - assert "Model" in models - - -@pytest.mark.skipif(not PYDANTIC_V2, reason="Dynamic models require Pydantic v2") -class TestCreatorEdgeCases: - def test_empty_parser_results(self) -> None: - schema = {} - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - assert models == {} or isinstance(models, dict) - - def test_field_with_pattern_constraint(self) -> None: - schema = { - "type": "object", - "properties": { - "code": {"type": "string", "pattern": "^[A-Z]{3}$"}, - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - instance = models["Model"](code="ABC") - assert instance.code == "ABC" - - with pytest.raises(ValidationError): - models["Model"](code="abc") - - -@pytest.mark.skipif(not PYDANTIC_V2, reason="Dynamic models require Pydantic v2") -class TestTypeResolver: - def test_set_with_typed_items(self) -> None: - schema = { - "type": "object", - "properties": { - "tags": { - "type": "array", - "items": {"type": "string"}, - "uniqueItems": True, - } - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - instance = models["Model"](tags=["a", "b"]) - assert "a" in instance.tags - - def test_dict_with_key_value_types(self) -> None: - schema = { - "type": "object", - "properties": { - "mapping": { - "type": "object", - "additionalProperties": {"type": "string"}, - } - }, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - instance = models["Model"](mapping={"key": "value"}) - assert instance.mapping["key"] == "value" - - def test_format_date_type(self) -> None: - from datetime import date - - schema = { - "type": "object", - "properties": {"birthday": {"type": "string", "format": "date"}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - instance = models["Model"](birthday=date(1990, 1, 1)) - assert instance.birthday == date(1990, 1, 1) - - def test_forward_ref_already_resolved(self) -> None: - schema = { - "$defs": { - "Address": { - "type": "object", - "properties": {"city": {"type": "string"}}, - }, - "Person": { - "type": "object", - "properties": { - "home": {"$ref": "#/$defs/Address"}, - "work": {"$ref": "#/$defs/Address"}, - }, - }, - }, - "$ref": "#/$defs/Person", - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - addr = models["Address"](city="NYC") - person = models["Person"](home=addr, work=addr) - assert person.home.city == "NYC" - - def test_union_with_multiple_types(self) -> None: - schema = { - "type": "object", - "properties": {"value": {"anyOf": [{"type": "string"}, {"type": "integer"}, {"type": "boolean"}]}}, - } - - models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema) - assert models["Model"](value="test").value == "test" - assert models["Model"](value=42).value == 42 - assert models["Model"](value=True).value is True - - -@pytest.mark.skipif(not PYDANTIC_V2, reason="Dynamic models require Pydantic v2") + + +def test_formats() -> None: + from datetime import date, datetime, time, timezone + from decimal import Decimal + from uuid import uuid4 + + models = generate_dynamic_models(DATA_DIR / "formats.json") + test_uuid = uuid4() + instance = models["Model"]( + date_field=date(2024, 1, 1), + datetime_field=datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc), + uuid_field=test_uuid, + time_field=time(12, 30), + decimal_field=Decimal("123.45"), + binary_field=b"hello", + ) + assert instance.date_field == snapshot(date(2024, 1, 1)) + assert instance.uuid_field == test_uuid + assert instance.time_field == snapshot(time(12, 30)) + assert instance.decimal_field == snapshot(Decimal("123.45")) + assert instance.binary_field == snapshot(b"hello") + + +def test_types() -> None: + models = generate_dynamic_models(DATA_DIR / "types.json") + instance = models["Model"]( + bool_field=True, float_field=19.99, string_field="test", nullable_field=None, const_field="active" + ) + assert instance.bool_field is True + assert instance.float_field == snapshot(19.99) + assert instance.nullable_field is None + assert instance.const_field == snapshot("active") + + +def test_objects() -> None: + models = generate_dynamic_models(DATA_DIR / "objects.json") + instance = models["Model"]( + dict_str={"key": "value"}, + dict_int={"count": 10}, + plain_object={"any": "value"}, + plain_array=[1, "two", 3.0], + ) + assert instance.dict_str == snapshot({"key": "value"}) + assert instance.dict_int == snapshot({"count": 10}) + + +def test_unions() -> None: + models = generate_dynamic_models(DATA_DIR / "unions.json") + assert models["Model"](anyof_field="test").anyof_field == snapshot("test") + assert models["Model"](anyof_field=42).anyof_field == snapshot(42) + assert models["Model"](oneof_field=True).oneof_field is True + assert models["Model"](multi_union="str").multi_union == snapshot("str") + + +def test_refs() -> None: + models = generate_dynamic_models(DATA_DIR / "refs.json") + for name in ["Address", "Inner", "Middle", "Model", "Outer"]: + assert name in models + address = models["Address"](city="Tokyo") + inner = models["Inner"](value="test") + middle = models["Middle"](inner=inner) + outer = models["Outer"](middle=middle) + instance = models["Model"](address=address, outer=outer) + assert instance.address.city == snapshot("Tokyo") + assert instance.outer.middle.inner.value == snapshot("test") + + +def test_standalone_enum() -> None: + models = generate_dynamic_models(DATA_DIR / "standalone_enum.json") + assert "Model" in models + assert "Status" in models + + +def test_multiple_allof() -> None: + models = generate_dynamic_models(DATA_DIR / "multiple_allof.json") + person = models["Person"](name="John", age=30) + assert person.name == snapshot("John") + assert person.age == snapshot(30) + + +def test_required_field() -> None: + models = generate_dynamic_models(DATA_DIR / "required.json") + with pytest.raises(ValidationError): + models["Model"](name="John") + instance = models["Model"](name="John", email="john@test.com") + assert instance.email == snapshot("john@test.com") + + +def test_tuple_type() -> None: + models = generate_dynamic_models(DATA_DIR / "tuple.json") + assert "Model" in models + + +def test_unique_items() -> None: + models = generate_dynamic_models(DATA_DIR / "unique_items.json") + instance = models["Model"](tags=["a", "b"], constrained_tags=["x"]) + assert "a" in instance.tags + + +def test_min_max_properties() -> None: + models = generate_dynamic_models(DATA_DIR / "min_max_properties.json") + assert "Model" in models + + +def test_description() -> None: + models = generate_dynamic_models(DATA_DIR / "description.json") + instance = models["Model"](name="Test") + assert instance.name == snapshot("Test") + field_info = models["Model"].model_fields["name"] + assert field_info.description == snapshot("The user's name") + + +def test_graphql_not_supported() -> None: + from datamodel_code_generator import Error + + with pytest.raises(Error, match="GraphQL is not yet supported"): + generate_dynamic_models("type Query { hello: String }", input_file_type=InputFileType.GraphQL) + + def test_type_resolution_error() -> None: - """Test TypeResolutionError is raised when type resolution fails.""" from unittest.mock import MagicMock, patch from datamodel_code_generator.dynamic.creator import DynamicModelCreator @@ -909,11 +239,9 @@ def test_type_resolution_error() -> None: mock_parser = MagicMock() creator = DynamicModelCreator(mock_parser) - mock_field = MagicMock() mock_field.name = "test_field" mock_field.data_type = MagicMock() - mock_data_model = MagicMock() mock_data_model.class_name = "TestModel" mock_data_model.fields = [mock_field] From 0e0a9e4da05326a756a49e2f2ad89f99d0d38ce8 Mon Sep 17 00:00:00 2001 From: Koudai Aono Date: Fri, 2 Jan 2026 14:49:31 +0000 Subject: [PATCH 8/9] Simplify type_resolver: use builtins for primitive types - Replace PRIMITIVE_TYPE_MAP with builtins lookup - Use TYPE_ALIASES for schema type names (string->str, etc.) - Rename CONSTRAINED_TYPE_MAP to CONSTRAINED_TYPE_BASE --- .../dynamic/type_resolver.py | 29 ++++++++----------- tests/data/dynamic/alias.yaml | 12 ++++++++ tests/data/dynamic/array_default.json | 10 +++++++ 3 files changed, 34 insertions(+), 17 deletions(-) create mode 100644 tests/data/dynamic/alias.yaml create mode 100644 tests/data/dynamic/array_default.json diff --git a/src/datamodel_code_generator/dynamic/type_resolver.py b/src/datamodel_code_generator/dynamic/type_resolver.py index 5cd796ba8..eff66b6a9 100644 --- a/src/datamodel_code_generator/dynamic/type_resolver.py +++ b/src/datamodel_code_generator/dynamic/type_resolver.py @@ -2,6 +2,7 @@ from __future__ import annotations +import builtins import decimal import operator from functools import reduce @@ -10,21 +11,14 @@ if TYPE_CHECKING: from datamodel_code_generator.types import DataType -PRIMITIVE_TYPE_MAP: dict[str, type[Any]] = { - "str": str, - "string": str, - "int": int, - "integer": int, - "float": float, - "number": float, - "bool": bool, - "boolean": bool, - "bytes": bytes, - "None": type(None), - "Any": Any, +TYPE_ALIASES: dict[str, str] = { + "string": "str", + "integer": "int", + "number": "float", + "boolean": "bool", } -CONSTRAINED_TYPE_MAP: dict[str, type[Any]] = { +CONSTRAINED_TYPE_BASE: dict[str, type[Any]] = { "conint": int, "confloat": float, "constr": str, @@ -78,13 +72,14 @@ def _resolve_type_string(self, data_type: DataType, constraints: dict[str, Any]) type_str = data_type.type - if type_str in CONSTRAINED_TYPE_MAP: - base_type = CONSTRAINED_TYPE_MAP[type_str] + if type_str in CONSTRAINED_TYPE_BASE: + base_type = CONSTRAINED_TYPE_BASE[type_str] self._extract_constraints(data_type, constraints) return base_type, constraints - if type_str in PRIMITIVE_TYPE_MAP: - return PRIMITIVE_TYPE_MAP[type_str], constraints + normalized = TYPE_ALIASES.get(type_str, type_str) + if builtin_type := getattr(builtins, normalized, None): + return builtin_type, constraints if type_str in self._models: return self._models[type_str], constraints diff --git a/tests/data/dynamic/alias.yaml b/tests/data/dynamic/alias.yaml new file mode 100644 index 000000000..8f8743a95 --- /dev/null +++ b/tests/data/dynamic/alias.yaml @@ -0,0 +1,12 @@ +openapi: "3.0.0" +info: + title: Test API + version: "1.0" +paths: {} +components: + schemas: + User: + type: object + properties: + userName: + type: string diff --git a/tests/data/dynamic/array_default.json b/tests/data/dynamic/array_default.json new file mode 100644 index 000000000..8c679d8cf --- /dev/null +++ b/tests/data/dynamic/array_default.json @@ -0,0 +1,10 @@ +{ + "type": "object", + "properties": { + "items": { + "type": "array", + "items": {"type": "string"}, + "default": [] + } + } +} From e7940065d425e82ad7e110385a1204fced4e1950 Mon Sep 17 00:00:00 2001 From: Koudai Aono Date: Fri, 2 Jan 2026 14:56:34 +0000 Subject: [PATCH 9/9] Add container type handling in TypeResolver - Add handling for list, set, and dict container types - Fix circular reference resolution (now returns correct list[Model] type) - Refactor resolve_with_constraints to reduce return statements --- .../dynamic/type_resolver.py | 30 ++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/src/datamodel_code_generator/dynamic/type_resolver.py b/src/datamodel_code_generator/dynamic/type_resolver.py index eff66b6a9..58816139f 100644 --- a/src/datamodel_code_generator/dynamic/type_resolver.py +++ b/src/datamodel_code_generator/dynamic/type_resolver.py @@ -47,10 +47,7 @@ def resolve_with_constraints(self, data_type: DataType) -> tuple[Any, dict[str, constraints: dict[str, Any] = {} if data_type.reference is not None: - model_name = data_type.reference.short_name - if model_name in self._models: - return self._models[model_name], constraints - return ForwardRef(model_name), constraints + return self._resolve_reference(data_type.reference.short_name), constraints if data_type.literals: return Literal[tuple(data_type.literals)], constraints @@ -59,12 +56,31 @@ def resolve_with_constraints(self, data_type: DataType) -> tuple[Any, dict[str, inner_types = [self.resolve(dt) for dt in data_type.data_types] return reduce(operator.or_, inner_types), constraints - if data_type.is_optional and data_type.data_types: - inner = self.resolve(data_type.data_types[0]) - return inner | None, constraints + if data_type.data_types: + return self._resolve_container(data_type), constraints return self._resolve_type_string(data_type, constraints) + def _resolve_reference(self, model_name: str) -> Any: + """Resolve a model reference to a type or ForwardRef.""" + if model_name in self._models: + return self._models[model_name] + return ForwardRef(model_name) + + def _resolve_container(self, data_type: DataType) -> Any: + """Resolve container types (list, set, dict, optional).""" + inner = self.resolve(data_type.data_types[0]) + if data_type.is_optional: + return inner | None + if data_type.is_list: + return list[inner] + if data_type.is_set: + return set[inner] + if data_type.is_dict: + key_type = self.resolve(data_type.dict_key) if data_type.dict_key else str + return dict[key_type, inner] + return inner + def _resolve_type_string(self, data_type: DataType, constraints: dict[str, Any]) -> tuple[Any, dict[str, Any]]: """Resolve type from type string.""" if not data_type.type: