Source code for cellucid.anndata_server

"""
AnnData Server for Cellucid

HTTP server that serves AnnData data dynamically in the Cellucid format.
This allows direct visualization of AnnData files without pre-export.

Supports:
- h5ad files: Single HDF5 files (backed mode for lazy loading)
- zarr stores: Directory-based stores (inherently lazy-loaded)
- In-memory AnnData objects

Usage:
    from cellucid.anndata_server import serve_anndata

    # Serve an h5ad file
    serve_anndata("/path/to/data.h5ad")

    # Serve a zarr store (must be a directory)
    serve_anndata("/path/to/data.zarr")

    # Or an in-memory AnnData
    serve_anndata(adata)
"""

from __future__ import annotations

import logging
import re
import socket
import threading
import webbrowser
from functools import partial
from http import HTTPStatus
from http.server import HTTPServer, SimpleHTTPRequestHandler
from pathlib import Path
from typing import TYPE_CHECKING, Optional, Union
from urllib.parse import parse_qs, unquote, urlparse

from .anndata_adapter import AnnDataAdapter, _safe_filename_component
from ._server_base import (
    CORSMixin,
    DEFAULT_PORT,
    DEFAULT_HOST,
    ensure_port_available,
    print_step,
    print_detail,
    print_success,
    print_server_banner,
)

if TYPE_CHECKING:
    import anndata

logger = logging.getLogger("cellucid.anndata_server")


