Skip to content

Commit

Permalink
completed and tested API for creating an appliance
Browse files Browse the repository at this point in the history
  • Loading branch information
dcvan24 committed Mar 16, 2018
1 parent 9ee7155 commit 4a8fbcb
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 63 deletions.
93 changes: 79 additions & 14 deletions appliance/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from collections import deque
from collections import defaultdict
from container.base import Container


class Appliance:

def __init__(self, id, containers=[], pending=[], **kwargs):
def __init__(self, id, containers=[], **kwargs):
self.__id = id
self.__containers = list(containers)
self.__pending = deque(pending)
self.__dag = ContainerDAG()

@property
def id(self):
Expand All @@ -17,23 +18,87 @@ def containers(self):
return self.__containers

@property
def pending(self):
return list(self.__pending)
def dag(self):
return self.__dag

@containers.setter
def containers(self, contrs):
self.__containers = list(contrs)

def enqueue_pending_containers(self, contr_id):
self.__pending.append(contr_id)

def dequeue_pending_container(self):
return self.__pending.popleft()

def to_render(self):
return dict(id=self.id,
containers=[c if isinstance(c, str) else c.to_render()
for c in self.containers])
containers=[c.to_render() for c in self.containers])

def to_save(self):
return dict(id=self.id, pending=self.pending)
return dict(id=self.id, dag=self.dag.to_save())

def __str__(self):
return self.id


class ContainerDAG:

def __init__(self):
self.__containers = {}
self.__parent_map, self.__child_map = None, None

@property
def parent_map(self):
return {k: list(v) for k, v in self.__parent_map.items()}

@property
def child_map(self):
return {k: list(v) for k, v in self.__child_map.items()}

@property
def is_empty(self):
return len(self.__parent_map) == 0

def get_free_containers(self):
return [self.__containers[k] for k, v in self.__parent_map.items() if not v]

def update_container(self, contr):
self.__containers[contr.id] = contr

def remove_container(self, contr_id):
for child in self.__child_map.pop(contr_id, set()):
self.__parent_map.get(child, set()).remove(contr_id)
self.__parent_map.pop(contr_id, None)

def construct_graph(self, contrs):
assert all(isinstance(c, Container) for c in contrs)
parent_map, child_map = defaultdict(set), defaultdict(set)
containers = {c.id: c for c in contrs}
# check dependency validity
for c in contrs:
nonexist_deps = list(filter(lambda c: c not in containers, c.dependencies))
if nonexist_deps:
return 422, None, "Dependencies '%s' do not exist in this appliance"%nonexist_deps
parent_map[c.id].update(c.dependencies)
for d in c.dependencies:
child_map[d].add(c.id)
# check cycles
for c, parents in parent_map.items():
cyclic_deps = ['%s<->%s'%(c, p)
for p in filter(lambda p: c in parent_map[p], parents)]
if cyclic_deps:
return 422, None, "Cycle(s) found: %s"%cyclic_deps
self.__containers = containers
self.__parent_map, self.__child_map = parent_map, child_map
return 200, "DAG constructed successfully", None

def restore(self, dag, contrs):
assert isinstance(dag, dict)
assert all(isinstance(c, Container) for c in contrs)
self.__parent_map = dag.get('parent_map', {})
self.__child_map = dag.get('child_map', {})
self.__containers = {c.id: c for c in contrs if c.id not in self.__parent_map}

def to_save(self):
return dict(parent_map=self.parent_map, child_map=self.child_map)






12 changes: 9 additions & 3 deletions appliance/handler.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import json
import tornado

from tornado.web import RequestHandler

from appliance.manager import ApplianceManager
from appliance.manager import ApplianceManager, ApplianceMonitor
from container.manager import ContainerManager
from util import message, error
from util import Loggable
Expand All @@ -13,10 +14,15 @@ class AppliancesHandler(RequestHandler, Loggable):
def initialize(self, config):
self.__app_mgr = ApplianceManager(config)
self.__contr_mgr = ContainerManager(config)
self.__app_monitor = ApplianceMonitor(self.__app_mgr, self.__contr_mgr)

async def post(self):
raise NotImplemented

data = tornado.escape.json_decode(self.request.body)
status, app, err = await self.__app_mgr.create_appliance(data)
self.set_status(status)
self.write(json.dumps(app.to_render() if status == 201 else error(err)))
if status == 201:
self.__app_monitor.spawn(app)

class ApplianceHandler(RequestHandler, Loggable):

Expand Down
100 changes: 94 additions & 6 deletions appliance/manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import tornado
import functools

from tornado.ioloop import PeriodicCallback

from util import Singleton, Loggable, MotorClient, SecureAsyncHttpClient
from appliance.base import Appliance
from container.base import ContainerType, ContainerState
from container.manager import ContainerManager


Expand All @@ -19,23 +25,41 @@ async def get_appliance(self, app_id):
_, app.containers, _ = await self.__contr_mgr.get_containers(app_id)
return 200, app, None

