Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Precompute clustering #18

Merged
merged 22 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
026e765
add clustering data frame to the solution
rcannood Dec 10, 2024
9392ab6
update script
rcannood Dec 10, 2024
013b54c
add comments
rcannood Dec 10, 2024
b075b3c
add clustering key prefix for cluster-based metrics
mumichae Dec 13, 2024
0f99a8d
add resolutions parameters to metrics to make use of precomputed clus…
mumichae Dec 13, 2024
a4404ca
fix clustering key for nmi and ari
mumichae Dec 13, 2024
54c0fd9
set correct version of scib to make using precomputed clusters possible
mumichae Dec 13, 2024
f80d939
add resolutions argument to cluster-based metrics
mumichae Dec 20, 2024
5234d3c
use igraph for clustering on CPU
mumichae Dec 20, 2024
17d436c
use partial reading for clustering
mumichae Dec 20, 2024
e77ad55
rename cluster keys to be consistent with scib metrics
mumichae Dec 20, 2024
810507f
fix import and reading missing slot
mumichae Dec 20, 2024
391e4b2
get clustering from obsm
mumichae Dec 20, 2024
4b95c18
Add config to create test resources script
lazappi Jan 8, 2025
6c56070
Add clustering to benchmark workflow
lazappi Jan 8, 2025
f3dc116
Remove clustering from process dataset workflow
lazappi Jan 8, 2025
b285dbe
Move output processing to subworkflow
lazappi Jan 9, 2025
81f9649
Update API with processing subworkflow
lazappi Jan 9, 2025
3ff4797
Re-enable all methods/metrics
lazappi Jan 9, 2025
5ee87fd
Remove clustering from fil_solution.yaml API file
lazappi Jan 9, 2025
19b6e52
Add processing to test resources script
lazappi Jan 9, 2025
38552af
update readme
rcannood Jan 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ Format:
AnnData object
obs: 'cell_type', 'batch'
var: 'hvg', 'hvg_score', 'feature_name'
obsm: 'X_pca'
obsm: 'X_pca', 'clustering'
rcannood marked this conversation as resolved.
Show resolved Hide resolved
obsp: 'knn_distances', 'knn_connectivities'
layers: 'counts', 'normalized'
uns: 'dataset_id', 'dataset_name', 'dataset_url', 'dataset_reference', 'dataset_summary', 'dataset_description', 'dataset_organism', 'normalization_id', 'knn'
Expand All @@ -229,6 +229,7 @@ Data structure:
| `var["hvg_score"]` | `double` | A ranking of the features by hvg. |
| `var["feature_name"]` | `string` | A human-readable name for the feature, usually a gene symbol. |
| `obsm["X_pca"]` | `double` | The resulting PCA embedding. |
| `obsm["clustering"]` | `integer` | Leiden clustering results at different resolutions. |
| `obsp["knn_distances"]` | `double` | K nearest neighbors distance matrix. |
| `obsp["knn_connectivities"]` | `double` | K nearest neighbors connectivities matrix. |
| `layers["counts"]` | `integer` | Raw counts. |
Expand Down
10 changes: 7 additions & 3 deletions scripts/create_resources/test_resources.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ DATASET_DIR=resources_test/task_batch_integration
mkdir -p $DATASET_DIR

# process dataset
viash run src/data_processors/process_dataset/config.vsh.yaml -- \
nextflow run . \
-main-script target/nextflow/workflows/process_datasets/main.nf \
-profile docker \
--input "$RAW_DATA/cxg_immune_cell_atlas/dataset.h5ad" \
--output_dataset "$DATASET_DIR/cxg_immune_cell_atlas/dataset.h5ad" \
--output_solution "$DATASET_DIR/cxg_immune_cell_atlas/solution.h5ad"
--publish_dir "$DATASET_DIR/cxg_immune_cell_atlas" \
--output_dataset output_dataset.h5ad \
--output_solution output_solution.h5ad \
--output_state state.yaml

# run one method
viash run src/methods/combat/config.vsh.yaml -- \
Expand Down
4 changes: 4 additions & 0 deletions src/api/file_solution.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ info:
name: X_pca
description: The resulting PCA embedding.
required: true
- type: integer
name: clustering
description: Leiden clustering results at different resolutions.
required: true
obsp:
- type: double
name: knn_distances
Expand Down
30 changes: 30 additions & 0 deletions src/data_processors/precompute_clustering_merge/config.vsh.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
name: precompute_clustering_merge
namespace: data_processors
label: Merge clustering precomputations
summary: Merge the precompute results of clustering on the input dataset
arguments:
- name: --input
type: file
direction: input
required: true
- name: --output
type: file
direction: output
required: true
- type: file
name: --clusterings
description: Clustering results to merge
direction: input
required: true
multiple: true
resources:
- type: python_script
path: script.py
engines:
- type: docker
image: openproblems/base_python:1.0.0
runners:
- type: executable
- type: nextflow
directives:
label: [midtime, midmem, lowcpu]
28 changes: 28 additions & 0 deletions src/data_processors/precompute_clustering_merge/script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import anndata as ad
import pandas as pd

