From df52170727ba45a4f85b3bd0ef6e48297021fe5c Mon Sep 17 00:00:00 2001 From: Koudai Aono Date: Mon, 29 Dec 2025 04:35:53 +0000 Subject: [PATCH 1/2] Fix DataType deepcopy infinite recursion with circular references --- src/datamodel_code_generator/types.py | 48 ++++++++++ tests/test_types.py | 121 ++++++++++++++++++++++++++ 2 files changed, 169 insertions(+) diff --git a/src/datamodel_code_generator/types.py b/src/datamodel_code_generator/types.py index 99d520bdd..948dfc1dd 100644 --- a/src/datamodel_code_generator/types.py +++ b/src/datamodel_code_generator/types.py @@ -354,6 +354,54 @@ class Config: _exclude_fields: ClassVar[set[str]] = {"parent", "children"} _pass_fields: ClassVar[set[str]] = {"parent", "children", "data_types", "reference"} + def __deepcopy__(self, memo: dict[int, Any] | None = None) -> DataType: + """Handle circular references during deepcopy. + + The parent and children fields can create circular references that cause + infinite recursion during deepcopy. This method excludes them from deep + copying while properly copying all other fields. + """ + from copy import deepcopy # noqa: PLC0415 + + from datamodel_code_generator.util import is_pydantic_v2 # noqa: PLC0415 + + if memo is None: + memo = {} + + # Check if we've already copied this object + obj_id = id(self) + if obj_id in memo: + return memo[obj_id] + + # Access model_fields from the class (v2) or __fields__ (v1) + cls = self.__class__ + model_fields = cls.model_fields if is_pydantic_v2() else cls.__fields__ # type: ignore[attr-defined] + + # First pass: collect shallow values and excluded fields + shallow_kwargs: dict[str, Any] = {} + for field_name in model_fields: + value = getattr(self, field_name) + if field_name in self._exclude_fields: + shallow_kwargs[field_name] = None + else: + shallow_kwargs[field_name] = value + + # Create the new instance and add to memo BEFORE deepcopying nested objects + # This prevents infinite recursion when data_types reference back to this object + new_obj = ( + cls.model_construct(**shallow_kwargs) if is_pydantic_v2() else cls.construct(**shallow_kwargs) # type: ignore[attr-defined] + ) + memo[obj_id] = new_obj + + # Second pass: deepcopy non-excluded fields and update the object + for field_name in model_fields: + if field_name not in self._exclude_fields: + value = getattr(self, field_name) + copied_value = deepcopy(value, memo) + object.__setattr__(new_obj, field_name, copied_value) + + return new_obj + @classmethod def from_import( # noqa: PLR0913 cls: builtins.type[DataTypeT], diff --git a/tests/test_types.py b/tests/test_types.py index 6d32e31f1..065db5cfa 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -152,3 +152,124 @@ def test_remove_none_from_union(type_str: str, use_union_operator: bool, expecte def test_remove_none_from_union_short_strings(type_str: str, use_union_operator: bool, expected: str) -> None: """Test _remove_none_from_union with short strings to verify index bounds safety.""" assert _remove_none_from_union(type_str, use_union_operator=use_union_operator) == expected + + +def test_datatype_deepcopy_with_circular_references() -> None: + """Test that DataType.__deepcopy__ handles circular references via parent/children. + + This test verifies the fix for the recursion error that occurred when deepcopying + DataType objects with circular references through parent and children fields. + """ + from copy import deepcopy + + # Import DataModelFieldBase first to trigger model_rebuild + from datamodel_code_generator.model.base import DataModelFieldBase # noqa: F401 + from datamodel_code_generator.types import DataType + + # Create parent and child DataTypes with circular references + parent = DataType(type="ParentType") + child1 = DataType(type="ChildType1", parent=parent) + child2 = DataType(type="ChildType2", parent=parent) + parent.children = [child1, child2] + + # This should not cause infinite recursion + copied_parent = deepcopy(parent) + + # Verify the copy was successful + assert copied_parent.type == "ParentType" + # parent and children should be None in the copy (excluded from deepcopy) + assert copied_parent.parent is None + assert copied_parent.children is None + + +def test_datatype_deepcopy_with_nested_data_types() -> None: + """Test that DataType.__deepcopy__ properly copies nested data_types.""" + from copy import deepcopy + + # Import DataModelFieldBase first to trigger model_rebuild + from datamodel_code_generator.model.base import DataModelFieldBase # noqa: F401 + from datamodel_code_generator.types import DataType + + # Create nested DataTypes + inner = DataType(type="InnerType", is_optional=True) + outer = DataType(type="OuterType", data_types=[inner], is_list=True) + + # Deepcopy should work and create independent copies + copied_outer = deepcopy(outer) + + # Verify the structure is preserved + assert copied_outer.type == "OuterType" + assert copied_outer.is_list is True + assert len(copied_outer.data_types) == 1 + assert copied_outer.data_types[0].type == "InnerType" + assert copied_outer.data_types[0].is_optional is True + + # Verify it's a deep copy (modifying original doesn't affect copy) + inner.type = "ModifiedInnerType" + assert copied_outer.data_types[0].type == "InnerType" + + +def test_datatype_deepcopy_memo_prevents_duplicate_copies() -> None: + """Test that the memo dictionary prevents duplicate copies of the same object.""" + from copy import deepcopy + + # Import DataModelFieldBase first to trigger model_rebuild + from datamodel_code_generator.model.base import DataModelFieldBase # noqa: F401 + from datamodel_code_generator.types import DataType + + # Create a shared DataType referenced by multiple parents + shared = DataType(type="SharedType") + container1 = DataType(type="Container1", data_types=[shared]) + container2 = DataType(type="Container2", data_types=[shared]) + root = DataType(type="Root", data_types=[container1, container2]) + + # Deepcopy should handle the shared reference + copied_root = deepcopy(root) + + # Verify structure is correct + assert copied_root.type == "Root" + assert len(copied_root.data_types) == 2 + assert copied_root.data_types[0].type == "Container1" + assert copied_root.data_types[1].type == "Container2" + + # Both containers should have copies of the shared type + assert copied_root.data_types[0].data_types[0].type == "SharedType" + assert copied_root.data_types[1].data_types[0].type == "SharedType" + + # Verify that the same object is returned from memo (memoization behavior) + assert copied_root.data_types[0].data_types[0] is copied_root.data_types[1].data_types[0] + + +def test_datatype_deepcopy_with_none_memo() -> None: + """Test __deepcopy__ when called with memo=None (covers memo initialization).""" + # Import DataModelFieldBase first to trigger model_rebuild + from datamodel_code_generator.model.base import DataModelFieldBase # noqa: F401 + from datamodel_code_generator.types import DataType + + data_type = DataType(type="TestType", is_optional=True) + + # Call __deepcopy__ directly with None memo to cover the `if memo is None` branch + copied = data_type.__deepcopy__(None) # noqa: PLC2801 + + assert copied.type == "TestType" + assert copied.is_optional is True + assert copied is not data_type + + +def test_datatype_deepcopy_memo_cache_hit() -> None: + """Test that memo cache returns the same object for repeated references.""" + # Import DataModelFieldBase first to trigger model_rebuild + from datamodel_code_generator.model.base import DataModelFieldBase # noqa: F401 + from datamodel_code_generator.types import DataType + + data_type = DataType(type="TestType") + memo: dict[int, DataType] = {} + + # First call - should create new object and store in memo + copied1 = data_type.__deepcopy__(memo) # noqa: PLC2801 + assert copied1 is not data_type + assert id(data_type) in memo + + # Second call with same memo - should return cached object (covers memo hit branch) + copied2 = data_type.__deepcopy__(memo) # noqa: PLC2801 + assert copied2 is copied1 # Same object from memo From d039eb7eb2111b98377d5c76cf25e30ee2ab145a Mon Sep 17 00:00:00 2001 From: Koudai Aono Date: Mon, 29 Dec 2025 09:22:30 +0000 Subject: [PATCH 2/2] Remove inline comments from __deepcopy__ method --- src/datamodel_code_generator/types.py | 25 +++++-------------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/src/datamodel_code_generator/types.py b/src/datamodel_code_generator/types.py index 948dfc1dd..4bf0d9314 100644 --- a/src/datamodel_code_generator/types.py +++ b/src/datamodel_code_generator/types.py @@ -9,6 +9,7 @@ import re from abc import ABC, abstractmethod +from copy import deepcopy from enum import Enum, auto from functools import lru_cache from itertools import chain @@ -355,29 +356,17 @@ class Config: _pass_fields: ClassVar[set[str]] = {"parent", "children", "data_types", "reference"} def __deepcopy__(self, memo: dict[int, Any] | None = None) -> DataType: - """Handle circular references during deepcopy. - - The parent and children fields can create circular references that cause - infinite recursion during deepcopy. This method excludes them from deep - copying while properly copying all other fields. - """ - from copy import deepcopy # noqa: PLC0415 - - from datamodel_code_generator.util import is_pydantic_v2 # noqa: PLC0415 - + """Create a deep copy handling circular references in parent/children fields.""" if memo is None: memo = {} - # Check if we've already copied this object obj_id = id(self) if obj_id in memo: return memo[obj_id] - # Access model_fields from the class (v2) or __fields__ (v1) cls = self.__class__ - model_fields = cls.model_fields if is_pydantic_v2() else cls.__fields__ # type: ignore[attr-defined] + model_fields = getattr(cls, "model_fields" if is_pydantic_v2() else "__fields__") - # First pass: collect shallow values and excluded fields shallow_kwargs: dict[str, Any] = {} for field_name in model_fields: value = getattr(self, field_name) @@ -386,14 +375,10 @@ def __deepcopy__(self, memo: dict[int, Any] | None = None) -> DataType: else: shallow_kwargs[field_name] = value - # Create the new instance and add to memo BEFORE deepcopying nested objects - # This prevents infinite recursion when data_types reference back to this object - new_obj = ( - cls.model_construct(**shallow_kwargs) if is_pydantic_v2() else cls.construct(**shallow_kwargs) # type: ignore[attr-defined] - ) + constructor = getattr(cls, "model_construct" if is_pydantic_v2() else "construct") + new_obj: DataType = constructor(**shallow_kwargs) memo[obj_id] = new_obj - # Second pass: deepcopy non-excluded fields and update the object for field_name in model_fields: if field_name not in self._exclude_fields: value = getattr(self, field_name)