Commit 77fd0858 authored by Spiros Koulouzis's avatar Spiros Koulouzis

use new channel for each request

parent 535d96b9
...@@ -82,32 +82,32 @@ services: ...@@ -82,32 +82,32 @@ services:
#- "3001:3000" #- "3001:3000"
manager: #manager:
depends_on: #depends_on:
- rabbit #- rabbit
- mongo #- mongo
- sure-tosca #- sure-tosca
image: qcdis/manager #image: qcdis/manager
environment: #environment:
RABBITMQ_HOST: rabbit #RABBITMQ_HOST: rabbit
MONGO_HOST: mongo #MONGO_HOST: mongo
SURE_TOSCA_BASE_PATH: http://sure-tosca:8081/tosca-sure/1.0.0 #SURE_TOSCA_BASE_PATH: http://sure-tosca:8081/tosca-sure/1.0.0
CREDENTIAL_SECRET: top_secret #CREDENTIAL_SECRET: top_secret
ports: #ports:
- "8080:8080" #- "8080:8080"
sure-tosca: sure-tosca:
image: qcdis/sure-tosca image: qcdis/sure-tosca
ports: ports:
- "8081:8081" - "8081:8081"
#planner: planner:
#depends_on: depends_on:
#- rabbit - rabbit
#- sure-tosca - sure-tosca
#image: qcdis/planner image: qcdis/planner
#environment: environment:
#RABBITMQ_HOST: rabbit RABBITMQ_HOST: rabbit
provisioner: provisioner:
depends_on: depends_on:
......
...@@ -43,9 +43,9 @@ import org.springframework.stereotype.Service; ...@@ -43,9 +43,9 @@ import org.springframework.stereotype.Service;
@Service @Service
public class DRIPCaller implements AutoCloseable { public class DRIPCaller implements AutoCloseable {
private Connection connection; // private Connection connection;
private Channel channel; // private Channel channel;
private String replyQueueName; // private String replyQueueName;
private String requestQeueName; private String requestQeueName;
private final ObjectMapper mapper; private final ObjectMapper mapper;
private final ConnectionFactory factory; private final ConnectionFactory factory;
...@@ -63,51 +63,41 @@ public class DRIPCaller implements AutoCloseable { ...@@ -63,51 +63,41 @@ public class DRIPCaller implements AutoCloseable {
} }
public void init() throws IOException, TimeoutException { public void init() throws IOException, TimeoutException {
if (connection == null || !connection.isOpen()) { // if (connection == null || !connection.isOpen()) {
connection = factory.newConnection(); // connection = factory.newConnection();
channel = connection.createChannel(); // }
// create a single callback queue per client not per requests. }
Map<String, Object> args = new HashMap<>();
// args.put("x-message-ttl", 60000); // /**
// replyQueueName = channel.queueDeclare("myqueue", false, false, false, args).getQueue(); // * @return the connection
replyQueueName = channel.queueDeclare().getQueue(); // */
// public Connection getConnection() {
} // return connection;
} // }
// /**
/** // * @return the channel
* @return the connection // */
*/ // public Channel getChannel() {
public Connection getConnection() { // return channel;
return connection; // }
} // /**
// * @return the replyQueueName
/** // */
* @return the channel // public String getReplyQueueName() {
*/ // return replyQueueName;
public Channel getChannel() { // }
return channel;
}
/**
* @return the replyQueueName
*/
public String getReplyQueueName() {
return replyQueueName;
}
@Override @Override
public void close() throws IOException, TimeoutException { public void close() throws IOException, TimeoutException {
if (channel != null && channel.isOpen()) {
channel.close(); // if (connection != null && connection.isOpen()) {
} // connection.close();
if (connection != null && connection.isOpen()) { // }
connection.close();
}
} }
public Message call(Message r) throws IOException, TimeoutException, InterruptedException { public Message call(Message r) throws IOException, TimeoutException, InterruptedException {
Channel channel = null;
Connection connection = null;
try {
String jsonInString = mapper.writeValueAsString(r); String jsonInString = mapper.writeValueAsString(r);
int timeOut = 25; int timeOut = 25;
...@@ -117,21 +107,26 @@ public class DRIPCaller implements AutoCloseable { ...@@ -117,21 +107,26 @@ public class DRIPCaller implements AutoCloseable {
if (getRequestQeueName().equals("provisioner")) { if (getRequestQeueName().equals("provisioner")) {
timeOut = 10; timeOut = 10;
} }
connection = factory.newConnection();
channel = connection.createChannel();
Map<String, Object> args = new HashMap<>();
String replyQueueName = channel.queueDeclare().getQueue();
//Build a correlation ID to distinguish responds //Build a correlation ID to distinguish responds
final String corrId = UUID.randomUUID().toString(); final String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.correlationId(corrId) .correlationId(corrId)
.expiration(String.valueOf(timeOut*60000)) .expiration(String.valueOf(timeOut * 60000))
.replyTo(getReplyQueueName()) .replyTo(replyQueueName)
.build(); .build();
Logger.getLogger(DRIPCaller.class.getName()).log(Level.INFO, "Sending: {0} to queue: {1}", new Object[]{jsonInString, getRequestQeueName()}); Logger.getLogger(DRIPCaller.class.getName()).log(Level.INFO, "Sending: {0} to queue: {1}", new Object[]{jsonInString, getRequestQeueName()});
getChannel().basicPublish("", getRequestQeueName(), props, jsonInString.getBytes("UTF-8")); channel.basicPublish("", getRequestQeueName(), props, jsonInString.getBytes("UTF-8"));
final BlockingQueue<String> response = new ArrayBlockingQueue(1); final BlockingQueue<String> response = new ArrayBlockingQueue(1);
getChannel().basicConsume(getReplyQueueName(), true, new DefaultConsumer(getChannel()) { channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override @Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) { if (properties.getCorrelationId().equals(corrId)) {
...@@ -144,11 +139,17 @@ public class DRIPCaller implements AutoCloseable { ...@@ -144,11 +139,17 @@ public class DRIPCaller implements AutoCloseable {
String resp = response.poll(timeOut, TimeUnit.MINUTES); String resp = response.poll(timeOut, TimeUnit.MINUTES);
Logger.getLogger(DRIPCaller.class.getName()).log(Level.INFO, "Got: {0}", resp); Logger.getLogger(DRIPCaller.class.getName()).log(Level.INFO, "Got: {0}", resp);
if (resp == null) { if (resp == null) {
getChannel().queueDeleteNoWait(getReplyQueueName(), false, true);
close();
throw new TimeoutException("Timeout on qeue: " + getRequestQeueName()); throw new TimeoutException("Timeout on qeue: " + getRequestQeueName());
} }
return mapper.readValue(resp, Message.class); return mapper.readValue(resp, Message.class);
} finally {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
}
} }
/** /**
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
<parent> <parent>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId> <artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version> <version>2.2.2.RELEASE</version>
</parent> </parent>
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment