Commit 79544fba authored by Spiros Koulouzis's avatar Spiros Koulouzis

publish log messages to queue

parent bbeff598
...@@ -13,71 +13,101 @@ ...@@ -13,71 +13,101 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package nl.uva.sne.drip.api.event; package nl.uva.sne.drip.api.service;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.logging.Handler;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.LogRecord; import java.util.logging.LogRecord;
import java.util.logging.Logger; import java.util.logging.Logger;
import java.util.logging.StreamHandler; import java.util.logging.StreamHandler;
import nl.uva.sne.drip.api.dao.CloudCredentialsDao;
import nl.uva.sne.drip.commons.utils.DRIPLogRecordFactory;
import nl.uva.sne.drip.drip.commons.data.v1.external.DRIPLogRecord;
import nl.uva.sne.drip.drip.commons.data.v1.external.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/** /**
* *
* @author S. Koulouzis * @author S. Koulouzis
*/ */
public class DRIPLogger extends StreamHandler implements AutoCloseable { public class DRIPLogHandler extends StreamHandler implements AutoCloseable {
private final Connection connection; private final Connection connection;
private final Channel channel; private final Channel channel;
private static final String EXCHANGE_NAME = "direct_logs";
public DRIPLogger(String messageBrokerHost) throws IOException, TimeoutException { private final String qeueName;
super(); private final ObjectMapper mapper;
public DRIPLogHandler(String messageBrokerHost) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); ConnectionFactory factory = new ConnectionFactory();
factory.setHost(messageBrokerHost); factory.setHost(messageBrokerHost);
factory.setPort(AMQP.PROTOCOL.PORT); factory.setPort(AMQP.PROTOCOL.PORT);
//factory.setUsername("guest");
//factory.setPassword("pass");
connection = factory.newConnection(); connection = factory.newConnection();
channel = connection.createChannel(); channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
this.qeueName = "log_qeue";
this.mapper = new ObjectMapper();
mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
} }
@Override @Override
public void publish(LogRecord record) { public void publish(LogRecord record) {
try { try {
String level = record.getLevel().getName(); User user = (User) SecurityContextHolder.getContext().getAuthentication().getPrincipal();
String message = record.getMessage(); String owner = user.getUsername();
channel.basicPublish(EXCHANGE_NAME, level, null, message.getBytes("UTF-8"));
super.publish(record); DRIPLogRecord dripLog = DRIPLogRecordFactory.create(record);
dripLog.setOwner(owner);
String jsonInString = mapper.writeValueAsString(dripLog);
// channel.basicPublish(qeueName, owner, null, jsonInString.getBytes());
// channel.basicPublish(qeueName, owner, MessageProperties.PERSISTENT_TEXT_PLAIN, jsonInString.getBytes("UTF-8"));
String qeueNameUser = qeueName + "_" + owner;
channel.queueDeclare(qeueNameUser, true, false, false, null);
channel.basicPublish("", qeueNameUser, MessageProperties.PERSISTENT_TEXT_PLAIN, jsonInString.getBytes("UTF-8"));
close();
} catch (JsonProcessingException ex) {
Logger.getLogger(DRIPLogHandler.class.getName()).log(Level.SEVERE, null, ex);
} catch (IOException ex) { } catch (IOException ex) {
Logger.getLogger(DRIPLogger.class.getName()).log(Level.SEVERE, null, ex); Logger.getLogger(DRIPLogHandler.class.getName()).log(Level.SEVERE, null, ex);
} }
} }
@Override @Override
public void close() { public void close() {
super.close();
if (channel != null && channel.isOpen()) { if (channel != null && channel.isOpen()) {
try { try {
channel.close(); channel.close();
if (connection != null && connection.isOpen()) {
connection.close();
}
} catch (IOException | TimeoutException ex) { } catch (IOException | TimeoutException ex) {
Logger.getLogger(DRIPLogger.class.getName()).log(Level.SEVERE, null, ex); Logger.getLogger(DRIPLogHandler.class.getName()).log(Level.SEVERE, null, ex);
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException ex) {
Logger.getLogger(DRIPLogger.class.getName()).log(Level.SEVERE, null, ex);
} }
} }
} }
} }
...@@ -25,10 +25,8 @@ import java.util.List; ...@@ -25,10 +25,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.logging.LogManager;
import java.util.logging.Logger; import java.util.logging.Logger;
import nl.uva.sne.drip.api.dao.PlanDao; import nl.uva.sne.drip.api.dao.PlanDao;
import nl.uva.sne.drip.api.event.DRIPLogger;
import nl.uva.sne.drip.api.exception.BadRequestException; import nl.uva.sne.drip.api.exception.BadRequestException;
import nl.uva.sne.drip.api.exception.NotFoundException; import nl.uva.sne.drip.api.exception.NotFoundException;
import nl.uva.sne.drip.api.rpc.PlannerCaller; import nl.uva.sne.drip.api.rpc.PlannerCaller;
...@@ -69,18 +67,18 @@ public class PlannerService { ...@@ -69,18 +67,18 @@ public class PlannerService {
@Value("${message.broker.host}") @Value("${message.broker.host}")
private String messageBrokerHost; private String messageBrokerHost;
private final Logger rootLogger; private final Logger logger;
@Autowired @Autowired
public PlannerService() throws IOException, TimeoutException { public PlannerService() throws IOException, TimeoutException {
rootLogger = LogManager.getLogManager().getLogger(""); logger = Logger.getLogger(PlannerService.class.getName());
rootLogger.addHandler(new DRIPLogger(messageBrokerHost)); logger.addHandler(new DRIPLogHandler(messageBrokerHost));
} }
public PlanResponse getPlan(String toscaId) throws JSONException, UnsupportedEncodingException, IOException, TimeoutException, InterruptedException { public PlanResponse getPlan(String toscaId) throws JSONException, UnsupportedEncodingException, IOException, TimeoutException, InterruptedException {
try (PlannerCaller planner = new PlannerCaller(messageBrokerHost)) { try (PlannerCaller planner = new PlannerCaller(messageBrokerHost)) {
Message plannerInvokationMessage = buildPlannerMessage(toscaId); Message plannerInvokationMessage = buildPlannerMessage(toscaId);
rootLogger.info("some message"); logger.info("some message");
Message plannerReturnedMessage = (planner.call(plannerInvokationMessage)); Message plannerReturnedMessage = (planner.call(plannerInvokationMessage));
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
......
/*
* Copyright 2017 S. Koulouzis, Wang Junchao, Huan Zhou, Yang Hu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nl.uva.sne.drip.commons.utils;
import java.util.logging.LogRecord;
import nl.uva.sne.drip.drip.commons.data.v1.external.DRIPLogRecord;
/**
*
* @author S. Koulouzis
*/
public class DRIPLogRecordFactory {
public static DRIPLogRecord create(LogRecord rec) {
DRIPLogRecord dRec = new DRIPLogRecord();
dRec.setLevel(rec.getLevel().getName());
dRec.setLoggerName(rec.getLoggerName());
dRec.setMessage(rec.getMessage());
dRec.setMillis(dRec.getTimestamp());
dRec.setSequenceNumber(rec.getSequenceNumber());
dRec.setSourceClassName(rec.getSourceClassName());
dRec.setSourceMethodName(rec.getSourceMethodName());
return dRec;
}
}
/*
* Copyright 2017 S. Koulouzis, Wang Junchao, Huan Zhou, Yang Hu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nl.uva.sne.drip.drip.commons.data.v1.external;
import com.fasterxml.jackson.annotation.JsonInclude;
import org.springframework.data.mongodb.core.mapping.Document;
/**
* @author S. Koulouzis
*/
@Document
@JsonInclude(JsonInclude.Include.NON_NULL)
public class DRIPLogRecord extends OwnedObject {
private String level;
private String loggerName;
private String message;
private long millis;
private long sequenceNumber;
private String sourceClassName;
private String sourceMethodName;
/**
* @return the level
*/
public String getLevel() {
return level;
}
/**
* @param level the level to set
*/
public void setLevel(String level) {
this.level = level;
}
/**
* @return the loggerName
*/
public String getLoggerName() {
return loggerName;
}
/**
* @param loggerName the loggerName to set
*/
public void setLoggerName(String loggerName) {
this.loggerName = loggerName;
}
/**
* @return the message
*/
public String getMessage() {
return message;
}
/**
* @param message the message to set
*/
public void setMessage(String message) {
this.message = message;
}
/**
* @return the millis
*/
public long getMillis() {
return millis;
}
/**
* @param millis the millis to set
*/
public void setMillis(long millis) {
this.millis = millis;
}
/**
* @return the sequenceNumber
*/
public long getSequenceNumber() {
return sequenceNumber;
}
/**
* @param sequenceNumber the sequenceNumber to set
*/
public void setSequenceNumber(long sequenceNumber) {
this.sequenceNumber = sequenceNumber;
}
/**
* @return the sourceClassName
*/
public String getSourceClassName() {
return sourceClassName;
}
/**
* @param sourceClassName the sourceClassName to set
*/
public void setSourceClassName(String sourceClassName) {
this.sourceClassName = sourceClassName;
}
/**
* @return the sourceMethodName
*/
public String getSourceMethodName() {
return sourceMethodName;
}
/**
* @param sourceMethodName the sourceMethodName to set
*/
public void setSourceMethodName(String sourceMethodName) {
this.sourceMethodName = sourceMethodName;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment