Skip to content

Commit

Permalink
completed and tested major container-level API
Browse files Browse the repository at this point in the history
  • Loading branch information
dcvan24 committed Mar 16, 2018
1 parent 512d5d4 commit 7357367
Show file tree
Hide file tree
Showing 22 changed files with 1,211 additions and 732 deletions.
146 changes: 8 additions & 138 deletions appliance.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,144 +4,14 @@
from tornado.ioloop import PeriodicCallback
from collections import deque

from container import Container, Endpoint, ContainerState, ContainerManager
from container.base import Container, Endpoint, ContainerState
from container.manager import ContainerManager
from util import message, error
from util import Singleton, MotorClient, Loggable
from util import HTTP_METHOD_GET
from util import SecureAsyncHttpClient
from config import config, cluster_to_ip, ip_to_cluster


class Appliance:

def __init__(self, id, containers=[], pending=[], **kwargs):
self.__id = id
self.__containers = list(containers)
self.__pending = deque(pending)

@property
def id(self):
return self.__id

@property
def containers(self):
return self.__containers

@property
def pending(self):
return list(self.__pending)

@containers.setter
def containers(self, contrs):
self.__containers = list(contrs)

def enqueue_pending_containers(self, contr_id):
self.__pending.append(contr_id)

def dequeue_pending_container(self):
return self.__pending.popleft()

def to_render(self):
return dict(id=self.id,
containers=[c if isinstance(c, str) else c.to_render()
for c in self.containers])

def to_save(self):
return dict(id=self.id,
containers=[c if isinstance(c, str) else c.id for c in self.containers],
pending=self.pending)


class ApplianceManager(Loggable, metaclass=Singleton):

def __init__(self):
self.__app_col = MotorClient().requester.appliance
self.__http_cli = AsyncHTTPClient()
self.__contr_mgr = ContainerManager()

async def get_appliance(self, app_id, verbose=True):
app = await self.__app_col.find_one(dict(id=app_id))
if not app:
return 404, error("Appliance '%s' is not found"%app_id)
app = Appliance(**app)
if verbose:
status, contrs = await self.__contr_mgr.get_containers(app_id)
if status == 200:
app.containers = contrs
return 200, app

async def create_appliance(self, app):
app = Appliance(**app)
app_count = await self.__app_col.count(dict(id=app.id))
if app_count > 0:
return 409, error("Appliance '%s' already exists" % app.id)
if not app.containers:
return 400, error('No container in the Appliance "%s"'%app.id)

app.containers = [Container(**c) for c in app.containers]
for c in app.containers:
c.appliance, c.state = app.id, ContainerState.PENDING
app.enqueue_pending_containers(c.id)
c.add_env(SCIDAS_DATA=','.join(c.data),
SCIDAS_RESC_CPUS=str(c.resources.cpus),
SCIDAS_RESC_MEM=str(c.resources.mem),
SCIDAS_RESC_DISK=str(c.resources.disk))

for c in app.containers:
await self.__contr_mgr.save_container(c)
await self.save_appliance(app)
return 201, app

async def delete_appliance(self, app_id):
status, app = await self.get_appliance(app_id)
if status != 200:
return status, None, app
status, resp = await self.__contr_mgr.delete_containers(app_id)
self.__app_col.delete_one(dict(id=app_id))
return 200, app, message("Appliance '%s' has been deleted"%app_id)

async def accept_offer(self, app_id, contr_id, offers):
status, resp = await self.get_appliance(app_id, verbose=False)
if status != 200:
return status, resp, resp
app = resp
status, resp = await self.__contr_mgr.get_container(app_id, contr_id)
if status != 200:
return status, resp, resp
contr = resp
offers = list(filter(lambda o: o['cpus'] and o['mem'] and o['disk']
and o['cpus'] >= contr.resources.cpus
and o['mem'] >= contr.resources.mem
and o['disk'] >= contr.resources.disk,
offers))
for o in offers:
o['master'] = o['master'].split(':')[0]
if contr.cluster:
offers = list(filter(lambda o: o['master'] == cluster_to_ip[contr.cluster], offers))
if not offers:
app.enqueue_pending_containers(contr.id)
return 200, app, contr
offer = offers[0]
contr.cluster = ip_to_cluster.get(offer['master'], None)
contr.host = offer['agent']
await self.__contr_mgr.save_container(contr)
return 200, app, contr

