Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions docs/rfc/0004-dataframe-and-blob.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,21 @@

| Feld | Wert |
|-------------|--------------------------------------------------------------|
| Status | Proposed |
| Status | Accepted — Option B (Integritäts-Mixin) implementiert; Option C offen |
| Datum | 2026-06-29 |
| Autor | lepy <lepy@tuta.io> |
| Komponente | `sdata/sclass/dataframe.py`, `sdata/sclass/blob.py` |
| Komponente | `sdata/sclass/content.py`, `dataframe.py`, `blob.py` |
| Betrifft | ob/wie `DataFrame` auf `Blob` aufbaut (Folge von RFC 0003) |
| Validierung | Analyse/Designvorschlag — noch nicht implementiert |
| Validierung | Option B umgesetzt; content/blob/dataframe je 100 % |

> **Umsetzungsstand.** **Option B** umgesetzt: `ContentIntegrityMixin`
> (`sdata/sclass/content.py`) liefert `sha1`/`md5`/`sha256`/`size` sowie
> `update_checksum`/`verify` über einen `content_bytes`-Hook + `self.metadata`.
> `Blob` und `DataFrame` erben den Mixin (keine Vererbung untereinander — RFC-Befund).
> `DataFrame.content_bytes` = **reines Daten-Parquet** (`self.df.to_parquet()`, *ohne*
> eingebettete Metadaten): der Hash erfasst die Daten, sodass das Speichern der
> Prüfsumme in den Metadaten den Hash nicht verändert (Selbstreferenz vermieden).
> **Offen:** **Option C** (`DataFrame.as_blob(fmt)`) als additive Komposition.

## 1. Zusammenfassung

Expand Down
97 changes: 2 additions & 95 deletions sdata/sclass/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import typing
from typing import Any, List, Dict, Optional, Literal, Optional, Type
from sdata.base import Base
from sdata.sclass.content import ContentIntegrityMixin
logger = logging.getLogger(__name__)

# Import fsspec for handling various URI schemes (local file, S3, Zip, etc.)
Expand All @@ -17,7 +18,7 @@
logger.warning('fsspec not installed')
fsspec = None

class Blob(Base):
class Blob(ContentIntegrityMixin, Base):
"""
A derived class from Base that represents a generic binary large object (Blob).
Stores the content in self.data['content'] as a dictionary with:
Expand Down Expand Up @@ -289,100 +290,6 @@ def exists(self) -> bool:
else:
return False

@property
def sha1(self) -> Optional[str]:
"""
Calculate the SHA1 hash of the blob content lazily.
Loads content_bytes if necessary.
"""
try:
hash_obj = hashlib.sha1()
self._update_hash(hash_obj)
return hash_obj.hexdigest()
except Exception as e:
logger.error(f"Failed to compute SHA1: {str(e)}")
return None

@property
def md5(self) -> Optional[str]:
"""
Calculate the MD5 hash of the blob content lazily.
Loads content_bytes if necessary.
"""
try:
hash_obj = hashlib.md5()
self._update_hash(hash_obj)
return hash_obj.hexdigest()
except Exception as e:
logger.error(f"Failed to compute MD5: {str(e)}")
return None

@property
def sha256(self) -> Optional[str]:
"""
Calculate the SHA-256 hash of the blob content lazily.

:return: the hex digest, or ``None`` if the content cannot be loaded.
"""
try:
hash_obj = hashlib.sha256()
self._update_hash(hash_obj)
return hash_obj.hexdigest()
except Exception as e:
logger.error(f"Failed to compute SHA256: {str(e)}")
return None

def _update_hash(self, hash_obj: Any, buffer_size: int = 65536) -> None:
"""
Update the hash object with the blob content.
Uses content_bytes for hashing.
"""
content_bytes = self.content_bytes # Lazy load
bytes_io = io.BytesIO(content_bytes)
while True:
data = bytes_io.read(buffer_size)
if not data:
break
hash_obj.update(data)

@property
def size(self) -> Optional[int]:
"""
Size of the content in bytes (loads the content lazily).

:return: the byte count, or ``None`` if the content cannot be loaded.
"""
try:
return len(self.content_bytes)
except Exception as e:
logger.error(f"Failed to determine size: {str(e)}")
return None

def update_checksum(self) -> Optional[str]:
"""
Compute the SHA-256 of the content and store it in the ``checksum`` metadata
(ontology ``schema:sha256``).

:return: the stored SHA-256 hex digest (or ``None`` if the content is unavailable).
"""
digest = self.sha256
self.metadata.set_attr("checksum", digest)
return digest

def verify(self) -> bool:
"""
Verify the content against the stored ``checksum`` (SHA-256) metadata.

:return: ``True`` iff a checksum is stored and matches the current content;
``False`` on mismatch or when no checksum has been stored yet.
"""
attr = self.metadata.get("checksum")
stored = attr.value if attr is not None else None
if not stored:
logger.warning("Blob.verify: no checksum stored (call update_checksum first)")
return False
return stored == self.sha256

def write(self, uri: str, **kwargs: Any) -> str:
"""Write the content to a destination ``uri`` via fsspec (local/S3/zip/…).

Expand Down
87 changes: 87 additions & 0 deletions sdata/sclass/content.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# -*- coding: utf-8 -*-
"""Wiederverwendbarer Integritäts-/Hash-Layer (RFC 0004, Option B).

:class:`ContentIntegrityMixin` stellt Prüfsummen (``sha1``/``md5``/``sha256``),
``size`` sowie ``update_checksum``/``verify`` über einen ``content_bytes``-Hook und
das ``metadata``-Objekt (aus :class:`~sdata.base.Base`) bereit. Genutzt von
:class:`~sdata.sclass.blob.Blob` und :class:`~sdata.sclass.dataframe.DataFrame`.
"""
import io
import hashlib
import logging
from typing import Any, Optional

