"""Utility methods for ome_zarr access."""
import csv
import json
import logging
import os
import urllib
import webbrowser
import xml.etree.ElementTree as ET
from collections.abc import Iterator
from datetime import datetime
from http.server import ( # type: ignore[attr-defined]
HTTPServer,
SimpleHTTPRequestHandler,
test,
)
from pathlib import Path
import dask
import dask.array as da
import zarr
from dask.diagnostics import ProgressBar
# Not needed with python 3.15+? https://github.com/python/cpython/issues/86809
from RangeHTTPServer import RangeRequestHandler
from . import USE_DASK_ARRAY_KWARGS
from .format import format_from_version
from .io import parse_url
from .reader import Multiscales, Node, Reader
from .types import Any, JSONDict
LOGGER = logging.getLogger("ome_zarr.utils")
[docs]
def info(path: str, stats: bool = False) -> Iterator[Node]:
"""Print information about an OME-Zarr fileset.
All :class:`Nodes <ome_utils.reader.Node>` that are found from the given path will
be visited recursively.
Parameters
----------
path :
Path to OME-Zarr fileset.
stats :
If True, print stats (currently just minimum/maximum of all arrays)
Warnings
--------
Passing ``stats=True`` will trigger a full read of every array in the fileset.
"""
zarr = parse_url(path)
assert zarr, f"not a zarr: {zarr}"
reader = Reader(zarr)
for node in reader():
if not node.specs:
print(f"not an ome-zarr node: {node}")
continue
print(node)
loc = node.zarr
version = loc.zgroup.get("version")
if version is None:
version = loc.zgroup.get("multiscales", [{}])[0].get("version", "")
print(" - version:", version)
print(" - metadata")
for spec in node.specs:
print(f" - {spec.__class__.__name__}")
print(" - data")
for array in node.data:
minmax = ""
if stats:
minmax = f" minmax={dask.compute(array.min(), array.max())}"
print(f" - {array.shape}{minmax}")
LOGGER.debug(node.data)
yield node
def view(
input_path: str, port: int = 8000, dry_run: bool = False, force: bool = False
) -> None:
# serve the parent directory in a simple server with CORS. Open browser
# dry_run is for testing, so we don't open the browser or start the server
if not force:
zarrs = []
if (Path(input_path) / ".zattrs").exists() or (
Path(input_path) / "zarr.json"
).exists():
zarrs = find_multiscales(Path(input_path))
if len(zarrs) == 0:
print(
f"No OME-Zarr images found in {input_path}. "
f"Try $ ome_zarr finder {input_path} or use -f to force open in browser."
)
return
parent_dir, image_name = os.path.split(input_path)
if len(image_name) == 0:
parent_dir, image_name = os.path.split(parent_dir)
parent_dir = str(parent_dir)
# open ome-ngff-validator in a web browser...
url = (
f"https://ome.github.io/ome-ngff-validator/"
f"?source=http://localhost:{port}/{image_name}"
)
class CORSRequestHandler(RangeRequestHandler):
def end_headers(self) -> None:
self.send_header("Access-Control-Allow-Origin", "*")
SimpleHTTPRequestHandler.end_headers(self)
def translate_path(self, path: str) -> str:
# Since we don't call the class constructor ourselves,
# we set the directory here instead
self.directory = parent_dir
super_path = super().translate_path(path)
return super_path
# for testing
if dry_run:
return
# Open in browser...
webbrowser.open(url)
# ...then start serving content
test(CORSRequestHandler, HTTPServer, port=port)
def find_multiscales(path_to_zattrs):
# return list of images. Each image is [path_to_zarr, name, dirname]
# We want full path to find the multiscales Image. e.g. full/path/to/image.zarr/0
# AND we want image Name, e.g. "image.zarr Series 0"
# AND we want the dir path to use for Tags e.g. full/path/to
text = None
for name in (".zattrs", "zarr.json"):
if (Path(path_to_zattrs) / name).exists():
with open(path_to_zattrs / name) as f:
text = f.read()
break
if text is None:
print("No .zattrs or zarr.json found in {path_to_zattrs}")
return []
zattrs = json.loads(text)
if "attributes" in zattrs and "ome" in zattrs["attributes"]:
zattrs = zattrs["attributes"]["ome"]
if "plate" in zattrs:
plate = zattrs.get("plate")
wells = plate.get("wells")
field = "0"
if len(wells) > 0:
path_to_zarr = path_to_zattrs / wells[0].get("path") / field
plate_name = os.path.basename(path_to_zattrs)
return [[path_to_zarr, plate_name, os.path.dirname(path_to_zattrs)]]
else:
LOGGER.info("No wells found in plate%s", path_to_zattrs)
return []
elif zattrs.get("bioformats2raw.layout") == 3:
# Open OME/METADATA.ome.xml
try:
tree = ET.parse(path_to_zattrs / "OME" / "METADATA.ome.xml")
root = tree.getroot()
# spec says "If the "series" attribute does not exist and no "plate" is
# present, separate "multiscales" images MUST be stored in consecutively
# numbered groups starting from 0 (i.e. "0/", "1/", "2/", "3/", ...)."
series = 0
images = []
for child in root:
# tag is eg. {http://www.openmicroscopy.org/Schemas/OME/2016-06}Image
if child.tag.endswith("Image"):
img_name = f"{os.path.basename(path_to_zattrs)} Series:{series}"
# Get Name from XML metadata, otherwise use path and Series
img_name = child.attrib.get("Name", img_name)
images.append(
[
path_to_zattrs / str(series),
img_name,
os.path.dirname(path_to_zattrs),
]
)
series += 1
return images
except Exception as ex: # noqa: BLE001
print(ex)
elif zattrs.get("multiscales"):
return [
[
path_to_zattrs,
os.path.basename(path_to_zattrs),
os.path.dirname(path_to_zattrs),
]
]
return []
def splitall(path):
# Use os.path.split() repeatedly to split path into dirs
allparts = []
while 1:
parts = os.path.split(path)
if parts[0] == path: # sentinel for absolute paths
allparts.insert(0, parts[0])
break
elif parts[1] == path: # sentinel for relative paths
allparts.insert(0, parts[1])
break
else:
path = parts[0]
allparts.insert(0, parts[1])
return allparts
def finder(input_path: str, port: int = 8000, dry_run=False) -> None:
# serve the parent directory in a simple server with CORS. Open browser
# dry_run is for testing, so we don't open the browser or start the server
parent_path, server_dir = os.path.split(input_path)
# in case input_path had trailing slash, we go one level up...
if len(server_dir) == 0:
parent_path, server_dir = os.path.split(parent_path)
# 'input_path' is path passed to the script. To the data dir. E.g. "ZARR/data"
# 'parent_path', e.g. "ZARR" just for running http server
# 'server_dir' is the name of our top-level dir E.g. "data"
# We will be serving the data from last dir in /parent/dir/path
# so we need to use that as base for image URLs...
# walk the input path to find all .zattrs files...
def walk(path: Path):
if (path / ".zattrs").exists() or (path / "zarr.json").exists():
yield from find_multiscales(path)
else:
for p in path.iterdir():
if (p / ".zattrs").exists() or (p / "zarr.json").exists():
yield from find_multiscales(p)
elif p.is_dir():
yield from walk(p)
else:
continue
url = None
zarrs = list(walk(Path(input_path)))
# If we have just one zarr, open ome-ngff-validator in a web browser...
if len(zarrs) == 0:
print("No OME-Zarr files found in", input_path)
return
else:
# ...otherwise write to CSV file and open in BioFile Finder
col_names = ["File Path", "File Name", "Folders", "Uploaded"]
# write csv file into the dir we're serving from...
bff_csv = os.path.join(input_path, "biofile_finder.csv")
with open(bff_csv, "w", newline="") as csvfile:
writer = csv.writer(csvfile, delimiter=",")
writer.writerow(col_names)
for zarr_img in zarrs:
# zarr paths start with full path to img
# e.g. ZARR/data/to/img (from walk("ZARR/data"))
# but we want them to be from the server_dir to img, e.g "data/to/img".
# So we want relative /to/img path, from input_path -> to img
relpath = os.path.relpath(zarr_img[0], input_path)
# On Windows, we need to replace \\ with / in relpath for URL
rel_url = "/".join(splitall(relpath))
file_path = f"http://localhost:{port}/{server_dir}/{rel_url}"
name = zarr_img[1] or os.path.basename(zarr_img[0])
# folders is "f1,f2,f3" etc.
folders_path = os.path.relpath(zarr_img[2], input_path)
folders = ",".join(splitall(folders_path))
timestamp = ""
try:
mtime = os.path.getmtime(zarr_img[0])
# format mtime as "YYYY-MM-DD HH:MM:SS.Z"
timestamp = datetime.fromtimestamp(mtime).strftime(
"%Y-%m-%d %H:%M:%S.%Z"
)
except OSError:
pass
writer.writerow([file_path, name, folders, timestamp])
source = {
"uri": f"http://localhost:{port}/{server_dir}/biofile_finder.csv",
"type": "csv",
"name": "biofile_finder.csv",
}
s = urllib.parse.quote(json.dumps(source))
url = f"https://bff.allencell.org/app?source={s}"
# show small thumbnails view by default. (v=3 for big thumbnails)
url += "&v=2"
class CORSRequestHandler(RangeRequestHandler):
def end_headers(self) -> None:
self.send_header("Access-Control-Allow-Origin", "*")
SimpleHTTPRequestHandler.end_headers(self)
def translate_path(self, path: str) -> str:
# Since we don't call the class constructor ourselves,
# we set the directory here instead
self.directory = parent_path
super_path = super().translate_path(path)
return super_path
# for testing
if dry_run:
return
# Open in browser...
webbrowser.open(url)
# ...then start serving content
test(CORSRequestHandler, HTTPServer, port=port)
[docs]
def download(input_path: str, output_dir: str = ".") -> None:
"""Download an OME-Zarr from the given path.
All :class:`Nodes <ome_utils.reader.Node>` that are found from the given path will
be included in the download.
"""
location = parse_url(input_path)
assert location, f"not a zarr: {location}"
reader = Reader(location)
nodes: list[Node] = list()
paths: list[list[str]] = list()
for node in reader():
nodes.append(node)
paths.append(node.zarr.parts())
common = strip_common_prefix(paths)
output_path = Path(output_dir)
root_path = output_path / common
assert not root_path.exists(), f"{root_path} already exists!"
print("downloading...")
for path in paths:
print(" ", Path(*path))
print(f"to {output_dir}")
for path, node in sorted(zip(paths, nodes)):
target_path = output_path / Path(*path)
target_path.mkdir(parents=True)
# Use version etc...
version = node.zarr.version
fmt = format_from_version(version)
metadata: JSONDict = {}
node.write_metadata(metadata)
if fmt.zarr_format == 3:
# For zarr v3, we need to put metadata under "ome" namespace
metadata = {"ome": metadata}
root = zarr.open_group(
target_path, mode="w", zarr_format=fmt.zarr_format, attributes=metadata
)
resolutions: list[da.core.Array]
datasets: list[str]
for spec in node.specs:
if isinstance(spec, Multiscales):
datasets = spec.datasets
resolutions = node.data
zarr_array_kwargs: dict[str, Any] = {"zarr_format": fmt.zarr_format}
if USE_DASK_ARRAY_KWARGS and fmt.zarr_format == 2:
zarr_array_kwargs["chunk_key_encoding"] = {
"name": "v2",
"separator": "/",
}
elif fmt.zarr_format == 3:
zarr_array_kwargs["chunk_key_encoding"] = fmt.chunk_key_encoding
else:
zarr_array_kwargs["dimension_separator"] = "/"
if fmt.zarr_format != 2:
zarr_array_kwargs["dimension_names"] = [
axis["name"] for axis in node.metadata["axes"]
]
if datasets and resolutions:
pbar = ProgressBar()
for dataset, data in reversed(list(zip(datasets, resolutions))):
LOGGER.info("resolution %s...", dataset)
with pbar:
da.to_zarr(
arr=data,
url=root.store,
component=dataset,
**zarr_array_kwargs,
)
else:
# Assume a group that needs metadata, like labels
zarr.group(str(target_path))
[docs]
def strip_common_prefix(parts: list[list[str]]) -> str:
"""Find and remove the prefix common to all strings.
Returns the last element of the common prefix.
An exception is thrown if no common prefix exists.
>>> paths = [["a", "b"], ["a", "b", "c"]]
>>> strip_common_prefix(paths)
'b'
>>> paths
[['b'], ['b', 'c']]
"""
first_mismatch = 0
min_length = min(len(x) for x in parts)
for idx in range(min_length):
if len({x[idx] for x in parts}) == 1:
first_mismatch += 1
else:
break
if first_mismatch <= 0:
msg = "No common prefix:\n"
for path in parts:
msg += f"{path}\n"
raise Exception(msg)
else:
common = parts[0][first_mismatch - 1]
for idx, path in enumerate(parts):
parts[idx] = parts[idx][first_mismatch - 1 :]
return common