async def process_next_pending_container(self, app):
if not app.pending:
return
contr_id = app.dequeue_pending_container()
await self.save_appliance(app, upsert=False)
_, contr = await self.__contr_mgr.get_container(app.id, contr_id)
contr.state = ContainerState.SUBMITTED
await self.__contr_mgr.save_container(contr)
status, _ = await self.__contr_mgr.submit_container(contr)
if status != 200:
app.enqueue_pending_containers(contr.id)

async def save_appliance(self, app, upsert=True):
await self.__app_col.replace_one(dict(id=app.id), app.to_save(), upsert=upsert)


class ApplianceMonitor(Loggable):

def __init__(self, interval=5000):
Expand Down Expand Up @@ -215,16 +85,16 @@ async def handle_marathon_container(contr):
await self.__app_mgr.process_next_pending_container(app)
contr.reset_waiting_bit()
else:
await self.__contr_mgr.save_container(contr, upsert=False)
await self.__contr_mgr._save_container_to_db(contr, upsert=False)
else:
contr.state = state
contr.reset_waiting_bit()
await self.__contr_mgr.save_container(contr, upsert=False)
await self.__contr_mgr._save_container_to_db(contr, upsert=False)
except HTTPError as e:
if e.response.code == 404 and state and state == ContainerState.SUBMITTED:
_, app = await self.__app_mgr.get_appliance(contr.appliance)
if isinstance(app, str):
await self.__contr_mgr.delete_containers(contr.appliance)
await self.__contr_mgr._delete_containers_of_appliance_from_db(contr.appliance)
elif isinstance(app, Appliance):
app.enqueue_pending_containers(contr.id)
await self.__app_mgr.save_appliance(app, upsert=False)
Expand Down Expand Up @@ -259,11 +129,11 @@ async def handle_chronos_container(contr):
await self.__app_mgr.process_next_pending_container(app)
contr.reset_waiting_bit()
else:
await self.__contr_mgr.save_container(contr, upsert=False)
await self.__contr_mgr._save_container_to_db(contr, upsert=False)
else:
contr.state = state
contr.reset_waiting_bit()
await self.__contr_mgr.save_container(contr, upsert=False)
await self.__contr_mgr._save_container_to_db(contr, upsert=False)

async for c in self.__contr_col.find():
contr = Container(**c)
Expand Down
Empty file added appliance/__init__.py
Empty file.
41 changes: 41 additions & 0 deletions appliance/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from collections import deque


class Appliance:

def __init__(self, id, containers=[], pending=[], **kwargs):
self.__id = id
self.__containers = list(containers)
self.__pending = deque(pending)

@property
def id(self):
return self.__id

@property
def containers(self):
return self.__containers

@property
def pending(self):
return list(self.__pending)

@containers.setter
def containers(self, contrs):
self.__containers = list(contrs)

def enqueue_pending_containers(self, contr_id):
self.__pending.append(contr_id)

def dequeue_pending_container(self):
return self.__pending.popleft()

def to_render(self):
return dict(id=self.id,
containers=[c if isinstance(c, str) else c.to_render()
for c in self.containers])

def to_save(self):
return dict(id=self.id,
containers=[c if isinstance(c, str) else c.id for c in self.containers],
pending=self.pending)
46 changes: 46 additions & 0 deletions appliance/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import json

from tornado.web import RequestHandler

from appliance.manager import ApplianceManager
from container.manager import ContainerManager
from util import Loggable


