Commit a538b5a0 authored by Spiros Koulouzis's avatar Spiros Koulouzis

refactor planner

parent 4ed61d65
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="Unittests" />
</component>
</module>
\ No newline at end of file
# To change this license header, choose License Headers in Project Properties.
# To change this template file, choose Tools | Templates
# and open the template in the editor.
import json
import os
import os.path
import tempfile
import time
import logging
import pika
import yaml
import sys
from planner.planner import Planner
from service.spec_service import SpecService
logger = logging.getLogger(__name__)
# if not getattr(logger, 'handler_set', None):
# logger.setLevel(logging.INFO)
# h = logging.StreamHandler()
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# h.setFormatter(formatter)
# logger.addHandler(h)
# logger.handler_set = True
def init_chanel(args):
global rabbitmq_host
if len(args) > 1:
rabbitmq_host = args[1]
queue_name = args[2] # planner
else:
rabbitmq_host = '127.0.0.1'
connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_host))
channel = connection.channel()
channel.queue_declare(queue=queue_name)
return channel
def start(this_channel):
this_channel.basic_qos(prefetch_count=1)
this_channel.basic_consume(queue=queue_name, on_message_callback=on_request)
logger.info(" [x] Awaiting RPC requests")
this_channel.start_consuming()
def on_request(ch, method, props, body):
response = handle_delivery(body)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
def handle_delivery(message):
logger.info("Got: " + str(message))
try:
message = message.decode()
except (UnicodeDecodeError, AttributeError):
pass
parsed_json_message = json.loads(message)
owner = parsed_json_message['owner']
tosca_file_name = 'tosca_template'
tosca_template_json = parsed_json_message['toscaTemplate']
input_current_milli_time = lambda: int(round(time.time() * 1000))
# rabbit = DRIPLoggingHandler(host=rabbitmq_host, port=5672, user=owner)
# logger.addHandler(rabbit)
try:
tosca_folder_path = os.path.join(tempfile.gettempdir(), "planner_files", str(input_current_milli_time()))
except NameError:
import sys
millis = int(round(time.time() * 1000))
tosca_folder_path = os.path.dirname(os.path.abspath(sys.argv[0])) + os.path.join(tempfile.gettempdir(),
"planner_files",
str(millis))
if not os.path.exists(tosca_folder_path):
os.makedirs(tosca_folder_path)
input_tosca_file_path = os.path.join(tosca_folder_path, tosca_file_name + ".yml")
with open(input_tosca_file_path, 'w') as outfile:
outfile.write(yaml.dump(tosca_template_json))
conf = {'url': "http://host"}
spec_service = SpecService(conf)
test_planner = Planner(input_tosca_file_path, spec_service)
tosca_template = test_planner.resolve_requirements()
tosca_template = test_planner.set_infrastructure_specifications()
template_dict = tosca_util.get_tosca_template_2_topology_template_dictionary(tosca_template)
logger.info("template ----: \n" + yaml.dump(template_dict))
response = {'toscaTemplate': template_dict}
output_current_milli_time = int(round(time.time() * 1000))
response["creationDate"] = output_current_milli_time
response["parameters"] = []
if queue_name == "planner_queue":
logger.info("Planning")
logger.info("Returning plan")
logger.info("Output message:" + json.dumps(response))
return json.dumps(response)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
if sys.argv[1] == "test_local":
tosca_path = "../TOSCA/"
input_tosca_file_path = tosca_path + '/application_example_2_topologies.yaml'
conf = {'url': "http://host"}
spec_service = SpecService(conf)
test_planner = Planner(input_tosca_file_path, spec_service)
test_tosca_template = test_planner.resolve_requirements()
test_tosca_template = test_planner.set_infrastructure_specifications()
template_dict = tosca_util.get_tosca_template_2_topology_template_dictionary(test_tosca_template)
logger.info("template ----: \n" + yaml.dump(template_dict))
try:
tosca_folder_path = os.path.join(tempfile.gettempdir(), tosca_path)
except NameError:
import sys
tosca_folder_path = os.path.dirname(os.path.abspath(sys.argv[0])) + os.path.join(tempfile.gettempdir(),
tosca_path)
tosca_file_name = 'tosca_template'
input_tosca_file_path = tosca_path + '/application_example_2_topologies.yaml'
with open(input_tosca_file_path, 'w') as outfile:
outfile.write(yaml.dump(template_dict))
ToscaTemplate(input_tosca_file_path)
test_response = {'toscaTemplate': template_dict}
logger.info("Output message:" + json.dumps(test_response))
else:
logger.info("Input args: " + sys.argv[0] + ' ' + sys.argv[1] + ' ' + sys.argv[2])
channel = init_chanel(sys.argv)
global queue_name
queue_name = sys.argv[2]
start(channel)
This diff is collapsed.
import copy
from toscaparser.nodetemplate import NodeTemplate
from toscaparser.properties import Property
import networkx as nx
import logging
from service.specification_analyzer import SpecificationAnalyzer
from util import tosca_helper
class SimpleAnalyzer(SpecificationAnalyzer):
def __init__(self, tosca_template):
super(SimpleAnalyzer, self).__init__(tosca_template)
def set_relationship_occurrences(self):
return_nodes = []
# nodes_with_occurrences_in_requirements = tosca_util.get_nodes_with_occurrences_in_requirements(
# self.tosca_template.nodetemplates)
orchestrator_nodes = tosca_helper.get_nodes_by_type('tosca.nodes.ARTICONF.Orchestrator',
self.tosca_template.nodetemplates, self.all_node_types,
self.all_custom_def)
if 'properties' in orchestrator_nodes[0].entity_tpl:
if 'masters_num' in orchestrator_nodes[0].entity_tpl['properties']:
masters_num = orchestrator_nodes[0].entity_tpl['properties']['masters_num']
if 'workers_num' in orchestrator_nodes[0].entity_tpl['properties']:
workers_num = orchestrator_nodes[0].entity_tpl['properties']['workers_num']
else:
masters_num = orchestrator_nodes[0].get_property_value('masters_num')
workers_num = orchestrator_nodes[0].get_property_value('workers_num')
topology_nodes = tosca_helper.get_nodes_by_type('tosca.nodes.ARTICONF.VM.topology',
self.tosca_template.nodetemplates, self.all_node_types,
self.all_custom_def)
# for requirement in topology_nodes[0].requirements:
# requirement_dict = requirement[next(iter(requirement))]
# if requirement_dict['capability'] == 'tosca.capabilities.ARTICONF.VM':
# requirement_dict['occurrences'] = min_num_of_vm
vm_nodes = tosca_helper.get_nodes_by_type('tosca.nodes.ARTICONF.VM.Compute',
self.tosca_template.nodetemplates, self.all_node_types,
self.all_custom_def)
if vm_nodes:
for i in range(len(vm_nodes), masters_num):
old_vm_name = vm_nodes[0].name
new_vm = copy.deepcopy(vm_nodes[0])
new_vm_name = new_vm.name + '_' + str(i)
new_vm.name = new_vm_name
templates = new_vm.templates.pop(old_vm_name)
new_vm.templates[new_vm_name] = templates
return_nodes.append(new_vm)
for requirement in topology_nodes[0].requirements:
requirement_key = next(iter(requirement))
requirement_value = requirement[requirement_key]
if requirement_value['capability'] == 'tosca.capabilities.ARTICONF.VM':
new_requirement = copy.deepcopy(requirement)
new_requirement[requirement_key]['node'] = new_vm.name
topology_nodes[0].requirements.append(new_requirement)
return_nodes.append(topology_nodes[0])
break
for i in range(len(vm_nodes), workers_num + 1):
old_vm_name = vm_nodes[0].name
new_vm = copy.deepcopy(vm_nodes[0])
new_vm_name = new_vm.name + '_' + str(i)
new_vm.name = new_vm_name
templates = new_vm.templates.pop(old_vm_name)
new_vm.templates[new_vm_name] = templates
new_vm.templates[next(iter(new_vm.templates))]['properties']['role'] = "worker"
return_nodes.append(new_vm)
for requirement in topology_nodes[0].requirements:
requirement_key = next(iter(requirement))
requirement_value = requirement[requirement_key]
if requirement_value['capability'] == 'tosca.capabilities.ARTICONF.VM':
new_requirement = copy.deepcopy(requirement)
new_requirement[requirement_key]['node'] = new_vm.name
topology_nodes[0].requirements.append(new_requirement)
return_nodes.append(topology_nodes[0])
break
return return_nodes
def set_node_specifications(self):
nodes_to_implement_policies = self.get_nodes_to_implement_policy()
affected_nodes = []
for node_name in nodes_to_implement_policies:
policies = nodes_to_implement_policies[node_name]
affected_node = self.set_specs(node_name, policies, self.tosca_template.nodetemplates)
if affected_node:
affected_nodes.append(affected_node)
return affected_nodes
def get_nodes_to_implement_policy(self):
nodes_to_implement_policies = {}
for policy in self.tosca_template.policies:
for target in policy.targets:
for leaf in self.leaf_nodes:
for affected_node_name in (nx.shortest_path(self.g, source=target, target=leaf)):
if affected_node_name not in nodes_to_implement_policies:
policy_list = []
nodes_to_implement_policies[affected_node_name] = policy_list
policy_list = nodes_to_implement_policies[affected_node_name]
policy_list.append(policy.type)
nodes_to_implement_policies[affected_node_name] = policy_list
return nodes_to_implement_policies
def set_node_properties_for_policy(self, affected_node, policies):
logging.info('Setting properties for: ' + str(affected_node.type))
ancestors_types = tosca_helper.get_all_ancestors_types(affected_node, self.all_node_types, self.all_custom_def)
# if 'tosca.nodes.ARTICONF.Orchestrator' in ancestors_types:
# logging.info('Do Something')
properties = tosca_helper.get_all_ancestors_properties(affected_node, self.all_node_types,
self.all_custom_def)
default_properties = {}
for node_property in properties:
default_property = self.get_defult_value(node_property)
if default_property:
default_properties[next(iter(default_property))] = default_property[next(iter(default_property))]
if default_properties:
for default_property in default_properties:
affected_node.get_properties_objects().append(default_property)
if 'properties' in affected_node.templates[next(iter(affected_node.templates))]:
for prop_name in affected_node.templates[next(iter(affected_node.templates))]['properties']:
if 'required' not in affected_node.templates[next(iter(affected_node.templates))]['properties'][
prop_name] and 'type' not in \
affected_node.templates[next(iter(affected_node.templates))]['properties'][prop_name]:
default_properties[prop_name] = \
affected_node.templates[next(iter(affected_node.templates))]['properties'][prop_name]
affected_node.templates[next(iter(affected_node.templates))]['properties'] = default_properties
return affected_node
else:
return None
def set_specs(self, node_name, policies, nodes_in_template):
logging.info('node_name: ' + str(node_name) + ' will implement policies: ' + str(len(policies)))
for node in nodes_in_template:
if node.name == node_name:
affected_node = node
break
logging.info('node: ' + str(affected_node.type) + ' will implement policies: ' + str(len(policies)))
affected_node = self.set_node_properties_for_policy(affected_node, policies)
return affected_node
def get_defult_value(self, node_property):
if isinstance(node_property.value,
dict) and 'required' in node_property.value and 'type' in node_property.value:
if node_property.value['required']:
default_prop = {}
if 'default' in node_property.value:
if node_property.value['type'] == 'integer':
default_prop = int(node_property.value['default'])
else:
default_prop = str(node_property.value['default'])
elif 'constraints' in node_property.value:
constraints = node_property.value['constraints']
for constraint in constraints:
for constraint_key in constraint:
if 'equal' in constraint_key:
if node_property.value['type'] == 'integer':
default_prop = int(constraint[constraint_key])
else:
default_prop = str(constraint[constraint_key])
name = node_property.name
node_property = {name: default_prop}
return node_property
return None
class SpecService:
def __init__(self, conf):
self.configuration = conf
def get_property(self, prop_key):
return None
from abc import abstractmethod, ABCMeta
from toscaparser.tosca_template import ToscaTemplate
import networkx as nx
import matplotlib.pyplot as plt
class SpecificationAnalyzer(metaclass=ABCMeta):
def __init__(self, tosca_template):
self.tosca_template = tosca_template
self.tosca_node_types = self.tosca_template.nodetemplates[0].type_definition.TOSCA_DEF
self.all_custom_def = self.tosca_template.nodetemplates[0].custom_def
self.all_node_types = {}
self.all_node_types.update(self.tosca_node_types.items())
self.all_node_types.update(self.all_custom_def.items())
self.required_nodes = []
self.g = self.build_graph(self.tosca_template.nodetemplates)
self.root_nodes = []
self.leaf_nodes = []
for node_name, degree in self.g.in_degree():
if degree == 0:
self.root_nodes.append(node_name)
for node_name, degree in self.g.out_degree():
if degree == 0:
self.leaf_nodes.append(node_name)
def build_graph(self, node_templates):
graph = nx.DiGraph()
for node in node_templates:
graph.add_node(node.name, attr_dict=node.entity_tpl)
for req in node.requirements:
req_name = next(iter(req))
req_node_name = req[req_name]['node']
if 'relationship' in req[req_name] and 'type' in req[req_name]['relationship']:
relationship_type = req[req_name]['relationship']['type']
else:
if 'relationship' not in req[req_name]:
relationship_type = 'tosca.relationships.DependsOn'
else:
relationship_type = req[req_name]['relationship']
graph.add_edge(node.name, req_node_name, relationship=relationship_type)
nx.draw(graph, with_labels=True)
plt.savefig("/tmp/graph.png")
# plt.show()
return graph
@abstractmethod
def set_node_specifications(self):
raise NotImplementedError('Must implement upload in subclasses')
@abstractmethod
def set_relationship_occurrences(self):
raise NotImplementedError('Must implement upload in subclasses')
from setuptools import setup, find_packages
setup(
name='drip_planner2',
version='0.1',
packages=find_packages(),
# Declare your packages' dependencies here, for eg:
install_requires=['matplotlib==3.1.1', 'pika==1.1.0', 'tosca-parser==1.6.0', 'names==0.3.0', 'networkx==2.4',
'matplotlib==3.1.1'],
# Fill in these to make your Egg ready for upload to
# PyPI
author='S. Koulouzis',
author_email='',
# summary = 'Just another Python package for the cheese shop',
url='',
license='',
long_description='Long description of the package',
# could also include long_description, download_url, classifiers, etc.
)
import json
import logging
import os
import os.path
import tempfile
import time
import unittest
import yaml
from toscaparser.tosca_template import ToscaTemplate
from planner.planner import Planner
from service.spec_service import SpecService
class MyTestCase(unittest.TestCase):
def test_something(self):
logger = logging.getLogger(__name__)
tosca_path = "../../TOSCA/"
input_tosca_file_path = tosca_path + '/application_example_2_topologies.yaml'
dir_path = os.path.dirname(os.path.realpath(__file__))
print(dir_path)
self.assertEqual(True, os.path.exists(input_tosca_file_path),
"Input TOSCA file: " + input_tosca_file_path + " not found")
conf = {'url': "http://host"}
spec_service = SpecService(conf)
test_planner = Planner(input_tosca_file_path, spec_service)
test_tosca_template = test_planner.resolve_requirements()
test_tosca_template = test_planner.set_infrastructure_specifications()
template_dict = tosca_util.get_tosca_template_2_topology_template_dictionary(test_tosca_template)
logger.info("template ----: \n" + yaml.dump(template_dict))
try:
tosca_folder_path = os.path.join(tempfile.gettempdir(), tosca_path)
except NameError:
import sys
tosca_folder_path = os.path.dirname(os.path.abspath(sys.argv[0])) + os.path.join(tempfile.gettempdir(),
tosca_path)
tosca_file_name = 'tosca_template'
input_tosca_file_path = tosca_path + '/application_example_output.yaml'
with open(input_tosca_file_path, 'w') as outfile:
outfile.write(yaml.dump(template_dict))
ToscaTemplate(input_tosca_file_path)
test_response = {'toscaTemplate': template_dict}
response = {'toscaTemplate': template_dict}
output_current_milli_time = int(round(time.time() * 1000))
response["creationDate"] = output_current_milli_time
response["parameters"] = []
print("Output message:" + json.dumps(response))
self.assertEqual(True, True)
if __name__ == '__main__':
unittest.main()
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment