Skip to content

Commit e546379

Browse files
committed
started implementing dictlike xml wrapper
1 parent fa902cf commit e546379

5 files changed

Lines changed: 179 additions & 39 deletions

File tree

src/somesy/pom_xml/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,11 @@
11
"""Somesy implementation for Java Maven pom.xml."""
2+
3+
# some POM-related constants and reusable objects
4+
POM_URL = "http://maven.apache.org/POM/4.0.0"
5+
POM_PREF = "{" + POM_URL + "}"
6+
POM_NS_MAP = dict(pom=POM_URL)
7+
POM_ROOT_ATRS = {
8+
"xmlns": "http://maven.apache.org/POM/4.0.0",
9+
"xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance",
10+
"xsi:schemaLocation": "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd",
11+
}

src/somesy/pom_xml/writer.py

Lines changed: 7 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,46 +2,16 @@
22
import logging
33
import xml.etree.ElementTree as ET
44
from pathlib import Path
5-
from typing import Dict, Optional
6-
7-
import defusedxml.ElementTree as DET
5+
from typing import Optional
86

97
from somesy.core.models import Person
108
from somesy.core.writer import FieldKeyMapping, ProjectMetadataWriter
119

10+
from . import POM_ROOT_ATRS, POM_URL
1211
from .xmlproxy import XMLProxy
1312

14-
logger = logging.getLogger("somesy")
15-
16-
# some POM-related constants and reusable objects
17-
POM_URL = "http://maven.apache.org/POM/4.0.0"
18-
POM_PREF = "{" + POM_URL + "}"
19-
POM_NS_MAP = dict(pom=POM_URL)
20-
POM_ROOT_ATRS: Dict[str, str] = {
21-
"xmlns": "http://maven.apache.org/POM/4.0.0",
22-
"xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance",
23-
"xsi:schemaLocation": "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd",
24-
}
25-
POM_PARSER = DET.XMLParser(target=ET.TreeBuilder(insert_comments=True))
26-
2713
ET.register_namespace("pom", POM_URL) # globally register xml namespace for POM
28-
29-
# helper methods
30-
31-
32-
def new_pom() -> ET.ElementTree:
33-
"""Create a minimal pom.xml file."""
34-
return ET.ElementTree(ET.Element("project", POM_ROOT_ATRS))
35-
36-
37-
def parse_pom(path: Path) -> ET.ElementTree:
38-
"""Parse a pom.xml file into an ElementTree, preserving comments."""
39-
return DET.parse(path, parser=POM_PARSER)
40-
41-
42-
def write_pom(tree: ET.ElementTree, path: Path):
43-
"""Write the POM DOM to a file."""
44-
tree.write(path, encoding="UTF-8", xml_declaration=True, default_namespace=POM_URL)
14+
logger = logging.getLogger("somesy")
4515

4616