class AnnDataRequestHandler(CORSMixin, SimpleHTTPRequestHandler):
    """
    HTTP handler for serving AnnData data in Cellucid format.

    Routes:
        /dataset_identity.json - Dataset metadata
        /obs_manifest.json - Cell metadata manifest
        /var_manifest.json - Gene expression manifest
        /connectivity_manifest.json - Connectivity manifest
        /points_{dim}d.bin - Embedding coordinates
        /vectors/{fieldId}_{dim}d.bin - Vector field displacement vectors
        /obs/{field}.values.f32.bin - Continuous obs field
        /obs/{field}.codes.{ext}.bin - Categorical obs field codes
        /obs/{field}.outliers.f32.bin - Categorical outlier quantiles
        /var/{gene}.values.f32.bin - Gene expression values
        /connectivity/edges.src.bin - Edge sources
        /connectivity/edges.dst.bin - Edge destinations
        /_cellucid/health - Health check
        /_cellucid/info - Server info
        /_cellucid/events - POST endpoint for frontend events (hooks)
    """

    allow_caching = False  # Data is dynamic, don't cache

    def __init__(
        self,
        *args,
        adapter: AnnDataAdapter,
        server_info: dict,
        dataset_id: str = "",
        web_proxy: bool = False,
        **kwargs,
    ):
        self.adapter = adapter
        self.server_info = server_info
        self.dataset_id = dataset_id  # Cached for path prefix stripping
        self.web_proxy = bool(web_proxy)
        # We serve "virtual" files via explicit route handlers, but still want
        # the normal BaseHTTPRequestHandler lifecycle (parse_request, etc.).
        # `directory` is unused because we never call the default file-serving
        # paths for AnnData mode.
        super().__init__(*args, **kwargs)

    def end_headers(self):
        """Add CORS headers to every response."""
        self.add_cors_headers()
        super().end_headers()

    def do_POST(self):
        """Handle POST requests (events from frontend)."""
        if self.handle_event_post():
            return
        if self.handle_session_bundle_post():
            return
        # No other POST endpoints - return 404
        self.send_error_response(404, f"POST not supported for path: {self.path}")

    def do_HEAD(self):
        """Handle HEAD requests (for urlExists checks)."""
        self.do_GET(head_only=True)

    def do_GET(self, head_only: bool = False):
        """Handle GET requests."""
        parsed = urlparse(self.path)
        raw_path = unquote(parsed.path)

        # Proxy the hosted viewer UI assets so the viewer runs on the same
        # origin as the AnnData server (avoids mixed-content).
        if self.web_proxy and self.handle_web_proxy_get(raw_path, head_only=head_only):
            return

        path = raw_path.lstrip("/")

        # Strip dataset ID prefix if present (frontend may include it)
        # e.g., "my_dataset/points_3d.bin" -> "points_3d.bin"
        if self.dataset_id and path.startswith(f"{self.dataset_id}/"):
            path = path[len(self.dataset_id) + 1:]

        # Check for gzip support
        accept_encoding = self.headers.get("Accept-Encoding", "")
        supports_gzip = "gzip" in accept_encoding

        try:
            # Root path - redirect to viewer
            if path == "" or path == "index.html":
                # Always serve the viewer UI from this server via the hosted-asset proxy.
                if self.handle_web_proxy_get("/index.html", head_only=head_only):
                    return
                self.send_error_response(503, "Cellucid viewer UI unavailable")
                return

            if path == "_cellucid/health":
                self.send_json({
                    "status": "ok",
                    "type": "anndata",
                    "version": self.server_info.get("version", "unknown"),
                    "format": self.server_info.get("format", "unknown"),
                    "is_backed": self.server_info.get("is_backed", False),
                    "n_cells": self.adapter.n_cells,
                    "n_genes": self.adapter.n_genes,
                }, head_only)
            elif path == "_cellucid/info":
                self.send_json(self.server_info, head_only)
            elif path == "_cellucid/datasets":
                # Return a single dataset entry for the loaded AnnData
                identity = self.adapter.get_dataset_identity()
                datasets = [{
                    "id": identity.get("id", "anndata"),
                    "path": "/",
                    "name": identity.get("name", "AnnData"),
                }]
                self.send_json({"datasets": datasets}, head_only)
            elif path == "dataset_identity.json":
                self.send_json(self.adapter.get_dataset_identity(), head_only)
            elif path == "obs_manifest.json":
                self.send_json(self.adapter.get_obs_manifest(), head_only)
            elif path == "var_manifest.json":
                self.send_json(self.adapter.get_var_manifest(), head_only)
            elif path == "connectivity_manifest.json":
                manifest = self.adapter.get_connectivity_manifest()
                if manifest:
                    self.send_json(manifest, head_only)
                else:
                    self.send_error_response(404, "No connectivity data")
            elif path.startswith("points_") and path.endswith(".bin"):
                self._handle_points(path, head_only, supports_gzip)
            elif path.startswith("obs/"):
                self._handle_obs(path, head_only, supports_gzip)
            elif path.startswith("var/"):
                self._handle_var(path, head_only, supports_gzip)
            elif path.startswith("connectivity/"):
                self._handle_connectivity(path, head_only, supports_gzip)
            elif path.startswith("vectors/"):
                self._handle_vector_fields(path, head_only, supports_gzip)
            else:
                self.send_error_response(404, f"Not found: {path}")

        except Exception:
            logger.exception("Error handling %s", path)
            self.send_error_response(500, "Internal server error")

    def _handle_points(self, path: str, head_only: bool, supports_gzip: bool):
        """Handle points_Xd.bin requests."""
        # Parse dimension from filename: points_1d.bin, points_2d.bin, points_3d.bin
        # Also handle .gz suffix for explicit compressed requests
        clean_path = path[:-3] if path.endswith(".gz") else path
        match = re.match(r"points_(\d)d\.bin", clean_path)
        if not match:
            self.send_error_response(404, f"Invalid points path: {path}")
            return

        dim = int(match.group(1))
        try:
            # Compress if client supports gzip (transparent compression for better network perf)
            compress = supports_gzip
            data = self.adapter.get_points_binary(dim, compress=compress)
            self.send_binary(data, head_only=head_only, compressed=compress)
        except ValueError:
            self.send_error_response(404, "Points data not available")

    def _handle_vector_fields(self, path: str, head_only: bool, supports_gzip: bool):
        """
        Handle vector field requests under /vectors/.

        Supported paths:
        - vectors/{fieldId}_{dim}d.bin
        - vectors/{fieldId}_{dim}d.bin.gz  (raw gzip bytes, no Content-Encoding)
        """
        raw_gz = path.endswith(".gz")
        clean_path = path[:-3] if raw_gz else path
        match = re.match(r"vectors/(.+)_([123])d\.bin$", clean_path)
        if not match:
            self.send_error_response(404, f"Invalid vectors path: {path}")
            return

        field_id = match.group(1)
        dim = int(match.group(2))

        try:
            if raw_gz:
                data = self.adapter.get_vector_field_binary(field_id, dim, compress=True)
                self.send_binary(data, head_only=head_only, compressed=False)
            else:
                compress = supports_gzip
                data = self.adapter.get_vector_field_binary(field_id, dim, compress=compress)
                self.send_binary(data, head_only=head_only, compressed=compress)
        except ValueError as e:
            self.send_error_response(404, str(e))

    def _handle_obs(self, path: str, head_only: bool, supports_gzip: bool):
        """
        Handle obs field requests.

        Supported paths:
        - obs/{field}.values.f32
        - obs/{field}.codes.u8 or obs/{field}.codes.u16
        - obs/{field}.outliers.f32
        """
        # Remove 'obs/' prefix
        filename = path[4:]

        # Strip suffixes for parsing
        if filename.endswith(".gz"):
            filename = filename[:-3]

        # Compress if client supports gzip (transparent compression)
        compress = supports_gzip

        # Parse filename: {field}.{type}.{dtype}
        # Examples: cell_type.codes.u8, n_counts.values.f32, cluster.outliers.f32
        parts = filename.rsplit(".", 2)  # Split from right: [field, type, dtype]
        if len(parts) < 3:
            self.send_error_response(404, f"Invalid obs path format: {path}")
            return

        field_name = parts[0]
        data_type = parts[1]  # 'values', 'codes', or 'outliers'

        # Find the actual field key (may differ from safe filename)
        obs_keys = self.adapter.get_obs_keys()
        actual_key = None
        for key in obs_keys:
            if _safe_filename_component(key) == field_name:
                actual_key = key
                break

        if actual_key is None:
            self.send_error_response(404, f"Obs field not found: {field_name}")
            return

        try:
            if data_type == "values":
                # Continuous field values
                data = self.adapter.get_obs_continuous_values(actual_key, compress=compress)
                self.send_binary(data, head_only=head_only, compressed=compress)
            elif data_type == "codes":
                # Categorical codes
                data, categories, missing = self.adapter.get_obs_categorical_codes(actual_key, compress=compress)
                self.send_binary(data, head_only=head_only, compressed=compress)
            elif data_type == "outliers":
                # Outlier quantiles for categorical field
                data = self.adapter.get_obs_outlier_quantiles(actual_key, compress=compress)
                self.send_binary(data, head_only=head_only, compressed=compress)
            else:
                self.send_error_response(404, f"Unknown obs data type: {data_type} (expected values, codes, or outliers)")
        except Exception:
            logger.exception("Error handling obs request: %s", path)
            self.send_error_response(500, "Internal server error")

    def _handle_var(self, path: str, head_only: bool, supports_gzip: bool):
        """
        Handle var (gene expression) requests.

        Supported paths:
        - var/{gene}.values.f32
        """
        # Remove 'var/' prefix
        filename = path[4:]

        # Strip suffixes for parsing
        if filename.endswith(".gz"):
            filename = filename[:-3]

        # Compress if client supports gzip (transparent compression)
        compress = supports_gzip

        # Check format: {gene}.values.f32
        if not filename.endswith(".values.f32"):
            self.send_error_response(404, f"Invalid var path format: {path} (expected {{gene}}.values.f32)")
            return

        gene_safe = filename[:-11]  # Remove '.values.f32'

        # Find actual gene ID (may differ from safe filename due to special characters)
        gene_ids = self.adapter.get_gene_ids()
        actual_gene = None

        # First try safe filename match
        for gid in gene_ids:
            if _safe_filename_component(gid) == gene_safe:
                actual_gene = gid
                break

        # Fallback: try exact match (in case gene ID is already safe)
        if actual_gene is None and gene_safe in gene_ids:
            actual_gene = gene_safe

        if actual_gene is None:
            self.send_error_response(404, f"Gene not found: {gene_safe}")
            return

        try:
            data = self.adapter.get_gene_expression(actual_gene, compress=compress)
            self.send_binary(data, head_only=head_only, compressed=compress)
        except KeyError:
            self.send_error_response(404, "Gene not found")
        except Exception:
            logger.exception("Error handling var request: %s", path)
            self.send_error_response(500, "Internal server error")

    def _handle_connectivity(self, path: str, head_only: bool, supports_gzip: bool):
        """
        Handle connectivity edge requests.

        Supported paths:
        - connectivity/edges.src.bin - Edge source indices
        - connectivity/edges.dst.bin - Edge destination indices
        """
        filename = path.split("/")[-1]

        # Strip suffixes for parsing
        if filename.endswith(".gz"):
            filename = filename[:-3]

        # Compress if client supports gzip (transparent compression)
        compress = supports_gzip

        try:
            sources_data, dests_data, n_edges, max_neighbors = self.adapter.get_connectivity_edges(compress=compress)

            if filename == "edges.src.bin":
                self.send_binary(sources_data, head_only=head_only, compressed=compress)
            elif filename == "edges.dst.bin":
                self.send_binary(dests_data, head_only=head_only, compressed=compress)
            else:
                self.send_error_response(404, f"Unknown connectivity file: {filename} (expected edges.src.bin or edges.dst.bin)")
        except ValueError:
            self.send_error_response(404, "Connectivity data not available")
        except Exception:
            logger.exception("Error handling connectivity request: %s", path)
            self.send_error_response(500, "Internal server error")

    def log_message(self, format: str, *args):
        """Override to use Python logging."""
        logger.debug("%s - %s", self.address_string(), format % args)


