Source code for vivarium.parameters.parameters

from __future__ import absolute_import, division, print_function

import os
import copy
import itertools
import math

import numpy as np
import matplotlib.pyplot as plt

from vivarium.library.units import units
from vivarium.core.composition import simulate_compartment_in_experiment
from vivarium.core.process import Generator

# processes for testing
from vivarium.processes.convenience_kinetics import ConvenienceKinetics, get_glc_lct_config


[docs]def get_nested(dict, keys): d = dict for key in keys[:-1]: if key in d: d = d[key] try: value = d[keys[-1]] except: value = None print('value not found for: {}'.format(keys)) return value
[docs]def set_nested(dict, keys, value, create_missing=True): d = dict for key in keys[:-1]: if key in d: d = d[key] elif create_missing: d = d.setdefault(key, {}) else: return dict if keys[-1] in d or create_missing: d[keys[-1]] = value return dict
[docs]def get_parameters_logspace(min, max, number): ''' get list of n parameters logarithmically spaced between min and max ''' range = np.logspace(np.log10(min), np.log10(max), number, endpoint=True) return list(range)
[docs]def run_compartment_get_output(compartment, parameters, condition, metrics, settings): settings['initial_state'] = condition settings['return_raw_data'] = True settings['compartment'] = parameters # run the simulation and get the last state sim_out = simulate_compartment_in_experiment(compartment, settings) time_vec = list(sim_out.keys()) last_state = sim_out[time_vec[-1]] # pull out metric values from last_state output = [] for output_value in metrics: output.append(get_nested(last_state, output_value)) return output
[docs]def check_path_exists(path, dict): if len(path) > 0: key = path[0] if key in dict: sub_path = path[1:] sub_dict = dict[key] check_path_exists(sub_path, sub_dict) else: raise Exception('parameter path does not exist: {}'.format(path))
[docs]def parameter_scan(config): ''' Pass in a config (dict) with: - compartment (object) -- a compartment class, for configuration by the parameters - scan_parameters (dict) -- each parameter location (tuple) mapped to a list of values - metrics (list) -- a list of output values (tuple) with the (port, key) - conditions (list) -- a list of state values (dict) with {port: {variable: value}} for the default state the condition is and empty dict, [{}] - settings (dict) -- simulation settings for the experiments Returns a list of all parameter combinations, and a dictionary with output values for those parameters ''' compartment = config['compartment'] scan_params = config['scan_parameters'] metrics = config['metrics'] settings = config.get('settings', {}) conditions = config.get('conditions') if not conditions: conditions = [{}] n_conditions = len(conditions) ## Set up the parameters # get number of parameter sets for scan n_values = [len(v) for v in scan_params.values()] n_combinations = np.prod(np.array(n_values)) print('parameter scan size: {}'.format(n_combinations)) # get default parameters from baseline compartment object default_params = compartment.get_parameters() # check that scan parameters are in default parameters for path in scan_params.keys(): check_path_exists(path, default_params) # get parameter sets for scan param_keys = list(scan_params.keys()) param_values = list(scan_params.values()) param_combinations = list(itertools.product(*param_values)) # a list of all parameter combinations param_sets = [dict(zip(param_keys, combo)) for combo in param_combinations] # list of dicts with {param: value} # run all parameter sets and save results results = [] for params_index, param_set in enumerate(param_sets): # set up the parameters # parameters = copy.deepcopy(default_params) parameters = {} for param_key, param_value in param_set.items(): parameters = set_nested(parameters, param_key, param_value) ## run the parameter set for each condition's state for condition_index, condition_state in enumerate(conditions): print('running parameter set {}/{}, condition {}/{}'.format( params_index + 1, n_combinations, condition_index+1, n_conditions)) output = run_compartment_get_output( compartment, parameters, condition_state, metrics, settings) result = { 'parameter_index': params_index, 'condition_index': condition_index, 'output': output} results.append(result) # organize data by metric output_data = { 'results': results, 'metrics': metrics, 'parameter_sets': {idx: param_set for idx, param_set in enumerate(param_sets)}, 'conditions': {idx: condition for idx, condition in enumerate(conditions)}} return organize_param_scan_results(output_data)
[docs]def organize_param_scan_results(data): results = data['results'] metrics = data['metrics'] parameter_sets = data['parameter_sets'] conditions = data['conditions'] param_indices = list(range(0, len(parameter_sets))) condition_indices = list(range(0, len(conditions))) # organize the results by metric metric_data = { metric: { condition_index: [None for param in param_indices] for condition_index in condition_indices} for metric in metrics} for result in results: param_index = result['parameter_index'] condition_index = result['condition_index'] output = result['output'] for metric_index, datum in enumerate(output): metric = metrics[metric_index] metric_data[metric][condition_index][param_index] = datum return { 'metric_data': metric_data, 'parameter_sets': parameter_sets, 'conditions': conditions}
[docs]def plot_scan_results(results, out_dir='out', filename='parameter_scan'): metric_data = results['metric_data'] parameter_sets = results['parameter_sets'] conditions = results['conditions'] parameter_indices = [idx for idx, param in enumerate(parameter_sets)] # make the figure n_cols = 1 lines_per_row = 8 base_rows = len(metric_data) param_rows = math.ceil(len(parameter_indices)/lines_per_row) condition_rows = math.ceil(len(conditions)/lines_per_row) n_rows = base_rows + param_rows + condition_rows fig = plt.figure(figsize=(n_cols * 6, n_rows * 2)) grid = plt.GridSpec(n_rows, n_cols) font = {'size': 6} plt.rc('font', **font) row_idx = 0 col_idx = 0 for metric, data, in metric_data.items(): ax = fig.add_subplot(grid[row_idx, col_idx]) for condition, param_data in data.items(): ax.scatter(parameter_indices, param_data, label=condition) ax.legend(title='condition', bbox_to_anchor=(1.2, 1.0)) ax.set_yscale('log') ax.set_ylabel(metric) ax.set_xticks(parameter_indices) ax.set_xlabel('parameter set #') row_idx += 1 # prepare text param_text_row = 1 / lines_per_row / param_rows cond_text_row = 1 / lines_per_row / condition_rows parameter_text = [ '{}: {}'.format(param_idx, param_set) for param_idx, param_set in parameter_sets.items()] condition_text = [ '{}: {}'.format(condition_idx, condition) for condition_idx, condition in conditions.items()] # parameter text ax = fig.add_subplot(grid[base_rows:base_rows+param_rows, :]) ax.text(0, 1.0, 'parameters') for text_idx, param in enumerate(parameter_text): ax.text(0, 0.9-text_idx*param_text_row, param) ax.axis('off') # condition text ax = fig.add_subplot(grid[base_rows+param_rows:, :]) ax.text(0, 1.0, 'conditions') for text_idx, condition in enumerate(condition_text): ax.text(0, 0.9-text_idx*cond_text_row, condition) ax.axis('off') # save the figure fig_path = os.path.join(out_dir, filename) plt.subplots_adjust(wspace=0.3, hspace=0.5) plt.savefig(fig_path, bbox_inches='tight')
# testing
[docs]class TestConvienceKinetics(Generator): defaults = { 'boundary_path': ('boundary',), 'config': { 'process': { 'reactions': { # reaction1 is the reaction ID 'reaction1': { 'stoichiometry': { # 1 mol A is consumed per mol reaction ('internal', 'A'): -1, ('internal', 'B'): -1, # 2 mol C are produced per mol reaction ('internal', 'C'): 2, }, 'is reversible': False, 'catalyzed by': [ ('internal', 'E'), ], } }, 'kinetic_parameters': { 'reaction1': { ('internal', 'E'): { 'kcat_f': 1e1, # kcat for forward reaction ('internal', 'A'): 1e-1, # km for A ('internal', 'B'): 1e-1, # km for B }, }, }, 'initial_state': { 'internal': { 'A': 1e2, 'B': 1e2, 'C': 1e2, 'E': 1e2, }, 'fluxes': { 'reaction1': 0.0, } }, 'ports': { 'internal': ['A', 'B', 'C', 'E'], 'external': [], }, } } } def __init__(self, config=None): self.config = self.defaults['config'] self.boundary_path = self.or_default(config, 'boundary_path')
[docs] def generate_processes(self, config): process = ConvenienceKinetics(config['process']) return {'process': process}
[docs] def generate_topology(self, config): external_path = self.boundary_path + ('external',) return { 'process': { 'internal': ('internal',), 'external': external_path, 'fields': ('fields',), 'fluxes': ('fluxes',), 'global': self.boundary_path, 'dimensions': ('dimensions',), } }
[docs]def scan_test(): # initialize the compartment compartment = TestConvienceKinetics({}) # parameters to be scanned, and their values scan_params = { ('process', 'kinetic_parameters', 'reaction1', ('internal', 'E'), 'kcat_f'): get_parameters_logspace(1e0, 1e5, 6), } # metrics are the outputs of a scan metrics = [('fluxes', 'reaction1')] # define conditions conditions = [ {'internal': {'E': 1e0}}, {'internal': {'E': 1e1}}, {'internal': {'E': 1e2}}, ] # set up scan options timeline = [(5, {})] sim_settings = { 'timeline': { 'timeline': timeline, 'ports': { 'external': ('boundary', 'external')}}} # run scan scan_config = { 'compartment': compartment, 'scan_parameters': scan_params, 'conditions': conditions, 'metrics': metrics, 'settings': sim_settings} results = parameter_scan(scan_config) return results
if __name__ == '__main__': out_dir = os.path.join('out', 'parameters', 'scan') if not os.path.exists(out_dir): os.makedirs(out_dir) results = scan_test() plot_scan_results(results, out_dir, 'test_scan')