Tutorial: Scanpy + Dask-AnnData

This notebook demonstrates converting a TileDB-SOMA ExperimentAxisQuery to an anndata whose X matrix is a Dask Array, and then running Scanpy’s distributed HVG and PCA implementations on it.

In addition to dask, distributed, and tiledbsoma, this notebook requires dask-ml to be installed; here are some known-good pins:

pip install dask==2025.2.0 distributed==2025.2.0 dask-ml==2024.4.4 scanpy==1.11.0 scikit-learn==1.5.2

The first cell below contains Papermill parameters, which can be overridden when running this notebook programmatically, e.g.:

name=tutorial_scanpy_pca_dask
papermill -p tissue lung -p dask_chunk_size 200_000 $name.ipynb $name-lung_200k.ipynb
[1]:
from os import cpu_count

from psutil import virtual_memory

available_mem = virtual_memory().available

# Census
measurement_name = "RNA"             # CELLxGENE Census measurement name
layer = "raw"                        # Read "raw" transcript counts
census = "2025-01-30"                # Recent CELLxGENE Census version
tissue = "liver"                     # Query cells matching this "tissue_general" value
var_filter = None                    # Select all genes, by default

# Dask
dask_chunk_size = 100_000            # Number of "X" matrix rows per Dask task
worker_mem_allowance = 10 * 2**30    # 100k cell-rows per chunk uses about 10GiB in each worker
n_dask_procs = max(2, min(           # Number of Dask worker processes to spawn
    cpu_count(),
    available_mem // worker_mem_allowance,
))
dask_disable_worker_mem_mgmt = True  # Disable spilling and worker pausing/terminating; they tend to harm more than help
dask_graph_warn_size = "20MB"        # Relax default (10MB) "Sending large graph" warning threshold ("distributed.admin.large-graph-warning-threshold" config key)
dashboard_port = 8787                # Port for the Dask web UI (for following jobs in real-time)

# Scanpy
hvg_inplace = True                   # Scanpy HVG: "inplace=True" by default
hvg_subset = False                   # Scanpy HVG: "subset=False" by default

Open experiment, initialize query

Open the CELLxGENE Census Experiment:

[2]:
import scanpy as sc
from somacore import AxisQuery

from tiledbsoma import Experiment, SOMATileDBContext

tiledb_config = {
    "vfs.s3.no_sign_request": "true",
    "vfs.s3.region": "us-west-2",
}
exp_uri = f"s3://cellxgene-census-public-us-west-2/cell-census/{census}/soma/census_data/homo_sapiens"
exp = Experiment.open(
    exp_uri,
    context=SOMATileDBContext(tiledb_config=tiledb_config),
)
exp
[2]:
<Experiment 's3://cellxgene-census-public-us-west-2/cell-census/2025-01-30/soma/census_data/homo_sapiens' (open for 'r') (2 items)
    'ms': 's3://cellxgene-census-public-us-west-2/cell-census/2025-01-30/soma/census_data/homo_sapiens/ms' (unopened)
    'obs': 's3://cellxgene-census-public-us-west-2/cell-census/2025-01-30/soma/census_data/homo_sapiens/obs' (unopened)>

Set up query for cells from the given tissue:

[3]:
%%time
obs_filter = "is_primary_data == True"  # Deduplicate / only fetch primary representations of cells
if tissue:
    obs_filter = f'tissue_general == "{tissue}" and {obs_filter}'
query = exp.axis_query(
    measurement_name=measurement_name,
    obs_query=AxisQuery(value_filter=obs_filter) if obs_filter else None,
    var_query=AxisQuery(value_filter=var_filter) if var_filter else None,
)
CPU times: user 11.8 ms, sys: 3.04 ms, total: 14.8 ms
Wall time: 158 ms

Check the number of cells responsive to the query:

[4]:
%%time
query.n_obs
CPU times: user 4.05 s, sys: 1.6 s, total: 5.65 s
Wall time: 968 ms
[4]:
1429379

Initialize Dask

Create a cluster of local processes:

[5]:
import dask
import dask_ml  # Used by `sc.pp.pca` later, fail early here if not installed  # noqa: F401
from dask.distributed import Client, LocalCluster

if dask_disable_worker_mem_mgmt:
    # Dask spilling and worker pause/terminate logic mostly interferes with our usage.
    # It doesn't take into account available swap space and, once workers begin to spill
    # or pause, the HVG/PCA jobs below never recover.
    dask.config.set(
        {
            # "distributed.worker.memory.target": 0.7,
            "distributed.worker.memory.spill": False,
            "distributed.worker.memory.pause": False,
            "distributed.worker.memory.terminate": False,
        }
    )

if dask_graph_warn_size:
    # Relax default (10MB) "Sending large graph" warning threshold
    dask.config.set({ "distributed.admin.large-graph-warning-threshold": dask_graph_warn_size })

if dask_chunk_size:
    cluster = LocalCluster(
        n_workers=n_dask_procs,
        threads_per_worker=1,  # Performance is better with just one thread per worker process
        dashboard_address=f":{dashboard_port}",
    )
    print(f"{n_dask_procs=}")
    client = Client(cluster)
else:
    dask_chunk_size = None
    client = None
client
n_dask_procs=24
[5]:

Client

Client-998e4379-1ffa-11f0-9ce2-0a61715a5721

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

