Source code for cellucid.anndata_adapter

"""
AnnData Adapter for Cellucid

Provides a server-side adapter that reads AnnData objects (in memory, backed h5ad,
or zarr stores) and serves data in the same format that the Cellucid web viewer
expects. This allows users to visualize AnnData directly without running
prepare first.

Key features:
- Works with in-memory AnnData, backed h5ad files, and zarr stores
- Handles sparse matrices (CSR/CSC) transparently with automatic format conversion
- Auto-detects UMAP dimensions from explicit obsm keys (X_umap_1d, X_umap_2d, X_umap_3d) or X_umap
- Computes centroids and outlier quantiles on-demand with caching
- Lazy loading: gene expression and obs data are loaded on-demand
- No quantization (full float32 precision, gzip compression for network transfer)

Lazy Loading Behavior:
----------------------
- h5ad (backed='r'): Uses HDF5 memory-mapping. Arrays are loaded only when accessed.
  Gene expression columns are fetched individually, minimizing memory usage.
- zarr: Arrays are stored as separate files. Reading is inherently lazy - only
  requested array chunks are loaded into memory. Note: zarr does not support
  backed mode like h5ad; the AnnData structure is loaded but data arrays are lazy.
- in-memory: All data is already in RAM. No lazy loading.

Memory Management:
-----------------
The adapter maintains several caches for performance:
- Embedding cache: Normalized UMAP coordinates (one per dimension)
- Centroid cache: Computed label centroids
- Gene expression LRU cache: Recently accessed gene columns (max 100 genes)
- CSC cache: For CSR matrices, a CSC copy is created for efficient column access

IMPORTANT: Always use the context manager or call close() when done:
    with AnnDataAdapter.from_file("data.h5ad") as adapter:
        # use adapter
    # resources are automatically released

Usage:
    from cellucid.anndata_adapter import AnnDataAdapter

    # From h5ad file (lazy loading via backed mode)
    with AnnDataAdapter.from_file("/path/to/data.h5ad") as adapter:
        manifest = adapter.get_obs_manifest()

    # From zarr store (lazy array access)
    adapter = AnnDataAdapter.from_file("/path/to/data.zarr")

    # From in-memory AnnData
    adapter = AnnDataAdapter(adata)

    # Use with server
    from cellucid import show_anndata
    show_anndata(adata)  # or show_anndata("/path/to/data.h5ad")
"""

from __future__ import annotations

import gzip
import json
import logging
import re
import weakref
from collections import OrderedDict
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, Optional, Union

import numpy as np
import pandas as pd
from scipy import sparse

if TYPE_CHECKING:
    import anndata

logger = logging.getLogger("cellucid.anndata_adapter")

# Import shared utilities from prepare_data to avoid duplication
# If running standalone, define locally
try:
    from .prepare_data import _safe_filename_component
except ImportError:
    def _safe_filename_component(name: str) -> str:
        """Return a filesystem-friendly version of a field key."""
        safe = re.sub(r"[^A-Za-z0-9._-]+", "_", str(name))
        safe = safe.strip("._")
        return safe or "field"


class LRUCache:
    """
    Simple LRU cache with O(1) operations using OrderedDict.

    WARNING: This class is NOT thread-safe. The AnnDataAdapter is designed
    for single-threaded use within an HTTP server (each request is handled
    sequentially). If you need thread-safe caching, wrap operations in
    external locks or use a thread-safe cache implementation.

    The HTTP server uses a single adapter instance per server, and Python's
    GIL provides some level of safety for simple operations, but concurrent
    access from multiple threads may cause race conditions.
    """

    def __init__(self, max_size: int = 100):
        if max_size < 1:
            raise ValueError("max_size must be at least 1")
        self._cache: OrderedDict[str, Any] = OrderedDict()
        self._max_size = max_size

    def get(self, key: str) -> Optional[Any]:
        """Get item and move to end (most recently used)."""
        if key in self._cache:
            self._cache.move_to_end(key)
            return self._cache[key]
        return None

    def put(self, key: str, value: Any) -> None:
        """Add item, evicting oldest if at capacity."""
        if key in self._cache:
            self._cache.move_to_end(key)
        else:
            if len(self._cache) >= self._max_size:
                self._cache.popitem(last=False)  # Remove oldest
        self._cache[key] = value

    def __contains__(self, key: str) -> bool:
        return key in self._cache

    def clear(self) -> None:
        """Clear all cached items and release memory."""
        self._cache.clear()

    def __len__(self) -> int:
        return len(self._cache)

    @property
    def max_size(self) -> int:
        """Maximum cache size."""
        return self._max_size


def _to_dense_1d(arr: Union[np.ndarray, sparse.spmatrix]) -> np.ndarray:
    """Convert sparse vector to dense numpy array."""
    if sparse.issparse(arr):
        return np.asarray(arr.toarray()).flatten()
    arr = np.asarray(arr)
    if arr.ndim > 1:
        return arr.flatten()
    return arr


def _to_dense_2d(arr: Union[np.ndarray, sparse.spmatrix]) -> np.ndarray:
    """Convert sparse matrix to dense numpy array."""
    if sparse.issparse(arr):
        return np.asarray(arr.toarray())
    return np.asarray(arr)


