Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions src/datamodel_code_generator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,81 @@ def get_header_and_first_line(csv_file: IO[str]) -> dict[str, Any]:
return None


def generate_dynamic_models(
input_: Path | str | ParseResult | Mapping[str, Any],
*,
input_file_type: InputFileType = InputFileType.Auto,
) -> dict[str, type]:
"""Generate actual Python model classes from schema at runtime.

This function creates real Python classes using Pydantic's create_model(),
allowing you to instantiate and validate data directly without generating code.

Args:
input_: The input source (file path, string content, URL, or dict).
input_file_type: The type of input file. Defaults to Auto-detection.

Returns:
Dictionary mapping class names to actual Python model classes.

Raises:
DynamicModelError: If model generation fails.
TypeResolutionError: If a field type cannot be resolved.

Example:
>>> schema = {
... "type": "object",
... "properties": {"name": {"type": "string"}, "age": {"type": "integer"}},
... "required": ["name"],
... }
>>> models = generate_dynamic_models(schema, input_file_type=InputFileType.JsonSchema)
>>> User = models["Model"]
>>> user = User(name="John", age=30)
>>> user.model_dump()
{'name': 'John', 'age': 30}

Note:
- Only Pydantic v2 models are supported
- Circular references are handled via model_rebuild()
- Custom validators/methods are not included
- For code generation, use generate() instead
"""
from datamodel_code_generator.parser.jsonschema import JsonSchemaParser # noqa: PLC0415
from datamodel_code_generator.parser.openapi import OpenAPIParser # noqa: PLC0415

source: str | Path | dict[str, Any] | ParseResult
if isinstance(input_, Mapping):
source = dict(input_)
elif isinstance(input_, Path):
source = input_
elif isinstance(input_, str):
path = Path(input_)
source = path if path.exists() and path.is_file() else input_
else:
source = input_

if input_file_type == InputFileType.Auto:
if isinstance(source, Mapping):
input_file_type = InputFileType.JsonSchema
elif isinstance(source, Path):
input_file_type = infer_input_type(source.read_text())
elif isinstance(source, str):
input_file_type = infer_input_type(source)
else:
input_file_type = InputFileType.JsonSchema

if input_file_type == InputFileType.OpenAPI:
parser = OpenAPIParser(source=source)
elif input_file_type == InputFileType.GraphQL:
msg = "GraphQL is not yet supported for dynamic model generation"
raise Error(msg)
else:
parser = JsonSchemaParser(source=source)

parser.parse_raw()
return parser.create_dynamic_models()


def infer_input_type(text: str) -> InputFileType:
"""Automatically detect the input file type from text content."""
import yaml.parser # noqa: PLC0415
Expand Down Expand Up @@ -955,4 +1030,5 @@ def infer_input_type(text: str) -> InputFileType:
"SchemaParseError",
"TargetPydanticVersion",
"generate",
"generate_dynamic_models",
]
19 changes: 19 additions & 0 deletions src/datamodel_code_generator/dynamic/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Dynamic model generation module.

