"""Product sidecar, checksum, and pointer-attribute helpers."""
from __future__ import annotations
import shutil
from collections.abc import Mapping
from dataclasses import dataclass
from importlib import resources
from pathlib import Path
from string import Template
from typing import Any
from ._json import read_json, write_json
from ._paths import sha256_file
from .product_metadata import (
PRODUCT_INDEX_FILE,
RO_CRATE_FILE,
ProductMetadata,
ProductMetadataDependencyError,
input_license_records,
is_cc_family_license,
match_product_metadata,
product_license_summary,
require_product_metadata_tools,
software_license_records,
)
from .provenance import public_provenance
PROVENANCE_ATTR = "processing_provenance"
PROVENANCE_FILE_ATTR = "provenance_file"
PROVENANCE_SHA256_ATTR = "provenance_sha256"
PROVENANCE_SCHEMA_ATTR = "provenance_schema_version"
[docs]
@dataclass(frozen=True)
class ProductSidecars:
"""Paths that travel with one durable data product."""
data: Path
package: Path
stem: str
readme: Path
license: Path
ro_crate: Path
provenance: Path
provenance_sha256: Path
[docs]
def product_sidecars(data_path: str | Path) -> ProductSidecars:
"""Return sidecar paths for a durable data product."""
data = Path(data_path)
stem = data.stem
return ProductSidecars(
data=data,
package=data.parent,
stem=stem,
readme=data.parent / "README.md",
license=data.parent / "LICENSE.md",
ro_crate=data.parent / RO_CRATE_FILE,
provenance=data.parent / f"{stem}.prov.json",
provenance_sha256=data.parent / f"{stem}.prov.json.sha256",
)
def write_sha256_file(path: str | Path, *, digest: str, filename: str) -> None:
out = Path(path)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(f"{digest} {filename}\n", encoding="utf-8")
[docs]
def product_record(
data_path: str | Path,
*,
provenance_path: str | Path | None = None,
metadata: Mapping[str, Any] | None = None,
) -> dict[str, Any]:
"""Build generic product metadata for a provenance record."""
data_path = Path(data_path)
sidecars = product_sidecars(data_path)
provenance_file = Path(provenance_path).name if provenance_path else sidecars.provenance.name
record: dict[str, Any] = {
"data": data_path.name,
"format": "zarr" if data_path.suffix == ".zarr" else data_path.suffix.removeprefix("."),
"package": ".",
"provenance_file": provenance_file,
"provenance_sha256_file": f"{provenance_file}.sha256"
if not provenance_file.endswith(".sha256")
else provenance_file,
"readme_file": sidecars.readme.name,
"license_file": sidecars.license.name,
"ro_crate_file": sidecars.ro_crate.name,
}
if metadata:
record.update({str(key): value for key, value in metadata.items()})
return {key: value for key, value in record.items() if value not in (None, "")}
[docs]
def public_license(license_payload: str | Mapping[str, Any] | None) -> dict[str, str]:
"""Validate and normalize required product license metadata."""
if not license_payload:
raise ValueError(
"Product license metadata is required. Configure a matching entry in "
"reprotrail.products.toml or pass explicit product license metadata before finalizing product sidecars."
)
summary = product_license_summary(license_payload)
if summary is None:
raise ValueError("Product license metadata is required.")
return summary
[docs]
def default_readme_template_text() -> str:
"""Return the bundled product README template."""
return resources.files("reprotrail.templates").joinpath("product_README.md.template").read_text(encoding="utf-8")
[docs]
def copy_readme_template(output: str | Path, *, force: bool = False) -> Path:
"""Copy the bundled product README template for project customization."""
destination = Path(output)
if destination.exists() and not force:
raise FileExistsError(f"README template already exists: {destination}")
destination.parent.mkdir(parents=True, exist_ok=True)
with resources.as_file(resources.files("reprotrail.templates").joinpath("product_README.md.template")) as source:
shutil.copyfile(source, destination)
return destination
def _template_text(metadata: ProductMetadata | None, project_root: Path) -> str:
if metadata is None or not metadata.readme_template:
return default_readme_template_text()
path = Path(metadata.readme_template)
if not path.is_absolute():
path = project_root / path
return path.read_text(encoding="utf-8")
def _files_section(product: Mapping[str, Any], *, license_payload: Mapping[str, str] | None) -> str:
data = product.get("data") or "product"
provenance_file = product.get("provenance_file") or "product.prov.json"
checksum_file = product.get("provenance_sha256_file") or f"{provenance_file}.sha256"
ro_crate_file = product.get("ro_crate_file") or RO_CRATE_FILE
lines = [
f"- `{data}`: data product",
f"- `{provenance_file}`: product provenance",
f"- `{checksum_file}`: SHA-256 checksum for `{provenance_file}`",
f"- `{ro_crate_file}`: RO-Crate metadata for this product package",
]
if license_payload:
lines.append("- `LICENSE.md`: license notice")
return "\n".join(lines)
def _license_section(license_payload: Mapping[str, str] | None) -> str:
if license_payload:
return f"This product is distributed under {license_payload['name']} (`{license_payload['spdx']}`)."
return "No product license was selected. This package does not include a `LICENSE.md` notice."
def _attribution_section(input_records: list[dict[str, Any]]) -> str:
entries = []
for item in input_records:
if item.get("marginal"):
continue
parts = [str(item.get("name") or item.get("path") or item["id"])]
if item.get("producer"):
parts.append(f"producer: {item['producer']}")
if item.get("license"):
parts.append(f"license: {item['license']}")
else:
parts.append("license: unknown")
if item.get("url"):
parts.append(f"url: {item['url']}")
entries.append("- " + "; ".join(parts))
return "\n".join(entries) if entries else "No non-marginal input attribution entries were provided."
def _warnings_section(warnings: list[str]) -> str:
return "\n".join(f"- {warning}" for warning in warnings) if warnings else "No packaging warnings."
def _readme_text(
record: Mapping[str, Any],
digest: str,
license_payload: Mapping[str, str] | None,
*,
input_records: list[dict[str, Any]],
warnings: list[str],
metadata: ProductMetadata | None,
project_root: Path,
) -> str:
product = record.get("product") or {}
provenance_file = product.get("provenance_file") or "product.prov.json"
checksum_file = product.get("provenance_sha256_file") or f"{provenance_file}.sha256"
data = product.get("data") or "product"
schema = record.get("schema_version") or record.get("provenance_schema_version") or "1"
template = Template(_template_text(metadata, project_root))
return template.safe_substitute(
data=data,
provenance_file=provenance_file,
checksum_file=checksum_file,
schema=schema,
digest=digest,
files_section=_files_section(product, license_payload=license_payload),
license_section=_license_section(license_payload),
attribution_section=_attribution_section(input_records),
warnings_section=_warnings_section(warnings),
)
def _license_text(license_payload: Mapping[str, str]) -> str:
return (
f"SPDX-License-Identifier: {license_payload['spdx']}\n\n"
f"{license_payload['name']}\n\n"
f"{license_payload['url']}\n\n"
"When using or redistributing this product, preserve the provenance "
"files included in this package.\n"
)
def _append_record_warnings(record: dict[str, Any], warnings: list[str]) -> None:
if not warnings:
return
existing = list(record.get("warnings") or [])
for warning in warnings:
if warning not in existing:
existing.append(warning)
record["warnings"] = existing
def _entity_id(value: str) -> str:
return value.replace("\\", "/").replace(" ", "-").replace(":", "-").replace("#", "-").strip("/") or "unknown"
def _add_package_file(crate: Any, path: Path, *, properties: dict[str, Any] | None = None) -> None:
if path.exists() and path.is_dir():
crate.add_dataset(str(path), dest_path=path.name, properties=properties)
else:
crate.add_file(str(path), dest_path=path.name, properties=properties)
def _write_ro_crate(
path: Path,
*,
record: dict[str, Any],
data_path: Path,
digest: str,
license_payload: Mapping[str, str] | None,
input_records: list[dict[str, Any]],
software_records: list[dict[str, Any]],
warnings: list[str],
) -> None:
from rocrate.rocrate import ROCrate
product = record.get("product") or {}
crate = ROCrate()
crate.root_dataset["name"] = product.get("data") or data_path.name
crate.root_dataset["description"] = "Reprotrail product package metadata."
crate.root_dataset["provenance_sha256"] = digest
if license_payload:
crate.root_dataset["license"] = license_payload["spdx"]
if warnings:
crate.root_dataset["reprotrail_packaging_warnings"] = warnings
data_properties: dict[str, Any] = {"name": product.get("data") or data_path.name}
if license_payload:
data_properties["license"] = license_payload["spdx"]
_add_package_file(crate, data_path, properties=data_properties)
for filename in (
product.get("provenance_file"),
product.get("provenance_sha256_file"),
product.get("readme_file"),
product.get("license_file") if license_payload else None,
):
if filename:
_add_package_file(crate, path.parent / str(filename))
for item in input_records:
entity_id = item.get("path") or item.get("url") or item["id"]
entity: dict[str, Any] = {
"@id": str(entity_id),
"@type": "Dataset",
"name": item.get("name") or str(entity_id),
"producer": item.get("producer"),
"license": item.get("license"),
"url": item.get("url"),
"marginal": item.get("marginal", False),
"license_status": item.get("status"),
"license_source": item.get("license_source"),
"spdx_valid": item.get("spdx_valid"),
}
crate.add_jsonld({key: value for key, value in entity.items() if value not in (None, "")})
for item in software_records:
entity: dict[str, Any] = {
"@id": f"software/{_entity_id(str(item.get('name') or 'unknown'))}",
"@type": "SoftwareApplication",
"name": item.get("name"),
"softwareVersion": item.get("version"),
"applicationCategory": item.get("kind"),
"license": item.get("license"),
"url": item.get("url"),
"license_family": item.get("license_family"),
"license_status": item.get("status"),
"license_source": item.get("license_source"),
"spdx_valid": item.get("spdx_valid"),
"overrides_discovered_license": item.get("overrides_discovered_license"),
}
crate.add_jsonld({key: value for key, value in entity.items() if value not in (None, "")})
write_json(path, crate.metadata.generate())
[docs]
def write_json_with_provenance(
path: str | Path,
payload: dict[str, Any],
*,
provenance: dict[str, Any] | None = None,
) -> None:
"""Write JSON metadata, embedding public provenance when supplied."""
if provenance is not None:
payload = {**payload, PROVENANCE_ATTR: public_provenance(provenance)}
write_json(path, payload)
[docs]
def stamp_dataset_provenance(obj: Any, provenance: dict[str, Any] | None) -> Any:
"""Stamp lightweight provenance pointer attrs on an xarray-like object."""
if provenance is None:
return obj
out = obj.copy()
product = provenance.get("product") or {}
if provenance.get("history_entry"):
out.attrs["history"] = provenance["history_entry"]
if product.get("provenance_file"):
out.attrs[PROVENANCE_FILE_ATTR] = product["provenance_file"]
out.attrs[PROVENANCE_SCHEMA_ATTR] = provenance.get("schema_version", "1")
out.attrs.pop(PROVENANCE_ATTR, None)
return out
def _stamp_zarr_pointer_attrs(data_path: Path, record: dict[str, Any], digest: str) -> None:
try:
import zarr
except ImportError as err: # pragma: no cover - optional dependency
raise RuntimeError("Install reprotrail[products] to stamp Zarr outputs.") from err
product = record.get("product") or {}
group = zarr.open_group(str(data_path), mode="a")
attrs = dict(group.attrs)
attrs.pop(PROVENANCE_ATTR, None)
attrs.update(
{
"history": record.get("history_entry", attrs.get("history", "")),
PROVENANCE_FILE_ATTR: product.get("provenance_file", product_sidecars(data_path).provenance.name),
PROVENANCE_SHA256_ATTR: digest,
PROVENANCE_SCHEMA_ATTR: record.get("schema_version", "1"),
}
)
group.attrs.clear()
group.attrs.update(attrs)
def _stamp_netcdf_pointer_attrs(data_path: Path, record: dict[str, Any], digest: str) -> None:
try:
import xarray as xr
except ImportError as err: # pragma: no cover - optional dependency
raise RuntimeError("Install reprotrail[products] to stamp NetCDF outputs.") from err
product = record.get("product") or {}
with xr.open_dataset(data_path) as source:
ds = source.load()
ds.attrs.pop(PROVENANCE_ATTR, None)
ds.attrs.update(
{
"history": record.get("history_entry", ds.attrs.get("history", "")),
PROVENANCE_FILE_ATTR: product.get("provenance_file", product_sidecars(data_path).provenance.name),
PROVENANCE_SHA256_ATTR: digest,
PROVENANCE_SCHEMA_ATTR: record.get("schema_version", "1"),
}
)
tmp_path = data_path.with_name(f".{data_path.name}.tmp")
try:
ds.to_netcdf(tmp_path)
tmp_path.replace(data_path)
finally:
if tmp_path.exists():
tmp_path.unlink()
[docs]
def finalize_product_provenance(
provenance_path: str | Path,
*,
project_root: str | Path | None = None,
pixi_environment: str | None = None,
product_metadata_file: str = PRODUCT_INDEX_FILE,
license: str | Mapping[str, Any] | None = None,
allow_partial_metadata: bool = False,
stamp: bool = True,
) -> str | None:
"""Finalize a product sidecar checksum and lightweight product attrs."""
path = Path(provenance_path)
if not path.exists():
return None
record = read_json(path)
product = record.get("product") or {}
if not product.get("data"):
write_json(path, record)
digest = sha256_file(path)
write_sha256_file(path.with_suffix(f"{path.suffix}.sha256"), digest=digest, filename=path.name)
return digest
data_path = path.parent / str(product["data"])
sidecars = product_sidecars(data_path)
root = Path(project_root).resolve() if project_root is not None else Path.cwd().resolve()
warnings: list[str] = []
metadata = match_product_metadata(data_path, root, metadata_file=product_metadata_file)
selected_license = license if license is not None else (metadata.license if metadata is not None else None)
tools_available = True
try:
require_product_metadata_tools()
except ProductMetadataDependencyError as err:
if not allow_partial_metadata:
raise
tools_available = False
warnings.append(f"{err} Wrote partial product metadata because --allow-partial-metadata was set.")
license_payload = None
input_records: list[dict[str, Any]] = []
software_records: list[dict[str, Any]] = []
if tools_available:
license_payload = product_license_summary(selected_license)
if metadata is None and license is None:
warnings.append(f"No product metadata entry matched {data_path.name}; no product license was selected.")
elif selected_license is None:
warnings.append("No product license was selected.")
input_records = input_license_records(
metadata.inputs if metadata is not None else (),
project_root=root,
package_dir=path.parent,
)
software_records, software_warnings = software_license_records(
project_root=root,
pixi_environment=pixi_environment,
overrides=metadata.software if metadata is not None else (),
)
warnings.extend(software_warnings)
if is_cc_family_license(license_payload):
unknown_inputs = [
str(item.get("name") or item.get("path") or item["id"])
for item in input_records
if not item.get("marginal") and not item.get("license")
]
if unknown_inputs:
warnings.append(
"CC-family product license selected, but input licenses are unknown for: "
+ ", ".join(unknown_inputs)
)
elif selected_license is not None:
warnings.append("Product license was selected but not validated because product metadata tools are missing.")
else:
warnings.append("No product license was selected.")
if license_payload:
record["license"] = license_payload
product["license_file"] = product.get("license_file") or sidecars.license.name
else:
record.pop("license", None)
product.pop("license_file", None)
product["ro_crate_file"] = product.get("ro_crate_file") or sidecars.ro_crate.name
record["product"] = product
_append_record_warnings(record, warnings)
write_json(path, record)
digest = sha256_file(path)
checksum_file = product.get("provenance_sha256_file") or sidecars.provenance_sha256.name
checksum_path = path.parent / str(checksum_file)
write_sha256_file(checksum_path, digest=digest, filename=path.name)
if data_path.exists() and stamp:
if data_path.suffix == ".zarr":
_stamp_zarr_pointer_attrs(data_path, record, digest)
elif data_path.suffix == ".nc":
_stamp_netcdf_pointer_attrs(data_path, record, digest)
readme_file = product.get("readme_file") or sidecars.readme.name
(path.parent / str(readme_file)).write_text(
_readme_text(
record,
digest,
license_payload,
input_records=input_records,
warnings=warnings,
metadata=metadata,
project_root=root,
),
encoding="utf-8",
)
if license_payload:
license_file = product.get("license_file") or sidecars.license.name
(path.parent / str(license_file)).write_text(
_license_text(license_payload),
encoding="utf-8",
)
if tools_available:
_write_ro_crate(
path.parent / str(product.get("ro_crate_file") or sidecars.ro_crate.name),
record=record,
data_path=data_path,
digest=digest,
license_payload=license_payload,
input_records=input_records,
software_records=software_records,
warnings=warnings,
)
return digest