[docs] class AnnDataAdapter: """ Adapter that wraps AnnData and provides data in Cellucid format. This adapter generates all the data that would normally be created by prepare, but reads directly from AnnData without creating intermediate files. This is slower but more convenient for interactive use. """
[docs] def __init__( self, adata: "anndata.AnnData", latent_key: Optional[str] = None, gene_id_column: str = "index", normalize_embeddings: bool = True, centroid_outlier_quantile: float = 0.95, centroid_min_points: int = 10, dataset_name: Optional[str] = None, dataset_id: Optional[str] = None, ): """ Initialize the adapter. Parameters ---------- adata : AnnData AnnData object to adapt. Can be in-memory or backed (h5ad file). latent_key : str, optional Key in obsm for latent space used for outlier quantile calculation. If None, attempts to find: 'X_pca', 'X_scvi', 'scanvi', 'scvi', or first obsm. gene_id_column : str Column in var for gene identifiers. Use "index" for var.index. normalize_embeddings : bool If True, normalize embeddings to [-1, 1] range (recommended). centroid_outlier_quantile : float Quantile for outlier removal in centroid computation. centroid_min_points : int Minimum points per category for centroid computation. dataset_name : str, optional Human-readable dataset name. dataset_id : str, optional Dataset identifier. """ self.adata = adata self.latent_key = latent_key self.gene_id_column = gene_id_column self.normalize_embeddings = normalize_embeddings # Validate centroid parameters if not (0.5 < centroid_outlier_quantile < 1.0): logger.warning( f"centroid_outlier_quantile={centroid_outlier_quantile} is outside " f"recommended range (0.5, 1.0). Using default 0.95." ) centroid_outlier_quantile = 0.95 self.centroid_outlier_quantile = centroid_outlier_quantile if centroid_min_points < 1: logger.warning( f"centroid_min_points={centroid_min_points} must be positive. Using default 10." ) centroid_min_points = 10 self.centroid_min_points = centroid_min_points # Dataset metadata self.dataset_name = dataset_name or "AnnData Dataset" self.dataset_id = dataset_id or _safe_filename_component(self.dataset_name) # Source tracking (set by from_file, defaults for in-memory) self._source_path: Optional[str] = None self._source_type: str = "memory" self._is_lazy: bool = False # Set by from_file for backed h5ad/zarr # Caches for computed data self._embedding_cache: dict[int, np.ndarray] = {} # Per-dimension normalization info for embeddings (center + scale_factor). # Used to scale vector fields into the same normalized space as points. self._embedding_norm: dict[int, dict[str, Any]] = {} # Optional per-cell vector fields (e.g. velocity, CellRank drift). # Structure matches dataset_identity.json: {"default_field": ..., "fields": {...}}. self._vector_fields_metadata: Optional[dict[str, Any]] = None self._vector_field_cache: dict[tuple[str, int], np.ndarray] = {} self._centroid_cache: dict[str, dict] = {} self._outlier_quantile_cache: dict[str, np.ndarray] = {} self._latent_space: Optional[np.ndarray] = None self._gene_ids_cache: Optional[list[str]] = None self._gene_id_to_idx_cache: Optional[dict[str, int]] = None # Connectivity cache: (sources, destinations, n_edges, max_neighbors) self._connectivity_cache: Optional[tuple[np.ndarray, np.ndarray, int, int]] = None # CSC cache for efficient column access on CSR matrices # Note: This doubles memory for sparse matrices, but is necessary for O(1) gene access # Set to None initially; created lazily only when needed self._X_csc_cache: Optional[sparse.csc_matrix] = None # LRU cache for gene expression values using O(1) OrderedDict self._gene_expression_cache = LRUCache(max_size=100) # UMAP embedding key resolution (dimension -> obsm key) self._umap_embedding_key_by_dim: dict[int, str] = {} # Optional metadata for UI notifications about X_umap aliasing/clashes self._umap_resolution: Optional[dict[str, Any]] = None # Track if adapter has been closed self._closed = False # Auto-detect available dimensions self._available_dimensions = self._detect_dimensions() self._default_dimension = self._select_default_dimension() # Detect optional per-cell vector fields aligned to UMAP. self._vector_fields_metadata = self._detect_vector_fields() # Auto-detect latent space if self.latent_key is None: self.latent_key = self._detect_latent_key() logger.info(f"AnnDataAdapter initialized: {self.n_cells:,} cells, " f"{self.n_genes:,} genes, dimensions: {self._available_dimensions}")
[docs] @classmethod def from_file( cls, path: Union[str, Path], backed: Union[bool, Literal["r", "r+"]] = "r", **kwargs ) -> "AnnDataAdapter": """ Create adapter from h5ad file or zarr store with lazy loading. Supports both: - .h5ad files: HDF5-based, supports true backed mode with memory-mapping - .zarr directories: Directory-based, arrays are loaded on-demand Lazy Loading Behavior --------------------- h5ad (backed mode): When backed='r', the file is memory-mapped. Only accessed data is loaded into RAM. This is ideal for large datasets. The X matrix and layer arrays support lazy column/row access. zarr: Zarr stores individual arrays as separate files on disk. While anndata.read_zarr() loads the AnnData structure (obs, var metadata), the actual X matrix data is loaded lazily when accessed. This is because zarr's internal chunking mechanism defers loading until data is requested. Note: zarr does not support the same backed mode API as h5ad, but achieves similar lazy behavior through its design. Parameters ---------- path : str or Path Path to h5ad file or zarr directory. - For h5ad: path/to/file.h5ad - For zarr: path/to/store.zarr (must be a directory) backed : bool or 'r' or 'r+' For h5ad only: - 'r': Read-only backed mode (recommended for visualization) - 'r+': Read-write backed mode - True: Same as 'r' - False: Load entire file into memory For zarr: This parameter is ignored (zarr is always lazy). **kwargs Additional arguments passed to AnnDataAdapter.__init__: - latent_key: Key in obsm for latent space - gene_id_column: Column in var for gene IDs - normalize_embeddings: Normalize UMAP to [-1,1] - dataset_name: Human-readable name Returns ------- AnnDataAdapter Adapter instance wrapping the loaded data. Raises ------ FileNotFoundError If the path does not exist. ValueError If the path is not a valid h5ad or zarr store. Examples -------- >>> # Load h5ad with lazy loading (default) >>> adapter = AnnDataAdapter.from_file("data.h5ad") >>> # Load h5ad fully into memory >>> adapter = AnnDataAdapter.from_file("data.h5ad", backed=False) >>> # Load zarr store >>> adapter = AnnDataAdapter.from_file("data.zarr") """ import anndata as ad path = Path(path).resolve() if not path.exists(): raise FileNotFoundError(f"Path not found: {path}") # Detect format based on path structure # Zarr stores are directories containing .zgroup or .zattrs files is_zarr = ( path.suffix == ".zarr" or (path.is_dir() and (path / ".zgroup").exists()) or (path.is_dir() and (path / ".zattrs").exists()) ) # Additional check: if it's a directory without h5ad extension, likely zarr if path.is_dir() and not is_zarr: # Check for zarr indicators if any((path / f).exists() for f in [".zgroup", ".zattrs", ".zarray"]): is_zarr = True elif path.suffix != ".h5ad": # If directory doesn't look like zarr and isn't h5ad, warn logger.warning( f"Directory '{path}' does not appear to be a zarr store. " f"Expected .zgroup or .zattrs file." ) if is_zarr: # Zarr store - data arrays are loaded lazily by zarr's chunking if not path.is_dir(): raise ValueError( f"Zarr path must be a directory: {path}. " f"Zarr stores are directories, not single files." ) try: adata = ad.read_zarr(path) except ModuleNotFoundError as e: # `anndata.read_zarr` requires the optional `zarr` dependency. if getattr(e, "name", None) == "zarr": raise ModuleNotFoundError( "Loading `.zarr` stores requires the optional `zarr` dependency. " "Install it with: pip install zarr" ) from e raise except ImportError as e: # Some environments may raise a generic ImportError instead. if "zarr" in str(e).lower(): raise ImportError( "Loading `.zarr` stores requires the optional `zarr` dependency. " "Install it with: pip install zarr" ) from e raise source_type = "zarr" is_lazy = True # Zarr provides lazy array access logger.info(f"Loaded zarr store: {path} (lazy array access)") else: # h5ad file if not path.is_file(): raise ValueError(f"h5ad path must be a file: {path}") try: if backed: backed_mode = backed if isinstance(backed, str) else "r" adata = ad.read_h5ad(path, backed=backed_mode) is_lazy = True else: adata = ad.read_h5ad(path) is_lazy = False except ModuleNotFoundError as e: # `anndata.read_h5ad` requires `h5py` in practice. if getattr(e, "name", None) == "h5py": raise ModuleNotFoundError( "Loading `.h5ad` files requires the `h5py` dependency. " "Install it with: pip install h5py" ) from e raise source_type = "h5ad" logger.info(f"Loaded h5ad file: {path} (backed={backed}, lazy={is_lazy})") # Use filename/dirname as default name if "dataset_name" not in kwargs: kwargs["dataset_name"] = path.stem if path.is_file() else path.name # Create adapter and store source info adapter = cls(adata, **kwargs) adapter._source_path = str(path) adapter._source_type = source_type adapter._is_lazy = is_lazy return adapter
def _check_closed(self) -> None: """Raise error if adapter has been closed.""" if self._closed: raise RuntimeError( "AnnDataAdapter has been closed. Create a new adapter instance." ) @property def n_cells(self) -> int: """Number of cells.""" self._check_closed() return self.adata.n_obs @property def n_genes(self) -> int: """Number of genes.""" self._check_closed() return self.adata.n_vars @property def is_backed(self) -> bool: """ Whether the AnnData is backed (lazy loading from disk). Returns False if the adapter is closed or if adata is None. """ if self._closed or self.adata is None: return False try: return self.adata.isbacked except Exception: return False # ========================================================================= # DIMENSION DETECTION # ========================================================================= def _detect_dimensions(self) -> list[int]: """ Detect available UMAP dimensions from obsm keys. Looks for embeddings in this order: 1. Explicit dimension keys: X_umap_1d, X_umap_2d, X_umap_3d 2. Generic key: X_umap (interpreted as 1D/2D/3D if shape matches and no clash) Returns ------- list[int] Sorted list of available dimensions (e.g., [2, 3]) Raises ------ ValueError If no valid UMAP embeddings are found in obsm. """ # Edge case: empty AnnData if self.n_cells == 0: logger.warning("AnnData has 0 cells. Visualization may not work correctly.") # Reset resolution metadata on each detection pass self._umap_embedding_key_by_dim = {} self._umap_resolution = None available: list[int] = [] def _shape_of(arr: Any) -> tuple[int, ...]: if sparse.issparse(arr): return tuple(arr.shape) try: shape = arr.shape # type: ignore[attr-defined] return tuple(shape) except Exception: return tuple(np.asarray(arr).shape) # Check for explicit dimension keys first for dim in [1, 2, 3]: key = f"X_umap_{dim}d" if key in self.adata.obsm: arr = self.adata.obsm[key] # Handle sparse obsm (rare but possible) arr_shape = _shape_of(arr) if len(arr_shape) != 2: logger.warning(f"Key '{key}' is not 2D (shape: {arr_shape}), skipping") continue if arr_shape[0] != self.n_cells: logger.warning( f"Key '{key}' has {arr_shape[0]} rows but AnnData has {self.n_cells} cells" ) continue if arr_shape[1] == dim: available.append(dim) self._umap_embedding_key_by_dim[dim] = key else: logger.warning( f"Key '{key}' has {arr_shape[1]} columns, expected {dim}" ) # Optional fallback: interpret X_umap as a dimensioned embedding when unambiguous. if "X_umap" in self.adata.obsm: arr = self.adata.obsm["X_umap"] arr_shape = _shape_of(arr) if len(arr_shape) != 2: self._umap_resolution = { "source_key": "X_umap", "action": "ignored", "reason": "invalid_shape", "shape": list(arr_shape), } elif arr_shape[0] != self.n_cells: self._umap_resolution = { "source_key": "X_umap", "action": "ignored", "reason": "row_count_mismatch", "shape": list(arr_shape), "expected_rows": self.n_cells, } else: n_dims = int(arr_shape[1]) alias_key = f"X_umap_{n_dims}d" if n_dims not in (1, 2, 3): self._umap_resolution = { "source_key": "X_umap", "action": "ignored", "reason": "unsupported_dim", "n_dims": n_dims, } elif n_dims in self._umap_embedding_key_by_dim: # Clash: explicit key for this dimension exists; ignore X_umap. self._umap_resolution = { "source_key": "X_umap", "action": "ignored", "reason": "explicit_key_present", "dim": n_dims, "alias_key": alias_key, } else: available.append(n_dims) self._umap_embedding_key_by_dim[n_dims] = "X_umap" self._umap_resolution = { "source_key": "X_umap", "action": "used_as", "dim": n_dims, "alias_key": alias_key, } if not available: raise ValueError( "No valid UMAP embeddings found in adata.obsm. " "Expected keys: 'X_umap_1d', 'X_umap_2d', 'X_umap_3d', or 'X_umap' (1D/2D/3D). " f"Available obsm keys: {list(self.adata.obsm.keys())}. " "Please add UMAP coordinates to your AnnData object using scanpy.tl.umap() " "and then store explicit embeddings under X_umap_1d / X_umap_2d / X_umap_3d." ) # Ensure uniqueness and stable ordering. return sorted(set(available)) def _select_default_dimension(self) -> int: """Select default dimension (prefer 3D > 2D > 1D).""" for dim in [3, 2, 1]: if dim in self._available_dimensions: return dim return self._available_dimensions[0] def _detect_vector_fields(self) -> Optional[dict[str, Any]]: """ Detect per-cell vector fields in `adata.obsm` aligned to UMAP. Naming convention (UMAP basis): - Explicit: `<field>_umap_<dim>d` (preferred) - Examples: `velocity_umap_2d`, `T_fwd_umap_3d` - Implicit: `<field>_umap` with shape `(n_cells, 1|2|3)` - Used only if the explicit key for that dim is not present (clash-safe) Returns a structure suitable for dataset_identity.json under `vector_fields`, including per-field `files` entries pointing at server-served binaries: `vectors/<fieldId>_<dim>d.bin`. """ keys = list(self.adata.obsm.keys()) if not keys: return None def _shape_of(arr: Any) -> tuple[int, ...]: if sparse.issparse(arr): return tuple(arr.shape) try: shape = arr.shape # type: ignore[attr-defined] return tuple(shape) except Exception: return tuple(np.asarray(arr).shape) suffix_re = re.compile(r"^(.*)_([123])d$") # Internal working structure: fieldId -> {dims:set, explicit:set, obsm_keys:dict[str,str]} fields: dict[str, dict[str, Any]] = {} def _ensure(field_id: str) -> dict[str, Any]: if field_id not in fields: fields[field_id] = {"dims": set(), "explicit": set(), "obsm_keys": {}} return fields[field_id] # 1) Explicit per-dimension keys. for key in keys: match = suffix_re.match(key) if not match: continue field_id = match.group(1) dim = int(match.group(2)) if not field_id.endswith("_umap"): continue if field_id.startswith("X_"): continue # reserved for embeddings if dim not in (1, 2, 3): continue if dim not in self._available_dimensions: continue # can't render a dim that doesn't exist in points arr = self.adata.obsm.get(key) if arr is None: continue shape = _shape_of(arr) if len(shape) != 2 or shape[0] != self.n_cells or shape[1] != dim: continue entry = _ensure(field_id) entry["obsm_keys"][f"{dim}d"] = key entry["dims"].add(dim) entry["explicit"].add(dim) # 2) Implicit keys (<field>_umap), shape-derived, clash-safe. for key in keys: field_id = key if not field_id.endswith("_umap"): continue if field_id.startswith("X_"): continue if suffix_re.match(field_id): continue # skip explicit keys arr = self.adata.obsm.get(field_id) if arr is None: continue shape = _shape_of(arr) if len(shape) == 1: if shape[0] != self.n_cells: continue dim = 1 elif len(shape) == 2: if shape[0] != self.n_cells: continue dim = int(shape[1]) else: continue if dim not in (1, 2, 3): continue if dim not in self._available_dimensions: continue entry = _ensure(field_id) if dim in entry["explicit"]: continue # clash-safe: explicit wins slot = f"{dim}d" if slot not in entry["obsm_keys"]: entry["obsm_keys"][slot] = field_id entry["dims"].add(dim) if not fields: return None def _label_for(field_id: str) -> str: base = field_id[:-5] if field_id.endswith("_umap") else field_id base = base.replace("_", " ").strip() titled = (base[:1].upper() + base[1:]) if base else field_id return f"{titled} (UMAP)" if field_id.endswith("_umap") else titled fields_out: dict[str, Any] = {} for field_id, entry in fields.items(): dims = sorted(entry["dims"]) if not dims: continue files = {f"{d}d": f"vectors/{field_id}_{d}d.bin" for d in dims} fields_out[field_id] = { "label": _label_for(field_id), "basis": "umap", "available_dimensions": dims, "default_dimension": max(dims), "files": files, "obsm_keys": entry["obsm_keys"], } if not fields_out: return None default_field = "velocity_umap" if "velocity_umap" in fields_out else sorted(fields_out.keys())[0] return {"default_field": default_field, "fields": fields_out} def _detect_latent_key(self) -> Optional[str]: """Auto-detect latent space key from obsm.""" # Priority order for latent space detection candidates = ["scanvi", "scvi", "X_scvi", "X_pca", "harmony", "X_harmony"] for key in candidates: if key in self.adata.obsm: logger.info(f"Auto-detected latent space: '{key}'") return key # Fall back to first available obsm that's not a UMAP for key in self.adata.obsm.keys(): if not key.startswith("X_umap"): logger.info(f"Using '{key}' as latent space (first non-UMAP obsm)") return key logger.warning("No latent space found in obsm, centroid outlier computation will be skipped") return None # ========================================================================= # EMBEDDING DATA # =========================================================================
[docs] def get_embedding(self, dim: int) -> np.ndarray: """ Get embedding coordinates for a dimension. Returns normalized Float32 array of shape (n_cells, dim). """ self._check_closed() if dim not in self._available_dimensions: raise ValueError(f"Dimension {dim}D not available. Available: {self._available_dimensions}") if dim in self._embedding_cache: return self._embedding_cache[dim] # Get raw embedding key = self._umap_embedding_key_by_dim.get(dim) if not key or key not in self.adata.obsm: raise ValueError( f"Could not find embedding for dimension {dim}. " f"Expected an embedding resolved for {dim}D. Available obsm keys: {list(self.adata.obsm.keys())}" ) raw = _to_dense_2d(self.adata.obsm[key]) raw = raw.astype(np.float32) if self.normalize_embeddings: # Normalize to [-1, 1] range (same as prepare) axis_mins = raw.min(axis=0) axis_maxs = raw.max(axis=0) axis_ranges = axis_maxs - axis_mins max_range = float(axis_ranges.max()) if max_range < 1e-8: max_range = 1.0 center = (axis_mins + axis_maxs) / 2 scale_factor = 2.0 / max_range raw = ((raw - center) * scale_factor).astype(np.float32) self._embedding_norm[dim] = { "center": center.astype(np.float32), "scale_factor": float(scale_factor), } else: self._embedding_norm[dim] = { "center": np.zeros((dim,), dtype=np.float32), "scale_factor": 1.0, } self._embedding_cache[dim] = raw return raw
[docs] def get_embedding_3d(self, dim: int) -> np.ndarray: """ Get embedding padded to 3D for WebGL rendering. 1D -> (x, 0, 0) 2D -> (x, y, 0) 3D -> (x, y, z) """ embedding = self.get_embedding(dim) n_cells = embedding.shape[0] if dim == 3: return embedding elif dim == 2: result = np.zeros((n_cells, 3), dtype=np.float32) result[:, :2] = embedding return result elif dim == 1: result = np.zeros((n_cells, 3), dtype=np.float32) result[:, 0] = embedding[:, 0] return result else: raise ValueError(f"Unsupported dimension: {dim}")
[docs] def get_points_binary(self, dim: int, compress: bool = False) -> bytes: """Get embedding as binary data (for HTTP response).""" embedding = self.get_embedding(dim) data = embedding.astype(np.float32).tobytes() if compress: return gzip.compress(data, compresslevel=6) return data
[docs] def get_vector_field_binary(self, field_id: str, dim: int, compress: bool = False) -> bytes: """ Get a per-cell vector field (displacement vectors) as binary float32 data. Vector fields are scaled by the SAME per-dimension normalization scale as the embedding points, so they are in the same normalized space as the `points_{dim}d.bin` responses. """ self._check_closed() field = str(field_id or "") d = int(dim) if d not in (1, 2, 3): raise ValueError(f"Invalid dimension {dim}. Expected 1, 2, or 3.") meta = self._vector_fields_metadata or {} fields = meta.get("fields") or {} entry = fields.get(field) if not entry: raise ValueError(f"Vector field '{field}' not available") obsm_keys = entry.get("obsm_keys") or {} obsm_key = obsm_keys.get(f"{d}d") if not obsm_key or obsm_key not in self.adata.obsm: raise ValueError(f"Vector field '{field}' does not provide {d}D data") cache_key = (field, d) if cache_key not in self._vector_field_cache: # Ensure embedding normalization scale is computed. self.get_embedding(d) scale_factor = float(self._embedding_norm.get(d, {}).get("scale_factor", 1.0)) raw = _to_dense_2d(self.adata.obsm[obsm_key]).astype(np.float32) if raw.ndim == 1: raw = raw.reshape(-1, 1) if raw.ndim != 2 or raw.shape[0] != self.n_cells or raw.shape[1] != d: raise ValueError( f"Vector field '{obsm_key}' has shape {tuple(raw.shape)}, expected ({self.n_cells}, {d})" ) if self.normalize_embeddings and scale_factor != 1.0: raw *= scale_factor self._vector_field_cache[cache_key] = raw data = self._vector_field_cache[cache_key].tobytes() if compress: return gzip.compress(data, compresslevel=6) return data
# ========================================================================= # LATENT SPACE FOR OUTLIER COMPUTATION # ========================================================================= def _get_latent_space(self) -> Optional[np.ndarray]: """Get latent space for outlier quantile computation.""" if self._latent_space is not None: return self._latent_space if self.latent_key is None or self.latent_key not in self.adata.obsm: return None self._latent_space = _to_dense_2d(self.adata.obsm[self.latent_key]).astype(np.float32) return self._latent_space # ========================================================================= # OBS DATA (CELL METADATA) # =========================================================================
[docs] def get_obs_keys(self) -> list[str]: """Get list of obs column names.""" self._check_closed() return list(self.adata.obs.columns)
[docs] def get_obs_field_kind(self, key: str) -> Literal["continuous", "category"]: """ Determine if an obs field is continuous or categorical. Classification rules: - Categorical dtype → category - Boolean dtype → category - Numeric dtype → continuous - String/object → category (treated as labels) - Empty column → category (safe default) """ self._check_closed() if key not in self.adata.obs.columns: raise KeyError(f"Field '{key}' not found in obs. Available: {list(self.adata.obs.columns)}") s = self.adata.obs[key] # Handle empty series if len(s) == 0: logger.warning(f"obs field '{key}' is empty, treating as categorical") return "category" # Check categorical dtype (pandas 2.0+ compatible) try: is_cat = isinstance(s.dtype, pd.CategoricalDtype) except AttributeError: # Fallback for older pandas versions is_cat = hasattr(s, 'cat') if is_cat: return "category" elif pd.api.types.is_bool_dtype(s): return "category" elif pd.api.types.is_numeric_dtype(s): # Check if all values are NaN - still treat as continuous return "continuous" elif pd.api.types.is_string_dtype(s) or pd.api.types.is_object_dtype(s): return "category" else: # Unknown dtype - safe default is categorical logger.warning(f"obs field '{key}' has unknown dtype {s.dtype}, treating as categorical") return "category"
[docs] def get_obs_continuous_values(self, key: str, compress: bool = False) -> bytes: """ Get continuous obs field as binary float32 data. NaN/Inf values are preserved in the output (client handles visualization). Raises ------ KeyError If the field is not found in obs. """ self._check_closed() if key not in self.adata.obs.columns: raise KeyError( f"obs field '{key}' not found. Available fields: {list(self.adata.obs.columns)}" ) s = self.adata.obs[key] values = pd.to_numeric(s, errors="coerce").to_numpy(dtype=np.float32) # Log warning for problematic data n_nan = np.isnan(values).sum() n_inf = np.isinf(values).sum() if n_nan > 0 or n_inf > 0: logger.debug(f"obs field '{key}': {n_nan} NaN, {n_inf} Inf values") data = values.tobytes() if compress: return gzip.compress(data, compresslevel=6) return data
[docs] def get_obs_categorical_codes(self, key: str, compress: bool = False) -> tuple[bytes, list[str], int]: """ Get categorical obs field as binary codes. Returns: (binary_codes, category_list, missing_value) Categories are assigned codes 0 to n-1. Missing values (NaN) are encoded as the missing_value sentinel. Raises ------ KeyError If the field is not found in obs. """ self._check_closed() if key not in self.adata.obs.columns: raise KeyError( f"obs field '{key}' not found. Available fields: {list(self.adata.obs.columns)}" ) s = self.adata.obs[key] cat = s.astype("category") categories = [str(c) for c in cat.cat.categories] codes = cat.cat.codes.to_numpy(dtype=np.int32) # Handle edge case: empty categories n_categories = len(categories) if n_categories == 0: logger.warning(f"obs field '{key}' has no categories (all values are NaN/missing)") # Return all missing values dtype = np.uint8 missing_value = 255 codes_typed = np.full(self.n_cells, missing_value, dtype=dtype) data = codes_typed.tobytes() if compress: data = gzip.compress(data, compresslevel=6) return data, [], int(missing_value) # Select dtype based on number of categories if n_categories <= 254: dtype = np.uint8 missing_value = 255 else: dtype = np.uint16 missing_value = 65535 # Convert codes (-1 for NaN -> missing_value) codes_typed = np.full(self.n_cells, missing_value, dtype=dtype) valid_mask = codes >= 0 codes_typed[valid_mask] = codes[valid_mask].astype(dtype) # Log missing value count n_missing = (~valid_mask).sum() if n_missing > 0: logger.debug(f"obs field '{key}': {n_missing} missing values out of {self.n_cells}") data = codes_typed.tobytes() if compress: data = gzip.compress(data, compresslevel=6) return data, categories, int(missing_value)
# ========================================================================= # CENTROIDS AND OUTLIER QUANTILES # ========================================================================= def _compute_centroids_for_field( self, key: str, dim: int, ) -> list[dict]: """Compute centroids for a categorical field at a given dimension.""" cache_key = f"{key}_{dim}d" if cache_key in self._centroid_cache: return self._centroid_cache[cache_key] coords = self.get_embedding(dim) s = self.adata.obs[key] cat = s.astype("category") categories = [str(c) for c in cat.cat.categories] codes = cat.cat.codes.to_numpy(dtype=np.int32) centroids = [] for code, label in enumerate(categories): mask = codes == code idx = np.nonzero(mask)[0] n = idx.size if n < self.centroid_min_points: continue pts = coords[idx, :] center = pts.mean(axis=0) # Remove outliers for centroid computation if n > self.centroid_min_points: dists = np.linalg.norm(pts - center, axis=1) thr = float(np.quantile(dists, self.centroid_outlier_quantile)) inlier_mask = dists <= thr n_in = int(inlier_mask.sum()) if n_in >= self.centroid_min_points: pts_in = pts[inlier_mask, :] center = pts_in.mean(axis=0) used_count = n_in else: used_count = n else: used_count = n centroids.append({ "category": str(label), "position": center.astype(float).tolist(), "n_points": int(used_count), }) self._centroid_cache[cache_key] = centroids return centroids
[docs] def get_centroids_for_field(self, key: str) -> dict[str, list[dict]]: """Get centroids for all available dimensions.""" result = {} for dim in self._available_dimensions: centroids = self._compute_centroids_for_field(key, dim) result[str(dim)] = centroids return result
def _compute_outlier_quantiles(self, key: str) -> np.ndarray: """Compute per-cell outlier quantiles for a categorical field.""" if key in self._outlier_quantile_cache: return self._outlier_quantile_cache[key] latent = self._get_latent_space() if latent is None: # Return all zeros if no latent space result = np.zeros(self.n_cells, dtype=np.float32) self._outlier_quantile_cache[key] = result return result s = self.adata.obs[key] cat = s.astype("category") categories = [str(c) for c in cat.cat.categories] codes = cat.cat.codes.to_numpy(dtype=np.int32) quantiles = np.full(self.n_cells, np.nan, dtype=np.float32) for code, label in enumerate(categories): mask = codes == code idx = np.nonzero(mask)[0] n = idx.size if n < self.centroid_min_points: continue pts = latent[idx, :] centroid = pts.mean(axis=0) dists = np.linalg.norm(pts - centroid, axis=1) sorted_dists = np.sort(dists) ranks = np.searchsorted(sorted_dists, dists, side='right') cell_quantiles = ranks.astype(np.float32) / n quantiles[idx] = cell_quantiles self._outlier_quantile_cache[key] = quantiles return quantiles
[docs] def get_obs_outlier_quantiles(self, key: str, compress: bool = False) -> bytes: """Get outlier quantiles as binary float32 data.""" quantiles = self._compute_outlier_quantiles(key) data = quantiles.tobytes() if compress: return gzip.compress(data, compresslevel=6) return data
# ========================================================================= # VAR DATA (GENE EXPRESSION) # =========================================================================
[docs] def get_gene_ids(self) -> list[str]: """Get list of gene identifiers.""" self._check_closed() if self._gene_ids_cache is not None: return self._gene_ids_cache if self.gene_id_column == "index" or self.gene_id_column is None: self._gene_ids_cache = self.adata.var.index.astype(str).tolist() else: if self.gene_id_column not in self.adata.var.columns: raise KeyError( f"gene_id_column '{self.gene_id_column}' not found in var. " f"Available columns: {list(self.adata.var.columns)}" ) self._gene_ids_cache = self.adata.var[self.gene_id_column].astype(str).tolist() # Build index for O(1) lookup self._gene_id_to_idx_cache = {gid: idx for idx, gid in enumerate(self._gene_ids_cache)} return self._gene_ids_cache
def _get_gene_idx(self, gene_id: str) -> int: """Get gene index with O(1) lookup.""" # Ensure cache is populated if self._gene_id_to_idx_cache is None: self.get_gene_ids() if gene_id not in self._gene_id_to_idx_cache: raise KeyError(f"Gene '{gene_id}' not found in var") return self._gene_id_to_idx_cache[gene_id] def _get_gene_column(self, gene_idx: int) -> np.ndarray: """ Extract a single gene column from the expression matrix. Handles multiple data formats efficiently: - Dense numpy arrays: Direct column access O(1) - CSC sparse matrices: Efficient column access O(nnz/n_cols) - CSR sparse matrices: Convert to CSC and cache for repeated access - Other sparse formats (COO, DOK, etc.): Convert to CSC and cache - None: Returns zeros (edge case for AnnData without X matrix) - Backed/chunked arrays: Direct slicing (no caching to preserve lazy loading) Memory-Speed Tradeoff: For in-memory CSR matrices, the CSC cache doubles memory usage but enables O(nnz/n_cols) column access instead of O(nnz). This pays off after accessing more than a few genes. IMPORTANT: For backed h5ad files, we do NOT create a CSC cache because: 1. It would force loading the entire X matrix into memory 2. It defeats the purpose of lazy loading for large datasets Instead, we accept slower column access to preserve memory efficiency. Parameters ---------- gene_idx : int Column index of the gene to extract. Returns ------- np.ndarray 1D array of expression values for all cells. """ self._check_closed() X = self.adata.X # Edge case: no expression matrix if X is None: logger.warning("AnnData has no X matrix. Returning zeros for gene expression.") return np.zeros(self.n_cells, dtype=np.float32) # Edge case: invalid gene index if gene_idx < 0 or gene_idx >= self.n_genes: raise IndexError(f"Gene index {gene_idx} out of range [0, {self.n_genes})") # CRITICAL: For backed h5ad files, avoid creating CSC cache to preserve # lazy loading behavior. The memory cost of CSC cache would defeat the # purpose of backed mode for large datasets. if self._is_lazy and self._source_type == "h5ad": # For backed h5ad, use direct slicing (slower but memory-efficient) # Note: backed h5ad matrices support slicing but may be slower col = X[:, gene_idx] if sparse.issparse(col): return np.asarray(col.toarray()).flatten() return np.asarray(col).flatten() if sparse.issparse(X): # For sparse matrix, CSC is efficient for column access if sparse.isspmatrix_csc(X): return np.asarray(X.getcol(gene_idx).toarray()).flatten() elif sparse.isspmatrix_csr(X): # CSR is inefficient for column access - convert to CSC and cache # This is a one-time cost that pays off with repeated gene queries # Only do this for in-memory data (not backed files) if self._X_csc_cache is None: logger.info( f"Converting CSR matrix ({X.shape[0]:,}×{X.shape[1]:,}, " f"{X.nnz:,} non-zeros) to CSC for efficient column access" ) self._X_csc_cache = X.tocsc() return np.asarray(self._X_csc_cache.getcol(gene_idx).toarray()).flatten() else: # Other sparse format (COO, LIL, DOK, BSR) - convert to CSC if self._X_csc_cache is None: logger.info(f"Converting {type(X).__name__} to CSC for column access") self._X_csc_cache = sparse.csc_matrix(X) return np.asarray(self._X_csc_cache.getcol(gene_idx).toarray()).flatten() else: # Dense matrix - direct column access return np.asarray(X[:, gene_idx]).flatten()
[docs] def get_gene_expression(self, gene_id: str, compress: bool = False) -> bytes: """Get expression values for a single gene as binary float32.""" # Check LRU cache first (O(1) operations) values = self._gene_expression_cache.get(gene_id) if values is None: # Fetch from data gene_idx = self._get_gene_idx(gene_id) col = self._get_gene_column(gene_idx) values = col.astype(np.float32) # Add to LRU cache (handles eviction automatically) self._gene_expression_cache.put(gene_id, values) data = values.tobytes() if compress: return gzip.compress(data, compresslevel=6) return data
[docs] def get_gene_min_max(self, gene_id: str) -> tuple[float, float]: """Get min/max values for a gene (for colormap scaling).""" gene_idx = self._get_gene_idx(gene_id) col = self._get_gene_column(gene_idx) valid = col[np.isfinite(col)] if len(valid) == 0: return 0.0, 1.0 return float(valid.min()), float(valid.max())
# ========================================================================= # CONNECTIVITY DATA # =========================================================================
[docs] def has_connectivity(self) -> bool: """Check if connectivity data is available.""" if self._closed: return False return "connectivities" in self.adata.obsp
def _compute_connectivity_edges(self) -> tuple[np.ndarray, np.ndarray, int, int]: """ Compute connectivity edges (cached). Returns: (sources_array, destinations_array, n_edges, max_neighbors) """ self._check_closed() if self._connectivity_cache is not None: return self._connectivity_cache if not self.has_connectivity(): raise ValueError("No connectivity data in adata.obsp['connectivities']") connectivities = self.adata.obsp["connectivities"] if not sparse.isspmatrix_csr(connectivities): connectivities = sparse.csr_matrix(connectivities) # Symmetrize and binarize (make a copy to avoid modifying original) connectivities_sym = (connectivities + connectivities.T).tocsr() connectivities_sym.data = np.ones_like(connectivities_sym.data) indptr = connectivities_sym.indptr indices = connectivities_sym.indices # Pre-allocate arrays for edges (upper bound: nnz/2) max_edges = connectivities_sym.nnz // 2 + 1 edge_sources = np.empty(max_edges, dtype=np.int64) edge_destinations = np.empty(max_edges, dtype=np.int64) edge_count = 0 max_neighbors = 0 for cell_idx in range(self.n_cells): start = indptr[cell_idx] end = indptr[cell_idx + 1] neighbor_count = end - start if neighbor_count > max_neighbors: max_neighbors = neighbor_count for j in range(start, end): neighbor_idx = indices[j] if cell_idx < neighbor_idx: edge_sources[edge_count] = cell_idx edge_destinations[edge_count] = neighbor_idx edge_count += 1 # Trim to actual size edge_sources = edge_sources[:edge_count] edge_destinations = edge_destinations[:edge_count] # Select dtype based on cell count if self.n_cells <= 65535: dtype = np.uint16 elif self.n_cells <= 4294967295: dtype = np.uint32 else: dtype = np.uint64 sources = edge_sources.astype(dtype) destinations = edge_destinations.astype(dtype) # Sort for better compression sort_idx = np.lexsort((destinations, sources)) sources = sources[sort_idx] destinations = destinations[sort_idx] self._connectivity_cache = (sources, destinations, edge_count, int(max_neighbors)) return self._connectivity_cache
[docs] def get_connectivity_edges(self, compress: bool = False) -> tuple[bytes, bytes, int, int]: """ Get connectivity edges as binary data. Returns: (sources_binary, destinations_binary, n_edges, max_neighbors) """ sources, destinations, n_edges, max_neighbors = self._compute_connectivity_edges() sources_data = sources.tobytes() destinations_data = destinations.tobytes() if compress: sources_data = gzip.compress(sources_data, compresslevel=6) destinations_data = gzip.compress(destinations_data, compresslevel=6) return sources_data, destinations_data, n_edges, max_neighbors
# ========================================================================= # MANIFEST GENERATION # =========================================================================
[docs] def get_dataset_identity(self) -> dict: """Generate dataset_identity.json content.""" # Count obs fields obs_keys = self.get_obs_keys() n_categorical = sum(1 for k in obs_keys if self.get_obs_field_kind(k) == "category") n_continuous = sum(1 for k in obs_keys if self.get_obs_field_kind(k) == "continuous") # Check connectivity has_conn = self.has_connectivity() n_edges = None if has_conn: try: _, _, n_edges, _ = self.get_connectivity_edges(compress=False) except Exception: pass # Build embeddings metadata embeddings_meta = { "available_dimensions": self._available_dimensions, "default_dimension": self._default_dimension, "files": {f"{dim}d": f"points_{dim}d.bin" for dim in self._available_dimensions} } if self._umap_resolution is not None: embeddings_meta["umap_resolution"] = self._umap_resolution # Build obs field summaries obs_fields = [] for key in obs_keys: kind = self.get_obs_field_kind(key) entry = {"key": key, "kind": kind} if kind == "category": s = self.adata.obs[key] cat = s.astype("category") entry["n_categories"] = len(cat.cat.categories) obs_fields.append(entry) identity = { "version": 2, "id": self.dataset_id, "name": self.dataset_name, "description": "Loaded directly from AnnData", "created_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "cellucid_data_version": "anndata_adapter", "stats": { "n_cells": self.n_cells, "n_genes": self.n_genes, "n_obs_fields": len(obs_keys), "n_categorical_fields": n_categorical, "n_continuous_fields": n_continuous, "has_connectivity": has_conn, "n_edges": n_edges, }, "embeddings": embeddings_meta, "obs_fields": obs_fields, "export_settings": { # No file compression (data is served dynamically) # Network compression (gzip) is applied transparently by the server "compression": None, # No quantization - full float32 precision for all values # This ensures no data loss but increases network transfer size "var_quantization": None, "obs_continuous_quantization": None, "obs_categorical_dtype": "auto", }, "source": { "type": "anndata_adapter", "format": self._source_type, # "h5ad", "zarr", or "memory" "path": self._source_path, "is_backed": self.is_backed, }, # Special flag to indicate this is from AnnData adapter "_anndata_adapter": True, } if self._vector_fields_metadata: identity["vector_fields"] = self._vector_fields_metadata return identity
[docs] def get_obs_manifest(self) -> dict: """Generate obs_manifest.json content.""" obs_keys = self.get_obs_keys() continuous_fields = [] categorical_fields = [] for key in obs_keys: safe_key = _safe_filename_component(key) kind = self.get_obs_field_kind(key) if kind == "continuous": s = self.adata.obs[key] values = pd.to_numeric(s, errors="coerce").to_numpy(dtype=np.float32) valid = values[np.isfinite(values)] min_val = float(valid.min()) if len(valid) > 0 else 0.0 max_val = float(valid.max()) if len(valid) > 0 else 1.0 continuous_fields.append([key, min_val, max_val]) else: # Categorical s = self.adata.obs[key] cat = s.astype("category") categories = [str(c) for c in cat.cat.categories] n_categories = len(categories) dtype_str = "uint8" if n_categories <= 254 else "uint16" missing_value = 255 if n_categories <= 254 else 65535 # Get centroids for all dimensions centroids_by_dim = self.get_centroids_for_field(key) # Outlier quantile min/max (always 0-1 since it's a quantile) categorical_fields.append([ key, categories, dtype_str, missing_value, centroids_by_dim, 0.0, 1.0 ]) # Build compact manifest format # Path patterns must match prepare format for consistency # Note: We don't use compression (.gz) in adapter mode - gzip is applied # at the HTTP level when client supports Accept-Encoding: gzip obs_schemas = { "continuous": { "pathPattern": "obs/{key}.values.f32", "ext": "f32", "dtype": "float32", "quantized": False, }, "categorical": { "codesPathPattern": "obs/{key}.codes.{ext}", "outlierPathPattern": "obs/{key}.outliers.f32", "outlierExt": "f32", "outlierDtype": "float32", "outlierQuantized": False, } } return { "_format": "compact_v1", "n_points": self.n_cells, "centroid_outlier_quantile": self.centroid_outlier_quantile, "latent_key": self.latent_key, "compression": None, "_obsSchemas": obs_schemas, "_continuousFields": continuous_fields, "_categoricalFields": categorical_fields, }
[docs] def get_var_manifest(self) -> dict: """Generate var_manifest.json content.""" gene_ids = self.get_gene_ids() # Build field list with min/max values # Note: Computing min/max for all genes upfront would be slow # We'll compute on-demand and cache, or use a placeholder fields = [[gid] for gid in gene_ids] # No min/max by default # Path pattern must match prepare format for consistency # Note: We don't use compression (.gz) in adapter mode - gzip is applied # at the HTTP level when client supports Accept-Encoding: gzip var_schema = { "kind": "continuous", "pathPattern": "var/{key}.values.f32", "ext": "f32", "dtype": "float32", "quantized": False, } return { "_format": "compact_v1", "n_points": self.n_cells, "var_gene_id_column": self.gene_id_column, "compression": None, "quantization": None, "_varSchema": var_schema, "fields": fields, }
[docs] def get_connectivity_manifest(self) -> Optional[dict]: """Generate connectivity_manifest.json content.""" if not self.has_connectivity(): return None try: _, _, n_edges, max_neighbors = self.get_connectivity_edges(compress=False) except Exception as e: logger.error(f"Failed to compute connectivity: {e}") return None # Determine index dtype if self.n_cells <= 65535: index_dtype = "uint16" index_bytes = 2 elif self.n_cells <= 4294967295: index_dtype = "uint32" index_bytes = 4 else: index_dtype = "uint64" index_bytes = 8 return { "format": "edge_pairs", "n_cells": self.n_cells, "n_edges": n_edges, "max_neighbors": max_neighbors, "index_bytes": index_bytes, "index_dtype": index_dtype, "sourcesPath": "connectivity/edges.src.bin", "destinationsPath": "connectivity/edges.dst.bin", "compression": None, }
# ========================================================================= # CLEANUP AND CONTEXT MANAGER # =========================================================================
[docs] def close(self) -> None: """ Close the adapter and release all resources. This method: 1. Clears all caches to free memory (embedding, centroid, CSC, gene expression) 2. Closes the underlying file handle for backed h5ad files 3. Marks the adapter as closed to prevent further operations Safe to call multiple times. Always call this method when done with the adapter, or use the context manager:: with AnnDataAdapter.from_file("data.h5ad") as adapter: # use adapter # automatically cleaned up Memory Released --------------- - Embedding cache (normalized UMAP coordinates) - Centroid cache (computed label centroids) - Outlier quantile cache - Gene expression LRU cache (up to 100 gene columns) - CSC matrix cache (for CSR->CSC converted matrices) - Latent space array - Gene ID lookup indices """ if self._closed: return # Check if backed BEFORE setting _closed flag # (is_backed property returns False after _closed is True) was_backed = False file_to_close = None try: was_backed = self.adata.isbacked if was_backed and hasattr(self.adata, 'file'): file_to_close = self.adata.file except Exception: pass # adata may already be in invalid state self._closed = True # Clear all caches to free memory self._embedding_cache.clear() self._embedding_norm.clear() self._vector_field_cache.clear() self._vector_fields_metadata = None self._centroid_cache.clear() self._outlier_quantile_cache.clear() self._gene_expression_cache.clear() self._latent_space = None self._connectivity_cache = None self._gene_ids_cache = None self._gene_id_to_idx_cache = None # Clear CSC cache (can be large for sparse matrices) # This is critical for memory management - CSC cache can double memory usage if self._X_csc_cache is not None: try: # For very large sparse matrices, explicit deletion helps GC del self._X_csc_cache except Exception: pass self._X_csc_cache = None # Close backed file handle if applicable if file_to_close is not None: try: file_to_close.close() logger.debug(f"Closed backed file handle for {self.dataset_name}") except Exception as e: logger.warning(f"Error closing backed AnnData file: {e}") # Clear reference to adata to help garbage collection # (do this last since we needed it for is_backed check) self.adata = None logger.debug(f"AnnDataAdapter closed: {self.dataset_name}")
def __enter__(self) -> "AnnDataAdapter": """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: """Context manager exit - ensures cleanup.""" self.close() def __del__(self): """ Cleanup on garbage collection. Note: Using __del__ is an anti-pattern but kept for safety. Users should prefer: - Using context manager: `with AnnDataAdapter(...) as adapter:` - Explicitly calling close(): `adapter.close()` """ try: self.close() except Exception: # Ignore errors during GC - object may be partially destroyed pass def __repr__(self) -> str: if self._closed: return f"AnnDataAdapter(closed)" backed_str = " (backed)" if self.is_backed else "" source_str = f" from {self._source_type}" if self._source_type != "memory" else "" return (f"AnnDataAdapter({self.n_cells:,} cells, {self.n_genes:,} genes, " f"dims={self._available_dimensions}{backed_str}{source_str})")