This module provides functionality to generate actual Python model classes
at runtime using Pydantic's create_model(), instead of generating text code.
"""

from __future__ import annotations

from datamodel_code_generator.dynamic.creator import DynamicModelCreator
from datamodel_code_generator.dynamic.exceptions import (
DynamicModelError,
TypeResolutionError,
)

__all__ = [
"DynamicModelCreator",
"DynamicModelError",
"TypeResolutionError",
]
61 changes: 61 additions & 0 deletions src/datamodel_code_generator/dynamic/constraints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Constraint conversion utilities for dynamic model generation."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from datamodel_code_generator.util import model_dump

if TYPE_CHECKING:
from datamodel_code_generator.model.base import ConstraintsBase
Comment thread Fixed
Comment thread Dismissed

CONSTRAINT_FIELD_MAP: dict[str, str] = {
"ge": "ge",
"gt": "gt",
"le": "le",
"lt": "lt",
"multiple_of": "multiple_of",
"min_length": "min_length",
"max_length": "max_length",
"regex": "pattern",
"pattern": "pattern",
"min_items": "min_length",
"max_items": "max_length",
}

JSON_SCHEMA_EXTRA_FIELDS: frozenset[str] = frozenset({
"unique_items",
"min_properties",
"max_properties",
})


def constraints_to_field_kwargs(
constraints: ConstraintsBase | None,
) -> dict[str, Any]:
"""Convert DataModel constraints to Pydantic Field kwargs."""
if constraints is None:
return {}

kwargs: dict[str, Any] = {}
json_schema_extra: dict[str, Any] = {}

for field_name, value in model_dump(constraints).items():
if value is None:
continue

if field_name in CONSTRAINT_FIELD_MAP:
kwargs[CONSTRAINT_FIELD_MAP[field_name]] = value
elif field_name in JSON_SCHEMA_EXTRA_FIELDS:
json_schema_extra[_to_camel_case(field_name)] = value

if json_schema_extra:
kwargs["json_schema_extra"] = json_schema_extra

return kwargs


def _to_camel_case(snake_str: str) -> str:
"""Convert snake_case to camelCase."""
components = snake_str.split("_")
return components[0] + "".join(x.title() for x in components[1:])
185 changes: 185 additions & 0 deletions src/datamodel_code_generator/dynamic/creator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
"""Dynamic model creator for generating Python classes at runtime."""

from __future__ import annotations

from enum import Enum
from typing import TYPE_CHECKING, Any, cast

from pydantic import BaseModel, Field, create_model

from datamodel_code_generator.dynamic.constraints import constraints_to_field_kwargs
Comment thread Dismissed
from datamodel_code_generator.dynamic.exceptions import TypeResolutionError
Comment thread Dismissed
from datamodel_code_generator.dynamic.type_resolver import TypeResolver
Comment thread Fixed
Comment thread Dismissed
from datamodel_code_generator.model.enum import Enum as EnumModel
Comment thread Dismissed
from datamodel_code_generator.parser.base import sort_data_models
Comment thread Dismissed

if TYPE_CHECKING:
from pydantic.fields import FieldInfo

from datamodel_code_generator.model.base import DataModel, DataModelFieldBase
Comment thread Dismissed
from datamodel_code_generator.parser.base import Parser
Comment thread Fixed
Comment thread Dismissed


class DynamicModelCreator:
"""Creates actual Python classes from DataModel objects."""

def __init__(self, parser: Parser) -> None:
"""Initialize with a parser instance."""
self.parser = parser
self._models: dict[str, type[Any]] = {}
self._short_name_lookup: dict[str, type[Any]] = {}
self._type_resolver = TypeResolver(self._short_name_lookup)

def create_models(self) -> dict[str, type]:
"""Create all models from parser results.

Returns:
Dictionary mapping class names to actual Python classes.

