Processing

Link to code

The processing.json file captures the data processing and analysis steps that have been carried out – mostly for derived data assets. This tracks what code was used for each step, when it was run, what the input and outputs where, what parameters were set. This includes things like spike sorting, image alignment, cell segmentation. It also includes manual annotation, quality control, and data analysis.

The processing file should be appended to with each subsequent stage of processing or analysis.

Example

  1"""example processing"""
  2
  3import argparse
  4from datetime import datetime, timezone
  5
  6from aind_data_schema.components.identifiers import Code, DataAsset
  7from aind_data_schema.core.processing import (
  8    DataProcess,
  9    Processing,
 10    ProcessName,
 11    ProcessStage,
 12    ResourceTimestamped,
 13    ResourceUsage,
 14)
 15from aind_data_schema_models.units import MemoryUnit
 16from aind_data_schema_models.system_architecture import OperatingSystem, CPUArchitecture
 17
 18# If a timezone isn't specified, the timezone of the computer running this
 19# script will be used as default
 20t = datetime(2022, 11, 22, 8, 43, 00, tzinfo=timezone.utc)
 21
 22
 23cpu_usage_list = [
 24    ResourceTimestamped(timestamp=datetime(2024, 9, 13, tzinfo=timezone.utc), usage=75.5),
 25    ResourceTimestamped(timestamp=datetime(2024, 9, 13, tzinfo=timezone.utc), usage=80.0),
 26]
 27
 28gpu_usage_list = [
 29    ResourceTimestamped(timestamp=datetime(2024, 9, 13, tzinfo=timezone.utc), usage=60.0),
 30    ResourceTimestamped(timestamp=datetime(2024, 9, 13, tzinfo=timezone.utc), usage=65.5),
 31]
 32
 33ram_usage_list = [
 34    ResourceTimestamped(timestamp=datetime(2024, 9, 13, tzinfo=timezone.utc), usage=70.0),
 35    ResourceTimestamped(timestamp=datetime(2024, 9, 13, tzinfo=timezone.utc), usage=72.5),
 36]
 37
 38file_io_usage_list = [
 39    ResourceTimestamped(timestamp=datetime(2024, 9, 13, tzinfo=timezone.utc), usage=5.5),
 40    ResourceTimestamped(timestamp=datetime(2024, 9, 13, tzinfo=timezone.utc), usage=6.0),
 41]
 42
 43example_code = Code(
 44    url="https://github.com/abcd",
 45    version="0.1",
 46    parameters={"size": 7},
 47)
 48
 49p = Processing.create_with_sequential_process_graph(
 50    pipelines=[
 51        Code(
 52            name="Imaging processing pipeline",
 53            url="https://url/for/pipeline",
 54            version="0.1.1",
 55            input_data=[DataAsset(
 56                name="123456_2026-05-20_14-14-14",
 57            )]
 58        ),
 59    ],
 60    data_processes=[
 61        DataProcess(
 62            process_type=ProcessName.IMAGE_TILE_FUSING,
 63            experimenters=["Dr. Dan"],
 64            stage=ProcessStage.PROCESSING,
 65            start_date_time=t,
 66            end_date_time=t,
 67            output_path="path/to/outputs",
 68            pipeline_name="Imaging processing pipeline",
 69            code=example_code.model_copy(
 70                update=dict(
 71                    parameters={"size": 7},
 72                )
 73            ),
 74            resources=ResourceUsage(
 75                os=OperatingSystem.UBUNTU_20_04,
 76                architecture=CPUArchitecture.X86_64,
 77                cpu="Intel Core i7",
 78                cpu_cores=8,
 79                gpu="NVIDIA GeForce RTX 3080",
 80                system_memory=32.0,
 81                system_memory_unit=MemoryUnit.GB,
 82                ram=16.0,
 83                ram_unit=MemoryUnit.GB,
 84                cpu_usage=cpu_usage_list,
 85                gpu_usage=gpu_usage_list,
 86                ram_usage=ram_usage_list,
 87            ),
 88        ),
 89        DataProcess(
 90            process_type=ProcessName.FILE_FORMAT_CONVERSION,
 91            pipeline_name="Imaging processing pipeline",
 92            experimenters=["Dr. Dan"],
 93            stage=ProcessStage.PROCESSING,
 94            start_date_time=t,
 95            end_date_time=t,
 96            output_path="path/to/outputs",
 97            code=example_code.model_copy(
 98                update=dict(
 99                    parameters={"u": 7, "z": True},
100                )
101            ),
102        ),
103        DataProcess(
104            process_type=ProcessName.IMAGE_DESTRIPING,
105            pipeline_name="Imaging processing pipeline",
106            experimenters=["Dr. Dan"],
107            stage=ProcessStage.PROCESSING,
108            start_date_time=t,
109            end_date_time=t,
110            output_path="path/to/output",
111            code=example_code.model_copy(
112                update=dict(
113                    parameters={"a": 2, "b": -2},
114                )
115            ),
116        ),
117        DataProcess(
118            name="Analysis 1",
119            stage=ProcessStage.ANALYSIS,
120            experimenters=["Some Analyzer"],
121            process_type=ProcessName.ANALYSIS,
122            start_date_time=t,
123            end_date_time=t,
124            output_path="path/to/outputs",
125            code=example_code.model_copy(
126                update=dict(
127                    parameters={"size": 7},
128                )
129            ),
130        ),
131        DataProcess(
132            name="Analysis 2",
133            stage=ProcessStage.ANALYSIS,
134            experimenters=["Some Analyzer"],
135            process_type=ProcessName.ANALYSIS,
136            start_date_time=t,
137            end_date_time=t,
138            output_path="path/to/outputs",
139            code=example_code.model_copy(
140                update=dict(
141                    parameters={"u": 7, "z": True},
142                )
143            ),
144        ),
145    ],
146)
147
148if __name__ == "__main__":
149    parser = argparse.ArgumentParser()
150    parser.add_argument("--output-dir", default=None, help="Output directory for generated JSON file")
151    args = parser.parse_args()
152
153    serialized = p.model_dump_json()
154    deserialized = Processing.model_validate_json(serialized)
155    p.write_standard_file(output_directory=args.output_dir)

