Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/datamodel_code_generator/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,13 @@ def remove_unused(self, used_names: set[str]) -> None:
for import_ in imports_
if not {self.get_effective_name(from_, import_), import_}.intersection(used_names)
]
# Build reverse lookup dict for O(1) access instead of O(n) linear scan per import
reverse_lookup: dict[tuple[str | None, str], str | None] = {
(imp.from_, imp.import_): path for path, imp in self.reference_paths.items()
}
for from_, import_ in unused:
alias = self.alias.get(from_, {}).get(import_)
reference_path = next(
(p for p, i in self.reference_paths.items() if i.from_ == from_ and i.import_ == import_),
None,
)
reference_path = reverse_lookup.get((from_, import_))
import_obj = Import(from_=from_, import_=import_, alias=alias, reference_path=reference_path)
while self.counter.get((from_, import_), 0) > 0:
self.remove(import_obj)
Expand Down
22 changes: 12 additions & 10 deletions src/datamodel_code_generator/parser/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def sort_data_models( # noqa: PLR0912, PLR0915
sorted_data_models[model.path] = model
add_model_path_to_list(require_update_action_models, model)
elif (
not model.reference_classes - {model.path} - set(sorted_data_models)
not model.reference_classes - {model.path} - sorted_data_models.keys()
): # reference classes have been resolved
sorted_data_models[model.path] = model
if model.path in model.reference_classes:
Expand All @@ -419,20 +419,21 @@ def sort_data_models( # noqa: PLR0912, PLR0915
# sort on base_class dependency
while True:
ordered_models: list[tuple[int, DataModel]] = []
unresolved_reference_model_names = [m.path for m in unresolved_references]
# Build lookup dict for O(1) index access instead of O(n) list.index()
path_to_index = {m.path: idx for idx, m in enumerate(unresolved_references)}
for model in unresolved_references:
if isinstance(model, pydantic_model_v2.RootModel):
indexes = [
unresolved_reference_model_names.index(ref_path)
path_to_index[ref_path]
for f in model.fields
for t in f.data_type.all_data_types
if t.reference and (ref_path := t.reference.path) in unresolved_reference_model_names
if t.reference and (ref_path := t.reference.path) in path_to_index
]
else:
indexes = [
unresolved_reference_model_names.index(b.reference.path)
path_to_index[b.reference.path]
for b in model.base_classes
if b.reference and b.reference.path in unresolved_reference_model_names
if b.reference and b.reference.path in path_to_index
]
if indexes:
ordered_models.append((
Expand All @@ -450,9 +451,9 @@ def sort_data_models( # noqa: PLR0912, PLR0915
unresolved_references = sorted_unresolved_models

# circular reference
unsorted_data_model_names = set(unresolved_reference_model_names)
unsorted_data_model_names = set(path_to_index.keys())
for model in unresolved_references:
unresolved_model = model.reference_classes - {model.path} - set(sorted_data_models)
unresolved_model = model.reference_classes - {model.path} - sorted_data_models.keys()
base_models = [getattr(s.reference, "path", None) for s in model.base_classes]
update_action_parent = set(require_update_action_models).intersection(base_models)
if not unresolved_model:
Expand Down Expand Up @@ -1082,8 +1083,9 @@ def _replace_model_in_list(
replacement: DataModel,
) -> None:
"""Replace model at its position in list."""
models.insert(models.index(original), replacement)
models.remove(original)
# Use direct assignment instead of insert+remove for O(n) instead of O(2n)
idx = models.index(original)
models[idx] = replacement

def __delete_duplicate_models(self, models: list[DataModel]) -> None:
model_class_names: dict[str, DataModel] = {}
Expand Down
52 changes: 39 additions & 13 deletions src/datamodel_code_generator/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,20 @@ def __init__( # noqa: PLR0913, PLR0917
# Only use suffixes when explicitly provided via --duplicate-name-suffix
self.duplicate_name_suffix_map: dict[str, str] = duplicate_name_suffix_map or {}

# Cache for reference names to avoid O(n) set creation on every _get_unique_name call
self._reference_names_cache: set[str] | None = None

def _get_reference_names(self) -> set[str]:
"""Get cached set of all reference names for uniqueness checking."""
if self._reference_names_cache is not None:
return self._reference_names_cache # pragma: no cover
self._reference_names_cache = {r.name for r in self.references.values()}
return self._reference_names_cache

def _invalidate_reference_names_cache(self) -> None:
"""Invalidate the reference names cache when references change."""
self._reference_names_cache = None

@property
def current_base_path(self) -> Path | None:
"""Return the current base path for file resolution."""
Expand Down Expand Up @@ -788,17 +802,27 @@ def add_ref(self, ref: str, resolved: bool = False) -> Reference: # noqa: FBT00
)

self.references[path] = reference
self._invalidate_reference_names_cache()
return reference

def _find_parent_reference(self, path: Sequence[str]) -> Reference | None:
"""Find the closest parent reference for a given path.

Traverses up the path hierarchy to find the first existing parent reference.
Returns None if no parent reference is found.
"""
parent_path = list(path[:-1])
while parent_path:
if parent_reference := self.references.get(self.join_path(parent_path)):
return parent_reference
parent_path = parent_path[:-1]
return None

def _check_parent_scope_option(self, name: str, path: Sequence[str]) -> str:
# Check for parent-prefixed naming via either the legacy flag or the new naming strategy
use_parent_prefix = self.parent_scoped_naming or self.naming_strategy == NamingStrategy.ParentPrefixed
if use_parent_prefix:
parent_path = path[:-1]
while parent_path:
if parent_reference := self.references.get(self.join_path(parent_path)):
return f"{parent_reference.name}_{name}"
parent_path = parent_path[:-1]
if use_parent_prefix and (parent_ref := self._find_parent_reference(path)):
return f"{parent_ref.name}_{name}"
return name

def _apply_full_path_naming(self, name: str, path: Sequence[str]) -> str:
Expand All @@ -811,12 +835,9 @@ def _apply_full_path_naming(self, name: str, path: Sequence[str]) -> str:
return name

# Find the immediate parent reference to prefix the name
parent_path = path[:-1]
while parent_path:
if parent_reference := self.references.get(self.join_path(parent_path)):
# Use immediate parent's name (CamelCase join without underscore)
return f"{parent_reference.name}{snake_to_upper_camel(name)}"
parent_path = parent_path[:-1]
if parent_ref := self._find_parent_reference(path):
# Use immediate parent's name (CamelCase join without underscore)
return f"{parent_ref.name}{snake_to_upper_camel(name)}"

return name

Expand Down Expand Up @@ -856,6 +877,7 @@ def _rename_external_ref_with_same_name(self, name: str, current_path: str) -> N
new_name = self._get_unique_name(name, camel=True)
ref.duplicate_name = ref.name
ref.name = new_name
self._invalidate_reference_names_cache()
break

def add( # noqa: PLR0913
Expand Down Expand Up @@ -921,6 +943,7 @@ def add( # noqa: PLR0913
reference.name = name
reference.loaded = loaded
reference.duplicate_name = duplicate_name
self._invalidate_reference_names_cache()
else:
reference = Reference(
path=joined_path,
Expand All @@ -930,6 +953,7 @@ def add( # noqa: PLR0913
duplicate_name=duplicate_name,
)
self.references[joined_path] = reference
self._invalidate_reference_names_cache()
return reference

def get(self, path: Sequence[str] | str) -> Reference | None:
Expand All @@ -941,6 +965,7 @@ def delete(self, path: Sequence[str] | str) -> None:
resolved = self.resolve_ref(path)
if resolved in self.references:
del self.references[resolved]
self._invalidate_reference_names_cache()

def default_class_name_generator(self, name: str) -> str:
"""Generate a valid class name from a string."""
Expand Down Expand Up @@ -989,7 +1014,8 @@ def get_class_name(
def _get_unique_name(self, name: str, camel: bool = False, model_type: str = "model") -> str: # noqa: FBT001, FBT002
unique_name: str = name
count: int = 0 if self.remove_suffix_number else 1
reference_names = {r.name for r in self.references.values()} | self.exclude_names
# Use cached reference names for O(1) lookup instead of O(n) set creation
reference_names = self._get_reference_names() | self.exclude_names

# Determine the suffix to use
suffix = self._get_suffix_for_model_type(model_type)
Expand Down
20 changes: 15 additions & 5 deletions src/datamodel_code_generator/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,15 +515,25 @@ def imports(self) -> Iterator[Import]:

def __init__(self, **values: Any) -> None:
"""Initialize DataType with validation and reference setup."""
if not TYPE_CHECKING:
if not TYPE_CHECKING: # pragma: no cover
super().__init__(**values)

# Single-pass optimization: detect ANY+optional and non-ANY types together
# This is a rare edge case optimization - pragma: no cover
any_optional_found = False
has_non_any = False
for type_ in self.data_types:
if type_.type == ANY and type_.is_optional:
if any(t for t in self.data_types if t.type != ANY): # pragma: no cover
self.is_optional = True
self.data_types = [t for t in self.data_types if not (t.type == ANY and t.is_optional)]
break # pragma: no cover
any_optional_found = True # pragma: no cover
elif type_.type != ANY:
has_non_any = True
# Early exit if both conditions met
if any_optional_found and has_non_any: # pragma: no cover
break

if any_optional_found and has_non_any: # pragma: no cover
self.is_optional = True
self.data_types = [t for t in self.data_types if not (t.type == ANY and t.is_optional)]

for data_type in self.data_types:
if data_type.reference or data_type.data_types:
Expand Down
Loading