Source code for ocdsmerge.flatten

from __future__ import annotations

import uuid
import warnings
from enum import Enum, auto, unique
from typing import TYPE_CHECKING, Any, Union

from ocdsmerge.exceptions import DuplicateIdValueWarning, InconsistentTypeError

if TYPE_CHECKING:
    from collections.abc import Generator

    from ocdsmerge.rules import MergeRules

VERSIONED_VALUE_KEYS = frozenset(['releaseID', 'releaseDate', 'releaseTag', 'value'])


[docs] @unique class MergeStrategy(Enum): APPEND = auto() MERGE_BY_POSITION = auto()
globals().update(MergeStrategy.__members__) Identifier = Union[int, str] Flattened = dict[tuple[Identifier, ...], Any] RuleOverrides = dict[tuple[str, ...], MergeStrategy]
[docs] class IdValue(str): __slots__ = ('_original_value', 'identifier') """ A string with ``identifier`` and ``original_value`` properties. """ def __init__(self, identifier: Identifier): self.identifier = identifier str.__init__(identifier) @property def original_value(self) -> Identifier | None: return self._original_value @original_value.setter def original_value(self, original_value: Identifier | None) -> None: self._original_value = original_value
[docs] def is_versioned_value(value: dict[str, Any]) -> bool: """Return whether the value is a versioned value.""" return len(value) == 4 and VERSIONED_VALUE_KEYS.issuperset(value)
[docs] def flatten( obj: list[dict[str, Any]] | dict[str, Any], merge_rules: MergeRules, rule_overrides: RuleOverrides, flattened: Flattened, path: tuple[Identifier, ...] = (), rule_path: tuple[str, ...] = (), versioned: bool | None = False, # noqa: FBT002 ) -> Flattened: """ Flatten a JSON object into key-value pairs, in which the key is the JSON path as a tuple. It replaces numbers in JSON paths (representing positions in arrays) with special objects. This ensures objects in arrays with different ``id`` values have different JSON paths - and makes it easy to identify such arrays. .. code:: json { "a": "I am a", "b": ["A", "list"], "c": [ {"id": 1, "cb": "I am ca"}, {"id": 2, "ca": "I am cb"} ] } flattens to: .. code:: python { ('a',): 'I am a', ('b',): ['A', 'list'], ('a', '1', 'cb'): 'I am ca', ('a', '1', 'id'): 1, ('a', '2', 'ca'): 'I am cb', ('a', '2', 'id'): 2, } """ # For an exploration of alternatives, see: https://github.com/open-contracting/ocds-merge/issues/26 if type(obj) is list: is_dict = False iterable = _enumerate(obj, path, rule_path, rule_overrides.get(rule_path)) new_rule_path = rule_path else: is_dict = True iterable = obj.items() for key, value in iterable: if is_dict: new_rule_path = (*rule_path, key) new_path_merge_rules = merge_rules.get(new_rule_path, None) if new_path_merge_rules == 'omitWhenMerged': continue # If it's `wholeListMerge`, if it's neither an object nor an array, if it's an array containing non-objects # (even if `wholeListMerge` is `false`), or if it's versioned values, use the whole list merge strategy. # Note: Behavior is undefined and inconsistent if the array is not in the schema and contains objects in some # cases but not in others. # See https://standard.open-contracting.org/1.1/en/schema/merging/#whole-list-merge # See https://standard.open-contracting.org/1.1/en/schema/merging/#objects if new_path_merge_rules == 'wholeListMerge' or not isinstance(value, (dict, list)) or \ (type(value) is list and any(not isinstance(item, dict) for item in value)) or \ (versioned and value and all(is_versioned_value(item) for item in value)): flattened[(*path, key)] = value # Recurse into non-empty objects, and arrays of objects that aren't `wholeListMerge`. elif value: flatten(value, merge_rules, rule_overrides, flattened, (*path, key), new_rule_path, versioned) return flattened
def _enumerate( obj: list[dict[str, Any]], path: tuple[Identifier, ...], rule_path: tuple[str, ...], rule: MergeStrategy | None ) -> Generator[tuple[IdValue, Any], None, None]: # This tracks the identifiers of objects in an array, to warn about collisions. identifiers = {} for key, value in enumerate(obj): new_key, default_key = _id_value(key, value, rule) # Check whether the identifier is used by other objects in the array. default_path = (*path, default_key) if default_path not in identifiers: identifiers[default_path] = key elif identifiers[default_path] != key: warnings.warn( f'Multiple objects have the `id` value {default_key!r} in the `{".".join(map(str, rule_path))}` array', category=DuplicateIdValueWarning, stacklevel=2, ) yield new_key, value def _id_value(key: int, value: dict[str, Any], rule: MergeStrategy | None) -> tuple[IdValue, IdValue]: # If it is an array of objects, get the `id` value to apply the identifier merge strategy. # https://standard.open-contracting.org/latest/en/schema/merging/#identifier-merge if 'id' in value: id_value = value['id'] identifier = id_value # If the object contained no top-level `id` value, set a unique value. else: id_value = None identifier = str(uuid.uuid1(1)) # use 1 instead of MAC address # Calculate the key for the warning, which checks for collisions using the default merge strategy. default_key = IdValue(identifier) if rule == MergeStrategy.APPEND: # Avoid creating an extra UUID. new_key = IdValue(str(uuid.uuid1(1))) if 'id' in value else default_key elif rule == MergeStrategy.MERGE_BY_POSITION: new_key = IdValue(key) else: new_key = default_key # Save the original value. (If the value is an integer, this avoids coercing it to a string.) new_key.original_value = id_value return new_key, default_key
[docs] def unflatten(flattened: Flattened) -> dict[str, Any]: """Unflattens a flattened object into a JSON object.""" unflattened: dict[str, Any] = {} identifiers: dict[tuple[Identifier, ...], dict] = {} for key in flattened: current_node = unflattened for end, part in enumerate(key, 1): if TYPE_CHECKING: assert type(part) is str # If this is a path to an item of an array. # See https://standard.open-contracting.org/1.1/en/schema/merging/#identifier-merge if type(part) is IdValue: # If no `id` of an object in the array matches, append a new object. id_path = key[:end - 1] + (part.identifier,) if id_path not in identifiers: new_node = {} # If the original object had an `id` value, set it. if part.original_value is not None: new_node['id'] = part.original_value # Cache which identifiers appear in which arrays. identifiers[id_path] = new_node current_node.append(new_node) # Change into it. current_node = identifiers[id_path] elif not isinstance(current_node, dict): message = 'An earlier release had the literal {!r} for /{}, but the current release has an object with a {!r} key' # noqa: E501 raise InconsistentTypeError(message.format(current_node, '/'.join(key[:end - 1]), part)) # Otherwise, this is a path to a property of an object. If this is a path to a node we visited before, # change into it. If it's an `id` field, it's already been set to its original value. elif part in current_node: current_node = current_node[part] elif end < len(key): # If the path is to a new array, start a new array, and change into it. if type(key[end]) is IdValue: current_node[part] = [] # If the path is to a new object, start a new object, and change into it. else: current_node[part] = {} current_node = current_node[part] # If this is a full path, copy the data, omitting null'ed fields. elif flattened[key] is not None: current_node[part] = flattened[key] return unflattened