Core file

Processing

Description of all processes run on data

Field

Type

Title (Description)

data_processes

List[DataProcess]

Data processing

pipelines

Optional[List[Code]]

Pipelines (For processing done with pipelines, list the repositories here. Pipelines must use the name field ,and be referenced in the pipeline_name field of a DataProcess.)

notes

Optional[str]

Notes

dependency_graph

Optional[Dict[str, List[str]]]

Dependency graph (Directed graph of processing step dependencies. Each key is a process name, and the value is a list of process names that are inputs to that process.)

Model definitions

DataProcess

Description of a single processing step

Field

Type

Title (Description)

process_type

ProcessName

Process type

name

str

Name ((‘Unique name of the processing step.’, ‘ If not provided, the type will be used as the name.’))

stage

ProcessStage

Processing stage

code

Code

Code (Code used for processing)

experimenters

List[str]

Experimenters (People responsible for processing)

pipeline_name

Optional[str]

Pipeline name (Pipeline names must exist in Processing.pipelines)

start_date_time

datetime (timezone-aware)

Start date time

end_date_time

Optional[datetime (timezone-aware)]

End date time

output_path

Optional[AssetPath]

Output path (Path to processing outputs, if stored.)

output_parameters

Optional[dict]

Outputs (Output parameters)

notes

Optional[str]

Notes

resources

Optional[ResourceUsage]

Process resource usage

ProcessStage

Stages of processing

Name

Value

PROCESSING

Processing

ANALYSIS

Analysis

ResourceTimestamped

Description of resource usage at a moment in time

Field

Type

Title (Description)

timestamp

datetime (timezone-aware)

Timestamp

usage

float

Usage

ResourceUsage

Description of resources used by a process

Field

Type

Title (Description)

os

str

Operating system

architecture

str

Architecture

cpu

Optional[str]

CPU name

cpu_cores

Optional[int]

CPU cores

gpu

Optional[str]

GPU name

system_memory

Optional[float]

System memory

system_memory_unit

Optional[MemoryUnit]

System memory unit

ram

Optional[float]

System RAM

ram_unit

Optional[MemoryUnit]

Ram unit

cpu_usage

Optional[List[ResourceTimestamped]]

CPU usage

gpu_usage

Optional[List[ResourceTimestamped]]

GPU usage

ram_usage

Optional[List[ResourceTimestamped]]

RAM usage

usage_unit

str

Usage unit