Raises:
TypeResolutionError: If a type cannot be resolved.
DynamicModelError: If model generation fails.
"""
if not self.parser.results:
return {}

_, sorted_models_dict, _ = sort_data_models(self.parser.results)

for data_model in sorted_models_dict.values():
if isinstance(data_model, EnumModel):
self._create_enum_model(data_model)
else:
self._create_pydantic_model(data_model)

self._rebuild_models()

return self._models

def _create_pydantic_model(self, data_model: DataModel) -> type[Any]:
"""Create a single Pydantic model from DataModel."""
field_definitions: dict[str, tuple[Any, FieldInfo]] = {}

for field in data_model.fields:
if field.name is None:
continue

try:
field_type, type_constraints = self._type_resolver.resolve_with_constraints(field.data_type)
except Exception as e:
raise TypeResolutionError(field.data_type, data_model.class_name, field.name or "") from e

field_info = self._create_field_info(field, type_constraints)
field_definitions[field.name] = (field_type, field_info)

base_classes = self._resolve_base_classes(data_model)
module_name = self._get_module_name(data_model)

if len(base_classes) > 1:
combined_base_name = f"_{data_model.class_name}Base"
combined_base = type(combined_base_name, base_classes, {})
effective_base: type[Any] = combined_base
else:
effective_base = base_classes[0] if base_classes else BaseModel

model = cast(
"type[Any]",
create_model(
data_model.class_name,
__base__=effective_base,
__module__=module_name,
**cast("dict[str, Any]", field_definitions),
),
)

model_key = self._get_model_key(data_model)
self._models[model_key] = model
self._short_name_lookup[data_model.class_name] = model

if model_key != data_model.class_name:
self._models[data_model.class_name] = model

return model

def _get_model_key(self, data_model: DataModel) -> str:
"""Get module-qualified key for model storage."""
module_name = self._get_module_name(data_model)
if module_name and module_name != "__dynamic__":
return f"{module_name}.{data_model.class_name}"
return data_model.class_name

@staticmethod
def _create_field_info(field: DataModelFieldBase, type_constraints: dict[str, Any] | None = None) -> FieldInfo:
"""Convert DataModelFieldBase to Pydantic FieldInfo."""
kwargs = constraints_to_field_kwargs(field.constraints)

if type_constraints:
kwargs.update(type_constraints)

if (hasattr(field, "has_default") and field.has_default) or field.default is not None:
kwargs["default"] = field.default
elif (default_factory := getattr(field, "default_factory", None)) is not None:
kwargs["default_factory"] = default_factory
elif field.required:
kwargs["default"] = ...
else:
kwargs["default"] = None

if field.alias:
kwargs["alias"] = field.alias

description = field.extras.get("description")
if description:
kwargs["description"] = description

return Field(**kwargs)

def _resolve_base_classes(self, data_model: DataModel) -> tuple[type, ...]:
"""Resolve base classes for the model."""
if not data_model.base_classes:
return (BaseModel,)

bases = []
for base_class in data_model.base_classes:
if base_class.reference and base_class.reference.short_name in self._short_name_lookup:
bases.append(self._short_name_lookup[base_class.reference.short_name])
else:
bases.append(BaseModel)

return tuple(bases) if bases else (BaseModel,)

@staticmethod
def _get_module_name(data_model: DataModel) -> str:
"""Determine module name for the dynamic model."""
if data_model.reference and data_model.reference.path:
parts = data_model.reference.path.split("/")
return ".".join(p for p in parts if p and p != "#")
return "__dynamic__"

def _create_enum_model(self, data_model: DataModel) -> type[Any]:
"""Create an Enum class from DataModel."""
members = {}
for field in data_model.fields:
if field.name and field.default is not None:
value = field.default
if isinstance(value, str):
value = value.strip("'\"")
members[field.name] = value

enum_class: type[Any] = Enum(data_model.class_name, members)

model_key = self._get_model_key(data_model)
self._models[model_key] = enum_class
self._short_name_lookup[data_model.class_name] = enum_class

if model_key != data_model.class_name:
self._models[data_model.class_name] = enum_class

return enum_class

def _rebuild_models(self) -> None:
"""Resolve forward references by calling model_rebuild() on all models."""
namespace = {**self._short_name_lookup}

for model in self._models.values():
if isinstance(model, type) and issubclass(model, BaseModel):
model.model_rebuild(_types_namespace=namespace)
23 changes: 23 additions & 0 deletions src/datamodel_code_generator/dynamic/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Custom exceptions for dynamic model generation."""

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from datamodel_code_generator.types import DataType
Comment thread Dismissed


class DynamicModelError(Exception):
"""Base exception for dynamic model generation."""


class TypeResolutionError(DynamicModelError):
"""Failed to resolve a type to a Python type object."""

def __init__(self, type_info: DataType, model_name: str, field_name: str) -> None:
"""Initialize with type info and context."""
self.type_info = type_info
self.model_name = model_name
self.field_name = field_name
super().__init__(f"Cannot resolve type for field '{field_name}' in model '{model_name}': {type_info}")
Loading
Loading