Processing

Link to code

The processing.json file captures the data processing and analysis steps that have been carried out – mostly for derived data assets. This tracks what code was used for each step, when it was run, what the input and outputs where, what parameters were set. This includes things like spike sorting, image alignment, cell segmentation. It also includes manual annotation, quality control, and data analysis.

The processing file should be appended to with each subsequent stage of processing or analysis.

Example

  1"""example processing"""
  2
  3from datetime import datetime, timezone
  4
  5from aind_data_schema.components.identifiers import Person, Code
  6from aind_data_schema.core.processing import (
  7    DataProcess,
  8    Processing,
  9    ProcessName,
 10    ProcessStage,
 11    ResourceTimestamped,
 12    ResourceUsage,
 13)
 14from aind_data_schema_models.units import MemoryUnit
 15from aind_data_schema_models.system_architecture import OperatingSystem, CPUArchitecture
 16
 17# If a timezone isn't specified, the timezone of the computer running this
 18# script will be used as default
 19t = datetime(2022, 11, 22, 8, 43, 00, tzinfo=timezone.utc)
 20
 21
 22cpu_usage_list = [
 23    ResourceTimestamped(timestamp=datetime(2024, 9, 13, tzinfo=timezone.utc), usage=75.5),
 24    ResourceTimestamped(timestamp=datetime(2024, 9, 13, tzinfo=timezone.utc), usage=80.0),
 25]
 26
 27gpu_usage_list = [
 28    ResourceTimestamped(timestamp=datetime(2024, 9, 13, tzinfo=timezone.utc), usage=60.0),
 29    ResourceTimestamped(timestamp=datetime(2024, 9, 13, tzinfo=timezone.utc), usage=65.5),
 30]
 31
 32ram_usage_list = [
 33    ResourceTimestamped(timestamp=datetime(2024, 9, 13, tzinfo=timezone.utc), usage=70.0),
 34    ResourceTimestamped(timestamp=datetime(2024, 9, 13, tzinfo=timezone.utc), usage=72.5),
 35]
 36
 37file_io_usage_list = [
 38    ResourceTimestamped(timestamp=datetime(2024, 9, 13, tzinfo=timezone.utc), usage=5.5),
 39    ResourceTimestamped(timestamp=datetime(2024, 9, 13, tzinfo=timezone.utc), usage=6.0),
 40]
 41
 42example_code = Code(
 43    url="https://github.com/abcd",
 44    version="0.1",
 45    parameters={"size": 7},
 46)
 47
 48p = Processing.create_with_sequential_process_graph(
 49    pipelines=[
 50        Code(
 51            name="Imaging processing pipeline",
 52            url="https://url/for/pipeline",
 53            version="0.1.1",
 54        ),
 55    ],
 56    data_processes=[
 57        DataProcess(
 58            process_type=ProcessName.IMAGE_TILE_FUSING,
 59            experimenters=[Person(name="Dr. Dan")],
 60            stage=ProcessStage.PROCESSING,
 61            start_date_time=t,
 62            end_date_time=t,
 63            output_path="/path/to/outputs",
 64            pipeline_name="Imaging processing pipeline",
 65            code=example_code.model_copy(
 66                update=dict(
 67                    parameters={"size": 7},
 68                )
 69            ),
 70            resources=ResourceUsage(
 71                os=OperatingSystem.UBUNTU_20_04,
 72                architecture=CPUArchitecture.X86_64,
 73                cpu="Intel Core i7",
 74                cpu_cores=8,
 75                gpu="NVIDIA GeForce RTX 3080",
 76                system_memory=32.0,
 77                system_memory_unit=MemoryUnit.GB,
 78                ram=16.0,
 79                ram_unit=MemoryUnit.GB,
 80                cpu_usage=cpu_usage_list,
 81                gpu_usage=gpu_usage_list,
 82                ram_usage=ram_usage_list,
 83            ),
 84        ),
 85        DataProcess(
 86            process_type=ProcessName.FILE_FORMAT_CONVERSION,
 87            pipeline_name="Imaging processing pipeline",
 88            experimenters=[Person(name="Dr. Dan")],
 89            stage=ProcessStage.PROCESSING,
 90            start_date_time=t,
 91            end_date_time=t,
 92            output_path="/path/to/outputs",
 93            code=example_code.model_copy(
 94                update=dict(
 95                    parameters={"u": 7, "z": True},
 96                )
 97            ),
 98        ),
 99        DataProcess(
100            process_type=ProcessName.IMAGE_DESTRIPING,
101            pipeline_name="Imaging processing pipeline",
102            experimenters=[Person(name="Dr. Dan")],
103            stage=ProcessStage.PROCESSING,
104            start_date_time=t,
105            end_date_time=t,
106            output_path="/path/to/output",
107            code=example_code.model_copy(
108                update=dict(
109                    parameters={"a": 2, "b": -2},
110                )
111            ),
112        ),
113        DataProcess(
114            stage=ProcessStage.ANALYSIS,
115            experimenters=[Person(name="Some Analyzer")],
116            process_type=ProcessName.ANALYSIS,
117            start_date_time=t,
118            end_date_time=t,
119            output_path="/path/to/outputs",
120            code=example_code.model_copy(
121                update=dict(
122                    parameters={"size": 7},
123                )
124            ),
125        ),
126        DataProcess(
127            name="Analysis 2",
128            stage=ProcessStage.ANALYSIS,
129            experimenters=[Person(name="Some Analyzer")],
130            process_type=ProcessName.ANALYSIS,
131            start_date_time=t,
132            end_date_time=t,
133            output_path="/path/to/outputs",
134            code=example_code.model_copy(
135                update=dict(
136                    parameters={"u": 7, "z": True},
137                )
138            ),
139        ),
140    ],
141)
142
143if __name__ == "__main__":
144    serialized = p.model_dump_json()
145    deserialized = Processing.model_validate_json(serialized)
146    p.write_standard_file()