## VIASH START
par = {
"input": "resources_test/task_batch_integration/cxg_immune_cell_atlas/dataset.h5ad",
"clusterings": ["output.h5ad", "output2.h5ad"],
"output": "output3.h5ad",
}
## VIASH END

print("Read clusterings", flush=True)
clusterings = []
for clus_file in par["clusterings"]:
adata = ad.read_h5ad(clus_file)
obs_filt = adata.obs.filter(regex='leiden_r[0-9.]+')
mumichae marked this conversation as resolved.
Show resolved Hide resolved
clusterings.append(obs_filt)

print("Merge clusterings", flush=True)
merged = pd.concat(clusterings, axis=1)

print("Read input", flush=True)
input = ad.read_h5ad(par["input"])

input.obsm["clustering"] = merged
mumichae marked this conversation as resolved.
Show resolved Hide resolved

print("Store outputs", flush=True)
input.write_h5ad(par["output"], compression="gzip")
34 changes: 34 additions & 0 deletions src/data_processors/precompute_clustering_run/config.vsh.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: precompute_clustering_run
namespace: data_processors
label: Run clustering precomputations
summary: Run clustering on the input dataset
arguments:
- name: --input
type: file
direction: input
required: true
- name: --output
type: file
direction: output
required: true
- type: double
name: resolution
default: 0.8
description: Resolution parameter for clustering
resources:
- type: python_script
path: script.py
engines:
- type: docker
image: openproblems/base_python:1.0.0
setup:
- type: python
pypi:
- scanpy
- igraph
- leidenalg
runners:
- type: executable
- type: nextflow
directives:
label: [midtime, midmem, lowcpu]
39 changes: 39 additions & 0 deletions src/data_processors/precompute_clustering_run/script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import anndata as ad

# check if we can use GPU
USE_GPU = False
try:
import subprocess
assert subprocess.run('nvidia-smi', shell=True, stdout=subprocess.DEVNULL).returncode == 0
from rapids_singlecell.tl import leiden
USE_GPU = True
except Exception as e:
mumichae marked this conversation as resolved.
Show resolved Hide resolved
from scanpy.tl import leiden

## VIASH START
par = {
"input": "resources_test/task_batch_integration/cxg_immune_cell_atlas/dataset.h5ad",
"output": "output.h5ad",
"resolution": 0.8,
}
## VIASH END

n_cell_cpu = 300_000

print("Read input", flush=True)
input = ad.read_h5ad(par["input"])
mumichae marked this conversation as resolved.
Show resolved Hide resolved

key = f'leiden_r{par["resolution"]}'
mumichae marked this conversation as resolved.
Show resolved Hide resolved

mumichae marked this conversation as resolved.
Show resolved Hide resolved
leiden(
input,
resolution=par["resolution"],
neighbors_key="knn",
key_added=key,
mumichae marked this conversation as resolved.
Show resolved Hide resolved
)

print("Store outputs", flush=True)
output = ad.AnnData(
obs=input.obs[[key]],
)
output.write_h5ad(par["output"], compression="gzip")
10 changes: 8 additions & 2 deletions src/data_processors/process_dataset/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
"output": "output.h5ad"
}
meta = {
"config": "target/nextflow/batch_integration/process_dataset/.config.vsh.yaml",
"resources_dir": "src/common/helper_functions"
"config": "target/nextflow/data_processors/process_dataset/.config.vsh.yaml",
"resources_dir": "target/nextflow/data_processors/process_dataset"
}
## VIASH END

Expand Down Expand Up @@ -80,6 +80,12 @@ def compute_batched_hvg(adata, n_hvgs):
"variance_ratio": variance_ratio
}

print(">> Recompute neighbors", flush=True)
del adata.uns["knn"]
del adata.obsp["knn_connectivities"]
del adata.obsp["knn_distances"]
sc.pp.neighbors(adata, use_rep="X_pca", n_neighbors=30, key_added="knn")
mumichae marked this conversation as resolved.
Show resolved Hide resolved