async def create_appliance(self, app):
pass
async def create_appliance(self, data):
status, _, _ = await self._get_appliance_from_db(data['id'])
if status == 200:
return 409, None, "Appliance '%s' already exists"%data['id']
status, app, err = self._instantiate_appliance(data)
if err:
self.logger.error(err)
return status, None, err
for c in app.containers:
status, msg, err = await self.__contr_mgr.create_container(c.to_render())
if err:
self.logger.error(err)
return status, None, err
status, msg, err = await self.save_appliance(app, True)
if err:
self.logger.error(err)
return status, None, err
self.logger.info(msg)
return 201, app, None

async def delete_appliance(self, app_id):
status, app, err = await self._get_appliance_from_db(app_id)
if err:
self.logger.error(err)
return status, None, err
status, msg, err = await self.__contr_mgr.delete_containers(appliance=app_id)
status, msg, err = await self.__contr_mgr.delete_containers(appliance=app_id,
type=ContainerType.JOB.value)
if err:
self.logger.error(err)
return 400, None, "Failed to deprovision containers of appliance '%s'"%app_id
return 400, None, "Failed to deprovision jobs of appliance '%s'"%app_id
self.logger.info(msg)
status, msg, err = await self._deprovision_group(app_id)
if err:
if err and status != 404:
self.logger.error(err)
return 400, None, "Failed to deprovision group of appliance '%s'"%app_id
return 400, None, "Failed to deprovision appliance '%s'"%app_id
self.logger.info(msg)
status, msg, err = await self._delete_appliance_from_db(app_id)
if err:
Expand All @@ -46,6 +70,10 @@ async def delete_appliance(self, app_id):

async def save_appliance(self, app, upsert=True):
await self.__app_col.replace_one(dict(id=app.id), app.to_save(), upsert=upsert)
return 200, "Appliance '%s' has been saved"%app, None

async def restore_appliances(self):
raise NotImplemented

async def _get_appliance_from_db(self, app_id):
app = await self.__app_col.find_one(dict(id=app_id))
Expand All @@ -64,3 +92,63 @@ async def _deprovision_group(self, app_id):
return status, None, err
return 200, "Services of appliance '%s' have been deprovisioned"%app_id, None

def _instantiate_appliance(self, data):
app_id, containers = data['id'], data['containers']
try:
app = Appliance(app_id,
[self.__contr_mgr.instantiate_container(dict(**c, appliance=app_id))
for c in data['containers']])
status, msg, err = app.dag.construct_graph(app.containers)
if err:
return status, None, err
return 200, app, None
except ValueError as e:
return 422, None, str(e)


class ApplianceMonitor(Loggable):

APP_MONITOR_INTERVAL = 5000

def __init__(self, app_mgr, contr_mgr):
self.__app_mgr = app_mgr
self.__contr_mgr = contr_mgr
self.__callbacks = {}

def spawn(self, app):
monitor_func = functools.partial(self._monitor_appliance, app)
tornado.ioloop.IOLoop.instance().add_callback(monitor_func)
cb = PeriodicCallback(monitor_func, self.APP_MONITOR_INTERVAL)
self.__callbacks[app.id] = cb
cb.start()

async def _monitor_appliance(self, app):
self.logger.info('Containers left: %s'%list(app.dag.parent_map.keys()))
cb = self.__callbacks.get(app.id, None)
if cb and cb.is_running and app.dag.is_empty:
self.logger.info('DAG is empty, stop monitoring')
cb.stop()
self.__callbacks.pop(app.id, None)
return
self.logger.info('Update DAG')
for c in app.dag.get_free_containers():
_, c, err = await self.__contr_mgr.get_container(app.id, c.id)
if err:
self.logger.error(err)
continue
if (not app.dag.child_map.get(c.id, [])) \
or (c.type == ContainerType.SERVICE and c.state == ContainerState.RUNNING) \
or (c.type == ContainerType.JOB and c.state == ContainerState.SUCCESS):
app.dag.remove_container(c.id)
else:
app.dag.update_container(c)
self.logger.info('Launch free containers')
for c in app.dag.get_free_containers():
if c.state == ContainerState.SUBMITTED:
_, _, err = await self.__contr_mgr.provision_container(c)
if err:
self.logger.error("Failed to launch container '%s'"%c)
self.logger.error(err)
status, msg, err = await self.__app_mgr.save_appliance(app, False)
if err:
self.logger.error(err)
4 changes: 2 additions & 2 deletions container/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ async def post(self, app_id):
data = tornado.escape.json_decode(self.request.body)
data['appliance'] = app_id
status, contr, err = await self.__contr_mgr.create_container(data)
if status != 200:
if err:
self.set_status(status)
self.write(json.dumps(error(err)))
return
status, contr, err = await self.__contr_mgr.provision_container(contr)
self.set_status(status)
self.write(json.dumps(contr.to_render() if status == 200 else error(err)))
self.write(json.dumps(contr.to_render() if status == 201 else error(err)))


class ContainerHandler(RequestHandler, Loggable):
Expand Down
Loading

0 comments on commit 4a8fbcb

Please sign in to comment.