Source code for vivarium.processes.diffusion_field

'''
===============
Diffusion Field
===============
'''

from __future__ import absolute_import, division, print_function

import sys
import os
import argparse

import numpy as np
from scipy import constants
from scipy.ndimage import convolve
import matplotlib.pyplot as plt

from vivarium.core.process import Process
from vivarium.core.composition import (
    simulate_process,
    PROCESS_OUT_DIR
)
from vivarium.library.lattice_utils import (
    count_to_concentration,
    get_bin_site,
    get_bin_volume,
)
from vivarium.library.units import units

from vivarium.plots.multibody_physics import plot_snapshots

NAME = 'diffusion_field'

# laplacian kernel for diffusion
LAPLACIAN_2D = np.array([[0.0, 1.0, 0.0], [1.0, -4.0, 1.0], [0.0, 1.0, 0.0]])
AVOGADRO = constants.N_A


[docs]def gaussian(deviation, distance): return np.exp(-np.power(distance, 2.) / (2 * np.power(deviation, 2.)))
[docs]def make_gradient(gradient, n_bins, size): '''Create a gradient from a configuration **Uniform** A uniform gradient fills the field evenly with each molecule, at the concentrations specified. Example configuration: .. code-block:: python 'gradient': { 'type': 'uniform', 'molecules': { 'mol_id1': 1.0, 'mol_id2': 2.0 }}, **Gaussian** A gaussian gradient multiplies the base concentration of the given molecule by a gaussian function of distance from center and deviation. Distance is scaled by 1/1000 from microns to millimeters. Example configuration: .. code-block:: python 'gradient': { 'type': 'gaussian', 'molecules': { 'mol_id1':{ 'center': [0.25, 0.5], 'deviation': 30}, 'mol_id2': { 'center': [0.75, 0.5], 'deviation': 30} }}, **Linear** A linear gradient sets a site's concentration (c) of the given molecule as a function of distance (d) from center and slope (b), and base concentration (a). Distance is scaled by 1/1000 from microns to millimeters. .. math:: c = a + b * d Example configuration: .. code-block:: python 'gradient': { 'type': 'linear', 'molecules': { 'mol_id1':{ 'center': [0.0, 0.0], 'base': 0.1, 'slope': -10}, 'mol_id2': { 'center': [1.0, 1.0], 'base': 0.1, 'slope': -5} }}, **Exponential** An exponential gradient sets a site's concentration (c) of the given molecule as a function of distance (d) from center, with parameters base (b) and scale (a). Distance is scaled by 1/1000 from microns to millimeters. Note: base > 1 makes concentrations increase from the center. .. math:: c=a*b^d. Example configuration: .. code-block:: python 'gradient': { 'type': 'exponential', 'molecules': { 'mol_id1':{ 'center': [0.0, 0.0], 'base': 1+2e-4, 'scale': 1.0}, 'mol_id2': { 'center': [1.0, 1.0], 'base': 1+2e-4, 'scale' : 0.1} }}, Parameters: gradient: Configuration dictionary that includes the ``type`` key to specify the type of gradient to make. n_bins: A list of two elements that specify the number of bins to have along each axis. size: A list of two elements that specifies the size of the environment. ''' bins_x = n_bins[0] bins_y = n_bins[1] length_x = size[0] length_y = size[1] fields = {} if gradient.get('type') == 'gaussian': for molecule_id, specs in gradient['molecules'].items(): field = np.ones((bins_x, bins_y), dtype=np.float64) center = [specs['center'][0] * length_x, specs['center'][1] * length_y] deviation = specs['deviation'] for x_bin in range(bins_x): for y_bin in range(bins_y): # distance from middle of bin to center coordinates dx = (x_bin + 0.5) * length_x / bins_x - center[0] dy = (y_bin + 0.5) * length_y / bins_y - center[1] distance = np.sqrt(dx ** 2 + dy ** 2) scale = gaussian(deviation, (distance/1000)) # multiply gradient by scale field[x_bin][y_bin] *= scale fields[molecule_id] = field elif gradient.get('type') == 'linear': for molecule_id, specs in gradient['molecules'].items(): field = np.zeros((bins_x, bins_y), dtype=np.float64) center = [specs['center'][0] * length_x, specs['center'][1] * length_y] base = specs.get('base', 0.0) slope = specs['slope'] for x_bin in range(bins_x): for y_bin in range(bins_y): dx = (x_bin + 0.5) * length_x / bins_x - center[0] dy = (y_bin + 0.5) * length_y / bins_y - center[1] distance = np.sqrt(dx ** 2 + dy ** 2) field[x_bin][y_bin] += base + slope * (distance/1000) fields[molecule_id] = field elif gradient.get('type') == 'exponential': for molecule_id, specs in gradient['molecules'].items(): field = np.zeros((bins_x, bins_y), dtype=np.float64) center = [specs['center'][0] * length_x, specs['center'][1] * length_y] base = specs['base'] scale = specs.get('scale', 1) for x_bin in range(bins_x): for y_bin in range(bins_y): dx = (x_bin + 0.5) * length_x / bins_x - center[0] dy = (y_bin + 0.5) * length_y / bins_y - center[1] distance = np.sqrt(dx ** 2 + dy ** 2) field[x_bin][y_bin] = scale * base ** (distance/1000) fields[molecule_id] = field elif gradient.get('type') == 'uniform': for molecule_id, fill_value in gradient['molecules'].items(): fields[molecule_id] = np.full((bins_x, bins_y), fill_value, dtype=np.float64) return fields
[docs]class DiffusionField(Process): ''' Diffusion in 2-dimensional fields of molecules with agent exchange Agent uptake and secretion occurs at agent locations. Notes: * Diffusion constant of glucose in 0.5 and 1.5 percent agarose gel is around :math:`6 * 10^{-10} \\frac{m^2}{s}` (Weng et al. 2005. Transport of glucose and poly(ethylene glycol)s in agarose gels). * Conversion to micrometers: :math:`6 * 10^{-10} \\frac{m^2}{s}=600 \\frac{micrometers^2}{s}`. ''' name = NAME defaults = { 'time_step': 1, 'molecules': ['glc'], 'initial_state': {}, 'n_bins': [10, 10], 'bounds': [10, 10], 'depth': 3000.0, # um 'diffusion': 5e-1, 'gradient': {}, 'agents': {}, } def __init__(self, initial_parameters=None): if initial_parameters is None: initial_parameters = {} # initial state self.molecule_ids = initial_parameters.get('molecules', self.defaults['molecules']) self.initial_state = initial_parameters.get('initial_state', self.defaults['initial_state']) # parameters self.n_bins = initial_parameters.get('n_bins', self.defaults['n_bins']) self.bounds = initial_parameters.get('bounds', self.defaults['bounds']) depth = initial_parameters.get('depth', self.defaults['depth']) # diffusion diffusion = initial_parameters.get('diffusion', self.defaults['diffusion']) bins_x = self.n_bins[0] bins_y = self.n_bins[1] length_x = self.bounds[0] length_y = self.bounds[1] dx = length_x / bins_x dy = length_y / bins_y dx2 = dx * dy self.diffusion = diffusion / dx2 self.diffusion_dt = 0.01 # self.diffusion_dt = 0.5 * dx ** 2 * dy ** 2 / (2 * self.diffusion * (dx ** 2 + dy ** 2)) # volume, to convert between counts and concentration self.bin_volume = get_bin_volume(self.n_bins, self.bounds, depth) # initialize gradient fields gradient = initial_parameters.get('gradient', self.defaults['gradient']) if gradient: gradient_fields = make_gradient(gradient, self.n_bins, self.bounds) self.initial_state.update(gradient_fields) # agents self.initial_agents = initial_parameters.get('agents', self.defaults['agents']) parameters = {} parameters.update(initial_parameters) super(DiffusionField, self).__init__(parameters)
[docs] def ports_schema(self): local_concentration_schema = { molecule: { '_default': 0.0, '_updater': 'set'} for molecule in self.molecule_ids} schema = {'agents': {}} for agent_id, states in self.initial_agents.items(): location = states['boundary'].get('location', []) schema['agents'][agent_id] = { 'boundary': { 'location': { '_value': location}, } } glob_schema = { '*': { 'boundary': { 'location': { '_default': [0.5 * bound for bound in self.bounds], '_updater': 'set'}, 'external': local_concentration_schema}}} schema['agents'].update(glob_schema) # fields fields_schema = { 'fields': { field: { '_value': self.initial_state.get(field, self.ones_field()), '_updater': 'accumulate', '_emit': True, } for field in self.molecule_ids }, } schema.update(fields_schema) # dimensions dimensions_schema = { 'dimensions': { 'bounds': { '_value': self.parameters['bounds'], '_updater': 'set', '_emit': True, }, 'n_bins': { '_value': self.parameters['n_bins'], '_updater': 'set', '_emit': True, }, 'depth': { '_value': self.parameters['depth'], '_updater': 'set', '_emit': True, } }, } schema.update(dimensions_schema) return schema
[docs] def next_update(self, timestep, states): fields = states['fields'] agents = states['agents'] # diffuse field delta_fields = self.diffuse(fields, timestep) # get each agent's local environment local_environments = self.get_local_environments(agents, fields) update = {'fields': delta_fields} if local_environments: update.update({'agents': local_environments}) return update
[docs] def count_to_concentration(self, count): return count_to_concentration( count * units.count, self.bin_volume * units.L ).to(units.mmol / units.L).magnitude
[docs] def get_bin_site(self, location): return get_bin_site(location, self.n_bins, self.bounds)
[docs] def get_single_local_environments(self, specs, fields): bin_site = self.get_bin_site(specs['location']) local_environment = {} for mol_id, field in fields.items(): local_environment[mol_id] = field[bin_site] return local_environment
[docs] def get_local_environments(self, agents, fields): local_environments = {} if agents: for agent_id, specs in agents.items(): local_environments[agent_id] = {'boundary': {}} local_environments[agent_id]['boundary']['external'] = \ self.get_single_local_environments(specs['boundary'], fields) return local_environments
[docs] def ones_field(self): return np.ones((self.n_bins[0], self.n_bins[1]), dtype=np.float64)
# diffusion functions
[docs] def diffusion_delta(self, field, timestep): ''' calculate concentration changes cause by diffusion''' field_new = field.copy() t = 0.0 dt = min(timestep, self.diffusion_dt) while t < timestep: field_new += self.diffusion * dt * convolve(field_new, LAPLACIAN_2D, mode='reflect') t += dt return field_new - field
[docs] def diffuse(self, fields, timestep): delta_fields = {} for mol_id, field in fields.items(): # run diffusion if molecule field is not uniform if len(set(field.flatten())) != 1: delta = self.diffusion_delta(field, timestep) else: delta = np.zeros_like(field) delta_fields[mol_id] = delta return delta_fields
# testing
[docs]def get_random_field_config(config={}): bounds = config.get('bounds', (20, 20)) n_bins = config.get('n_bins', (10, 10)) return { 'molecules': ['glc'], 'initial_state': { 'glc': np.random.rand(n_bins[0], n_bins[1])}, 'n_bins': n_bins, 'bounds': bounds}
[docs]def get_gaussian_config(config={}): molecules = config.get('molecules', ['glc']) bounds = config.get('bounds', (50, 50)) n_bins = config.get('n_bins', (20, 20)) center = config.get('center', [0.5, 0.5]) deviation = config.get('deviation', 5) diffusion = config.get('diffusion', 5e-1) return { 'molecules': molecules, 'n_bins': n_bins, 'bounds': bounds, 'diffusion': diffusion, 'gradient': { 'type': 'gaussian', 'molecules': { 'glc': { 'center': center, 'deviation': deviation}}}}
[docs]def get_exponential_config(config={}): molecules = config.get('molecules', ['glc']) bounds = config.get('bounds', (40, 40)) n_bins = config.get('n_bins', (20, 20)) center = config.get('center', [1.0, 1.0]) base = config.get('base', 1 + 2e-4) scale = config.get('scale', 0.1) diffusion = config.get('diffusion', 1e1) return { 'molecules': molecules, 'n_bins': n_bins, 'bounds': bounds, 'diffusion': diffusion, 'gradient': { 'type': 'exponential', 'molecules': { 'glc': { 'center': center, 'base': base, 'scale': scale}}}}
[docs]def get_secretion_agent_config(config={}): molecules = config.get('molecules', ['glc']) bounds = config.get('bounds', (20, 20)) n_bins = config.get('n_bins', (10, 10)) depth = config.get('depth', 30) n_agents = config.get('n_agents', 3) agents = {} for agent in range(n_agents): agent_id = str(agent) agents[agent_id] = { 'boundary': { 'location': [ np.random.uniform(0, bounds[0]), np.random.uniform(0, bounds[1])], 'external': { mol_id: 0 for mol_id in molecules}}} return { 'molecules': molecules, 'n_bins': n_bins, 'bounds': bounds, 'depth': depth, 'agents': agents, 'initial_state': { mol_id: np.ones((n_bins[0], n_bins[1])) for mol_id in molecules}}
[docs]def test_diffusion_field(config=get_gaussian_config(), time=10): diffusion = DiffusionField(config) settings = { 'return_raw_data': True, 'total_time': time, 'timestep': 1} return simulate_process(diffusion, settings)
[docs]def plot_fields(data, config, out_dir='out', filename='fields'): fields = {time: time_data['fields'] for time, time_data in data.items()} snapshots_data = { 'fields': fields, 'config': config} plot_config = { 'out_dir': out_dir, 'filename': filename} plot_snapshots(snapshots_data, plot_config)
if __name__ == '__main__': out_dir = os.path.join(PROCESS_OUT_DIR, NAME) if not os.path.exists(out_dir): os.makedirs(out_dir) parser = argparse.ArgumentParser(description='diffusion_field') parser.add_argument('--random', '-r', action='store_true', default=False) parser.add_argument('--gaussian', '-g', action='store_true', default=False) parser.add_argument('--exponential', '-e', action='store_true', default=False) parser.add_argument('--secretion', '-s', action='store_true', default=False) args = parser.parse_args() no_args = (len(sys.argv) == 1) if args.random or no_args: config = get_random_field_config() data = test_diffusion_field(config, 10) plot_fields(data, config, out_dir, 'random_field') if args.gaussian or no_args: config = get_gaussian_config() data = test_diffusion_field(config, 10) plot_fields(data, config, out_dir, 'gaussian_field') if args.exponential or no_args: config = get_exponential_config() data = test_diffusion_field(config, 20) plot_fields(data, config, out_dir, 'exponential_field') if args.secretion or no_args: config = get_secretion_agent_config() data = test_diffusion_field(config, 10) plot_fields(data, config, out_dir, 'secretion')