It’s recommended to use one worker thread per Dask process (threads_per_worker=1 above). Performance (wall-clock and memory use) suffers when more than one Dask worker thread runs in the same Dask worker process.

It’s also encouraged to open the dashboard at 127.0.0.1:8787/status, to follow along with tasks, progress, and memory use in real-time. Here’s an example screenshot of that UI, from a completed run of this notebook:

Screenshot of Dask web UI after running this notebook

Convert query to Dask-backed anndata

[6]:
%%time
adata = query.to_anndata(
    layer,
    dask=dict(
        chunk_size=dask_chunk_size,
        tiledb_config=tiledb_config,
    ),
)
adata
CPU times: user 10.5 s, sys: 9.76 s, total: 20.2 s
Wall time: 10.1 s
[6]:
AnnData object with n_obs × n_vars = 1429379 × 61888
    obs: 'soma_joinid', 'dataset_id', 'assay', 'assay_ontology_term_id', 'cell_type', 'cell_type_ontology_term_id', 'development_stage', 'development_stage_ontology_term_id', 'disease', 'disease_ontology_term_id', 'donor_id', 'is_primary_data', 'observation_joinid', 'self_reported_ethnicity', 'self_reported_ethnicity_ontology_term_id', 'sex', 'sex_ontology_term_id', 'suspension_type', 'tissue', 'tissue_ontology_term_id', 'tissue_type', 'tissue_general', 'tissue_general_ontology_term_id', 'raw_sum', 'nnz', 'raw_mean_nnz', 'raw_variance_nnz', 'n_measured_vars'
    var: 'soma_joinid', 'feature_id', 'feature_name', 'feature_type', 'feature_length', 'nnz', 'n_measured_obs'

The X matrix is a Dask Array:

[7]:
print(type(adata.X))
adata.X
<class 'dask.array.core.Array'>
[7]:
Array Chunk
Shape (1429379, 61888) (100000, 61888)
Dask graph 15 chunks in 2 graph layers
Data type float32 scipy.sparse._csr.csr_matrix
61888 1429379

Scanpy preprocessing

PCA uses normalized cell counts, and log1p values:

[8]:
%%time
sc.pp.normalize_total(adata)
CPU times: user 23.4 ms, sys: 8.21 ms, total: 31.6 ms
Wall time: 25 ms
[9]:
%%time
sc.pp.log1p(adata)
CPU times: user 4.45 ms, sys: 3.22 ms, total: 7.67 ms
Wall time: 9.49 ms

Scanpy HVG

[10]:
%%time
hvg = sc.pp.highly_variable_genes(adata, inplace=hvg_inplace, subset=hvg_subset)
hvg
CPU times: user 7.97 s, sys: 2.03 s, total: 10 s
Wall time: 32.1 s

Scanpy PCA

NOTE: it’s important to run HVG before PCA, so that PCA will only operate on the highly-variable genes. Otherwise, it will run on all ≈60k genes, and use intractable amounts of memory and compute.

[11]:
%%time
sc.pp.pca(adata)
CPU times: user 1min 8s, sys: 3.99 s, total: 1min 12s
Wall time: 1min 4s
[12]:
%%time
adata.obsm["X_pca"] = adata.obsm["X_pca"].compute()
CPU times: user 11.7 s, sys: 2.6 s, total: 14.3 s
Wall time: 23.5 s

Subset cell types, plot

[13]:
t10 = adata.obs.cell_type.value_counts().iloc[:10]
a10 = adata[adata.obs.cell_type.isin(t10.index)]
a10
[13]:
View of AnnData object with n_obs × n_vars = 893703 × 61888
    obs: 'soma_joinid', 'dataset_id', 'assay', 'assay_ontology_term_id', 'cell_type', 'cell_type_ontology_term_id', 'development_stage', 'development_stage_ontology_term_id', 'disease', 'disease_ontology_term_id', 'donor_id', 'is_primary_data', 'observation_joinid', 'self_reported_ethnicity', 'self_reported_ethnicity_ontology_term_id', 'sex', 'sex_ontology_term_id', 'suspension_type', 'tissue', 'tissue_ontology_term_id', 'tissue_type', 'tissue_general', 'tissue_general_ontology_term_id', 'raw_sum', 'nnz', 'raw_mean_nnz', 'raw_variance_nnz', 'n_measured_vars'
    var: 'soma_joinid', 'feature_id', 'feature_name', 'feature_type', 'feature_length', 'nnz', 'n_measured_obs', 'highly_variable', 'means', 'dispersions', 'dispersions_norm'
    uns: 'log1p', 'hvg', 'pca'
    obsm: 'X_pca'
    varm: 'PCs'
[14]:
%%time
sc.pl.pca(a10, color="cell_type", palette=["red", "blue", "green", "yellow", "grey", "purple", "orange", "teal", "pink", "black"])
/home/ubuntu/.pyenv/versions/3.11.8/lib/python3.11/site-packages/anndata/_core/anndata.py:1146: ImplicitModificationWarning: Trying to modify attribute `.var` of view, initializing view as actual.
  df[key] = c
../_images/notebooks_tutorial_scanpy_pca_dask_25_1.png
CPU times: user 5.74 s, sys: 300 ms, total: 6.04 s
Wall time: 5.77 s