from __future__ import absolute_import, division, print_function
from pymongo import MongoClient
from confluent_kafka import Producer
import json
import copy
try:
from urllib.parse import quote_plus # Python 3
except ImportError:
from urllib import quote_plus # Python 2
from vivarium.library.dict_utils import (
merge_dicts, value_in_embedded_dict, get_path_list_from_dict, \
get_value_from_path, make_path_dict)
HISTORY_INDEXES = [
'time',
'type',
'simulation_id',
'experiment_id']
CONFIGURATION_INDEXES = [
'type',
'simulation_id',
'experiment_id']
SECRETS_PATH = 'secrets.json'
[docs]def delivery_report(err, msg):
"""
This is a utility method passed to the Kafka Producer to handle the delivery
of messages sent using `send(topic, message)`.
"""
if err is not None:
print('message delivery failed: {}'.format(msg))
print('failed message: {}'.format(err))
[docs]def create_indexes(table, columns):
'''Create all of the necessary indexes for the given table name.'''
for column in columns:
table.create_index(column)
[docs]def get_emitter(config):
'''Get an emitter based on the provided config.
The available emitter type names and their classes are:
* ``kafka``: :py:class:`KafkaEmitter`
* ``database``: :py:class:`DatabaseEmitter`
* ``null``: :py:class:`NullEmitter`
* ``timeseries``: :py:class:`TimeSeriesEmitter`
Arguments:
config (dict): Requires three keys:
* type: Type of emitter ('kafka' for a kafka emitter).
* emitter: Any configuration the emitter type requires to initialize.
* keys: A list of state keys to emit for each state label.
Returns:
Emitter: An instantiated emitter.
'''
if config is None:
config = {'type': 'print'}
emitter_type = config.get('type', 'print')
if emitter_type == 'kafka':
emitter = KafkaEmitter(config)
elif emitter_type == 'database':
emitter = DatabaseEmitter(config)
elif emitter_type == 'null':
emitter = NullEmitter(config)
elif emitter_type == 'timeseries':
emitter = TimeSeriesEmitter(config)
else:
emitter = Emitter(config)
return emitter
[docs]def path_timeseries_from_data(data):
embedded_timeseries = timeseries_from_data(data)
return path_timeseries_from_embedded_timeseries(embedded_timeseries)
[docs]def path_timeseries_from_embedded_timeseries(embedded_timeseries):
times_vector = embedded_timeseries['time']
path_timeseries = make_path_dict({key: value for key, value in embedded_timeseries.items() if key != 'time'})
path_timeseries['time'] = times_vector
return path_timeseries
[docs]def time_indexed_timeseries_from_data(data):
times_vector = list(data.keys())
embedded_timeseries = {}
for time_index, (time, value) in enumerate(data.items()):
if isinstance(value, dict):
embedded_timeseries = value_in_embedded_dict(value, embedded_timeseries, time_index)
else:
pass
embedded_timeseries['time'] = times_vector
return embedded_timeseries
[docs]def timeseries_from_data(data):
times_vector = list(data.keys())
embedded_timeseries = {}
for time, value in data.items():
if isinstance(value, dict):
embedded_timeseries = value_in_embedded_dict(value, embedded_timeseries)
else:
pass
embedded_timeseries['time'] = times_vector
return embedded_timeseries
[docs]class Emitter(object):
'''
Emit data to terminal
'''
def __init__(self, config):
self.config = config
[docs] def emit(self, data):
print(data)
[docs] def get_data(self):
return []
[docs] def get_path_timeseries(self):
return path_timeseries_from_data(self.get_data())
[docs] def get_timeseries(self):
return timeseries_from_data(self.get_data())
[docs]class NullEmitter(Emitter):
'''
Don't emit anything
'''
[docs] def emit(self, data):
pass
[docs]class TimeSeriesEmitter(Emitter):
def __init__(self, config):
keys = config.get('keys', {})
self.saved_data = {}
[docs] def emit(self, data):
# save history data
if data['table'] == 'history':
emit_data = data['data']
time = emit_data.pop('time')
self.saved_data[time] = emit_data
[docs] def get_data(self):
return self.saved_data
[docs]class KafkaEmitter(Emitter):
'''
Emit data to kafka
Example:
>>> config = {
... 'host': 'localhost:9092',
... 'topic': 'EMIT',
... }
>>> emitter = KafkaEmitter(config)
'''
def __init__(self, config):
self.config = config
self.producer = Producer({
'bootstrap.servers': self.config['host']})
[docs] def emit(self, data):
encoded = json.dumps(data, ensure_ascii=False).encode('utf-8')
self.producer.produce(
self.config['topic'],
encoded,
callback=delivery_report)
self.producer.flush(timeout=0.1)
[docs]class DatabaseEmitter(Emitter):
'''
Emit data to a mongoDB database
Example:
>>> config = {
... 'host': 'localhost:27017',
... 'database': 'DB_NAME',
... }
>>> # The line below works only if you have to have 27017 open locally
>>> # emitter = DatabaseEmitter(config)
'''
client = None
default_host = 'localhost:27017'
def __init__(self, config):
self.config = config
self.experiment_id = config.get('experiment_id')
# create singleton instance of mongo client
if DatabaseEmitter.client is None:
DatabaseEmitter.client = MongoClient(config.get('host', self.default_host))
self.db = getattr(self.client, config.get('database', 'simulations'))
self.history = getattr(self.db, 'history')
self.configuration = getattr(self.db, 'configuration')
self.phylogeny = getattr(self.db, 'phylogeny')
create_indexes(self.history, HISTORY_INDEXES)
create_indexes(self.configuration, CONFIGURATION_INDEXES)
create_indexes(self.phylogeny, CONFIGURATION_INDEXES)
[docs] def emit(self, data_config):
data = data_config['data']
data.update({
'experiment_id': self.experiment_id})
table = getattr(self.db, data_config['table'])
table.insert_one(data)
[docs] def get_data(self):
return get_history_data_db(self.history, self.experiment_id)
[docs]def get_history_data_db(history_collection, experiment_id):
query = {'experiment_id': experiment_id}
raw_data = history_collection.find(query)
raw_data = list(raw_data)
data = {}
for datum in raw_data:
time = datum['time']
data[time] = {
key: value for key, value in datum.items()
if key not in ['_id', 'experiment_id', 'time']}
return data
[docs]def get_atlas_client(secrets_path):
with open(secrets_path, 'r') as f:
secrets = json.load(f)
emitter_config = get_atlas_database_emitter_config(
**secrets['database'])
uri = emitter_config['host']
client = MongoClient(uri)
return client[emitter_config['database']]
[docs]def get_local_client(host, port, database_name):
client = MongoClient('{}:{}'.format(host, port))
return client[database_name]
[docs]def data_from_database(experiment_id, client):
# Retrieve environment config
config_collection = client.configuration
environment_config = config_collection.find_one({
'experiment_id': experiment_id,
'type': 'environment_config',
})
# Retrieve timepoint data
history_collection = client.history
unique_time_objs = history_collection.aggregate([
{
'$match': {
'experiment_id': experiment_id
}
}, {
'$group': {
'_id': {
'time': '$time'
},
'id': {
'$first': '$_id'
}
}
}, {
'$sort': {
'_id.time': 1
}
},
])
unique_time_ids = [
obj['id'] for obj in unique_time_objs
]
data_cursor = history_collection.find({
'_id': {
'$in': unique_time_ids
}
}).sort('time')
raw_data = list(data_cursor)
# Reshape data
data = {
timepoint_dict['time']: {
key: val
for key, val in timepoint_dict.items()
if key != 'time'
}
for timepoint_dict in raw_data
}
return data, environment_config
[docs]def get_atlas_database_emitter_config(
username, password, cluster_subdomain, database
):
username = quote_plus(username)
password = quote_plus(password)
database = quote_plus(database)
uri = (
"mongodb+srv://{}:{}@{}.mongodb.net/"
+ "?retryWrites=true&w=majority"
).format(username, password, cluster_subdomain)
return {
'type': 'database',
'host': uri,
'database': database,
}
[docs]def emit_environment_config(environment_config, emitter):
config = {
'bounds': environment_config['multibody']['bounds'],
'type': 'environment_config',
}
emitter.emit({
'data': config,
'table': 'configuration',
})