Commit e3bfff6f authored by Spiros Koulouzis's avatar Spiros Koulouzis

try to add streaming for logs

parent 49776fb6
......@@ -169,8 +169,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>1.8</source>
<target>1.8</target>
<compilerArguments>
<endorseddirs>${endorsed.dir}</endorseddirs>
</compilerArguments>
......
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package nl.uva.sne.drip.api.conf;
import java.util.concurrent.Executor;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler;
import org.springframework.boot.context.embedded.ServletRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
*
* @author S. Koulouzis
*/
@Configuration
@EnableScheduling
@EnableAsync
@ComponentScan("nl.uva.sne.drip")
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(5);
scheduler.initialize();
return scheduler;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
}
......@@ -28,12 +28,15 @@ public class WebAppInitializer implements WebApplicationInitializer {
ctx.register(MultipartConfig.class);
ctx.register(MongoConfig.class);
ctx.register(SecurityConfig.class);
// ctx.register(MethodSecurityConfig.class);
ctx.register(AsyncConfig.class);
// ctx.register(MethodSecurityConfig.class);
ctx.setServletContext(servletContext);
Dynamic dynamic = servletContext.addServlet("dispatcher", new DispatcherServlet(ctx));
DispatcherServlet dispatcher = new DispatcherServlet(ctx);
Dynamic dynamic = servletContext.addServlet("dispatcher", dispatcher);
dynamic.addMapping("/");
dynamic.setLoadOnStartup(1);
dynamic.setAsyncSupported(true);
}
}
......@@ -23,6 +23,8 @@ import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize;
......@@ -47,8 +49,9 @@ public class DRIPLogService {
private ObjectMapper mapper;
private ConnectionFactory factory;
public DRIPLogRecord get() throws IOException, TimeoutException {
public List<DRIPLogRecord> get() throws IOException, TimeoutException {
Channel channel = null;
if (factory == null) {
this.factory = new ConnectionFactory();
factory.setHost(messageBrokerHost);
......@@ -68,13 +71,15 @@ public class DRIPLogService {
channel.queueDeclare(qeueNameUser, true, false, false, null);
GetResponse response = channel.basicGet(qeueNameUser, true);
if (response != null) {
List<DRIPLogRecord> logs = new ArrayList<>();
while (response != null) {
String message = new String(response.getBody(), "UTF-8");
return mapper.readValue(message, DRIPLogRecord.class);
response = channel.basicGet(qeueNameUser, true);
logs.add(mapper.readValue(message, DRIPLogRecord.class));
}
return logs;
}
return null;
}
}
......@@ -54,13 +54,11 @@ import nl.uva.sne.drip.commons.utils.Converter;
import nl.uva.sne.drip.commons.utils.DRIPLogHandler;
import nl.uva.sne.drip.drip.commons.data.v1.external.ConfigurationRepresentation;
import nl.uva.sne.drip.drip.commons.data.v1.external.KeyPair;
import nl.uva.sne.drip.drip.commons.data.v1.external.KeyValueHolder;
import nl.uva.sne.drip.drip.commons.data.v1.external.PlanResponse;
import nl.uva.sne.drip.drip.commons.data.v1.external.ScaleRequest;
import nl.uva.sne.drip.drip.commons.data.v1.external.ToscaRepresentation;
import nl.uva.sne.drip.drip.commons.data.v1.external.ansible.AnsibleOutput;
import nl.uva.sne.drip.drip.commons.data.v1.external.ansible.BenchmarkResult;
import nl.uva.sne.drip.drip.commons.data.v1.external.ansible.SysbenchCPUBenchmark;
import org.json.JSONArray;
import org.json.JSONObject;
......
......@@ -79,8 +79,10 @@ public class CloudCredentialsController {
@RequestMapping(method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON)
@RolesAllowed({UserService.USER, UserService.ADMIN})
@StatusCodes({
@ResponseCode(code = 404, condition = "Key can't be empty"),
@ResponseCode(code = 404, condition = "Cloud provider's name can't be empty"),
@ResponseCode(code = 404, condition = "Key can't be empty")
,
@ResponseCode(code = 404, condition = "Cloud provider's name can't be empty")
,
@ResponseCode(code = 200, condition = "At least one key ID is posted")
})
public @ResponseBody
......@@ -107,8 +109,10 @@ public class CloudCredentialsController {
@RequestMapping(value = "/upload/{id}", method = RequestMethod.POST)
@RolesAllowed({UserService.USER, UserService.ADMIN})
@StatusCodes({
@ResponseCode(code = 404, condition = "Credential not found"),
@ResponseCode(code = 400, condition = "Did not upload file"),
@ResponseCode(code = 404, condition = "Credential not found")
,
@ResponseCode(code = 400, condition = "Did not upload file")
,
@ResponseCode(code = 200, condition = "Key added to credential")
})
public @ResponseBody
......@@ -139,6 +143,9 @@ public class CloudCredentialsController {
pair = keyService.save(pair);
// loginKeyIDs.add(pair.getId());
}
if (cloudCredentials.getCloudProviderName().toLowerCase().equals("egi")) {
cloudCredentials.setSecretKey(new String(bytes, "UTF-8"));
}
// cloudCredentials.setKeyIDs(loginKeyIDs);
cloudCredentials = cloudCredentialsService.save(cloudCredentials);
return cloudCredentials.getId();
......@@ -157,7 +164,8 @@ public class CloudCredentialsController {
@RequestMapping(value = "/{id}", method = RequestMethod.GET)
@RolesAllowed({UserService.USER, UserService.ADMIN})
@StatusCodes({
@ResponseCode(code = 404, condition = "Credential not found"),
@ResponseCode(code = 404, condition = "Credential not found")
,
@ResponseCode(code = 200, condition = "Credential exists")
})
public @ResponseBody
......
......@@ -15,9 +15,14 @@
*/
package nl.uva.sne.drip.api.v1.rest;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.webcohesion.enunciate.metadata.rs.ResponseCode;
import com.webcohesion.enunciate.metadata.rs.StatusCodes;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
......@@ -31,6 +36,9 @@ import org.springframework.web.bind.annotation.RestController;
import nl.uva.sne.drip.api.service.DRIPLogService;
import nl.uva.sne.drip.api.service.UserService;
import nl.uva.sne.drip.drip.commons.data.v1.external.DRIPLogRecord;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
/**
* This controller is responsible for storing TOSCA descriptions that can be
......@@ -52,7 +60,7 @@ public class LogController {
@RequestMapping(method = RequestMethod.GET)
@RolesAllowed({UserService.USER, UserService.ADMIN})
public @ResponseBody
DRIPLogRecord get() {
List<DRIPLogRecord> get() {
try {
return logService.get();
} catch (IOException | TimeoutException ex) {
......@@ -61,4 +69,58 @@ public class LogController {
return null;
}
// @RequestMapping(method = RequestMethod.GET, value = "/ll")
// @RolesAllowed({UserService.USER, UserService.ADMIN})
// public @ResponseBody
// ResponseBodyEmitter streamLogs() {
//
// final ResponseBodyEmitter emitter = new ResponseBodyEmitter();
// ExecutorService service = Executors.newSingleThreadExecutor();
// service.execute(new Runnable() {
// @Override
// public void run() {
// int i = 0;
// while (true) {
// try {
// i++;
// emitter.send(i + " - ", MediaType.TEXT_PLAIN);
// Thread.sleep(10);
// } catch (InterruptedException | IOException ex) {
// Logger.getLogger(LogController.class.getName()).log(Level.SEVERE, null, ex);
// emitter.completeWithError(ex);
// }
//
// }
//// emitter.complete();
// }
// });
//
// return emitter;
// }
@RequestMapping(method = RequestMethod.GET, value = "/ll")
@RolesAllowed({UserService.USER, UserService.ADMIN})
public StreamingResponseBody streamLogs() {
return new StreamingResponseBody() {
@Override
public void writeTo(OutputStream out) throws IOException {
ObjectMapper mapper = new ObjectMapper();
while (true) {
try {
List<DRIPLogRecord> logs = logService.get();
for (DRIPLogRecord log : logs) {
out.write(mapper.writeValueAsBytes(log));
out.write("\n".getBytes());
Thread.sleep(10);
}
Thread.sleep(2000);
} catch (InterruptedException | TimeoutException ex) {
Logger.getLogger(LogController.class.getName()).log(Level.SEVERE, null, ex);
} finally {
out.flush();
}
}
}
};
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment