from __future__ import absolute_import, division, print_function
import os
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from vivarium.core.process import Process
from vivarium.core.composition import simulate_process
[docs]class CellularPotts(Process):
"""
Cellular Potts model
To run with animation on set animate: True, and use the TKAgg matplotlib backend:
> MPLBACKEND=TKAgg python vivarium/processes/cellular_potts.py
"""
name = 'cellular_potts'
defaults = {
'grid_size': (10, 10),
'n_agents': 1,
'target_area': 10
}
def __init__(self, initial_parameters=None):
if initial_parameters is None:
initial_parameters = {}
grid_size = initial_parameters.get('grid_size', self.defaults['grid_size'])
n_initial = initial_parameters.get('n_agents', self.defaults['n_agents'])
self.init_target_area = initial_parameters.get('target_area', self.defaults['target_area'])
# configure CPM
cpm_config = {
'n_initial': n_initial,
'grid_size': grid_size,
'target_area': self.init_target_area}
self.cpm = CPM(cpm_config)
# animate (for debugging)
self.animate = initial_parameters.get('animate', False)
if self.animate:
plt.ion()
self.animate_frame()
# parameters
parameters = {}
parameters.update(initial_parameters)
super(CellularPotts, self).__init__(parameters)
[docs] def ports_schema(self):
default_state = {
agent_id: {
'area': {'_default': area},
'area_target': {'_default': self.init_target_area}}
for agent_id, area in self.cpm.get_agents_areas(self.cpm.grid).items()}
glob_schema = {
'*': {
'area': {
# '_default': self.init_target_area,
'_updater': 'set'
},
'area_target': {
# '_default': self.init_target_area,
'_updater': 'set'}}}
schema = {'agents': glob_schema}
schema['agents'].update(default_state)
return schema
[docs] def animate_frame(self):
if self.animate:
plt.imshow(self.cpm.grid)
plt.axis('off')
plt.pause(0.0001)
[docs] def next_update(self, timestep, states):
agents = states['agents']
area_target = {
agent_id: agents[agent_id]['area_target']
for agent_id in self.cpm.agent_ids} # TODO -- get target areas from state
self.cpm.update_target_areas(area_target)
self.cpm.update()
self.animate_frame()
return {}
[docs]class CPM(object):
def __init__(self, config):
# CPM parameters
self.temperature = config.get('temperature', 1e2)
self.area_constant = config.get('area_constant', 40)
self.adhesion_baseline = 60
self.adhesion_matrix = np.array([])
# make the grid
self.grid_size = config.get('grid_size')
self.grid = np.zeros(self.grid_size, dtype=np.int)
self.n_sites = self.grid.size
# make agents, place in grid
n_initial = config.get('n_initial')
target_area = config.get('target_area')
self.agent_ids = [
agent_id for agent_id in range(1, n_initial+1)]
for agent_id in self.agent_ids:
# pick a site, fill if it is empty
filled = False
while not filled:
x, y = self.random_site()
if self.grid[x][y] == 0:
self.grid[x][y] = agent_id
neighbors = self.neighbor_sites((x, y))
# set all neighbors to same agent_id
for neighbor in neighbors:
self.grid[neighbor[0],neighbor[1]] = agent_id
filled = True
# target areas
self.target_areas = {
agent_id: target_area for agent_id in self.agent_ids}
# make adhesion matrix for agent_ids and medium
self.update_adhesion_matrix()
[docs] def update_adhesion_matrix(self):
self.adhesion_matrix = np.full((len(self.agent_ids)+1, len(self.agent_ids)+1), self.adhesion_baseline)
for agent_id in self.agent_ids:
self.adhesion_matrix[agent_id, agent_id] = 1 # TODO -- set self-adhesion
[docs] def random_site(self):
x = random.randint(0, self.grid_size[0] - 1)
y = random.randint(0, self.grid_size[1] - 1)
return (x, y)
[docs] def neighbor_sites(self, site):
"""
return the (x, y) of all neighboring sites
"""
x, y = site
neighbors = [
(n_x, n_y)
for n_x in range(x-1,x+2)
for n_y in range(y-1,y+2)
if n_x>=0 and n_x<self.grid_size[0] and n_y>=0 and n_y<self.grid_size[1] and (n_x, n_y) != site]
return neighbors
[docs] def random_neighbor(self, site):
"""
return a random neighbor, without wrapping
"""
neighbors = self.neighbor_sites(site)
return random.choice(neighbors)
[docs] def get_agent_area(self, agent_id, grid):
return np.count_nonzero(grid == agent_id)
[docs] def get_agents_areas(self, grid):
areas = {}
for agent_id in self.agent_ids:
areas[agent_id] = self.get_agent_area(agent_id, grid)
return areas
[docs] def inverse_kronecker_delta(self, value1, value2):
"""
Returns 0 if the values are the same, and 1 if they are different.
Keeps neighboring sites with the same value from contributing to the effective energy
"""
if value1 == value2:
return 0
else:
return 1
[docs] def neighbor_values(self, site, grid):
neighbors = self.neighbor_sites(site)
values = [grid[site] for site in neighbors]
return values
[docs] def get_interactions(self, site, grid):
interactions = 0
site_value = grid[site]
neighbor_values = self.neighbor_values(site, grid)
for value in neighbor_values:
interactions += self.adhesion_matrix[site_value, value] * self.inverse_kronecker_delta(site_value, value)
return interactions
[docs] def effective_energy(self, grid):
hamiltonian = 0.0
for x in range(self.grid_size[0]):
for y in range(self.grid_size[1]):
site = (x, y)
# interaction constraints
hamiltonian += self.get_interactions(site, grid)
# area constraints
areas = self.get_agents_areas(grid)
area_constraints = [
self.area_constant*(areas[agent_id] - self.target_areas[agent_id])**2
for agent_id in self.agent_ids]
hamiltonian += sum(area_constraints)
return hamiltonian
[docs] def boltzmann_acceptance_function(self, energy):
accept = False
if energy < 0:
accept = True
elif random.random() < np.exp(-energy / self.temperature):
accept = True
return accept
[docs] def mutate(self, grid):
"""
choose a random site and a random neighboring site.
If they have the same values, change the site to its neighbor's value.
If a successful mutation is made, return True
"""
x, y = self.random_site()
n_x, n_y = self.random_neighbor((x, y))
value = grid[x][y]
n_value = grid[n_x][n_y]
if value != n_value:
grid[x][y] = n_value
return True
else:
return False
[docs] def update_target_areas(self, areas):
self.target_areas.update(areas)
[docs] def update(self):
"""
Metropolis Monte Carlo.
Attempt as many updates as there are sites in the grid
"""
H_before = self.effective_energy(self.grid)
for update in range(self.n_sites):
grid_copy = self.grid.copy()
if self.mutate(grid_copy):
H_after = self.effective_energy(grid_copy)
# change in effective energy
dH = H_after - H_before
if self.boltzmann_acceptance_function(dH):
self.grid = grid_copy
H_before = H_after
# test functions
[docs]def get_cpm_config():
return {
'n_agents': 4,
'grid_size': (30, 30),
'target_area': 25,
'animate': True}
[docs]def get_cpm_minimum_config():
return {
'n_agents': 2,
'grid_size': (20, 20),
'target_area': 10,
'animate': True}
[docs]def run_CPM(cpm_config = get_cpm_minimum_config(), time=5):
# load process
CPM = CellularPotts(cpm_config)
settings = {
'return_raw_data': True,
'total_time': 10,
'timestep': 1}
return simulate_process(CPM, settings)
if __name__ == '__main__':
cpm_config = get_cpm_config()
saved_data = run_CPM(cpm_config, 30)