logger = logging.getLogger(__name__)


class ContentIntegrityMixin:
"""Hash/``verify``/``size`` über ``self.content_bytes`` + ``self.metadata``.

Subklassen liefern eine ``content_bytes``-Property (``bytes``) und besitzen
(über :class:`~sdata.base.Base`) ein ``metadata``-Objekt.
"""

def _update_hash(self, hash_obj: Any, buffer_size: int = 65536) -> None:
"""Speise den Hash-Objekt-Stream aus ``content_bytes`` (chunked)."""
bytes_io = io.BytesIO(self.content_bytes)
while True:
data = bytes_io.read(buffer_size)
if not data:
break
hash_obj.update(data)

def _content_hexdigest(self, algo) -> Optional[str]:
"""Hex-Digest des Inhalts mit ``algo`` (z. B. ``hashlib.sha256``); ``None`` bei Fehler."""
try:
hash_obj = algo()
self._update_hash(hash_obj)
return hash_obj.hexdigest()
except Exception as exp:
logger.error(f"Failed to compute {algo().name}: {exp}")
return None

@property
def sha1(self) -> Optional[str]:
"""SHA-1 hex digest of the content (lazy; ``None`` on error)."""
return self._content_hexdigest(hashlib.sha1)

@property
def md5(self) -> Optional[str]:
"""MD5 hex digest of the content (lazy; ``None`` on error)."""
return self._content_hexdigest(hashlib.md5)

@property
def sha256(self) -> Optional[str]:
"""SHA-256 hex digest of the content (lazy; ``None`` on error)."""
return self._content_hexdigest(hashlib.sha256)

@property
def size(self) -> Optional[int]:
"""Size of the content in bytes (lazy; ``None`` on error)."""
try:
return len(self.content_bytes)
except Exception as exp:
logger.error(f"Failed to determine size: {exp}")
return None

def update_checksum(self) -> Optional[str]:
"""Store the SHA-256 of the content in the ``checksum`` metadata (``schema:sha256``).

:return: the stored SHA-256 hex digest (``None`` if the content is unavailable).
"""
digest = self.sha256
self.metadata.set_attr("checksum", digest)
return digest

def verify(self) -> bool:
"""Verify the content against the stored ``checksum`` (SHA-256) metadata.

:return: ``True`` iff a checksum is stored and matches the current content;
``False`` on mismatch or when no checksum has been stored yet.
"""
attr = self.metadata.get("checksum")
stored = attr.value if attr is not None else None
if not stored:
logger.warning("verify: no checksum stored (call update_checksum first)")
return False
return stored == self.sha256
16 changes: 15 additions & 1 deletion sdata/sclass/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sdata.metadata import Metadata, Attribute
from sdata.base import Base
from sdata.interactive import ColumnAccessor
from sdata.sclass.content import ContentIntegrityMixin

logger = logging.getLogger(__name__)

Expand All @@ -35,7 +36,7 @@ def _require_parquet(engine: str = "pyarrow") -> None:
) from exp


class DataFrame(Base):
class DataFrame(ContentIntegrityMixin, Base):
SDATA_CLS = "sdata.sclass.dataframe.DataFrame"

#: optionales :class:`~sdata.schema.TableSchema`; beim Init angewandt (Default None)
Expand Down Expand Up @@ -137,6 +138,19 @@ def _sync_column_metadata(self, prune=False):

df = property(fget=_get_df, fset=_set_df, doc="df object(pandas.DataFrame)")

@property
def content_bytes(self) -> bytes:
"""Binary serialization of the **data** (plain Parquet of the df, *without* the
embedded sdata metadata).

The hook the inherited :class:`~sdata.sclass.content.ContentIntegrityMixin`
hashes over — enables ``sha256``/``md5``/``sha1``, ``size`` and
``verify()``/``update_checksum()`` directly on a :class:`DataFrame`. Hashing
the data only keeps the checksum stable when *metadata* changes (otherwise
storing the checksum in the metadata would alter the hash).
"""
return self.df.to_parquet()

@property
def column_metadata(self) -> Metadata:
"""
Expand Down
37 changes: 37 additions & 0 deletions tests/test_sclass_dataframe_integrity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
"""DataFrame nutzt den gemeinsamen Integritäts-Mixin (RFC 0004, Option B):
sha1/md5/sha256/size + verify/update_checksum über content_bytes (Parquet)."""
import pandas as pd
import pytest

pytest.importorskip("pyarrow")

from sdata.sclass.content import ContentIntegrityMixin
from sdata.sclass.dataframe import DataFrame


def _df():
return pd.DataFrame({"weight": [10, 20, 30], "height": [1.5, 1.6, 1.7]})


def test_dataframe_is_integrity_mixin():
assert isinstance(DataFrame(df=_df(), name="x"), ContentIntegrityMixin)


def test_dataframe_hashes_and_size():
sdf = DataFrame(df=_df(), name="x")
assert len(sdf.sha256) == 64
assert len(sdf.sha1) == 40
assert len(sdf.md5) == 32
assert sdf.size and sdf.size > 0
assert sdf.content_bytes == sdf.df.to_parquet() # reines Daten-Parquet (ohne _sdata)


def test_dataframe_verify_and_update_checksum():
sdf = DataFrame(df=_df(), name="x")
assert sdf.verify() is False # noch keine checksum gespeichert
digest = sdf.update_checksum()
assert digest == sdf.sha256 and len(digest) == 64
assert sdf.verify() is True # passt (Parquet deterministisch je Objekt)
sdf.df = pd.DataFrame({"weight": [9]}) # Daten ändern -> anderer Hash
assert sdf.verify() is False