[docs] class AnnDataServer: """ Server for serving AnnData data in Cellucid format. Example: server = AnnDataServer(adata) server.start() # Blocking # Or non-blocking: server.start_background() # ... do other things ... server.stop() """
[docs] def __init__( self, data: Union[str, Path, "anndata.AnnData"], port: int = DEFAULT_PORT, host: str = DEFAULT_HOST, open_browser: bool = False, quiet: bool = False, **adapter_kwargs, ): """ Initialize the server. Parameters ---------- data : str, Path, or AnnData Path to h5ad file or AnnData object. port : int Port to serve on. host : str Host to bind to. open_browser : bool Whether to open browser on start. quiet : bool Suppress info messages. **adapter_kwargs Additional arguments passed to AnnDataAdapter. """ self.port = port self.host = host self.open_browser = open_browser self.quiet = quiet # Local web assets and the legacy hosted-viewer mode are intentionally disabled in dev. # We always serve the UI from this server via the hosted-asset proxy to avoid mixed-content. self.web_proxy = True # Step 1: Detect format if isinstance(data, (str, Path)): data_path = Path(data) is_zarr = str(data).endswith('.zarr') or data_path.is_dir() format_name = "zarr" if is_zarr else "h5ad" self.data_source = str(data) self.data_format = format_name if not quiet: print_step(1, 4, "Detecting format") print_detail("Path", str(data)) print_detail("Format", format_name) print_success("Format detected") else: # In-memory AnnData self.data_source = "in-memory AnnData" self.data_format = "in-memory" if not quiet: print_step(1, 4, "Detecting format") print_detail("Source", "in-memory AnnData") print_success("Format detected") # Step 2: Load file if not quiet: print_step(2, 4, "Loading AnnData") backed = adapter_kwargs.get("backed", True) mode = "backed (lazy loading)" if backed else "in-memory" print_detail("Mode", mode) if isinstance(data, (str, Path)): self.adapter = AnnDataAdapter.from_file(data, **adapter_kwargs) else: self.adapter = AnnDataAdapter(data, **adapter_kwargs) if not quiet: print_success("File opened") # Step 3: Analyze dataset if not quiet: print_step(3, 4, "Analyzing dataset") self._print_dataset_info() print_success("Analysis complete") self._server: Optional[HTTPServer] = None self._thread: Optional[threading.Thread] = None self._running = False # Version try: from . import __version__ except ImportError: __version__ = "0.0.0" self.server_info = { "version": __version__, "type": "anndata", "format": self.data_format, "host": self.host, "port": self.port, "n_cells": self.adapter.n_cells, "n_genes": self.adapter.n_genes, "is_backed": self.adapter.is_backed, }
def _print_dataset_info(self): """Print information about the loaded dataset.""" print_detail("Cells", f"{self.adapter.n_cells:,}") print_detail("Genes", f"{self.adapter.n_genes:,}") # Get embedding info dims = self.adapter._available_dimensions dim_str = ", ".join(f"{d}D" for d in sorted(dims)) if dims else "none" print_detail("Embeddings", dim_str) # Count obs field types obs_keys = self.adapter.get_obs_keys() n_categorical = sum(1 for k in obs_keys if self.adapter.get_obs_field_kind(k) == "category") n_continuous = sum(1 for k in obs_keys if self.adapter.get_obs_field_kind(k) == "continuous") print_detail("Obs fields", f"{n_categorical} categorical, {n_continuous} continuous") # Check connectivity has_conn = self.adapter.has_connectivity() if has_conn: try: _, _, n_edges, _ = self.adapter.get_connectivity_edges(compress=False) conn_str = f"yes ({n_edges:,} edges)" except Exception: conn_str = "yes" else: conn_str = "no" print_detail("Connectivity", conn_str) @property def url(self) -> str: """Get the server URL.""" return f"http://{self.host}:{self.port}" @property def viewer_url(self) -> str: """Get the full URL to open the viewer.""" return f"{self.url}/?anndata=true"
[docs] def start(self, blocking: bool = True): """Start the server.""" if self._running: logger.warning("Server is already running") return # Step 4: Start server if not self.quiet: print_step(4, 4, "Starting server") # Pre-warm the hosted-asset proxy cache so the first browser load has a # visible progress indicator and offline/airgapped sessions are less # likely to hit missing lazily-loaded assets. if self.web_proxy and not self.quiet: try: from .web_cache import ensure_web_ui_cached print_detail("Viewer UI cache", "prefetching (one-time per web build)") ensure_web_ui_cached(force=False, show_progress=True) print_success("Viewer UI cached") except Exception as e: print_detail("Viewer UI cache", f"prefetch failed: {e}") # Ensure port is available (finds new one if needed) self.port = ensure_port_available(self.host, self.port, self.quiet) self.server_info["port"] = self.port # Create handler with adapter # Cache dataset_id for efficient path prefix stripping dataset_id = self.adapter.get_dataset_identity().get("id", "") handler = partial( AnnDataRequestHandler, adapter=self.adapter, server_info=self.server_info, dataset_id=dataset_id, web_proxy=self.web_proxy, ) self._server = HTTPServer((self.host, self.port), handler) self._running = True if not self.quiet: print_success("Server ready") print_server_banner(self.url, self.viewer_url) if self.open_browser: webbrowser.open(self.viewer_url) if blocking: try: self._server.serve_forever() except KeyboardInterrupt: if not self.quiet: print("\nShutting down server...") # Don't call shutdown() here - it would deadlock since we're in the same thread # Just close the server and cleanup self._running = False if self._server: self._server.server_close() self._server = None if hasattr(self, 'adapter') and self.adapter: self.adapter.close() if not self.quiet: print("AnnData server stopped") else: self._thread = threading.Thread(target=self._server.serve_forever, daemon=True) self._thread.start()
[docs] def start_background(self): """Start the server in a background thread.""" self.start(blocking=False)
[docs] def stop(self): """Stop the server and cleanup resources.""" self._running = False if self._server: # shutdown() tells serve_forever() to stop, but we also need # server_close() to release the socket immediately self._server.shutdown() self._server.server_close() self._server = None # IMPORTANT: Close the adapter to release memory and file handles if hasattr(self, 'adapter') and self.adapter: self.adapter.close() if not self.quiet: print("AnnData server stopped")
[docs] def is_running(self) -> bool: """Check if the server is running.""" return self._running
[docs] def wait(self): """Wait for the server to stop.""" if self._thread: try: while self._running: self._thread.join(timeout=1) except KeyboardInterrupt: self.stop()
[docs] def serve_anndata( data: Union[str, Path, "anndata.AnnData"], port: int = DEFAULT_PORT, host: str = DEFAULT_HOST, open_browser: bool = True, quiet: bool = False, **kwargs, ) -> AnnDataServer: """ Serve an AnnData object or h5ad file directly. This is a convenience function for quickly visualizing AnnData. For production use, consider using prepare instead. Parameters ---------- data : str, Path, or AnnData Path to h5ad file or AnnData object. port : int Port to serve on (default: 8765). host : str Host to bind to (default: 127.0.0.1). open_browser : bool Whether to open browser (default: True). quiet : bool Suppress info messages. **kwargs Additional arguments passed to AnnDataAdapter. Returns ------- AnnDataServer The running server instance. Example ------- >>> from cellucid import serve_anndata >>> serve_anndata("/path/to/data.h5ad") >>> # Or with in-memory AnnData >>> import anndata as ad >>> adata = ad.read_h5ad("data.h5ad") >>> serve_anndata(adata) """ server = AnnDataServer( data=data, port=port, host=host, open_browser=open_browser, quiet=quiet, **kwargs, ) server.start() return server