diff --git a/appliance/base.py b/appliance/base.py index 0660c1c..d8bd88c 100644 --- a/appliance/base.py +++ b/appliance/base.py @@ -1,12 +1,13 @@ -from collections import deque +from collections import defaultdict +from container.base import Container class Appliance: - def __init__(self, id, containers=[], pending=[], **kwargs): + def __init__(self, id, containers=[], **kwargs): self.__id = id self.__containers = list(containers) - self.__pending = deque(pending) + self.__dag = ContainerDAG() @property def id(self): @@ -17,23 +18,87 @@ def containers(self): return self.__containers @property - def pending(self): - return list(self.__pending) + def dag(self): + return self.__dag @containers.setter def containers(self, contrs): self.__containers = list(contrs) - def enqueue_pending_containers(self, contr_id): - self.__pending.append(contr_id) - - def dequeue_pending_container(self): - return self.__pending.popleft() - def to_render(self): return dict(id=self.id, - containers=[c if isinstance(c, str) else c.to_render() - for c in self.containers]) + containers=[c.to_render() for c in self.containers]) def to_save(self): - return dict(id=self.id, pending=self.pending) + return dict(id=self.id, dag=self.dag.to_save()) + + def __str__(self): + return self.id + + +class ContainerDAG: + + def __init__(self): + self.__containers = {} + self.__parent_map, self.__child_map = None, None + + @property + def parent_map(self): + return {k: list(v) for k, v in self.__parent_map.items()} + + @property + def child_map(self): + return {k: list(v) for k, v in self.__child_map.items()} + + @property + def is_empty(self): + return len(self.__parent_map) == 0 + + def get_free_containers(self): + return [self.__containers[k] for k, v in self.__parent_map.items() if not v] + + def update_container(self, contr): + self.__containers[contr.id] = contr + + def remove_container(self, contr_id): + for child in self.__child_map.pop(contr_id, set()): + self.__parent_map.get(child, set()).remove(contr_id) + self.__parent_map.pop(contr_id, None) + + def construct_graph(self, contrs): + assert all(isinstance(c, Container) for c in contrs) + parent_map, child_map = defaultdict(set), defaultdict(set) + containers = {c.id: c for c in contrs} + # check dependency validity + for c in contrs: + nonexist_deps = list(filter(lambda c: c not in containers, c.dependencies)) + if nonexist_deps: + return 422, None, "Dependencies '%s' do not exist in this appliance"%nonexist_deps + parent_map[c.id].update(c.dependencies) + for d in c.dependencies: + child_map[d].add(c.id) + # check cycles + for c, parents in parent_map.items(): + cyclic_deps = ['%s<->%s'%(c, p) + for p in filter(lambda p: c in parent_map[p], parents)] + if cyclic_deps: + return 422, None, "Cycle(s) found: %s"%cyclic_deps + self.__containers = containers + self.__parent_map, self.__child_map = parent_map, child_map + return 200, "DAG constructed successfully", None + + def restore(self, dag, contrs): + assert isinstance(dag, dict) + assert all(isinstance(c, Container) for c in contrs) + self.__parent_map = dag.get('parent_map', {}) + self.__child_map = dag.get('child_map', {}) + self.__containers = {c.id: c for c in contrs if c.id not in self.__parent_map} + + def to_save(self): + return dict(parent_map=self.parent_map, child_map=self.child_map) + + + + + + diff --git a/appliance/handler.py b/appliance/handler.py index d5c5727..e3eebee 100644 --- a/appliance/handler.py +++ b/appliance/handler.py @@ -1,8 +1,9 @@ import json +import tornado from tornado.web import RequestHandler -from appliance.manager import ApplianceManager +from appliance.manager import ApplianceManager, ApplianceMonitor from container.manager import ContainerManager from util import message, error from util import Loggable @@ -13,10 +14,15 @@ class AppliancesHandler(RequestHandler, Loggable): def initialize(self, config): self.__app_mgr = ApplianceManager(config) self.__contr_mgr = ContainerManager(config) + self.__app_monitor = ApplianceMonitor(self.__app_mgr, self.__contr_mgr) async def post(self): - raise NotImplemented - + data = tornado.escape.json_decode(self.request.body) + status, app, err = await self.__app_mgr.create_appliance(data) + self.set_status(status) + self.write(json.dumps(app.to_render() if status == 201 else error(err))) + if status == 201: + self.__app_monitor.spawn(app) class ApplianceHandler(RequestHandler, Loggable): diff --git a/appliance/manager.py b/appliance/manager.py index 7ca1de0..02d9358 100644 --- a/appliance/manager.py +++ b/appliance/manager.py @@ -1,5 +1,11 @@ +import tornado +import functools + +from tornado.ioloop import PeriodicCallback + from util import Singleton, Loggable, MotorClient, SecureAsyncHttpClient from appliance.base import Appliance +from container.base import ContainerType, ContainerState from container.manager import ContainerManager @@ -19,23 +25,41 @@ async def get_appliance(self, app_id): _, app.containers, _ = await self.__contr_mgr.get_containers(app_id) return 200, app, None - async def create_appliance(self, app): - pass + async def create_appliance(self, data): + status, _, _ = await self._get_appliance_from_db(data['id']) + if status == 200: + return 409, None, "Appliance '%s' already exists"%data['id'] + status, app, err = self._instantiate_appliance(data) + if err: + self.logger.error(err) + return status, None, err + for c in app.containers: + status, msg, err = await self.__contr_mgr.create_container(c.to_render()) + if err: + self.logger.error(err) + return status, None, err + status, msg, err = await self.save_appliance(app, True) + if err: + self.logger.error(err) + return status, None, err + self.logger.info(msg) + return 201, app, None async def delete_appliance(self, app_id): status, app, err = await self._get_appliance_from_db(app_id) if err: self.logger.error(err) return status, None, err - status, msg, err = await self.__contr_mgr.delete_containers(appliance=app_id) + status, msg, err = await self.__contr_mgr.delete_containers(appliance=app_id, + type=ContainerType.JOB.value) if err: self.logger.error(err) - return 400, None, "Failed to deprovision containers of appliance '%s'"%app_id + return 400, None, "Failed to deprovision jobs of appliance '%s'"%app_id self.logger.info(msg) status, msg, err = await self._deprovision_group(app_id) - if err: + if err and status != 404: self.logger.error(err) - return 400, None, "Failed to deprovision group of appliance '%s'"%app_id + return 400, None, "Failed to deprovision appliance '%s'"%app_id self.logger.info(msg) status, msg, err = await self._delete_appliance_from_db(app_id) if err: @@ -46,6 +70,10 @@ async def delete_appliance(self, app_id): async def save_appliance(self, app, upsert=True): await self.__app_col.replace_one(dict(id=app.id), app.to_save(), upsert=upsert) + return 200, "Appliance '%s' has been saved"%app, None + + async def restore_appliances(self): + raise NotImplemented async def _get_appliance_from_db(self, app_id): app = await self.__app_col.find_one(dict(id=app_id)) @@ -64,3 +92,63 @@ async def _deprovision_group(self, app_id): return status, None, err return 200, "Services of appliance '%s' have been deprovisioned"%app_id, None + def _instantiate_appliance(self, data): + app_id, containers = data['id'], data['containers'] + try: + app = Appliance(app_id, + [self.__contr_mgr.instantiate_container(dict(**c, appliance=app_id)) + for c in data['containers']]) + status, msg, err = app.dag.construct_graph(app.containers) + if err: + return status, None, err + return 200, app, None + except ValueError as e: + return 422, None, str(e) + + +class ApplianceMonitor(Loggable): + + APP_MONITOR_INTERVAL = 5000 + + def __init__(self, app_mgr, contr_mgr): + self.__app_mgr = app_mgr + self.__contr_mgr = contr_mgr + self.__callbacks = {} + + def spawn(self, app): + monitor_func = functools.partial(self._monitor_appliance, app) + tornado.ioloop.IOLoop.instance().add_callback(monitor_func) + cb = PeriodicCallback(monitor_func, self.APP_MONITOR_INTERVAL) + self.__callbacks[app.id] = cb + cb.start() + + async def _monitor_appliance(self, app): + self.logger.info('Containers left: %s'%list(app.dag.parent_map.keys())) + cb = self.__callbacks.get(app.id, None) + if cb and cb.is_running and app.dag.is_empty: + self.logger.info('DAG is empty, stop monitoring') + cb.stop() + self.__callbacks.pop(app.id, None) + return + self.logger.info('Update DAG') + for c in app.dag.get_free_containers(): + _, c, err = await self.__contr_mgr.get_container(app.id, c.id) + if err: + self.logger.error(err) + continue + if (not app.dag.child_map.get(c.id, [])) \ + or (c.type == ContainerType.SERVICE and c.state == ContainerState.RUNNING) \ + or (c.type == ContainerType.JOB and c.state == ContainerState.SUCCESS): + app.dag.remove_container(c.id) + else: + app.dag.update_container(c) + self.logger.info('Launch free containers') + for c in app.dag.get_free_containers(): + if c.state == ContainerState.SUBMITTED: + _, _, err = await self.__contr_mgr.provision_container(c) + if err: + self.logger.error("Failed to launch container '%s'"%c) + self.logger.error(err) + status, msg, err = await self.__app_mgr.save_appliance(app, False) + if err: + self.logger.error(err) diff --git a/container/handler.py b/container/handler.py index 963c7ef..318d23f 100644 --- a/container/handler.py +++ b/container/handler.py @@ -41,13 +41,13 @@ async def post(self, app_id): data = tornado.escape.json_decode(self.request.body) data['appliance'] = app_id status, contr, err = await self.__contr_mgr.create_container(data) - if status != 200: + if err: self.set_status(status) self.write(json.dumps(error(err))) return status, contr, err = await self.__contr_mgr.provision_container(contr) self.set_status(status) - self.write(json.dumps(contr.to_render() if status == 200 else error(err))) + self.write(json.dumps(contr.to_render() if status == 201 else error(err))) class ContainerHandler(RequestHandler, Loggable): diff --git a/container/manager.py b/container/manager.py index d9e70e6..1bfae81 100644 --- a/container/manager.py +++ b/container/manager.py @@ -2,7 +2,7 @@ from datetime import timedelta -from container.base import Container, ContainerType +from container.base import Container, ContainerType, ContainerState from container.service import Service from container.job import Job from cluster.base import Cluster @@ -27,12 +27,12 @@ async def get_container(self, app_id, contr_id): if not contr.last_update or \ datetime.datetime.now(tz=None) - contr.last_update > self.CONTAINER_REC_TTL: status, contr, err = await self._get_updated_container(contr) - if status == 404: + if status == 404 and contr.state != ContainerState.SUBMITTED: await self._delete_container_from_db(contr) - return 404, None, "Container '%s' is not found"%contr_id + return 404, None, err if status == 200: await self._save_container_to_db(contr, False) - else: + elif status != 404: self.logger.error("Failed to update container '%s'"%contr) self.logger.error(err) return 200, contr, None @@ -65,31 +65,31 @@ async def create_container(self, data): status, _, _ = await self._get_container_from_db(data['appliance'], data['id']) if status == 200: return 409, None, "Container '%s' already exists"%data['id'] - contr = self._instantiate_container(data) - if not contr: - errmsg = 'Unknown container type: %s'%contr.type - self.logger.error(errmsg) - return 400, None, errmsg - await self._save_container_to_db(contr, True) - return 200, contr, None + try: + contr = self.instantiate_container(data) + await self._save_container_to_db(contr, True) + return 201, contr, None + except ValueError as e: + return 422, None, str(e) async def delete_container(self, app_id, contr_id): status, contr, err = await self._get_container_from_db(app_id, contr_id) if status == 404: return status, None, err - if contr.type == ContainerType.SERVICE: - status, msg, err = await self._delete_service(contr) - if err: - self.logger.error(err) - else: - self.logger.info(msg) - elif contr.type == ContainerType.JOB: - _, _, kill_job_err = await self._kill_job_tasks(contr) - if kill_job_err: - self.logger.error(kill_job_err) - _, _, del_job_err = await self._delete_job(contr) - if del_job_err: - self.logger.error(del_job_err) + if contr.state != ContainerState.SUBMITTED: + if contr.type == ContainerType.SERVICE: + status, msg, err = await self._delete_service(contr) + if err: + self.logger.error(err) + else: + self.logger.info(msg) + elif contr.type == ContainerType.JOB: + _, _, kill_job_err = await self._kill_job_tasks(contr) + if kill_job_err: + self.logger.error(kill_job_err) + _, _, del_job_err = await self._delete_job(contr) + if del_job_err: + self.logger.error(del_job_err) await self._delete_container_from_db(contr) return 200, "Container '%s' has been deleted"%contr, None @@ -98,8 +98,10 @@ async def delete_containers(self, **filters): for c in await self._get_containers_from_db(**filters): status, msg, err = await self.delete_container(c.appliance, c.id) if err: - self.logger.error(err) - failed += [c.id] + if status != 404: + self.logger.error(err) + failed += [c.id] + self.logger.info("Container '%s' no more exists"%c) else: self.logger.info(msg) if failed: @@ -119,14 +121,21 @@ async def provision_container(self, contr): return status, None, err return status, contr, None + def instantiate_container(self, contr): + if contr['type'] == ContainerType.SERVICE.value: + return Service(**contr) + if contr['type'] == ContainerType.JOB.value: + return Job(**contr) + raise ValueError("Unknown container type: %s"%contr['type']) + async def _get_container_from_db(self, app_id, contr_id): contr = await self.__contr_col.find_one(dict(id=contr_id, appliance=app_id)) if not contr: return 404, None, "Container '%s' is not found"%contr_id - return 200, self._instantiate_container(contr), None + return 200, self.instantiate_container(contr), None async def _get_containers_from_db(self, **filters): - return [self._instantiate_container(c) async for c in self.__contr_col.find(filters)] + return [self.instantiate_container(c) async for c in self.__contr_col.find(filters)] async def _save_container_to_db(self, contr, upsert=True): await self.__contr_col.replace_one(dict(id=contr.id, appliance=contr.appliance), @@ -187,9 +196,9 @@ async def _get_update_job(self, job): return status, job, None async def _provision_service(self, service): - url = '%s/groups'%self.__config.url.service_scheduler - body = dict(id='/' + service.appliance, apps=[service.to_request()]) - return await self.__http_cli.put(url, body) + url = '%s/apps'%self.__config.url.service_scheduler + body = dict(service.to_request()) + return await self.__http_cli.post(url, body) async def _delete_service(self, contr): return await self.__http_cli.delete('%s/apps%s'%(self.__config.url.service_scheduler, contr)) @@ -203,11 +212,4 @@ async def _kill_job_tasks(self, contr): async def _delete_job(self, contr): return await self.__http_cli.delete('%s/job/%s'%(self.__config.url.job_scheduler, contr)) - def _instantiate_container(self, contr): - if contr['type'] == ContainerType.SERVICE.value: - return Service(**contr) - if contr['type'] == ContainerType.JOB.value: - return Job(**contr) - return None -