Core file

Processing

Description of all processes run on data

Field

Type

Description

data_processes

List[DataProcess]

pipelines

Optional[List[Code]]

For processing done with pipelines, list the repositories here. Pipelines must use the name field ,and be referenced in the pipeline_name field of a DataProcess.

notes

Optional[str]

dependency_graph

Dict[str, List[str]]

Directed graph of processing step dependencies. Each key is a process name, and the value is a list of process names that are inputs to that process.

Model definitions

DataProcess

Description of a single processing step

Field

Type

Description

process_type

ProcessName

name

str

(‘Unique name of the processing step.’, ‘ If not provided, the type will be used as the name.’)

stage

ProcessStage

code

Code

Code used for processing

experimenters

List[Person]

People responsible for processing

pipeline_name

Optional[str]

Pipeline names must exist in Processing.pipelines

start_date_time

datetime (timezone-aware)

end_date_time

datetime (timezone-aware)

output_path

Optional[AssetPath]

Path to processing outputs, if stored.

output_parameters

dict

Output parameters

notes

Optional[str]

resources

Optional[ResourceUsage]

ProcessStage

Stages of processing

Name

Value

PROCESSING

Processing

ANALYSIS

Analysis

ResourceTimestamped

Description of resource usage at a moment in time

Field

Type

Description

timestamp

datetime (timezone-aware)

usage

float

ResourceUsage

Description of resources used by a process

Field

Type

Description

os

str

architecture

str

cpu

Optional[str]

cpu_cores

Optional[int]

gpu

Optional[str]

system_memory

Optional[float]

system_memory_unit

Optional[MemoryUnit]

ram

Optional[float]

ram_unit

Optional[MemoryUnit]

cpu_usage

Optional[List[ResourceTimestamped]]

gpu_usage

Optional[List[ResourceTimestamped]]

ram_usage

Optional[List[ResourceTimestamped]]

usage_unit

str