"""
`.cellucid-session` bundle reader (Python).
This implements the same framing format as the web app:
1) MAGIC bytes: b"CELLUCID_SESSION\\n"
2) manifestByteLength (u32 LE)
3) manifest JSON bytes (UTF-8)
4) repeated chunks: [chunkByteLength (u32 LE), chunkBytes...]
The manifest contains chunk metadata in order; chunk payloads are either JSON
or binary and may be gzip-compressed.
"""
from __future__ import annotations
import gzip
import json
import shutil
import struct
from dataclasses import dataclass
from pathlib import Path
from typing import Any, BinaryIO
SESSION_BUNDLE_MAGIC = b"CELLUCID_SESSION\n"
U32_BYTES = 4
MAX_MANIFEST_BYTES = 16 * 1024 * 1024
DEFAULT_MAX_UNCOMPRESSED_CHUNK_BYTES = 512 * 1024 * 1024
@dataclass(frozen=True)
class SessionChunkRef:
id: str
meta: dict[str, Any]
offset: int
stored_bytes: int
def _read_exact(fp: BinaryIO, n: int) -> bytes:
data = fp.read(n)
if data is None or len(data) != n:
raise ValueError("Unexpected EOF while reading session bundle")
return data
def _read_u32_le(fp: BinaryIO) -> int:
return struct.unpack("<I", _read_exact(fp, U32_BYTES))[0]
[docs]
class CellucidSessionBundle:
"""
Handle for a `.cellucid-session` file on disk.
The reader is streaming-friendly: it indexes chunk offsets once, and only
reads/decompresses chunk payloads on demand.
"""
[docs]
def __init__(self, path: str | Path):
self.path = Path(path).expanduser().resolve()
if not self.path.is_file():
raise FileNotFoundError(str(self.path))
self._manifest: dict[str, Any] | None = None
self._chunks_by_id: dict[str, SessionChunkRef] | None = None
[docs]
def save(self, dest: str | Path) -> Path:
"""Copy the session bundle to `dest` and return the destination path."""
dest_path = Path(dest).expanduser().resolve()
dest_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(self.path, dest_path)
return dest_path
[docs]
def apply_to_anndata(self, adata: Any, *, inplace: bool = False, **kwargs):
"""
Apply this bundle to an AnnData.
See `cellucid.anndata_session.apply_cellucid_session_to_anndata`.
"""
from .anndata_session import apply_cellucid_session_to_anndata
return apply_cellucid_session_to_anndata(self, adata, inplace=inplace, **kwargs)
@property
def manifest(self) -> dict[str, Any]:
"""Parsed manifest JSON (loaded lazily)."""
self._ensure_indexed()
assert self._manifest is not None
return self._manifest
@property
def dataset_fingerprint(self) -> dict[str, Any] | None:
fp = self.manifest.get("datasetFingerprint")
return fp if isinstance(fp, dict) else None
[docs]
def list_chunk_ids(self) -> list[str]:
self._ensure_indexed()
assert self._chunks_by_id is not None
return list(self._chunks_by_id.keys())
[docs]
def read_chunk_bytes(self, chunk_id: str) -> bytes:
"""Read the stored bytes for a chunk (codec not applied)."""
self._ensure_indexed()
assert self._chunks_by_id is not None
ref = self._chunks_by_id.get(chunk_id)
if ref is None:
raise KeyError(chunk_id)
with self.path.open("rb") as f:
f.seek(ref.offset)
return _read_exact(f, ref.stored_bytes)
[docs]
def decode_chunk(self, chunk_id: str) -> Any:
"""Decode a chunk payload (apply codec + parse JSON for json-kind chunks)."""
meta = self.get_chunk_meta(chunk_id)
stored = self.read_chunk_bytes(chunk_id)
return self._decode_payload(meta, stored)
[docs]
def iter_chunks(self, *, decode: bool = False):
"""Iterate chunks in manifest order."""
self._ensure_indexed()
assert self._chunks_by_id is not None
chunks = self.manifest.get("chunks") or []
for entry in chunks:
chunk_id = entry.get("id")
if not isinstance(chunk_id, str) or not chunk_id:
continue
ref = self._chunks_by_id.get(chunk_id)
if ref is None:
continue
if not decode:
yield ref
else:
yield (ref, self._decode_payload(ref.meta, self.read_chunk_bytes(chunk_id)))
# ---------------------------------------------------------------------
# Internals
# ---------------------------------------------------------------------
def _ensure_indexed(self) -> None:
if self._manifest is not None and self._chunks_by_id is not None:
return
with self.path.open("rb") as f:
magic = _read_exact(f, len(SESSION_BUNDLE_MAGIC))
if magic != SESSION_BUNDLE_MAGIC:
raise ValueError("Not a .cellucid-session file (invalid MAGIC header)")
manifest_len = _read_u32_le(f)
if manifest_len <= 0 or manifest_len > MAX_MANIFEST_BYTES:
raise ValueError(f"Invalid session manifest length: {manifest_len}")
manifest_bytes = _read_exact(f, manifest_len)
try:
manifest = json.loads(manifest_bytes.decode("utf-8"))
except Exception as e:
raise ValueError(f"Invalid session manifest JSON: {e}") from e
chunks_meta = manifest.get("chunks")
if not isinstance(chunks_meta, list):
raise ValueError("Invalid session manifest (missing chunks list)")
chunks_by_id: dict[str, SessionChunkRef] = {}
for meta in chunks_meta:
if not isinstance(meta, dict):
raise ValueError("Invalid session manifest (chunk meta must be object)")
chunk_id = meta.get("id")
if not isinstance(chunk_id, str) or not chunk_id:
raise ValueError("Invalid session manifest (chunk id missing)")
stored_len = _read_u32_le(f)
offset = f.tell()
# Bounds check without trusting meta.
if stored_len < 0:
raise ValueError(f"Invalid chunk length for {chunk_id}: {stored_len}")
f.seek(stored_len, 1)
chunks_by_id[chunk_id] = SessionChunkRef(
id=chunk_id,
meta=meta,
offset=offset,
stored_bytes=stored_len,
)
self._manifest = manifest
self._chunks_by_id = chunks_by_id
def _decode_payload(self, meta: dict[str, Any], stored: bytes) -> Any:
codec = meta.get("codec")
kind = meta.get("kind")
if codec not in ("none", "gzip"):
raise ValueError(f"Unsupported session chunk codec: {codec!r}")
if kind not in ("json", "binary"):
raise ValueError(f"Unsupported session chunk kind: {kind!r}")
if codec == "gzip":
uncompressed = gzip.decompress(stored)
max_bytes = meta.get("uncompressedBytes")
if isinstance(max_bytes, int) and max_bytes > 0:
if len(uncompressed) > max_bytes:
raise ValueError(
f"Chunk decompressed larger than expected ({len(uncompressed)} > {max_bytes})"
)
elif len(uncompressed) > DEFAULT_MAX_UNCOMPRESSED_CHUNK_BYTES:
raise ValueError("Chunk decompressed too large (missing uncompressedBytes guard)")
else:
uncompressed = stored
if kind == "binary":
return uncompressed
# kind == "json"
try:
return json.loads(uncompressed.decode("utf-8"))
except Exception as e:
raise ValueError(f"Invalid JSON chunk: {e}") from e