print(">> Create output object", flush=True)
output_dataset = subset_h5ad_by_format(
adata,
Expand Down
2 changes: 1 addition & 1 deletion src/metrics/clustering_overlap/config.vsh.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ engines:
setup:
- type: python
pypi:
- scib==1.1.5
- scib==1.1.6
runners:
- type: executable
- type: nextflow
Expand Down
9 changes: 6 additions & 3 deletions src/metrics/clustering_overlap/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
par = {
'adata_integrated': 'resources_test/task_batch_integration/cxg_immune_cell_atlas/integrated_full.h5ad',
'output': 'output.h5ad',
"resolutions": [0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8], # TODO needs to be added to config
}

meta = {
Expand All @@ -25,18 +26,20 @@
adata.uns |= read_anndata(par['input_solution'], uns='uns').uns

print('Run optimal Leiden clustering', flush=True)
cluster_key = "leiden"
cluster_optimal_resolution(
adata=adata,
label_key="cell_type",
cluster_key='cluster',
cluster_key=cluster_key,
cluster_function=sc.tl.leiden,
resolutions=par["resolutions"],
)

print('Compute ARI score', flush=True)
ari_score = ari(adata, cluster_key='cluster', label_key="cell_type")
ari_score = ari(adata, cluster_key=cluster_key, label_key="cell_type")

print('Compute NMI score', flush=True)
nmi_score = nmi(adata, cluster_key='cluster', label_key="cell_type")
nmi_score = nmi(adata, cluster_key=cluster_key, label_key="cell_type")

print("Create output AnnData object", flush=True)
output = ad.AnnData(
Expand Down
2 changes: 1 addition & 1 deletion src/metrics/isolated_label_f1/config.vsh.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ engines:
setup:
- type: python
pypi:
- scib==1.1.5
- scib==1.1.6
runners:
- type: executable
- type: nextflow
Expand Down
5 changes: 4 additions & 1 deletion src/metrics/isolated_label_f1/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
par = {
'input_integrated': 'resources_test/task_batch_integration/cxg_immune_cell_atlas/integrated_full.h5ad',
'output': 'output.h5ad',
"resolutions": [0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8], # TODO needs to be added to config
}

meta = {
Expand All @@ -26,7 +27,9 @@
score = isolated_labels_f1(
adata,
label_key="cell_type",
batch_key='batch',
batch_key="batch",
cluster_key="leiden",
resolutions=par["resolutions"],
embed=None,
iso_threshold=None,
verbose=True,
Expand Down
9 changes: 9 additions & 0 deletions src/workflows/process_datasets/config.vsh.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ argument_groups:
__merge__: /src/api/file_solution.yaml
required: true
direction: output
- name: Clustering
arguments:
- name: "--resolutions"
type: double
multiple: true
default: [0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]
mumichae marked this conversation as resolved.
Show resolved Hide resolved
description: Resolution parameter for clustering

resources:
- type: nextflow_script
Expand All @@ -29,6 +36,8 @@ dependencies:
- name: validation/check_dataset_with_schema
repository: openproblems
- name: data_processors/process_dataset
- name: data_processors/precompute_clustering_run
- name: data_processors/precompute_clustering_merge

runners:
- type: nextflow
46 changes: 46 additions & 0 deletions src/workflows/process_datasets/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,52 @@ workflow run_wf {
state.dataset != null
}

// precompute clustering of the input data at various resolutions
| flatMap { id, state ->
state.resolutions.collect { resolution ->
def newId = "${id}_r${resolution}"
def newState = state + [
"resolution": resolution,
"prevId": id
]
[newId, newState]
}
}

// precompute clustering at this resolution
| precompute_clustering_run.run(
fromState: ["input": "dataset", "resolution": "resolution"],
toState: ["output_clustering": "output"]
)

// group by original dataset id
| map{id, state ->
[state.prevId, state]
}
| groupTuple()

// merge the clustering results into one state
| map{ id, states ->
if (states.size() == 0) {
throw new RuntimeException("Expected at least one state, but got ${states.size()}")
}
if (states.size() != states[0].resolutions.size()) {
throw new RuntimeException("Expected ${states[0].resolutions.size()} states, but got ${states.size()}")
}

def clusterings = states.collect { it.output_clustering }
def newState = states[0] + ["clusterings": clusterings]

[id, newState]
}

// merge clustering results into dataset h5ad
| precompute_clustering_merge.run(
fromState: ["input": "dataset", "clusterings": "clusterings"],
toState: ["dataset": "output"]
)

// process the dataset
| process_dataset.run(
fromState: [ input: "dataset" ],
toState: [
Expand Down
Loading