Source code for vivarium.processes.diffusion_network

from __future__ import absolute_import, division, print_function

import os

import matplotlib.pyplot as plt
import numpy as np

from vivarium.library.dict_utils import deep_merge
from vivarium.core.process import Process
from vivarium.core.composition import (
    simulate_process_in_experiment,
    plot_simulation_output,
    PROCESS_OUT_DIR
)


NAME = 'diffusion_network'

[docs]def field_from_locations_series(locations_series, molecule_ids, n_bins, times): n_times = len(times) field_series = { mol_id: np.zeros((n_times, n_bins[0], n_bins[1]), dtype=np.float64) for mol_id in molecule_ids} for molecule_id in molecule_ids: for time_index in range(n_times): field = np.empty((n_bins), dtype=np.float64) for x in range(n_bins[0]): for y in range(n_bins[1]): field[x, y] = locations_series[(x, y)][molecule_id][time_index] field_series[molecule_id][time_index] = field return field_series
[docs]def check_in_set(set_list, set): in_set = False for edge in set_list: if set[0] in edge and set[1] in edge: in_set = True return in_set
[docs]def make_location_network(n_bins): bins_x = n_bins[0] bins_y = n_bins[1] locations = [] adjacent_edges = set() # make location dict for x in range(bins_x): for y in range(bins_y): locations.append((x,y)) # make adjacency list for x in range(bins_x): for y in range(bins_y): if y > 0: south = ((x, y), (x, y-1)) if not check_in_set(adjacent_edges, south): adjacent_edges.add(south) if y < bins_y-1: north = ((x, y), (x, y+1)) if not check_in_set(adjacent_edges, north): adjacent_edges.add(north) if x > 0: west = ((x, y), (x-1, y)) if not check_in_set(adjacent_edges, west): adjacent_edges.add(west) if x < bins_x-1: east = ((x, y), (x+1, y)) if not check_in_set(adjacent_edges, east): adjacent_edges.add(east) return locations, adjacent_edges
[docs]class DiffusionNetwork(Process): '''''' name = NAME defaults = { 'molecules': ['glc'], 'diffusion': 1e-2, 'initial_state': {}, 'membrane_composition': {}, 'concentrations': {}, 'channels': {}, 'network': {}, } def __init__(self, initial_parameters=None): if initial_parameters is None: initial_parameters = {} # parameters molecule_ids = initial_parameters.get('molecules', self.defaults['molecules']) diffusion = initial_parameters.get('diffusion', self.defaults['diffusion']) # initial state initial_state = initial_parameters.get('initial_state', self.defaults['initial_state']) self.initial_membrane = initial_state.get('membrane_composition', self.defaults['membrane_composition']) concentrations = initial_state.get('concentrations', self.defaults['concentrations']) # membrane channels channels = initial_parameters.get('channels', self.defaults['channels']) # make diffusion network network = initial_parameters.get('network', self.defaults['network']) if network.get('type') == 'grid': n_bins = network.get('n_bins', (2,1)) size = network.get('size', (2, 1)) membrane_edges = network.get('membrane_edges', []) bins_x = n_bins[0] bins_y = n_bins[1] length_x = size[0] length_y = size[1] locations, edges = make_location_network(n_bins) # diffusion settings dx = length_x / bins_x dy = length_y / bins_y dx2 = dx * dy diffusion / dx2 # diffusion_dt = 0.5 * dx ** 2 * dy ** 2 / (2 * diffusion * (dx ** 2 + dy ** 2)) else: locations = network.get('locations') edges = network.get('edges') membrane_edges = network.get('membrane_edges', []) self.nodes = locations self.edges = edges self.molecule_ids = molecule_ids self.diffusion = diffusion self.membrane_edges = membrane_edges self.channel_diffusion = channels # initialize concentrations at each location self.initial_concentrations = {loc: {mol_id: 0.0 for mol_id in self.molecule_ids} for loc in locations} for site, concs in concentrations.items(): self.initial_concentrations[site] = deep_merge(self.initial_concentrations[site], concs) parameters = {} parameters.update(initial_parameters) super(DiffusionNetwork, self).__init__(parameters)
[docs] def ports_schema(self): initial_state = { 'membrane_composition': self.initial_membrane} initial_state.update(self.initial_concentrations) schema = { port: { state: { '_default': value, '_emit': True} for state, value in states.items()} for port, states in initial_state.items()} return schema
[docs] def next_update(self, timestep, states): concentrations = { state_id: concs for state_id, concs in states.items() if state_id is not 'membrane_composition'} membrane = states['membrane_composition'] diffusion_delta = self.diffusion_delta(concentrations, membrane, timestep) return diffusion_delta
[docs] def diffusion_delta(self, locations, membrane, timestep): diffusion_delta = { site: {mol_id: 0 for mol_id in self.molecule_ids} for site in self.nodes} for edge in self.edges: node1 = edge[0] node2 = edge[1] concs1 = locations[node1] concs2 = locations[node2] if edge in self.membrane_edges or (edge[1], edge[0]) in self.membrane_edges: diffusion = 0 for channel_id, channel_conc in membrane.items(): diffusion += channel_conc * self.channel_diffusion[channel_id] diffusion = min(diffusion, self.diffusion) else: diffusion = self.diffusion for mol_id in self.molecule_ids: delta = diffusion * timestep * (concs2[mol_id] - concs1[mol_id]) diffusion_delta[node1][mol_id] += delta diffusion_delta[node2][mol_id] -= delta return diffusion_delta
# testing functions
[docs]def get_two_compartment_config(): initial_state = { 'membrane_composition': { 'porin': 5}, 'concentrations': { 'external': { 'glc': 20.0}, 'internal': { 'glc': 0.0} }} return { 'initial_state': initial_state, 'molecules': ['glc'], 'network': { 'type': 'standard', 'locations': ['external', 'internal'], 'edges': [('external', 'internal')], 'membrane_edges': [('external', 'internal')], }, 'channels':{ 'porin': 5e-2 # diffusion rate through porin }, 'diffusion': 1e-1}
[docs]def get_grid_config(): initial_state = { 'membrane_composition': { 'porin': 1e2}, 'concentrations': { (1, 1): { 'glc': 20.0}, (1, 2): { 'glc': 20.0}, (2, 1): { 'glc': 20.0}, (2, 2): { 'glc': 20.0}, (3, 1): { 'glc': 20.0}, (3, 2): { 'glc': 20.0}, }} return { 'initial_state': initial_state, 'molecules': ['glc'], 'network': { 'type': 'grid', 'n_bins': (6, 4), 'size': (6, 4), 'membrane_edges': [ # left ((0, 1), (1, 1)), ((0, 2), (1, 2)), # top ((1, 3), (1, 2)), ((2, 3), (2, 2)), ((3, 3), (3, 2)), # ((4, 3), (4, 2)), # right # ((4, 2), (5, 2)), ((4, 1), (5, 1)), # bottom # ((1, 0), (1, 1)), ((2, 0), (2, 1)), ((3, 0), (3, 1)), ((4, 0), (4, 1)), ], }, 'channels': { 'porin': 1e-4 # diffusion rate through porin }, 'diffusion': 1e0}
[docs]def test_diffusion_network(config = get_two_compartment_config(), end_time=10): diffusion = DiffusionNetwork(config) settings = { 'timestep': 0.2, 'total_time': end_time} return simulate_process_in_experiment(diffusion, settings)
[docs]def plot_diffusion_field_output(data, config, out_dir='out', filename='field'): n_snapshots = 6 # parameters molecules = config.get('molecules', {}) molecule_ids = list(molecules) n_fields = len(molecule_ids) # get network configuration network = config.get('network') assert network['type'] == 'grid' membrane_edges = network.get('membrane_edges') n_bins = network.get('n_bins') size = network.get('size') bins_x = n_bins[0] bins_y = n_bins[1] length_x = size[0] length_y = size[1] bin_size_x = length_x / bins_x bin_size_y = length_y / bins_y # data times = data.get('time') locations_series = {(x,y): data[(x,y)] for x in range(bins_x) for y in range(bins_y)} field_series = field_from_locations_series(locations_series, molecule_ids, n_bins, times) # plot fields time_vec = times plot_steps = np.round(np.linspace(0, len(time_vec) - 1, n_snapshots)).astype(int).tolist() # make figure fig = plt.figure(figsize=(20 * n_snapshots, 10*n_fields)) grid = plt.GridSpec(n_fields, n_snapshots, wspace=0.2, hspace=0.2) plt.rcParams.update({'font.size': 36}) for mol_idx, mol_id in enumerate(molecule_ids): field_data = field_series[mol_id] vmin = np.amin(field_data) vmax = np.amax(field_data) for time_index, time_step in enumerate(plot_steps, 0): this_field = field_data[time_index] ax = fig.add_subplot(grid[mol_idx, time_index]) # grid is (row, column) ax.set(xlim=[0, length_x], ylim=[0, length_y], aspect=1) ax.set_yticklabels([]) ax.set_xticklabels([]) # plot field plt.imshow(np.transpose(this_field), vmin=vmin, vmax=vmax, origin='lower', extent=[0, length_x, 0, length_y], interpolation='nearest', cmap='YlGn') # plot membrane for membrane in membrane_edges: site1 = membrane[0] site2 = membrane[1] middle = ( (site1[0]+site2[0])/2 + 0.5, (site1[1]+site2[1])/2 + 0.5) delta = ( (site1[1] - site2[1]), (site1[0] - site2[0])) # swap x and y point_0 = ( middle[0] + delta[0]/2, middle[1] + delta[1]/2) point_1 = ( (middle[0] - delta[0]/2), (middle[1] - delta[1]/2)) x_0 = point_0[0] * bin_size_x y_0 = point_0[1] * bin_size_y x_1 = point_1[0] * bin_size_x y_1 = point_1[1] * bin_size_y plt.plot([x_0, x_1], [y_0, y_1], linewidth=5, color='r') fig_path = os.path.join(out_dir, filename) plt.subplots_adjust(wspace=0.7, hspace=0.1) plt.savefig(fig_path, bbox_inches='tight') plt.close(fig)
if __name__ == '__main__': out_dir = os.path.join(PROCESS_OUT_DIR, NAME) if not os.path.exists(out_dir): os.makedirs(out_dir) # 2-site network config = get_two_compartment_config() timeseries = test_diffusion_network(config, 20) plot_simulation_output(timeseries, {}, out_dir, '2_sites') # grid network config = get_grid_config() timeseries = test_diffusion_network(config, 30) plot_diffusion_field_output(timeseries, config, out_dir, 'grid')