Commit 0032c1bc authored by Spiros Koulouzis's avatar Spiros Koulouzis

experimental planner with winery

parent 055ac648
/*
* Copyright 2019 S. Koulouzis, Huan Zhou, Yang Hu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nl.uva.sne.drip.controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.webcohesion.enunciate.metadata.rs.ResponseCode;
import com.webcohesion.enunciate.metadata.rs.StatusCodes;
import nl.uva.sne.drip.service.CloudStormService;
import nl.uva.sne.drip.service.PlannerService;
import org.springframework.beans.factory.annotation.Autowired;
/**
* This controller is responsible for obtaining resources from cloud providers
*
*
* @author S. Koulouzis
*/
@RestController
@RequestMapping("/user/v3.0/planner")
@StatusCodes({
@ResponseCode(code = 401, condition = "Bad credentials")
})
public class PlannerController {
@Autowired
private PlannerService plannerService;
}
......@@ -29,7 +29,7 @@ import org.springframework.beans.factory.annotation.Autowired;
* @author S. Koulouzis
*/
@RestController
@RequestMapping("/user/v1.0/provisioner")
@RequestMapping("/user/v3.0/provisioner")
@StatusCodes({
@ResponseCode(code = 401, condition = "Bad credentials")
})
......
/*
* Copyright 2019 S. Koulouzis, Huan Zhou, Yang Hu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nl.uva.sne.drip.service;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
/**
*
* @author S. Koulouzis
*/
@Service
public class PlannerService {
@Value("${planner.queue.prefix}")
private String plannerQueuePrefix;
}
swagger: "2.0"
info:
description: "The Dynamic Real-time infrastructure planner (DRIP) allows application developers to seamlessly plan a customized virtual infrastructure based on application level constraints on QoS and resource budgets, provisioning the virtual infrastructure using standardized interfaces (e.g., TOSCA and OCCI), deploy application components onto the virtual infrastructure, and start execution on demand."
version: "3.0.0"
title: "DRIP manager"
license:
name: "Apache 2.0"
url: "http://www.apache.org/licenses/LICENSE-2.0.html"
host: "localhost:8080"
basePath: "/v3"
schemes:
- "https"
- "http"
paths:
/planner:
post:
description: "Uploads a TOSCA definition"
consumes:
- "multipart/form-data"
produces:
- "application/xml"
- "application/json"
parameters:
- in: formData
name: file
required: true
type: file
description: The TOSCA file
x-mimetype:
- application/x-yaml
- text/yaml
responses:
405:
description: "Invalid TOSCA file"
200:
description: "Upload Successful"
java.lib.path=
main.file=rpc_server.py
platform.active=Python_2.7.12
platform.active=Python_3.6.6
python.lib.path=
src.dir=src
import json
import logging
import pika
from python_logging_rabbitmq import RabbitMQHandler
class DRIPLoggingHandler(RabbitMQHandler):
def __init__(self, host='localhost', port=5672, username=None, password=None, user=None):
super(DRIPLoggingHandler, self).__init__(host=host, port=port, username=username, password=password)
self.user = user
def open_connection(self):
self.sequenceNumber = 0
"""
Connect to RabbitMQ.
"""
# Set logger for pika.
# See if something went wrong connecting to RabbitMQ.
handler = logging.StreamHandler()
handler.setFormatter(self.formatter)
rabbitmq_logger = logging.getLogger('pika')
rabbitmq_logger.addHandler(handler)
rabbitmq_logger.propagate = False
rabbitmq_logger.setLevel(logging.WARNING)
if not self.connection or self.connection.is_closed:
self.connection = pika.BlockingConnection(pika.ConnectionParameters(** self.connection_params))
if not self.channel or self.channel.is_closed:
self.channel = self.connection.channel()
self.channel.queue_declare(queue='log_qeue_' + self.user, durable=True)
# Manually remove logger to avoid shutdown message.
rabbitmq_logger.removeHandler(handler)
def emit(self, record):
self.acquire()
try:
if not self.connection or self.connection.is_closed or not self.channel or self.channel.is_closed:
self.open_connection()
queue='log_qeue_' + self.user
self.channel.basic_publish(
exchange='',
routing_key=queue,
body=self.format(record),
properties=pika.BasicProperties(
delivery_mode=2)
)
except Exception:
self.channel, self.connection = None, None
self.handleError(record)
finally:
if self.close_after_emit:
self.close_connection()
self.release()
def format(self, record):
drip_record = {}
drip_record['timestamp'] = record.created
drip_record['owner'] = 'user'
drip_record['level'] = record.levelname
drip_record['loggerName'] = record.module
drip_record['message'] = record.message
drip_record['millis'] = record.created
self.sequenceNumber += 1
drip_record['sequenceNumber'] = self.sequenceNumber
drip_record['sourceClassName'] = record.module
drip_record['sourceMethodName'] = record.funcName
return json.dumps(drip_record)
\ No newline at end of file
from toscaparser import *
from toscaparser.tosca_template import ToscaTemplate
import toscaparser.utils.yamlparser
import re
import operator
import json
class DumpPlanner:
service_template_names = ['serviceTemplateOrNodeTypeOrNodeTypeImplementation']
topology_template_names = ['topologyTemplate']
node_template_names = ['nodeTemplates']
requirement_names = ['requirements']
def __init__(self, service_templaete_file_path):
dict_tpl = self.load_file(service_templaete_file_path)
requirements = self.get_all_requirements(dict_tpl)
unmet_requirements = self.get_unmet_requirements(requirements)
print(requirements)
# yaml_dict_tpl = self.trnsform_to_tosca(yaml_dict_tpl)
def load_file(self,path):
is_json = True
with open(path) as f:
try:
dict_tpl = json.load(f)
except Exception as e:
if (not isinstance(e, json.decoder.JSONDecodeError)):
raise e
else:
is_json = False
if is_json:
return dict_tpl
else:
return toscaparser.utils.yamlparser.load_yaml(path)
def __init__(self, tosca_file_path):
self.yaml_dict_tpl = toscaparser.utils.yamlparser.load_yaml(tosca_file_path)
self.errors = []
self.warnings = []
self.tt = None
def trnsform_to_tosca(self,yaml_dict_tpl):
try:
self.tt = ToscaTemplate(path=None, yaml_dict_tpl=self.yaml_dict_tpl)
except:
self.warnings.append("Not a valid tosca file")
self.tt = ToscaTemplate(path=None, yaml_dict_tpl=yaml_dict_tpl)
except Exception as e:
self.handle_tosca_exeption(e,yaml_dict_tpl)
self.DOCKER_TYPE = 'Switch.nodes.Application.Container.Docker'
self.COMPUTE_TYPE = 'Switch.nodes.Compute'
self.HW_HOST_TYPE = 'Switch.datatypes.hw.host'
self.HOSTED_NODE_TYPE = [self.DOCKER_TYPE, self.COMPUTE_TYPE]
def handle_tosca_exeption(self,exception,yaml_dict_tpl):
print(exception)
def get_docker_types(self):
docker_types = set([])
node_types = self.get_node_types()
for node_type_key in node_types:
if node_types[node_type_key] and 'derived_from' in node_types[node_type_key].keys():
if node_types[node_type_key]['derived_from'] == self.DOCKER_TYPE:
docker_types.add(node_type_key)
return docker_types
def get_all_requirements(self,dict_tpl):
all_requirements = []
service_templates = self.get_service_template(dict_tpl)
for service in service_templates:
topology_template = self.get_topology_template(service)
node_templates = self.get_node_templates(topology_template)
for node_template in node_templates:
requirements = self.get_requirements(node_template)
if requirements:
for requirement in requirements['requirement']:
all_requirements.append(requirement)
return all_requirements
def get_node_types(self):
return self.yaml_dict_tpl['node_types']
def get_service_template(self,dict_tpl):
return self.find(dict_tpl,self.service_template_names)
def get_node_templates(self):
return self.yaml_dict_tpl['topology_template']['node_templates']
def get_topology_template(self,dict_tpl):
return self.find(dict_tpl,self.topology_template_names)
def get_network_templates(self):
if 'network_templates' in self.yaml_dict_tpl['topology_template']:
return self.yaml_dict_tpl['topology_template']['network_templates']
else:
return None
def get_artifacts(self, node):
if 'artifacts' in node:
return node['artifacts']
def get_hosted_nodes(self, node_templates):
docker_types = self.get_docker_types()
self.HOSTED_NODE_TYPE = self.HOSTED_NODE_TYPE + list(docker_types)
hosted_nodes = []
for node_key in node_templates:
for hosted_type in self.HOSTED_NODE_TYPE:
if node_templates[node_key]['type'] == hosted_type:
hosted_node = node_templates[node_key]
hosted_node['id'] = node_key
hosted_nodes.append(hosted_node)
break
return hosted_nodes
def get_properties(self, node):
if 'properties' in node:
return node['properties']
def cast_to_int(self,value):
if isinstance(value, int):
return value
if isinstance(value, str):
return int(re.findall("\d+", value)[0])
def sort_vms(self,vms,max_vms):
sorted_vms = []
vms_dict = {}
for vm in vms:
score = 0
score += self.cast_to_int(vm['host']['cpu_frequency'])
score += self.cast_to_int(vm['host']['mem_size'])
score += self.cast_to_int(vm['host']['num_cpus'])
score += self.cast_to_int(vm['host']['disk_size'])
vms_dict[vm['name']] = score
sorted_vms_dict = sorted(vms_dict.items(), key=operator.itemgetter(1),reverse=True)
counter = 0
for sorted_vm in sorted_vms_dict:
if counter >= max_vms:
break
for vm in vms:
if sorted_vm[0] == vm['name']:
counter+=1
sorted_vms.append(vm)
break
return sorted_vms
def plan(self,max_vms):
network_templates = self.get_network_templates()
vms = []
if network_templates and 'network' in network_templates and network_templates['network']['multicast'] == True:
vm = {}
vm['name'] = 'id'
vm['type'] = self.COMPUTE_TYPE
host = {}
host['cpu_frequency'] = '2.6GHz'
host['mem_size'] = '32GB'
host['num_cpus'] = '16'
host['disk_size'] = '10GB'
vm['host'] = host
os = {}
os['os_version'] = 16.04
os['distribution'] = 'ubuntu'
os['type'] = 'linux'
os['architecture'] = 'x86_64'
vm['os'] = os
vm['scaling_mode'] = 'multiple'
vms.append(vm)
return vms
node_templates = self.get_node_templates()
hosted_nodes = self.get_hosted_nodes(node_templates)
for node in hosted_nodes:
vm = {}
vm['name'] = node['id']
vm['type'] = self.COMPUTE_TYPE
if 'requirements' in node and node['requirements']:
for req in node['requirements']:
if 'host' in req and 'node_filter' in req['host']:
vm['host'] = req['host']['node_filter']['capabilities']['host']
vm['os'] = req['host']['node_filter']['capabilities']['os']
if 'host' not in vm:
host = {}
host['cpu_frequency'] = '2GHz'
host['mem_size'] = '4GB'
host['num_cpus'] = '1'
host['disk_size'] = '10GB'
vm['host'] = host
if 'os' not in vm:
os = {}
os['os_version'] = 16.04
os['distribution'] = 'ubuntu'
os['type'] = 'linux'
os['architecture'] = 'x86_64'
vm['os'] = os
properties = self.get_properties(node)
if properties and 'scaling_mode' in properties:
vm['scaling_mode'] = properties['scaling_mode']
else:
vm['scaling_mode'] = 'single'
vms.append(vm)
def get_node_templates(self,dict_tpl):
return self.find(dict_tpl,self.node_template_names)
def get_requirements(self,dict_tpl):
return self.find(dict_tpl,self.requirement_names)
def find(self,dict_tpl,names):
if dict_tpl:
for name in names:
if(name in dict_tpl):
return dict_tpl[name]
if max_vms <= -1:
max_vms = len(vms)//3
if max_vms > -1 and len(vms) > max_vms:
vms = self.sort_vms(vms,max_vms)
return vms
def get_unmet_requirements(self,requirements):
for requirement in requirement:
print(requirement)
\ No newline at end of file
......@@ -12,7 +12,6 @@ from planner.dum_planner import *
import sys
import tempfile
import time
from drip_logging.drip_logging_handler import *
logger = logging.getLogger(__name__)
if not getattr(logger, 'handler_set', None):
......@@ -112,9 +111,7 @@ def handle_delivery(message):
if __name__ == "__main__":
if(sys.argv[1] == "test_local"):
home = expanduser("~")
planner = DumpPlanner(home+"/Downloads/tosca.yml")
max_vms = -1
print planner.plan(max_vms)
planner = DumpPlanner(home+"/Downloads/topology.json")
else:
logger.info("Input args: " + sys.argv[0] + ' ' + sys.argv[1] + ' ' + sys.argv[2])
channel = init_chanel(sys.argv)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment