Source code for vivarium.library.dict_utils

from __future__ import absolute_import, division, print_function

import collections
import copy
from functools import reduce
import operator


tuple_separator = '___'

[docs]def merge_dicts(dicts): merge = {} for d in dicts: merge.update(d) return merge
[docs]def deep_merge_check(dct, merge_dct): ''' Recursive dict merge, which throws exceptions for conflicting values This mutates dct - the contents of merge_dct are added to dct (which is also returned). If you want to keep dct you could call it like deep_merge(dict(dct), merge_dct)''' for k, v in merge_dct.items(): if (k in dct and isinstance(dct[k], dict) and isinstance(merge_dct[k], collections.Mapping)): try: deep_merge_check(dct[k], merge_dct[k]) except: raise Exception('dict merge mismatch: key "{}" has values {} AND {}'.format(k, dct[k], merge_dct[k])) elif k in dct and (dct[k] is not merge_dct[k]): raise Exception('dict merge mismatch: key "{}" has values {} AND {}'.format(k, dct[k], merge_dct[k])) else: dct[k] = merge_dct[k] return dct
[docs]def deep_merge_combine_lists(dct, merge_dct): ''' Recursive dict merge, combines values that are lists This mutates dct - the contents of merge_dct are added to dct (which is also returned). If you want to keep dct you could call it like deep_merge(dict(dct), merge_dct)''' for k, v in merge_dct.items(): if (k in dct and isinstance(dct[k], dict) and isinstance(merge_dct[k], collections.Mapping)): deep_merge(dct[k], merge_dct[k]) elif k in dct and isinstance(dct[k], list) and isinstance(v, list): dct[k].extend(v) else: dct[k] = merge_dct[k] return dct
[docs]def deep_merge(dct, merge_dct): ''' Recursive dict merge This mutates dct - the contents of merge_dct are added to dct (which is also returned). If you want to keep dct you could call it like deep_merge(dict(dct), merge_dct)''' if dct is None: dct = {} if merge_dct is None: merge_dct = {} for k, v in merge_dct.items(): if (k in dct and isinstance(dct[k], dict) and isinstance(merge_dct[k], collections.Mapping)): deep_merge(dct[k], merge_dct[k]) else: dct[k] = merge_dct[k] return dct
[docs]def flatten_port_dicts(dicts): ''' Input: dicts (dict): embedded state dictionaries with the {'port_id': {'state_id': state_value}} Return: merge (dict): flattened dictionary with {'state_id_port_id': value} ''' merge = {} for port, states_dict in dicts.items(): for state, value in states_dict.items(): merge.update({state + '_' + port: value}) return merge
[docs]def tuplify_port_dicts(dicts): ''' Input: dicts (dict): embedded state dictionaries with the {'port_id': {'state_id': state_value}} Return: merge (dict): tuplified dictionary with {(port_id','state_id'): value} ''' merge = {} for port, states_dict in dicts.items(): if states_dict: for state, value in states_dict.items(): merge.update({(port, state): value}) return merge
[docs]def flatten_timeseries(timeseries): '''Flatten a timeseries in the style of flatten_port_dicts''' flat = {} for port, store_dict in timeseries.items(): if port == 'time': flat[port] = timeseries[port] continue for variable_name, values in store_dict.items(): key = "{}_{}".format(port, variable_name) flat[key] = values return flat
[docs]def tuple_to_str_keys(dictionary): # take a dict with tuple keys, and convert them to strings with tuple_separator as a delimiter new_dict = copy.deepcopy(dictionary) make_str_dict(new_dict) return new_dict
[docs]def make_str_dict(dictionary): # get down to the leaves first for k, v in dictionary.items(): if isinstance(v, dict): make_str_dict(v) # convert tuples in lists if isinstance(v, list): for idx, var in enumerate(v): if isinstance(var, tuple): v[idx] = tuple_separator.join(var) if isinstance(var, dict): make_str_dict(var) # which keys are tuples? tuple_ks = [k for k in dictionary.keys() if isinstance(k, tuple)] for tuple_k in tuple_ks: str_k = tuple_separator.join(tuple_k) dictionary[str_k] = dictionary[tuple_k] del dictionary[tuple_k] return dictionary
[docs]def str_to_tuple_keys(dictionary): # take a dict with keys that have tuple_separator, and convert them to tuples # get down to the leaves first for k, v in dictionary.items(): if isinstance(v, dict): str_to_tuple_keys(v) # convert strings in lists if isinstance(v, list): for idx, var in enumerate(v): if isinstance(var, str) and tuple_separator in var: v[idx] = tuple(var.split(tuple_separator)) if isinstance(var, dict): str_to_tuple_keys(var) # which keys are tuples? str_ks = [k for k in dictionary.keys() if isinstance(k, str) and tuple_separator in k] for str_k in str_ks: tuple_k = tuple(str_k.split(tuple_separator)) dictionary[tuple_k] = dictionary[str_k] del dictionary[str_k] return dictionary
[docs]def keys_list(d): return list(d.keys())
[docs]def value_in_embedded_dict(data, timeseries={}, time_index=None): ''' converts data from a single time step into an embedded dictionary with lists of values ''' for key, value in data.items(): if isinstance(value, dict): if key not in timeseries: timeseries[key] = {} timeseries[key] = value_in_embedded_dict(value, timeseries[key], time_index) elif time_index is None: if key not in timeseries: timeseries[key] = [] timeseries[key].append(value) else: if key not in timeseries: timeseries[key] = { 'value': [], 'time_index': [] } timeseries[key]['value'].append(value) timeseries[key]['time_index'].append(time_index) return timeseries
[docs]def get_path_list_from_dict(dictionary): paths_list = [] for key, value in dictionary.items(): if isinstance(value, dict): subpaths = get_path_list_from_dict(value) for subpath in subpaths: path = (key,) + subpath paths_list.append(path) else: path = (key,) paths_list.append(path) return paths_list
[docs]def get_value_from_path(dictionary, path): try: return reduce(operator.getitem, path, dictionary) except: return None
[docs]def make_path_dict(embedded_dict): '''converts embedded dict to a flat dict with path names as keys''' path_dict = {} paths_list = get_path_list_from_dict(embedded_dict) for path in paths_list: path_dict[path] = get_value_from_path(embedded_dict, path) return path_dict