class AppliancesHandler(RequestHandler, Loggable):

def initialize(self, config):
self.__app_mgr = ApplianceManager(config)
self.__contr_mgr = ContainerManager(config)

async def post(self):
status, app = await self.__app_mgr.create_appliance(json.loads(self.request.body))
self.set_status(status)
self.write(json.dumps(app.to_render()) if status == 201 else app)
self.finish()
if status == 201:
await self.__app_mgr.process_next_pending_container(app)


class ApplianceHandler(RequestHandler, Loggable):

def initialize(self, config):
self.__app_mgr = ApplianceManager(config)
self.__contr_mgr = ContainerManager(config)

async def get(self, app_id):
status, resp = await self.__app_mgr.get_appliance(app_id)
self.set_status(status)
self.write(resp)
# self.write(json.dumps(resp.to_render()) if status == 200 else resp)

async def delete(self, app_id):
status, app, resp = await self.__app_mgr.delete_appliance(app_id)
self.set_status(status)
self.write(resp)
if app:
await self._deprovision_containers(app.containers)

async def _deprovision_containers(self, contrs):
for c in contrs:
await self.__contr_mgr.deprovision_container(c)
22 changes: 22 additions & 0 deletions appliance/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from util import Singleton, Loggable, MotorClient, SecureAsyncHttpClient
from container.manager import ContainerManager

class ApplianceManager(Loggable, metaclass=Singleton):

def __init__(self):
self.__app_col = MotorClient().requester.appliance
self.__contr_mgr = ContainerManager()
self.__http_cli = SecureAsyncHttpClient()

async def get_appliance(self, app_id, verbose=True):
pass


async def create_appliance(self, app):
pass

async def delete_appliance(self, app_id):
pass

async def save_appliance(self, app, upsert=True):
await self.__app_col.replace_one(dict(id=app.id), app.to_save(), upsert=upsert)
35 changes: 0 additions & 35 deletions chronos.py

This file was deleted.

Empty file added cluster/__init__.py
Empty file.
53 changes: 53 additions & 0 deletions cluster/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from tornado.ioloop import PeriodicCallback

from util import Singleton, Loggable, SecureAsyncHttpClient


class Host:

def __init__(self, hostname, attributes={}):
self.__attributes = dict(**attributes, hostname=hostname)

@property
def hostname(self):
return self.__attributes.get('hostname', None)

@property
def attributes(self):
return dict(self.__attributes)

def to_render(self):
return self.attributes


class Cluster(Loggable, metaclass=Singleton):

MONITOR_INTERVAL = 30000 # query cluster info every minute

def __init__(self, config):
self.__master_url_base = '%s/%s'%(config['dcos']['master_url'],
config['dcos']['mesos_master_route'])
self.__config = config
self.__http_cli = SecureAsyncHttpClient(config)
self.__hosts = []

@property
def hosts(self):
return list(self.__hosts)

def find_host_by_attribute(self, key, val):
return list(filter(lambda h: h.attributes.get(key, None) == val, self.__hosts))

async def monitor(self):
async def query_mesos():
status, body, err = await self.__http_cli.get('%s/slaves'%self.__master_url_base)
if status != 200:
self.logger.debug(err)
return
self.logger.debug('Collect host info')
self.__hosts = [Host(hostname=h['hostname'], attributes=h['attributes'])
for h in body['slaves']]
await query_mesos()
PeriodicCallback(query_mesos, self.MONITOR_INTERVAL).start()


15 changes: 15 additions & 0 deletions cluster/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import json

from tornado.web import RequestHandler

from cluster.base import Cluster
from util import Loggable


class ClusterInfoHandler(RequestHandler, Loggable):

def initialize(self, config):
self.__cluster = Cluster(config)

def get(self):
self.write(json.dumps([h.to_render() for h in self.__cluster.hosts]))
Loading

0 comments on commit 7357367

Please sign in to comment.