Source code for ocdsmerge.rules
from __future__ import annotations
from functools import lru_cache
from typing import TYPE_CHECKING, Any
import jsonref
from ocdsmerge.util import get_release_schema_url, get_tags
if TYPE_CHECKING:
from collections.abc import Generator
MergeRules = dict[tuple[str, ...], str]
Schema = str | dict[str, Any] | None
[docs]
def get_merge_rules(schema: Schema = None) -> MergeRules:
"""
Return merge rules as key-value pairs.
The key is a JSON path as a tuple, and the value is the merge rule as a string
("omitWhenMerged" or "wholeListMerge").
"""
schema = schema or get_release_schema_url(get_tags()[-1])
if isinstance(schema, dict):
# jsonref.JsonRef is deprecated, but used for backwards-compatibility with jsonref 0.x.
return _get_merge_rules_from_dereferenced_schema(jsonref.JsonRef.replace_refs(schema))
return _get_merge_rules_from_url_or_path(schema)
@lru_cache
def _get_merge_rules_from_url_or_path(schema: str) -> MergeRules:
if schema.startswith("http"):
deref_schema = jsonref.load_uri(schema)
else:
with open(schema) as f:
deref_schema = jsonref.load(f)
return _get_merge_rules_from_dereferenced_schema(deref_schema)
def _get_merge_rules_from_dereferenced_schema(deref_schema: dict[str, Any]) -> MergeRules:
return dict(_get_merge_rules(deref_schema["properties"]))
def _get_merge_rules(
properties: dict[str, Any], path: tuple[str, ...] | None = None
) -> Generator[tuple[tuple[str, ...], str], None, None]:
"""
Yield merge rules as key-value pairs.
The first element is a JSON path as a tuple, and the second element is the merge rule as a string
("omitWhenMerged" or "wholeListMerge").
"""
if path is None:
path = ()
for key, value in properties.items():
new_path = (*path, key)
types = _get_types(value)
# `omitWhenMerged` supersedes all other rules.
# See https://standard.open-contracting.org/1.1/en/schema/merging/#discarded-fields
if value.get("omitWhenMerged") or value.get("mergeStrategy") == "ocdsOmit":
yield new_path, "omitWhenMerged"
# `wholeListMerge` supersedes any nested rules.
# See https://standard.open-contracting.org/1.1/en/schema/merging/#whole-list-merge
elif "array" in types and (value.get("wholeListMerge") or value.get("mergeStrategy") == "ocdsVersion"):
yield new_path, "wholeListMerge"
# See https://standard.open-contracting.org/1.1/en/schema/merging/#object-values
elif "object" in types and "properties" in value:
yield from _get_merge_rules(value["properties"], path=new_path)
# See https://standard.open-contracting.org/1.1/en/schema/merging/#whole-list-merge
elif "array" in types and "items" in value:
item_types = _get_types(value["items"])
if any(item_type != "object" for item_type in item_types):
yield new_path, "wholeListMerge"
elif "object" in item_types and "properties" in value["items"]:
if "id" not in value["items"]["properties"]:
yield new_path, "wholeListMerge"
else:
yield from _get_merge_rules(value["items"]["properties"], path=new_path)
def _get_types(prop: dict[str, Any]) -> list[str]:
"""Return a property's `type` as a list."""
if "type" not in prop:
return []
if isinstance(prop["type"], str):
return [prop["type"]]
return prop["type"]