4717
class POM(ProjectMetadataWriter):
@@ -74,11 +44,12 @@ def __init__(
7444

7545
def _init_new_file(self):
7646
"""Initialize new pom.xml file."""
77-
write_pom(new_pom(), self.path)
47+
pom = XMLProxy(ET.Element("project", POM_ROOT_ATRS))
48+
pom.write(self.path, default_namespace=POM_URL)
7849

7950
def _load(self):
8051
"""Load the POM file."""
81-
self._data = XMLProxy(parse_pom(self.path))
52+
self._data = XMLProxy.parse(self.path, default_namespace=POM_URL)
8253

8354
def _validate(self):
8455
"""Validate the POM file."""
@@ -87,7 +58,7 @@ def _validate(self):
8758
def save(self, path: Optional[Path] = None) -> None:
8859
"""Save the POM DOM to a file."""
8960
path = path or self.path
90-
write_pom(self._data, path)
61+
self._data.write(path)
9162

9263
@staticmethod
9364
def _from_person(person: Person):

src/somesy/pom_xml/xmlproxy.py

Lines changed: 125 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,129 @@
11
"""Wrapper to provide dict-like access to XML via ElementTree."""
2+
from __future__ import annotations
23

3-
from typing import Mapping
4+
import xml.etree.ElementTree as ET
5+
from pathlib import Path
6+
from typing import Optional, Union
47

8+
import defusedxml.ElementTree as DET
59

6-
class XMLProxy(Mapping):
7-
"""Class providing dict-like access to edit XML via ElementTree."""
10+
11+
class XMLProxy:
12+
"""Class providing dict-like access to edit XML via ElementTree.
13+
14+
Note that this wrapper facade is limited:
15+
* XML attributes are not supported
16+
* DTDs are ignored (arbitrary keys can be queried and added)
17+
* each tag is assumed to either contain a value or more nested tags
18+
* lists are treated atomically (no way to add/remove element from a collection)
19+
20+
The semantics is implemented as follows:
21+
* If there are multiple tags with the same name, a list of XMLProxy nodes is returned
22+
* If a unique tag does have no nested tags, its `text` string value is returned
23+
* Otherwise the node is returned
24+
"""
25+
26+
# NOTE: one could create a separate XMLList wrapper to cover the list case better
27+
# but need to think through the semantics properly.
28+
29+
def __init__(self, el: ET.Element, *, default_namespace: Optional[str] = None):
30+
"""Wrap an existing XML ElementTree Element."""
31+
self._node: ET.Element = el
32+
self._def_ns = default_namespace
33+
34+
def _qualified_key(self, key: str):
35+
"""If passed key is not qualified, prepends the default namespace (if set)."""
36+
if key[0] == "{" or not self._def_ns:
37+
return key
38+
return "{" + self._def_ns + "}" + key
39+
40+
@classmethod
41+
def parse(cls, path: Union[str, Path], **kwargs) -> XMLProxy:
42+
"""Parse an XML file into an ElementTree, preserving comments."""
43+
path = path if isinstance(path, Path) else Path(path)
44+
parser = DET.XMLParser(target=ET.TreeBuilder(insert_comments=True))
45+
return cls(DET.parse(path, parser=parser).getroot(), **kwargs)
46+
47+
def write(self, path: Union[str, Path], *, header: bool = True, **kwargs):
48+
"""Write the XML DOM to an UTF-8 encoded file."""
49+
path = path if isinstance(path, Path) else Path(path)
50+
et = ET.ElementTree(self._node)
51+
if self._def_ns and "default_namespace" not in kwargs:
52+
kwargs["default_namespace"] = self._def_ns
53+
et.write(path, encoding="UTF-8", xml_declaration=header, **kwargs)
54+
55+
def __repr__(self):
56+
"""See `object.__repr__`."""
57+
return str(self._node)
58+
59+
def __len__(self):
60+
"""See `Mapping.__len__`."""
61+
return len(self._node)
62+
63+
def __iter__(self):
64+
"""See `Mapping.__iter__`."""
65+
return map(XMLProxy, iter(self._node))
66+
67+
# ---- dict-like access ----
68+
69+
def get(self, key: str, *, as_node_list: bool = False):
70+
"""See `dict.get`."""
71+
if not key:
72+
raise ValueError("Key must not be an empty string!")
73+
# if not fully qualified + default NS is given, use it for query
74+
if lst := self._node.findall(self._qualified_key(key)):
75+
ns = list(map(lambda x: XMLProxy(x, default_namespace=self._def_ns), lst))
76+
if as_node_list:
77+
return ns # return it as a list an any case if desired
78+
if len(ns) > 1:
79+
return ns # node list (multiple matching elements)
80+
else:
81+
if ns[0]: # single node (single matched element)
82+
return ns[0]
83+
else: # string value (leaf element, i.e. no inner tags)
84+
return ns[0]._node.text.strip()
85+
86+
def __getitem__(self, key: str):
87+
"""Acts like `dict.__getitem__`, implemented with `get`."""
88+
val = self.get(key)
89+
if val is not None:
90+
return val
91+
else:
92+
raise KeyError(key)
93+
94+
def __contains__(self, key: str) -> bool:
95+
"""Acts like `dict.__contains__`, implemented with `get`."""
96+
return self.get(key) is not None
97+
98+
def __delitem__(self, key: str):
99+
"""Acts like `dict.__delitem__`.
100+
101+
If there are multiple matching tags, **all** of them are removed!
102+
"""
103+
nodes = self.get(key, as_node_list=True)
104+
if not nodes:
105+
raise KeyError(key)
106+
for child in nodes:
107+
self._node.remove(child._node)
108+
109+
def __setitem__(self, key: str, val):
110+
"""See `dict.__setitem__`."""
111+
nodes = self.get(key, as_node_list=True)
112+
if nodes:
113+
if len(nodes) > 1:
114+
# delete all (we can't handle lists well) + create new
115+
del self[key]
116+
node = ET.SubElement(self._node, self._qualified_key(key))
117+
else:
118+
# take unique node and empty it out (text + inner tags)
119+
node = nodes[0]._node
120+
node.text = ""
121+
for child in iter(node):
122+
node.remove(child)
123+
124+
# attach value to the tag
125+
if not isinstance(val, (XMLProxy, list, dict)): # leaf value
126+
val = val if isinstance(val, str) else str(val)
127+
node.text = val
128+
else: # nested dict-like structure
129+
raise NotImplementedError

tests/data/blank_pom.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
</project>
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from somesy.pom_xml import POM_URL
2+
from somesy.pom_xml.xmlproxy import XMLProxy
3+
4+
EMPTY_POM = """<?xml version='1.0' encoding='UTF-8'?>
5+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
6+
<!-- contents of POM file -->
7+
</project>"""
8+
9+
10+
def test_parse_write(tmp_path):
11+
"""Make sure that header + namespace of XML are set correctly and comments are preserved."""
12+
file_src = tmp_path / "blank_pom.xml"
13+
file_trg = tmp_path / "written.xml"
14+
15+
# write a blank test pom.xml file
16+
with open(file_src, "w") as f:
17+
f.write(EMPTY_POM)
18+
19+
prx = XMLProxy.parse(file_src, default_namespace=POM_URL)
20+
21+
# the root
22+
assert repr(prx).startswith(
23+
"<Element '{http://maven.apache.org/POM/4.0.0}project' at"
24+
)
25+
26+
# the comment element
27+
assert len(prx) == 1
28+
lst = list(iter(prx))
29+
assert len(lst) == 1 and repr(lst[0]).startswith("<Element <function Comment")
30+
31+
# check that namespaceing and comments are preserved
32+
prx.write(file_trg)
33+
contents = (file_trg).read_text()
34+
assert contents == EMPTY_POM

0 commit comments

Comments
 (0)