Commit a2e4f508 authored by Spiros Koulouzis's avatar Spiros Koulouzis

fixed thread for deployer

parent 0bb072e4
...@@ -10,12 +10,17 @@ import logging ...@@ -10,12 +10,17 @@ import logging
import pika import pika
import yaml import yaml
import sys import sys
from time import sleep
from concurrent.futures import thread
from threading import Thread
from service import tosca, k8s_service from service import tosca, k8s_service
from service import ansible_service from service import ansible_service
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
done = False
# if not getattr(logger, 'handler_set', None): # if not getattr(logger, 'handler_set', None):
# logger.setLevel(logging.INFO) # logger.setLevel(logging.INFO)
...@@ -37,7 +42,7 @@ def init_chanel(args): ...@@ -37,7 +42,7 @@ def init_chanel(args):
connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_host)) connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_host))
channel = connection.channel() channel = connection.channel()
channel.queue_declare(queue=queue_name) channel.queue_declare(queue=queue_name)
return channel return channel, connection
def start(this_channel): def start(this_channel):
...@@ -106,6 +111,12 @@ def handle_delivery(message): ...@@ -106,6 +111,12 @@ def handle_delivery(message):
return json.dumps(response) return json.dumps(response)
def threaded_function(args):
while not done:
connection.process_data_events()
sleep(5)
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
if sys.argv[1] == "test_local": if sys.argv[1] == "test_local":
...@@ -124,7 +135,16 @@ if __name__ == "__main__": ...@@ -124,7 +135,16 @@ if __name__ == "__main__":
else: else:
logger.info("Input args: " + sys.argv[0] + ' ' + sys.argv[1] + ' ' + sys.argv[2]) logger.info("Input args: " + sys.argv[0] + ' ' + sys.argv[1] + ' ' + sys.argv[2])
channel = init_chanel(sys.argv) global channel, queue_name, connection
global queue_name channel, connection = init_chanel(sys.argv)
queue_name = sys.argv[2] queue_name = sys.argv[2]
start(channel) logger.info("Awaiting RPC requests")
try:
thread = Thread(target=threaded_function, args=(1,))
thread.start()
start(channel)
except:
done = True
e = sys.exc_info()[0]
logger.info("Error: " + str(e))
exit(-1)
...@@ -20,7 +20,7 @@ services: ...@@ -20,7 +20,7 @@ services:
- sure-tosca - sure-tosca
image: alogo53/planner:3.0.0 image: alogo53/planner:3.0.0
environment: environment:
RABBITMQ_HOST: rabbit RABBITMQ_HOST: rabbit
provisioner: provisioner:
depends_on: depends_on:
......
...@@ -68,12 +68,13 @@ def on_request(ch, method, props, body): ...@@ -68,12 +68,13 @@ def on_request(ch, method, props, body):
ch.basic_ack(delivery_tag=method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag)
def handle_delivery(message): def handle_delivery(message, sys=None):
logger.info("Got: " + str(message)) logger.info("Got: " + str(message))
try: try:
message = message.decode() message = message.decode()
except (UnicodeDecodeError, AttributeError): except (UnicodeDecodeError, AttributeError):
pass e = sys.exc_info()[0]
logger.info("Parsing Error: " + str(e))
parsed_json_message = json.loads(message) parsed_json_message = json.loads(message)
owner = parsed_json_message['owner'] owner = parsed_json_message['owner']
tosca_file_name = 'tosca_template' tosca_file_name = 'tosca_template'
...@@ -143,22 +144,20 @@ if __name__ == "__main__": ...@@ -143,22 +144,20 @@ if __name__ == "__main__":
test_response = {'toscaTemplate': template_dict} test_response = {'toscaTemplate': template_dict}
logger.info("Output message:" + json.dumps(test_response)) logger.info("Output message:" + json.dumps(test_response))
else: else:
print("Input args: " + sys.argv[0] + ' ' + sys.argv[1] + ' ' + sys.argv[2])
logger.info("Input args: " + sys.argv[0] + ' ' + sys.argv[1] + ' ' + sys.argv[2]) logger.info("Input args: " + sys.argv[0] + ' ' + sys.argv[1] + ' ' + sys.argv[2])
global channel global channel
global connection global connection
channel, connection = init_chanel(sys.argv) channel, connection = init_chanel(sys.argv)
global queue_name global queue_name
queue_name = sys.argv[2] queue_name = sys.argv[2]
# start(channel)
thread = Thread(target=threaded_function, args=(1,)) # thread = Thread(target=threaded_function, args=(1,))
thread.start() # thread.start()
logger.info("Awaiting RPC requests") logger.info("Awaiting RPC requests")
try: try:
channel.start_consuming() start(channel)
except KeyboardInterrupt: except:
# thread.stop() e = sys.exc_info()[0]
done = True logger.info("Error: " + str(e))
thread.join() exit(-1)
logger.info("Threads successfully closed")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment