Commit 7f272ffb authored by Spiros Koulouzis's avatar Spiros Koulouzis

convert tosca to kubenetes

parent 8095bdf7
...@@ -23,7 +23,7 @@ topology_template: ...@@ -23,7 +23,7 @@ topology_template:
#type: tosca.relationships.ConnectsTo #type: tosca.relationships.ConnectsTo
type: tosca.relationships.DependsOn type: tosca.relationships.DependsOn
artifacts: artifacts:
my_image: image:
file: wordpress:latest file: wordpress:latest
type: tosca.artifacts.Deployment.Image.Container.Docker type: tosca.artifacts.Deployment.Image.Container.Docker
repository: docker_hub repository: docker_hub
......
...@@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; ...@@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -173,20 +174,20 @@ public class DeployService { ...@@ -173,20 +174,20 @@ public class DeployService {
public Map<String, Object> getSwarmInfo(DeployResponse deployResp) throws JSONException, IOException, TimeoutException, InterruptedException { public Map<String, Object> getSwarmInfo(DeployResponse deployResp) throws JSONException, IOException, TimeoutException, InterruptedException {
// deployResp.setManagerType("swarm_info"); // deployResp.setManagerType("swarm_info");
Message deployerInvokationMessage = buildDeployerMessages( // Message deployerInvokationMessage = buildDeployerMessages(
deployResp, // deployResp,
null, // null,
null).get(0); // null).get(0);
Map<String, Object> info; // Map<String, Object> info;
deployerInvokationMessage.setOwner(((User) SecurityContextHolder.getContext().getAuthentication().getPrincipal()).getUsername()); // deployerInvokationMessage.setOwner(((User) SecurityContextHolder.getContext().getAuthentication().getPrincipal()).getUsername());
try (DRIPCaller deployer = new DeployerCaller(messageBrokerHost);) { // try (DRIPCaller deployer = new DeployerCaller(messageBrokerHost);) {
logger.info("Calling deployer"); // logger.info("Calling deployer");
Message response = (deployer.call(deployerInvokationMessage)); // Message response = (deployer.call(deployerInvokationMessage));
logger.info("Got response from deployer"); // logger.info("Got response from deployer");
List<MessageParameter> params = response.getParameters(); // List<MessageParameter> params = response.getParameters();
info = buildSwarmInfo(params); // info = buildSwarmInfo(params);
} // }
return info; return null;
} }
private List<Message> buildDeployerMessages( private List<Message> buildDeployerMessages(
...@@ -194,13 +195,11 @@ public class DeployService { ...@@ -194,13 +195,11 @@ public class DeployService {
String serviceName, String serviceName,
Integer numOfContainers) throws JSONException { Integer numOfContainers) throws JSONException {
String provisionID = deployInfo.getProvisionID(); String provisionID = deployInfo.getProvisionID();
ProvisionResponse pro = provisionService.findOne(provisionID); ProvisionResponse pro = provisionService.findOne(provisionID);
if (pro == null) { if (pro == null) {
throw new NotFoundException(); throw new NotFoundException();
} }
// List<String> loginKeysIDs = pro.getDeployerKeyPairIDs();
List<Message> messages = new ArrayList<>(); List<Message> messages = new ArrayList<>();
List<MessageParameter> parameters = new ArrayList<>(); List<MessageParameter> parameters = new ArrayList<>();
...@@ -215,14 +214,11 @@ public class DeployService { ...@@ -215,14 +214,11 @@ public class DeployService {
MessageParameter managerTypeParameter = createManagerTypeParameter("kubernetes"); MessageParameter managerTypeParameter = createManagerTypeParameter("kubernetes");
parameters.add(managerTypeParameter); parameters.add(managerTypeParameter);
// if (action.toLowerCase().equals("scale")) { List<Map<String, Object>> deployments = TOSCAUtils.tosca2KubernetesDeployment(toscaProvisonMap);
// MessageParameter scaleParameter = createScaleParameter(null, serviceName, numOfContainers); // String deploymentEncoded = new String(Base64.getDecoder().decode(Converter.map2YmlString(deployments)));
// parameters.add(scaleParameter); // MessageParameter confParam = createConfigurationParameter(deploymentEncoded, "kubernetes");
// } // parameters.add(confParam);
// if (action.toLowerCase().equals("swarm_info") ) {
// MessageParameter swarmInfo = createSwarmInforparameter(null, serviceName);
// parameters.add(swarmInfo);
// }
Message deployInvokationMessage = new Message(); Message deployInvokationMessage = new Message();
deployInvokationMessage.setParameters(parameters); deployInvokationMessage.setParameters(parameters);
deployInvokationMessage.setCreationDate(System.currentTimeMillis()); deployInvokationMessage.setCreationDate(System.currentTimeMillis());
...@@ -279,6 +275,8 @@ public class DeployService { ...@@ -279,6 +275,8 @@ public class DeployService {
MessageParameter configurationParameter = new MessageParameter(); MessageParameter configurationParameter = new MessageParameter();
if (confType.equals("ansible")) { if (confType.equals("ansible")) {
configurationParameter.setName("playbook"); configurationParameter.setName("playbook");
} else if (confType.equals("kubernetes")) {
configurationParameter.setName("deployment");
} else { } else {
configurationParameter.setName(confType); configurationParameter.setName(confType);
} }
...@@ -344,7 +342,8 @@ public class DeployService { ...@@ -344,7 +342,8 @@ public class DeployService {
private DeployResponse handleResponse(List<MessageParameter> params, DeployRequest deployInfo) throws KeyException, IOException, Exception { private DeployResponse handleResponse(List<MessageParameter> params, DeployRequest deployInfo) throws KeyException, IOException, Exception {
DeployResponse deployResponse = new DeployResponse(); DeployResponse deployResponse = new DeployResponse();
deployResponse.setProvisionID(deployInfo.getProvisionID());
deployResponse.setKvMap(provisionService.findOne(deployInfo.getProvisionID()).getKeyValue());
for (MessageParameter p : params) { for (MessageParameter p : params) {
String name = p.getName(); String name = p.getName();
...@@ -355,7 +354,7 @@ public class DeployService { ...@@ -355,7 +354,7 @@ public class DeployService {
k.setType(Key.KeyType.PRIVATE); k.setType(Key.KeyType.PRIVATE);
KeyPair pair = new KeyPair(); KeyPair pair = new KeyPair();
pair.setPrivateKey(k); pair.setPrivateKey(k);
deployResponse.setKey(pair); // deployResponse.setKey(pair);
save(deployResponse); save(deployResponse);
return deployResponse; return deployResponse;
} }
...@@ -365,7 +364,7 @@ public class DeployService { ...@@ -365,7 +364,7 @@ public class DeployService {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
value = parseValue(value); value = parseValue(value);
List<AnsibleOutput> outputList = mapper.readValue(value, new TypeReference<List<AnsibleOutput>>() { List<AnsibleOutput> outputList = mapper.readValue(value, new TypeReference<List<AnsibleOutput>>() {
}); });
...@@ -436,7 +435,7 @@ public class DeployService { ...@@ -436,7 +435,7 @@ public class DeployService {
outputListIds.add(ansOut.getId()); outputListIds.add(ansOut.getId());
} }
deployResponse.setAnsibleOutputList(outputListIds); // deployResponse.setAnsibleOutputList(outputListIds);
} }
} }
return deployResponse; return deployResponse;
...@@ -570,4 +569,23 @@ public class DeployService { ...@@ -570,4 +569,23 @@ public class DeployService {
return null; return null;
} }
public String get(String id, String format) throws JSONException, IOException, TimeoutException, InterruptedException {
DeployResponse deploy = findOne(id);
Map<String, Object> map = deploy.getKeyValue();
if (format != null && format.equals("yml")) {
String ymlStr = Converter.map2YmlString(map);
ymlStr = ymlStr.replaceAll("\\uff0E", ".");
return ymlStr;
}
if (format != null && format.equals("json")) {
String jsonStr = Converter.map2JsonString(map);
jsonStr = jsonStr.replaceAll("\\uff0E", ".");
return jsonStr;
}
String ymlStr = Converter.map2YmlString(map);
ymlStr = ymlStr.replaceAll("\\uff0E", ".");
return ymlStr;
}
} }
...@@ -45,7 +45,7 @@ import org.springframework.web.bind.annotation.RequestBody; ...@@ -45,7 +45,7 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RequestParam;
/** /**
* This controller is responsible for deploying a cluster on provisioned * This controller is responsible for deploying a cluster on provisioned
* resources. * resources.
* *
* @author S. Koulouzis * @author S. Koulouzis
...@@ -70,8 +70,10 @@ public class DeployController { ...@@ -70,8 +70,10 @@ public class DeployController {
@RequestMapping(value = "/deploy", method = RequestMethod.POST) @RequestMapping(value = "/deploy", method = RequestMethod.POST)
@RolesAllowed({UserService.USER, UserService.ADMIN}) @RolesAllowed({UserService.USER, UserService.ADMIN})
@StatusCodes({ @StatusCodes({
@ResponseCode(code = 400, condition = "Empty provision ID"), @ResponseCode(code = 400, condition = "Empty provision ID")
@ResponseCode(code = 500, condition = "Deploymet failed"), ,
@ResponseCode(code = 500, condition = "Deploymet failed")
,
@ResponseCode(code = 200, condition = "Successful deploymet") @ResponseCode(code = 200, condition = "Successful deploymet")
}) })
public @ResponseBody public @ResponseBody
...@@ -88,9 +90,10 @@ public class DeployController { ...@@ -88,9 +90,10 @@ public class DeployController {
} }
/** /**
* Scales deployment * Scales deployment
*
* @param scaleRequest * @param scaleRequest
* @return * @return
*/ */
@RequestMapping(value = "/scale", method = RequestMethod.POST) @RequestMapping(value = "/scale", method = RequestMethod.POST)
@RolesAllowed({UserService.USER, UserService.ADMIN}) @RolesAllowed({UserService.USER, UserService.ADMIN})
...@@ -111,37 +114,29 @@ public class DeployController { ...@@ -111,37 +114,29 @@ public class DeployController {
return null; return null;
} }
/** /**
* Returns a deployment description * Returns a deployment description
* *
* @param id * @param id
* @param format
* @return * @return
*/ */
@RequestMapping(value = "/{id}", method = RequestMethod.GET) @RequestMapping(value = "/{id}", method = RequestMethod.GET, params = {"format"})
@RolesAllowed({UserService.USER, UserService.ADMIN}) @RolesAllowed({UserService.USER, UserService.ADMIN})
@StatusCodes({ @StatusCodes({
@ResponseCode(code = 404, condition = "Object not found"), @ResponseCode(code = 404, condition = "Object not found")
,
@ResponseCode(code = 200, condition = "Object found") @ResponseCode(code = 200, condition = "Object found")
}) })
public @ResponseBody public @ResponseBody
DeployResponse get(@PathVariable("id") String id) { String get(@PathVariable("id") String id, @RequestParam(value = "format") String format) {
DeployResponse resp = null;
try { try {
resp = deployService.findOne(id); DeployResponse resp = null;
return deployService.get(id, format);
// if (resp.getManagerType().equals("swarm")) {
// Map<String, Object> swarmInfo = deployService.getSwarmInfo(resp);
// resp.setManagerInfo(swarmInfo);
// }
} catch (JSONException | IOException | TimeoutException | InterruptedException ex) { } catch (JSONException | IOException | TimeoutException | InterruptedException ex) {
Logger.getLogger(DeployController.class.getName()).log(Level.SEVERE, null, ex); Logger.getLogger(DeployController.class.getName()).log(Level.SEVERE, null, ex);
} }
if (resp == null) { return null;
throw new NotFoundException();
}
return resp;
} }
/** /**
...@@ -154,7 +149,8 @@ public class DeployController { ...@@ -154,7 +149,8 @@ public class DeployController {
@RequestMapping(value = "/{id}/container_status", method = RequestMethod.GET, params = {"service_name"}) @RequestMapping(value = "/{id}/container_status", method = RequestMethod.GET, params = {"service_name"})
@RolesAllowed({UserService.USER, UserService.ADMIN}) @RolesAllowed({UserService.USER, UserService.ADMIN})
@StatusCodes({ @StatusCodes({
@ResponseCode(code = 404, condition = "Object not found"), @ResponseCode(code = 404, condition = "Object not found")
,
@ResponseCode(code = 200, condition = "Object found") @ResponseCode(code = 200, condition = "Object found")
}) })
public @ResponseBody public @ResponseBody
...@@ -174,16 +170,17 @@ public class DeployController { ...@@ -174,16 +170,17 @@ public class DeployController {
return resp; return resp;
} }
/** /**
* Returns the service names running on the cluster * Returns the service names running on the cluster
*
* @param id * @param id
* @return * @return
*/ */
@RequestMapping(value = "/{id}/service_names", method = RequestMethod.GET) @RequestMapping(value = "/{id}/service_names", method = RequestMethod.GET)
@RolesAllowed({UserService.USER, UserService.ADMIN}) @RolesAllowed({UserService.USER, UserService.ADMIN})
@StatusCodes({ @StatusCodes({
@ResponseCode(code = 404, condition = "Object not found"), @ResponseCode(code = 404, condition = "Object not found")
,
@ResponseCode(code = 200, condition = "Object found") @ResponseCode(code = 200, condition = "Object found")
}) })
public @ResponseBody public @ResponseBody
...@@ -229,7 +226,8 @@ public class DeployController { ...@@ -229,7 +226,8 @@ public class DeployController {
@RequestMapping(value = "/{id}", method = RequestMethod.DELETE) @RequestMapping(value = "/{id}", method = RequestMethod.DELETE)
@RolesAllowed({UserService.USER, UserService.ADMIN}) @RolesAllowed({UserService.USER, UserService.ADMIN})
@StatusCodes({ @StatusCodes({
@ResponseCode(code = 200, condition = "Successful delete"), @ResponseCode(code = 200, condition = "Successful delete")
,
@ResponseCode(code = 404, condition = "Object not found") @ResponseCode(code = 404, condition = "Object not found")
}) })
public @ResponseBody public @ResponseBody
......
...@@ -21,6 +21,9 @@ import java.util.HashMap; ...@@ -21,6 +21,9 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.json.JSONException;
/** /**
* *
...@@ -99,4 +102,107 @@ public class TOSCAUtils { ...@@ -99,4 +102,107 @@ public class TOSCAUtils {
List<String> outputPair = (List<String>) ((Map<String, Object>) ((Map<String, Object>) outputs.get(key)).get("value")).get("get_attribute"); List<String> outputPair = (List<String>) ((Map<String, Object>) ((Map<String, Object>) outputs.get(key)).get("value")).get("get_attribute");
return outputPair; return outputPair;
} }
public static List<Map.Entry> getDockerContainers(Map<String, Object> toscaPlan) {
Map<String, Object> topologyTemplate = (Map<String, Object>) toscaPlan.get("topology_template");
Map<String, Object> nodeTemplates = (Map<String, Object>) (topologyTemplate).get("node_templates");
Iterator it = nodeTemplates.entrySet().iterator();
List<Map.Entry> dockerContainers = new ArrayList<>();
while (it.hasNext()) {
Map.Entry node = (Map.Entry) it.next();
Map<String, Object> nodeValue = (Map<String, Object>) node.getValue();
String type = (String) nodeValue.get("type");
if (type.equals("tosca.nodes.ARTICONF.Container.Application.Docker")) {
dockerContainers.add(node);
}
}
return dockerContainers;
}
public static List<Map<String, Object>> tosca2KubernetesDeployment(Map<String, Object> toscaPlan) {
List<Map.Entry> dockerContainers = getDockerContainers(toscaPlan);
// String containerName = "mysql";
// String appName = containerName;
// String image1 = "mysql:5.7";
// String imageName1 = "mysql";
// Map<String, Object> envMap1 = new HashMap();
// envMap1.put("MYSQL_DATABASE", "wordpress");
// envMap1.put("MYSQL_PASSWORD", "wordpress");
// envMap1.put("MYSQL_ROOT_PASSWORD", "somewordpress");
// envMap1.put("MYSQL_USER", "wordpress");
List<Map<String, Object>> deployments = new ArrayList<>();
Iterator<Map.Entry> dicIt = dockerContainers.iterator();
while (dicIt.hasNext()) {
Map.Entry docker = dicIt.next();
String name = (String) docker.getKey();
Map<String, Object> labels = new HashMap();
labels.put("app", name);
Map<String, Object> metadata = new HashMap();
metadata.put("labels", labels);
metadata.put("name", name);
Map<String, Object> template = new HashMap();
template.put("metadata", metadata);
Map<String, Object> topSpec = new HashMap();
topSpec.put("replicas", 1);
topSpec.put("template", template);
Map<String, Object> dockerValues = (Map<String, Object>) docker.getValue();
Map<String, Object> properties = (Map<String, Object>) dockerValues.get("properties");
Map<String, Object> envMap = (Map<String, Object>) properties.get("environment");
Map<String, Object> imageEnv = new HashMap();
imageEnv.put("env", envMap);
Map<String, Object> image = (Map<String, Object>) ((Map<String, Object>) dockerValues.get("artifacts")).get("image");
String imageFile = (String) image.get("file");
Map<String, Object> container = new HashMap();
container.put("image", imageFile);
container.put("name", name);
container.put("env", imageEnv);
List<String> toscaPortsList = (List<String>) properties.get("ports");
if (toscaPortsList != null) {
Map<String, Object> ports = new HashMap();
System.err.println(toscaPortsList);
for (String portEntry : toscaPortsList) {
String[] portsArray = portEntry.split(":");
Map<String, Object> portMap = new HashMap();
portMap.put("containerPort", portsArray[0]);
List<Map<String, Object>> kubernetesPortsList = new ArrayList<>();
kubernetesPortsList.add(portMap);
}
}
// ports.put("ports", portsList);
List<Map<String, Object>> containersList = new ArrayList<>();
containersList.add(container);
Map<String, Object> containers = new HashMap();
containers.put("containers", containersList);
Map<String, Object> spec1 = new HashMap();
spec1.put("containers", containers);
// spec1.put("ports", ports);
topSpec.put("spec", spec1);
Map<String, Object> deployment = new HashMap();
deployment.put("spec", topSpec);
deployment.put("metadata", metadata);
deployment.put("kind", "Deployment");
deployment.put("apiVersion", "extensions/v1beta1");
try {
System.err.println(Converter.map2YmlString(deployment));
} catch (JSONException ex) {
Logger.getLogger(TOSCAUtils.class.getName()).log(Level.SEVERE, null, ex);
}
deployments.add(deployment);
}
return deployments;
}
} }
...@@ -30,69 +30,23 @@ import org.springframework.data.mongodb.core.mapping.Document; ...@@ -30,69 +30,23 @@ import org.springframework.data.mongodb.core.mapping.Document;
*/ */
@Document @Document
@JsonInclude(JsonInclude.Include.NON_NULL) @JsonInclude(JsonInclude.Include.NON_NULL)
public class DeployResponse extends DeployRequest { public class DeployResponse extends KeyValueHolder {
private KeyPair key;
private List<String> ansibleOutputListIDs;
private ScaleRequest scale;
private Map<String, Object> managerInfo;
public void setAnsibleOutputList(List<String> outputListIDs) {
this.ansibleOutputListIDs = outputListIDs;
}
/**
* @return the ansibleOutputList
*/
public List<String> getAnsibleOutputList() {
return ansibleOutputListIDs;
}
public void setKey(KeyPair key) {
this.key = key;
}
/**
* The key pair to log in and manage a docker cluster
*
* @return
*/
public KeyPair getKeyPair() {
return key;
}
/** /**
* The scale information if any for this deployment * @return the provisionID
*
* @return the scale
*/ */
public ScaleRequest getScale() { public String getProvisionID() {
return scale; return provisionID;
}
/**
* @param scale the scale to set
*/
public void setScale(ScaleRequest scale) {
this.scale = scale;
}
public void setManagerInfo(Map<String, Object> managerInfo) {
this.managerInfo = managerInfo;
} }
/** /**
* Returns manager info e.g. service status etc. * @param provisionID the provisionID to set
*
* @return
*/ */
@DocumentationExample("{\"services_info\": {\"status\": \"Ready\", \"hostname\": " public void setProvisionID(String provisionID) {
+ "\"stoor74\", \"ID\": \"v5y8cs7zd5atej53buq86g8j7\", \"availability\": \"Active\"}, ." this.provisionID = provisionID;
+ "\"cluster_node_info\": {\"status\": \"Ready\", \"hostname\": \"stoor74\", \"ID\": \"v5y8cs7zd5atej53buq86g8j7\", \"availability\": \"Active\"}}")
public Map<String, Object> getManagerInfo() {
return this.managerInfo;
} }
private String provisionID;
} }
apiVersion: v1
clusters:
- cluster:
certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUN5RENDQWJDZ0F3SUJBZ0lCQURBTkJna3Foa2lHOXcwQkFRc0ZBREFWTVJNd0VRWURWUVFERXdwcmRXSmwKY201bGRHVnpNQjRYRFRFNU1Ea3hPVEUxTXpVME9Gb1hEVEk1TURreE5qRTFNelUwT0Zvd0ZURVRNQkVHQTFVRQpBeE1LYTNWaVpYSnVaWFJsY3pDQ0FTSXdEUVlKS29aSWh2Y05BUUVCQlFBRGdnRVBBRENDQVFvQ2dnRUJBS1ZiClNqTE1WYXpOVTQ4QnFxOHk0WkV6WlBramJmU213NU9EZmNSaUhWQ0FCY2o4QUlrbzBDYkNxd2JiWkdMeXBpYTcKYXRWL2ZITFQzalRBNEkzSW11bWFCekE4VnR4Q1c0dHprMFl1akdHeW5LOFlJb1hlc0JJbldSc1h1Yi9sUlMrRwo1R25rZC94QU12TE10S1VycFppQlYvb1NJRzBNSVkrUW1qSE5GdmxmTjYwK3lMeSt2enUwcjNqMzc4WTFNY0RJCkRveHBxYzlBYUdPSkQrajU2VGVXSTh5ckdZRzVDNjk5NXljZDFJQ1ZiTFBaaDl5ZFBiVnUvNXBtY2JaYWRsazcKcC9JRHZUN0F0YXFPQ3h1N1VFeW5XRVVaUUtSc1pqRzY4MWo0bVlXMmxmZUhhNS9FQ01DYVZPQlNXTER4WXBQRwpOUjB4SWFtbHRDQ0tNT2E1eURjQ0F3RUFBYU1qTUNFd0RnWURWUjBQQVFIL0JBUURBZ0trTUE4R0ExVWRFd0VCCi93UUZNQU1CQWY4d0RRWUpLb1pJaHZjTkFRRUxCUUFEZ2dFQkFIWWtpNEFuVDM1d2RCUjNtbTlaLzRONFJKVDQKVlNXdU56dmt0ajdEK01MMjM5QUhHNGVhOXIwSzcyZWU4U3V4MFZ0ZUxkODhBdExJdmROa3FxYW84WFVDZmpzeQplSVNDUXZvaUxXdUhxSzVuck5PNlFFQXhkZGVxNlhGLzZUcElFL2Q0eDE5Zi8zUUwveDRpV0pONCt5OWZnUVlpClR2WXRnVGs3QlhwbEVBcm01Z0FwM25ST1RsWXBuQnk5YVI1Zit0dCtYdStzRjUwMFpnanU5YzB2MHk0aFU3aVoKb1BEVHM5ZXZPQlorcEQ1RUh0MU1ja1gyRTJybUNYN1ZKczNMdXBEY0hhckhhVWFiMjBvVndrdmdCNTBUd29abQpSYldJRXY1SHNjbloxMmRSVUR3dGZYMGpIWmdhc0ZVMFExbmc4SDhKYVZxU3YwNENzaWwva0hSOXlBWT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=
server: https://145.100.133.155:6443
name: kubernetes
contexts:
- context:
cluster: kubernetes
user: kubernetes-admin
name: kubernetes-admin@kubernetes
current-context: kubernetes-admin@kubernetes
kind: Config
preferences: {}
users:
- name: kubernetes-admin
user:
client-certificate-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUM4akNDQWRxZ0F3SUJBZ0lJYnFTZk4vRHNFYjh3RFFZSktvWklodmNOQVFFTEJRQXdGVEVUTUJFR0ExVUUKQXhNS2EzVmlaWEp1WlhSbGN6QWVGdzB4T1RBNU1Ua3hOVE0xTkRoYUZ3MHlNREE1TVRneE5UTTFOVEphTURReApGekFWQmdOVkJBb1REbk41YzNSbGJUcHRZWE4wWlhKek1Sa3dGd1lEVlFRREV4QnJkV0psY201bGRHVnpMV0ZrCmJXbHVNSUlCSWpBTkJna3Foa2lHOXcwQkFRRUZBQU9DQVE4QU1JSUJDZ0tDQVFFQXdxQWlhVGZlOWk2RHRNUzkKN0JqVCszeUR5UDhBdmVJNXdFaWQvVU9paUJDSHlEVndZTmJrSWo0a0NBRGZUekJ3cFZjUG5ZendKUGg1SjQxNgowM3djakdGSkR1RHg0U0IvbzNwMDlDZmNVcmxobUx0SjFPMFFrTWJKOHFOTUlDZFptdlVwNHIzSXVqbGNxcWtiCnRUMVRSLys4akZ6UE50QVlhaVBVbDY0L2lSUGY3V01GU0xBeGhHNFJKeHllZHhKVjhHR0EyVm4vZWN0LysrL3EKWklFbFFDTVg5bVRaMXlzVy90YXJYTkRCelMrWkpvdW1mQXhGdVprZzhJSXZ0UnBvZ2l5a0RBSWovck1SazJZTwpEN1FySURjcTNqOTRPRkZvOVgzZTBsNVhrNUcvTDVxb2ZwQTllV0gxamhDT2ROd3BYOHkyNE4ybksreHJTVTROCndmandBUUlEQVFBQm95Y3dKVEFPQmdOVkhROEJBZjhFQkFNQ0JhQXdFd1lEVlIwbEJBd3dDZ1lJS3dZQkJRVUgKQXdJd0RRWUpLb1pJaHZjTkFRRUxCUUFEZ2dFQkFIQ2FaNjFMQzJVdUhWUXRoditONWdJZ2pnQUN2dlVPWGcxNgoxbFArMXBkckxLZmpSVENHYTQwNFg5UHZBeEFwcEltMzZQSGpxeUVUOUJjYmpsOGRMK2FwTTArVjFjdlg0VkYrCnNtbGFGT3NGRGMwY0IxRjd5OHBnN0pVRW5YZTV4OS9saXd0SG90UkJiWlhqeXVuK1pBcEMzQ2JtTEx2dDZ6UW8KNkk2ejdCbHphN2xvWGJHbjJoeHRMK2gwZTVwNjJHdi84UXg4VFQxNWRFVXcrbmVBWEtBbm5YdHM5UVlpUmQ4TAphUkZyRFFueHl0S1pseHlrVzNrM2VJdHIzUk9QcVpOajN5eHpDUTdYV2NlMlRvaUdUYjNsMTZBWmV1b3I3TXZyCktwSDdndDNQWkNzUFROSTF3KzVrVFh3djREaTUzSkZPOFVJcDhZVC95SDJ3Ui9ObWlvQT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=
client-key-data: LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBd3FBaWFUZmU5aTZEdE1TOTdCalQrM3lEeVA4QXZlSTV3RWlkL1VPaWlCQ0h5RFZ3CllOYmtJajRrQ0FEZlR6QndwVmNQbll6d0pQaDVKNDE2MDN3Y2pHRkpEdUR4NFNCL28zcDA5Q2ZjVXJsaG1MdEoKMU8wUWtNYko4cU5NSUNkWm12VXA0cjNJdWpsY3Fxa2J0VDFUUi8rOGpGelBOdEFZYWlQVWw2NC9pUlBmN1dNRgpTTEF4aEc0Ukp4eWVkeEpWOEdHQTJWbi9lY3QvKysvcVpJRWxRQ01YOW1UWjF5c1cvdGFyWE5EQnpTK1pKb3VtCmZBeEZ1WmtnOElJdnRScG9naXlrREFJai9yTVJrMllPRDdRcklEY3Ezajk0T0ZGbzlYM2UwbDVYazVHL0w1cW8KZnBBOWVXSDFqaENPZE53cFg4eTI0TjJuSyt4clNVNE53Zmp3QVFJREFRQUJBb0lCQUhhREVsRWx6MlB6RVFvcgpYMGI4RzJFaEFoS0xqUVVUVTMxTityNTB4K3k0dkNYaXZJcUxjQ0dWMjFUd1N3cXFnWDE4MVNwbjN2ZDBRam9lCmdiTnorT0pXZnlsTE9wNk9yb3A2aityNHRzVzgwcnd4RjVWTHdmR1d5dlF3RWJQbW9qNmttUnZUMnYzaTNoV2gKb2hpcnZpR3lqVHFmYlNLQWRzMXpBZ3BXOWFOZTRWZTYrTTJkeXhMWWUvTkFvU0VjV0pEN1NuTE1VbGtML1h5NQpPTlVyTDVmMUlvTWJDSXFJbWNwNlZPb2ljUVptMW1jbElDL3BvTVZzVE4yNDFXVEp1YWNJV3RNZ3hhYU1hb0VpClhPZ3cvK0luMmc3NmV1QjJ5dllhTVZ6c2FtWHViUzREMEViY3BlWi93aVJ4OWhhODdSSmY0VEJvNExKKzA5VnkKbXBpM05YMENnWUVBMEZTcGN1R2l4R0ZKYW1qSitaTlJ2SHpsY2VxWHpSS2k0VVF4YUJyVU1ocjFJU2loVkNPYgo5NGw3UmlmbU95Z044VzlaQSszSjkyVXo5cThHcjVIYnZaTGdmM2haa0lWeFNMQXBhTGZ6dTNQSmZDQkFOQkJBCmthOUhwczk3Mk9zUitsOVhiQXdmd2lxKzF0U1RqVFNxREdkRXovbEpsWkdBY3llQ0IrS1drZ01DZ1lFQTd5aXEKeWRzUDlsS1NTOThYeU9rNzZsZFJVQ0I1dytERzRRRkJLaExrWUV6dG1BUDBGZDk3S0Zlek9DdkhiQ3BQK05vQQpjVXVhVWhVQ1NrWEJ0Z1ovc1dCN2JabmdqRERRR3cyQmNHd3ZEZlhvRlk0Yk9TNVNpOUQ0eVYyM0JuV3dDL2NGCmgzaTJzY1o0VU1xcXNlVFBESUlhL2dxVml5TkdNSkdVbVFqOGVLc0NnWUJxNjNpSDJGdTBGQVljMGFrUEtOZWoKT0NwOTI1dUxDVXlFOW4ydzQwd1NaeGwxcWllYVNpaEswOGxyeVNCMUxlcHlCZWFXaUp6R0ZxK2pwRkozR3hHKwo2cm5EVWg0TmVSOFo0aWR1Y2pKcCsxUG1HNXMzM1R0MlNaSXBmNFVkWUErN0F1R1lOMlM1UHp6d2Z1czNabGI3CnhLaGhZSnl1WVoyZC9DSFZNQ3A4eHdLQmdRQ0JqZmdtN1NLU2YxeDgrVmQ3SU0yeVBxYnZ6d2ZuVW1hcUVQSHEKQnQzc1JRQVliMXZVVllIejhyNUZXWUhvV0d1R0ZTSlVQVzVWdE1mTzBFY0Znak8rTk5Qb0pZbDhxQnl6NnZjSgpuYkZIME1SdW1OS3FnU3Q2VGpQWGNZcnFWdXFOTUwyd0MzWjhpMVUxL3ZQRTlud3EvSGYrMG1EOFJKbUxTZkhECmpSaW5qUUtCZ0J1NjFPZGRzVnhJV3VCeEJBMHZPSTZrbXI0b2p0bFV1Tmg0eldPYzllT1Zmc2hoeUUrL050Y0EKYlN1c21nMUQ4cVc1QStqVEVMdmhYcW80SUtaS0pZcEpnb1M3ZUtSb0ljNXU2ZitrVGg3VlRnazhEMHZkUSs0aQozMVR5WGFFazZaMlZuSGpTSEhqV3ZVa0tWV3dja3NUcUtURUZqV2hZUEZlN3plNkdQbDk3Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg==
...@@ -20,6 +20,7 @@ __author__ = 'Yang Hu' ...@@ -20,6 +20,7 @@ __author__ = 'Yang Hu'
import paramiko, os import paramiko, os
from vm_info import VmInfo from vm_info import VmInfo
import logging import logging
# from drip_logging.drip_logging_handler import * # from drip_logging.drip_logging_handler import *
...@@ -32,12 +33,12 @@ if not getattr(logger, 'handler_set', None): ...@@ -32,12 +33,12 @@ if not getattr(logger, 'handler_set', None):
logger.addHandler(h) logger.addHandler(h)
logger.handler_set = True logger.handler_set = True
retry = 0
retry=0
def deploy_compose(vm, compose_file, compose_name,docker_login): def deploy_compose(vm, compose_file, compose_name, docker_login):
try: try:
logger.info("Starting docker compose deployment on: "+vm.ip) logger.info("Starting docker compose deployment on: " + vm.ip)
paramiko.util.log_to_file("deployment.log") paramiko.util.log_to_file("deployment.log")
ssh = paramiko.SSHClient() ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
...@@ -45,46 +46,48 @@ def deploy_compose(vm, compose_file, compose_name,docker_login): ...@@ -45,46 +46,48 @@ def deploy_compose(vm, compose_file, compose_name,docker_login):
sftp = ssh.open_sftp() sftp = ssh.open_sftp()
sftp.chdir('/tmp/') sftp.chdir('/tmp/')
sftp.put(compose_file, "docker-compose.yml") sftp.put(compose_file, "docker-compose.yml")
if(docker_login): if (docker_login):
#stdin, stdout, stderr = ssh.exec_command("sudo docker stack rm %s" % (compose_name)) # stdin, stdout, stderr = ssh.exec_command("sudo docker stack rm %s" % (compose_name))
#stdout.read() # stdout.read()
#err = stderr.read() # err = stderr.read()
#sleep 5 # sleep 5
stdin, stdout, stderr = ssh.exec_command("sudo docker login -u "+docker_login['username']+" -p "+docker_login['password']+" "+docker_login['registry']+" && sudo docker stack deploy --with-registry-auth --compose-file /tmp/docker-compose.yml %s" % (compose_name)) stdin, stdout, stderr = ssh.exec_command(
"sudo docker login -u " + docker_login['username'] + " -p " + docker_login['password'] + " " +
docker_login[
'registry'] + " && sudo docker stack deploy --with-registry-auth --compose-file /tmp/docker-compose.yml %s" % (
compose_name))
else: else:
#stdin, stdout, stderr = ssh.exec_command("sudo docker stack rm %s" % (compose_name)) # stdin, stdout, stderr = ssh.exec_command("sudo docker stack rm %s" % (compose_name))
#stdout.read() # stdout.read()
#err = stderr.read() # err = stderr.read()
cmd = "sudo docker stack deploy --with-registry-auth --compose-file /tmp/docker-compose.yml "+compose_name cmd = "sudo docker stack deploy --with-registry-auth --compose-file /tmp/docker-compose.yml " + compose_name
logger.info("Sendding : "+cmd) logger.info("Sendding : " + cmd)
stdin, stdout, stderr = ssh.exec_command(cmd) stdin, stdout, stderr = ssh.exec_command(cmd)
out = stdout.read() out = stdout.read()
err = stderr.read() err = stderr.read()
logger.info("stderr from: "+vm.ip + " "+ err) logger.info("stderr from: " + vm.ip + " " + err)
logger.info("stdout from: "+vm.ip + " "+ out) logger.info("stdout from: " + vm.ip + " " + out)
logger.info("Finished docker compose deployment on: "+vm.ip) logger.info("Finished docker compose deployment on: " + vm.ip)
except Exception as e: except Exception as e:
global retry global retry
if retry < 10: if retry < 10:
logger.warning(vm.ip + " " + str(e)+". Retrying") logger.warning(vm.ip + " " + str(e) + ". Retrying")
retry+=1 retry += 1
return deploy_compose(vm, compose_file, compose_name,docker_login) return deploy_compose(vm, compose_file, compose_name, docker_login)
logger.error(vm.ip + " " + str(e)) logger.error(vm.ip + " " + str(e))
return "ERROR:" + vm.ip + " " + str(e) return "ERROR:" + vm.ip + " " + str(e)
ssh.close() ssh.close()
retry=0 retry = 0
return "SUCCESS" return "SUCCESS"
def run(vm_list, compose_file, compose_name, rabbitmq_host, owner, docker_login):
rabbit = DRIPLoggingHandler(host=rabbitmq_host, port=5672, user=owner)
def run(vm_list, compose_file, compose_name,rabbitmq_host,owner,docker_login):
rabbit = DRIPLoggingHandler(host=rabbitmq_host, port=5672,user=owner)
logger.addHandler(rabbit) logger.addHandler(rabbit)
for i in vm_list: for i in vm_list:
if i.role == "master": if i.role == "master":
ret = deploy_compose(i, compose_file, compose_name,docker_login) ret = deploy_compose(i, compose_file, compose_name, docker_login)
if "ERROR" in ret: if "ERROR" in ret:
return ret return ret
else: else:
...@@ -93,5 +96,4 @@ def run(vm_list, compose_file, compose_name,rabbitmq_host,owner,docker_login): ...@@ -93,5 +96,4 @@ def run(vm_list, compose_file, compose_name,rabbitmq_host,owner,docker_login):
swarm_file.close() swarm_file.close()
break break
return ret
return ret
\ No newline at end of file
#! /usr/bin/env python # ! /usr/bin/env python
# Copyright 2017 --Yang Hu--
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2017 --Yang Hu--
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__author__ = 'Yang Hu' __author__ = 'Yang Hu'
...@@ -31,67 +31,68 @@ if not getattr(logger, 'handler_set', None): ...@@ -31,67 +31,68 @@ if not getattr(logger, 'handler_set', None):
h.setFormatter(formatter) h.setFormatter(formatter)
logger.addHandler(h) logger.addHandler(h)
logger.handler_set = True logger.handler_set = True
retry=0 retry = 0
def install_engine(vm, return_dict):
try:
logger.info("Starting docker engine installation on: " + (vm.ip))
paramiko.util.log_to_file("deployment.log")
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(vm.ip, username=vm.user, key_filename=vm.key, timeout=30)
stdin, stdout, stderr = ssh.exec_command("sudo dpkg --get-selections | grep docker")
temp_list = stdout.readlines()
temp_str = ""
for i in temp_list: temp_str += i
if temp_str.find("docker") != -1:
logger.info("Docker engine arleady installated on: " + (vm.ip) + " Skiping")
return_dict[vm.ip] = "SUCCESS"
return "SUCCESS"
sftp = ssh.open_sftp()
sftp.chdir('/tmp/')
file_path = os.path.dirname(os.path.abspath(__file__))
install_script = file_path + "/" + "docker_engine.sh"
sftp.put(install_script, "engine_setup.sh")
stdin, stdout, stderr = ssh.exec_command("sudo sh /tmp/engine_setup.sh")
output = stdout.read()
logger.info("stdout: " + (output))
output = stderr.read()
logger.info("stderr: " + (output))
logger.info("Finised docker engine installation on: " + (vm.ip))
except Exception as e:
global retry
if retry < 10:
logger.warning(vm.ip + " " + str(e) + ". Retrying")
retry += 1
return install_engine(vm, return_dict)
logger.error(vm.ip + " " + str(e))
return_dict[vm.ip] = "ERROR:" + vm.ip + " " + str(e)
return "ERROR:" + vm.ip + " " + str(e)
ssh.close()
retry = 0
return_dict[vm.ip] = "SUCCESS"
return "SUCCESS"
def install_engine(vm,return_dict):
try:
logger.info("Starting docker engine installation on: "+(vm.ip))
paramiko.util.log_to_file("deployment.log")
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(vm.ip, username=vm.user, key_filename=vm.key, timeout=30 )
stdin, stdout, stderr = ssh.exec_command("sudo dpkg --get-selections | grep docker")
temp_list = stdout.readlines()
temp_str = ""
for i in temp_list: temp_str += i
if temp_str.find("docker") != -1:
logger.info("Docker engine arleady installated on: "+(vm.ip)+" Skiping")
return_dict[vm.ip] = "SUCCESS"
return "SUCCESS"
sftp = ssh.open_sftp()
sftp.chdir('/tmp/')
file_path = os.path.dirname(os.path.abspath(__file__))
install_script = file_path + "/" + "docker_engine.sh"
sftp.put(install_script, "engine_setup.sh")
stdin, stdout, stderr = ssh.exec_command("sudo sh /tmp/engine_setup.sh")
output = stdout.read()
logger.info("stdout: "+(output))
output = stderr.read()
logger.info("stderr: "+(output))
logger.info("Finised docker engine installation on: "+(vm.ip))
except Exception as e:
global retry
if retry < 10:
logger.warning(vm.ip + " " + str(e)+". Retrying")
retry+=1
return install_engine(vm,return_dict)
logger.error(vm.ip + " " + str(e))
return_dict[vm.ip] = "ERROR:"+vm.ip+" "+str(e)
return "ERROR:"+vm.ip+" "+str(e)
ssh.close()
retry=0
return_dict[vm.ip] = "SUCCESS"
return "SUCCESS"
def run(vm_list,rabbitmq_host,owner): def run(vm_list, rabbitmq_host, owner):
rabbit = DRIPLoggingHandler(host=rabbitmq_host, port=5672,user=owner) rabbit = DRIPLoggingHandler(host=rabbitmq_host, port=5672, user=owner)
logger.addHandler(rabbit) logger.addHandler(rabbit)
manager = multiprocessing.Manager() manager = multiprocessing.Manager()
return_dict = manager.dict() return_dict = manager.dict()
jobs = [] jobs = []
for i in vm_list: for i in vm_list:
#ret = install_engine(i) # ret = install_engine(i)
p = multiprocessing.Process(target=install_engine, args=(i,return_dict,)) p = multiprocessing.Process(target=install_engine, args=(i, return_dict,))
jobs.append(p) jobs.append(p)
p.start() p.start()
for proc in jobs: for proc in jobs:
proc.join() proc.join()
if "ERROR" in return_dict.values(): return "ERROR" if "ERROR" in return_dict.values(): return "ERROR"
#if "ERROR" in ret: return ret # if "ERROR" in ret: return ret
return "SUCCESS" return "SUCCESS"
\ No newline at end of file
...@@ -37,7 +37,7 @@ if not getattr(logger, 'handler_set', None): ...@@ -37,7 +37,7 @@ if not getattr(logger, 'handler_set', None):
retry = 0 retry = 0
def PrintException(): def print_exception():
exc_type, exc_obj, tb = sys.exc_info() exc_type, exc_obj, tb = sys.exc_info()
f = tb.tb_frame f = tb.tb_frame
lineno = tb.tb_lineno lineno = tb.tb_lineno
...@@ -62,15 +62,16 @@ def install_manager(vm): ...@@ -62,15 +62,16 @@ def install_manager(vm):
sftp.chdir('/tmp/') sftp.chdir('/tmp/')
install_script = file_path + "/" + "docker_kubernetes.sh" install_script = file_path + "/" + "docker_kubernetes.sh"
sftp.put(install_script, "kubernetes_setup.sh") sftp.put(install_script, "kubernetes_setup.sh")
# stdin, stdout, stderr = ssh.exec_command("sudo hostname ip-%s" % (vm.ip.replace('.','-'))) # stdin, stdout, stderr = ssh.exec_command("sudo hostname ip-%s" % (vm.ip.replace('.','-')))
# stdout.read() # stdout.read()
stdin, stdout, stderr = ssh.exec_command("sudo sh /tmp/kubernetes_setup.sh") stdin, stdout, stderr = ssh.exec_command("sudo sh /tmp/kubernetes_setup.sh")
stdout.read() out = stdout.read()
stdin, stdout, stderr = ssh.exec_command("sudo kubeadm reset") out = stderr.read()
stdin, stdout, stderr = ssh.exec_command("sudo kubeadm kubernetes-xenialreset --force")
stdout.read() stdout.read()
stdin, stdout, stderr = ssh.exec_command("sudo kubeadm init --apiserver-advertise-address=%s" % (vm.ip)) # stdin, stdout, stderr = ssh.exec_command("sudo kubeadm init --apiserver-advertise-address=%s" % (vm.ip))
stdin, stdout, stderr = ssh.exec_command("sudo kubeadm init")
retstr = stdout.readlines() retstr = stdout.readlines()
stdin, stdout, stderr = ssh.exec_command("sudo cp /etc/kubernetes/admin.conf /tmp/") stdin, stdout, stderr = ssh.exec_command("sudo cp /etc/kubernetes/admin.conf /tmp/")
...@@ -85,7 +86,7 @@ def install_manager(vm): ...@@ -85,7 +86,7 @@ def install_manager(vm):
global retry global retry
# print '%s: %s' % (vm.ip, e) # print '%s: %s' % (vm.ip, e)
logger.error(vm.ip + " " + str(e)) logger.error(vm.ip + " " + str(e))
PrintException() print_exception()
return "ERROR:" + vm.ip + " " + str(e) return "ERROR:" + vm.ip + " " + str(e)
ssh.close() ssh.close()
return retstr[-1] return retstr[-1]
...@@ -94,6 +95,7 @@ def install_manager(vm): ...@@ -94,6 +95,7 @@ def install_manager(vm):
def install_worker(join_cmd, vm): def install_worker(join_cmd, vm):
try: try:
logger.info("Starting kubernetes slave installation on: " + (vm.ip)) logger.info("Starting kubernetes slave installation on: " + (vm.ip))
logger.info("User: " + (vm.user) + " key file: " + vm.key)
ssh = paramiko.SSHClient() ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(vm.ip, username=vm.user, key_filename=vm.key, timeout=30) ssh.connect(vm.ip, username=vm.user, key_filename=vm.key, timeout=30)
...@@ -107,8 +109,8 @@ def install_worker(join_cmd, vm): ...@@ -107,8 +109,8 @@ def install_worker(join_cmd, vm):
file_path = os.path.dirname(os.path.abspath(__file__)) file_path = os.path.dirname(os.path.abspath(__file__))
install_script = file_path + "/" + "docker_kubernetes.sh" install_script = file_path + "/" + "docker_kubernetes.sh"
sftp.put(install_script, "kubernetes_setup.sh") sftp.put(install_script, "kubernetes_setup.sh")
stdin, stdout, stderr = ssh.exec_command("sudo hostname ip-%s" % (vm.ip.replace('.', '-'))) # stdin, stdout, stderr = ssh.exec_command("sudo hostname ip-%s" % (vm.ip.replace('.', '-')))
stdout.read() # stdout.read()
stdin, stdout, stderr = ssh.exec_command("sudo sh /tmp/kubernetes_setup.sh") stdin, stdout, stderr = ssh.exec_command("sudo sh /tmp/kubernetes_setup.sh")
stdout.read() stdout.read()
stdin, stdout, stderr = ssh.exec_command("sudo kubeadm reset") stdin, stdout, stderr = ssh.exec_command("sudo kubeadm reset")
...@@ -124,6 +126,12 @@ def install_worker(join_cmd, vm): ...@@ -124,6 +126,12 @@ def install_worker(join_cmd, vm):
return "SUCCESS" return "SUCCESS"
def deploy(vm_list, deployment_file):
for i in vm_list:
if i.role == "master":
return None
def run(vm_list, rabbitmq_host, owner): def run(vm_list, rabbitmq_host, owner):
# rabbit = DRIPLoggingHandler(host=rabbitmq_host, port=5672,user=owner) # rabbit = DRIPLoggingHandler(host=rabbitmq_host, port=5672,user=owner)
# logger.addHandler(rabbit) # logger.addHandler(rabbit)
...@@ -142,7 +150,6 @@ def run(vm_list, rabbitmq_host, owner): ...@@ -142,7 +150,6 @@ def run(vm_list, rabbitmq_host, owner):
worker_cmd = install_worker(join_cmd, i) worker_cmd = install_worker(join_cmd, i)
if "ERROR" in worker_cmd: if "ERROR" in worker_cmd:
return worker_cmd return worker_cmd
file_path = os.path.dirname(os.path.abspath(__file__)) file_path = os.path.dirname(os.path.abspath(__file__))
kuber_file = open(file_path + "/admin.conf", "r") kuber_file = open(file_path + "/admin.conf", "r")
kuber_string = kuber_file.read() kuber_string = kuber_file.read()
......
#! /bin/bash #! /bin/bash
sudo apt-get update && apt-get install -y apt-transport-https curl sudo sed -i -re 's/([a-z]{2}\.)?archive.ubuntu.com|security.ubuntu.com/old-releases.ubuntu.com/g' /etc/apt/sources.list
sudo apt-get update && sudo apt-get install -y apt-transport-https ca-certificates curl software-properties-common
sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
sudo apt-get update && sudo apt-get install -y --allow-unauthenticated docker-ce=18.06.2~ce~3-0~ubuntu
sudo echo -e "{\n \"exec-opts\": [\"native.cgroupdriver=systemd\"], \n \"log-driver\": \"json-file\", \n \"log-opts\": {\"max-size\": \"100m\"}, \n \"storage-driver\": \"overlay2\" \n}" > sudo /etc/docker/daemon.json
sudo mkdir -p /etc/systemd/system/docker.service.d
sudo systemctl daemon-reload
sudo systemctl restart docker
sudo apt-get update && sudo apt-get install -y apt-transport-https curl
sudo curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add - sudo curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | sudo tee /etc/apt/sources.list.d/kubernetes.list echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | sudo tee /etc/apt/sources.list.d/kubernetes.list
sudo apt-get update && sudo apt-get install -y kubelet kubeadm kubectl sudo apt-get update && sudo apt-get install -y kubelet kubeadm kubectl
sudo apt-mark hold kubelet kubeadm kubectl sudo apt-mark hold kubelet kubeadm kubectl
...@@ -12,6 +12,7 @@ from time import sleep ...@@ -12,6 +12,7 @@ from time import sleep
import pika import pika
import ansible_playbook
import docker_check import docker_check
import docker_compose import docker_compose
import docker_engine import docker_engine
...@@ -25,7 +26,7 @@ if len(sys.argv) > 1: ...@@ -25,7 +26,7 @@ if len(sys.argv) > 1:
rabbitmq_host = sys.argv[1] rabbitmq_host = sys.argv[1]
else: else:
rabbitmq_host = '127.0.0.1' rabbitmq_host = '127.0.0.1'
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if not getattr(logger, 'handler_set', None): if not getattr(logger, 'handler_set', None):
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
...@@ -35,36 +36,35 @@ if not getattr(logger, 'handler_set', None): ...@@ -35,36 +36,35 @@ if not getattr(logger, 'handler_set', None):
logger.addHandler(h) logger.addHandler(h)
logger.handler_set = True logger.handler_set = True
connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_host)) connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_host))
channel = connection.channel() channel = connection.channel()
channel.queue_declare(queue='deployer_queue') channel.queue_declare(queue='deployer_queue')
done = False done = False
def threaded_function(args): def threaded_function(args):
while not done: while not done:
connection.process_data_events() connection.process_data_events()
sleep(5) sleep(5)
def handleDelivery(message): def handleDelivery(message):
parsed_json = json.loads(message) parsed_json = json.loads(message)
owner = parsed_json['owner'] owner = parsed_json['owner']
params = parsed_json["parameters"] params = parsed_json["parameters"]
node_num = 0 node_num = 0
vm_list = set() vm_list = set()
current_milli_time = lambda: int(round(time.time() * 1000)) current_milli_time = lambda: int(round(time.time() * 1000))
try: try:
path = os.path.dirname(os.path.abspath(__file__)) + "/deployer_files/"+str(current_milli_time()) + "/" path = os.path.dirname(os.path.abspath(__file__)) + "/deployer_files/" + str(current_milli_time()) + "/"
except NameError: except NameError:
import sys import sys
path = os.path.dirname(os.path.abspath(sys.argv[0])) + "/deployer_files/"+str(current_milli_time()) + "/" path = os.path.dirname(os.path.abspath(sys.argv[0])) + "/deployer_files/" + str(current_milli_time()) + "/"
if not os.path.exists(path): if not os.path.exists(path):
os.makedirs(path) os.makedirs(path)
for param in params: for param in params:
name = param["name"] name = param["name"]
if name == "cluster": if name == "cluster":
...@@ -77,16 +77,23 @@ def handleDelivery(message): ...@@ -77,16 +77,23 @@ def handleDelivery(message):
role = param["attributes"]["role"] role = param["attributes"]["role"]
node_num += 1 node_num += 1
key = path + "%d.txt" % (node_num) key = path + "%d.txt" % (node_num)
fo = open(key, "w") fo = open(key, "w")
fo.write(value) fo.write(value)
fo.close() fo.close()
parentDir = os.path.dirname(os.path.abspath(key)) parentDir = os.path.dirname(os.path.abspath(key))
os.chmod(parentDir, 0o700) os.chmod(parentDir, 0o700)
os.chmod(key, 0o600) os.chmod(key, 0o600)
vm = VmInfo(ip, user, key, role) vm = VmInfo(ip, user, key, role)
vm_list.add(vm) vm_list.add(vm)
elif name == "deployment":
value = param["value"]
value = base64.b64decode(value)
deployment_file = path + "deployment.yml"
fo = open(deployment_file, "w")
fo.write(value)
fo.close()
elif name == "playbook": elif name == "playbook":
value = param["value"] value = param["value"]
playbook = path + "playbook.yml" playbook = path + "playbook.yml"
...@@ -96,65 +103,65 @@ def handleDelivery(message): ...@@ -96,65 +103,65 @@ def handleDelivery(message):
elif name == "composer": elif name == "composer":
value = param["value"] value = param["value"]
compose_file = path + "docker-compose.yml" compose_file = path + "docker-compose.yml"
if not param["attributes"] == None and not param["attributes"]["name"] == None : if not param["attributes"] == None and not param["attributes"]["name"] == None:
compose_name = param["attributes"]["name"] compose_name = param["attributes"]["name"]
docker_login = {} docker_login = {}
if 'docker_login' in param["attributes"]: if 'docker_login' in param["attributes"]:
docker_login['username'] = param["attributes"]["docker_login_username"] docker_login['username'] = param["attributes"]["docker_login_username"]
docker_login['password'] = param["attributes"]["docker_login_password"] docker_login['password'] = param["attributes"]["docker_login_password"]
docker_login['registry'] = param["attributes"]["docker_login_registry"] docker_login['registry'] = param["attributes"]["docker_login_registry"]
docker_login = param["attributes"]["docker_login"] docker_login = param["attributes"]["docker_login"]
else: else:
current_milli_time = lambda: int(round(time.time() * 1000)) current_milli_time = lambda: int(round(time.time() * 1000))
compose_name = "service_"+str(current_milli_time()) compose_name = "service_" + str(current_milli_time())
fo = open(compose_file, "w") fo = open(compose_file, "w")
fo.write(value) fo.write(value)
fo.close() fo.close()
elif name == "scale": elif name == "scale":
name_of_deployment = param["value"] name_of_deployment = param["value"]
name_of_service = param["attributes"]["service"] name_of_service = param["attributes"]["service"]
number_of_containers = param["attributes"]["number_of_containers"] number_of_containers = param["attributes"]["number_of_containers"]
elif name == "swarm_info": elif name == "swarm_info":
compose_name = param["attributes"]["name"] compose_name = param["attributes"]["name"]
if manager_type == "kubernetes": if manager_type == "kubernetes":
ret = docker_kubernetes.run(vm_list,rabbitmq_host,owner) ret = docker_kubernetes.run(vm_list, rabbitmq_host, owner)
docker_kubernetes.deploy(vm_list, deployment_file)
return ret return ret
elif manager_type == "swarm": elif manager_type == "swarm":
ret = docker_engine.run(vm_list,rabbitmq_host,owner) ret = docker_engine.run(vm_list, rabbitmq_host, owner)
if "ERROR" in ret: return ret if "ERROR" in ret: return ret
ret = docker_swarm.run(vm_list,rabbitmq_host,owner) ret = docker_swarm.run(vm_list, rabbitmq_host, owner)
if "ERROR" in ret: return ret if "ERROR" in ret: return ret
ret = docker_compose.run(vm_list, compose_file, compose_name,rabbitmq_host,owner,docker_login) ret = docker_compose.run(vm_list, compose_file, compose_name, rabbitmq_host, owner, docker_login)
return ret return ret
elif manager_type == "ansible": elif manager_type == "ansible":
ret = ansible_playbook.run(vm_list,playbook,rabbitmq_host,owner) ret = ansible_playbook.run(vm_list, playbook, rabbitmq_host, owner)
return ret return ret
elif manager_type == "scale": elif manager_type == "scale":
ret = docker_service.run(vm_list, name_of_deployment, name_of_service, number_of_containers,rabbitmq_host,owner) ret = docker_service.run(vm_list, name_of_deployment, name_of_service, number_of_containers, rabbitmq_host,
owner)
return ret return ret
elif manager_type == "swarm_info": elif manager_type == "swarm_info":
ret = docker_check.run(vm_list, compose_name,rabbitmq_host,owner) ret = docker_check.run(vm_list, compose_name, rabbitmq_host, owner)
ret = '"'+json.dumps(ret)+'"' ret = '"' + json.dumps(ret) + '"'
return ret return ret
else: else:
return "ERROR: invalid cluster" return "ERROR: invalid cluster"
def on_request(ch, method, props, body): def on_request(ch, method, props, body):
ret = handleDelivery(body) ret = handleDelivery(body)
parsed_json = json.loads(body) parsed_json = json.loads(body)
params = parsed_json["parameters"] params = parsed_json["parameters"]
for param in params: for param in params:
name = param["name"] name = param["name"]
if name == "cluster": if name == "cluster":
manager_type = param["value"] manager_type = param["value"]
break break
if "ERROR" in ret: if "ERROR" in ret:
res_name = "error" res_name = "error"
elif manager_type == "ansible": elif manager_type == "ansible":
...@@ -165,7 +172,7 @@ def on_request(ch, method, props, body): ...@@ -165,7 +172,7 @@ def on_request(ch, method, props, body):
res_name = "swarm_info" res_name = "swarm_info"
else: else:
res_name = "credential" res_name = "credential"
response = {} response = {}
outcontent = {} outcontent = {}
current_milli_time = lambda: int(round(time.time() * 1000)) current_milli_time = lambda: int(round(time.time() * 1000))
...@@ -178,33 +185,30 @@ def on_request(ch, method, props, body): ...@@ -178,33 +185,30 @@ def on_request(ch, method, props, body):
par["value"] = ret par["value"] = ret
par["attributes"] = "null" par["attributes"] = "null"
response["parameters"].append(par) response["parameters"].append(par)
response = json.dumps(response) response = json.dumps(response)
logger.info("Response: " + response) logger.info("Response: " + response)
ch.basic_publish(exchange='', ch.basic_publish(exchange='',
routing_key=props.reply_to, routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \ properties=pika.BasicProperties(correlation_id= \
props.correlation_id), props.correlation_id),
body=str(response)) body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1) channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='deployer_queue') channel.basic_consume(on_request, queue='deployer_queue')
thread = Thread(target=threaded_function, args=(1,))
thread = Thread(target = threaded_function, args = (1, ))
thread.start() thread.start()
logger.info("Awaiting RPC requests") logger.info("Awaiting RPC requests")
try: try:
channel.start_consuming() channel.start_consuming()
except KeyboardInterrupt: except KeyboardInterrupt:
#thread.stop() # thread.stop()
done = True done = True
thread.join() thread.join()
logger.info("Threads successfully closed") logger.info("Threads successfully closed")
'''
Created on Nov 20, 2015
@author: junchao
'''
import os
import sys
import re
import random
import networkx as nx
import numpy as np
import json
from subprocess import call
import time
from NewInstance import NewInstance
import collections
class Workflow():
def add_entry_exit(self, g):
# add entry node to the graph
g.add_node(0, est = -1, eft = -1, lft =-1)
for vertex in g:
if len(g.predecessors(vertex)) == 0 and not vertex == 0:
self.G.add_weighted_edges_from([(0, vertex, 0)])
# add exit node to the graph
g.add_node(self.vertex_num-1, est = -1, eft = -1, lft =-1)
startnodes = []
for vertex in g:
if len(g.successors(vertex)) == 0 and not vertex == self.vertex_num-1:
startnodes.append(vertex)
for node in startnodes:
self.G.add_weighted_edges_from([(node, self.vertex_num-1, 0)])
def init(self, content):
self.G = nx.DiGraph()
self.vertex_num =0
for (key, value) in content['workflow'].items():
if isinstance(value, list) :
if key == 'nodes' :
self.vertex_num = len(value)
for node in value:
self.G.add_node(node['name'], est = -1, eft = -1, lft =-1)
if key == 'links' :
for link in value:
self.G.add_weighted_edges_from([(link['source'], link['target'], link['weight'])])
#print self.G.nodes
#parse the performance matrix
p = []
od = collections.OrderedDict(sorted(content['performance'].items()))
for (key, value) in od.items():
row = []
row.append(0)
for i in value.split(','):
row.append(int(i))
row.append(0)
#print row
p.append(row)
self.p_table = np.matrix(p)
#parse the price vector
self.vm_price = []
for i in content['price'].split(','):
self.vm_price.append(int(i))
#parse the deadline
self.d_list = []
for (key, value) in content['deadline'].items():
self.d_list.append([int(key), int(value)])
self.d_table = np.matrix(self.d_list)
self.successful = 0
self.vertex_num += 2
self.add_entry_exit(self.G)
#test whether the DAG contains cycles
if len(list(nx.simple_cycles(self.G))) > 0:
print "the DAG contains cycles"
sys.exit()
self.assigned_list = [-1]*(self.vertex_num)
self.instances = []
self.G.node[0]['est'] = 0
self.G.node[0]['eft'] = 0
self.cal_est(0)
self.G.node[self.vertex_num-1]['lft'] = self.d_table[self.d_table.shape[0]-1,1]#self.p_table[0, child], self.p_table.shape[0]
self.cal_lft(self.vertex_num-1)
#def init1(self, workflow_file_name, performance_file_name, price_file_name, deadline_file_name):
##Initialization
#self.G=nx.DiGraph()
#self.vertex_num = 0
#self.successful = 0
##Read the workflow information
#graph = pydot.graph_from_dot_file(workflow_file_name)
#nx_graph = nx.from_pydot(graph)
#self.G=nx.DiGraph()
#for node in nx_graph:
##print node
#self.G.add_node(int(node)+1, est = -1, eft = -1, lft =-1)
#self.vertex_num += 1
##print nx_graph.edge
##print workflow_file_name
#for link in nx_graph.edges_iter():
##print link[0], link[1]
#self.G.add_weighted_edges_from([(int(link[0])+1, int(link[1])+1, int(float(nx_graph[link[0]][link[1]][0]['weight'])))])
#self.vertex_num += 2
#self.add_entry_exit(self.G)
##test whether the DAG contains cycles
#if len(list(nx.simple_cycles(self.G))) > 0:
#print "the DAG contains cycles"
#sys.exit()
##read performance table
#l = [ map(int,line.split(',')) for line in open(performance_file_name, 'r')]
##append the entry and exit node information
#for row in l:
#row.insert(0, 0)
#row.insert(len(row),0)
#self.p_table = np.matrix(l)
#self.assigned_list = [-1]*(self.vertex_num)
#self.vm_price = map(int,open(price_file_name,'r').readline().split(','))
#self.instances = []
#self.d_list = [ map(int,line.split('\t')) for line in open(deadline_file_name, 'r')]
#tmpList = self.d_list[len(self.d_list)-1]
#self.d_table = np.matrix(tmpList)
##deadline = open(deadline_file_name, 'r').readline()
#self.G.node[0]['est'] = 0
#self.G.node[0]['eft'] = 0
#self.cal_est(0)
#self.G.node[self.vertex_num-1]['lft'] = self.d_table[self.d_table.shape[0]-1,1]#self.p_table[0, child], self.p_table.shape[0]
#self.cal_lft(self.vertex_num-1)
#The following two functions are to initialize the EST, EFT and LFT
#calculate the earliest start time and earliest finish time
def cal_est(self, i):
for child in self.G.successors(i):
est = self.G.node[i]['eft']+self.G[i][child]['weight']
if est>self.G.node[child]['est']:
self.G.node[child]['est'] = est
table_child = self.p_table[0, child]
self.G.node[child]['eft'] = est + table_child
self.cal_est(child)
def cal_lft(self, d):
for parent in self.G.predecessors(d):
lft = self.G.node[d]['lft'] - self.p_table[0, d] - self.G[parent][d]['weight']
d_table_list = []
for deadline in self.d_table:
d_table_list.append(deadline[0, 0])
if parent in d_table_list:
deadline = self.d_table[d_table_list.index([parent]),1]
if deadline < lft:
lft = deadline
if self.G.node[parent]['lft'] == -1 or lft<self.G.node[parent]['lft']:
# parent may not finish later
self.G.node[parent]['lft'] = lft
#print "call graphAssignLFT(",graph.node[parent]['name'],")"
self.cal_lft(parent)
#Finding critical path
def ic_pcp(self):
self.assigned_list[0] = 0
self.assigned_list[self.vertex_num-1] = 0
self.assign_parents(self.vertex_num-1)
def has_unassigned_parent(self, i):
for parent in self.G.predecessors(i):
if (self.assigned_list[parent] == -1):
return True
return False
def assign_parents(self, i):
while (self.has_unassigned_parent(i)):
if self.successful == 1: #resources cannot be met
break
pcp = []
self.find_critical_path(i, pcp)
assigned_vms = self.assign_path(pcp)
if -1 in assigned_vms:
print 'resource cannot be met'
break
self.G.node[pcp[len(pcp)-1]]['eft'] = self.G.node[pcp[len(pcp)-1]]['est'] + self.p_table[assigned_vms[len(pcp)-1], pcp[len(pcp)-1]]
self.update_est(pcp[len(pcp)-1], pcp, assigned_vms)
self.update_lft(pcp[0], pcp, assigned_vms)
#split according to the types of assigned VMs and add it to the new instance
ni = NewInstance(assigned_vms, self.G.node[pcp[len(pcp)-1]]['est'], self.G.node[pcp[0]]['eft'], pcp)
for j in xrange(len(pcp)):
ni.cost = ni.cost + self.vm_price[assigned_vms[j]]
self.instances.append(ni)
for j in reversed(pcp): #also in the paper they didn't mention the order
self.assign_parents(j)
#TODO: A very tricky thing on updating the EST and EFT.
def update_est(self, i, pcp, assigned_vms):
for child in self.G.successors(i):
if child not in pcp:
est = self.G.node[i]['eft']+self.G[i][child]['weight']
if self.assigned_list[i] == -1:
eft = est + self.p_table[0, child]
else:
eft = est + self.p_table[self.assigned_list[child], child]
else:
if pcp.index(child) == len(pcp) - 1:
est = self.G.node[i]['eft']
eft = est + self.p_table[self.assigned_list[child], child]
else:
pos = pcp.index(child)
est = self.G.node[i]['eft'] + self.G[pcp[pos+1]][pcp[pos]]['weight']
eft = est + self.p_table[self.assigned_list[child], child]
#decide whether the assignment will violate other parent data dependency
all_smaller = True
for parent in self.G.predecessors(child):
if not parent == i:
if self.G.node[parent]['eft'] + self.G[parent][child]['weight'] > est:
all_smaller = False
if all_smaller:
self.G.node[child]['est'] = est
self.G.node[child]['eft'] = eft
self.update_est(child, pcp, assigned_vms)
def update_lft(self, d, pcp, assigned_vms):
for parent in self.G.predecessors(d):
if parent not in pcp:
if self.assigned_list[d] == -1:
lft = self.G.node[d]['lft'] - self.p_table[0, d] - self.G[parent][d]['weight']
else:
lft = self.G.node[d]['lft'] - self.p_table[self.assigned_list[d], d] - self.G[parent][d]['weight']
else:
pos = pcp.index(parent)
#if pos < len(pcp) -1:
if assigned_vms[pos] == assigned_vms[pos-1]:#9, 6, 2
lft = self.G.node[d]['lft'] - self.p_table[self.assigned_list[d], d]
else:
lft = self.G.node[d]['lft'] - self.p_table[self.assigned_list[d], d] - self.G[pcp[pos]][pcp[pos-1]]['weight']
if lft < self.G.node[parent]['lft']:
self.G.node[parent]['lft'] = lft
self.update_lft(parent, pcp, assigned_vms)
def find_critical_path(self, i, pcp):
cal_cost = 0
critical_parent = -1
for n in self.G.predecessors(i):
if self.assigned_list[n] == -1: #parent of node i is not assigned
if self.G.node[n]['eft'] + self.G.edge[n][i]['weight'] > cal_cost:
cal_cost = self.G.node[n]['eft'] + self.G.edge[n][i]['weight']
critical_parent = n
if not critical_parent == -1:
pcp.append(critical_parent)
self.find_critical_path(critical_parent, pcp)
def exec_time_sum(self, pcp, vm_type):
sum = 0
for i in pcp:
sum += self.p_table[vm_type, i]
return sum
#look forward one step when assigning a vm to a pcp how the est varies
def est_vary(self, pcp, d):
head_pcp = pcp[len(pcp)-1]
original_est = self.G.node[head_pcp]['est']
biggest_est = -1
biggest_parent = -1
for parent in self.G.predecessors(head_pcp):
if parent == d:
est = self.G.node[parent]['eft']
else:
est = self.G.node[parent]['eft'] + self.G[parent][head_pcp]['weight']
if biggest_est < est:
biggest_est = est
biggest_parent = parent
return original_est-biggest_est
#choose the best existing available instance for the pcp
def choose_exist_instance(self, pcp):
best_vm = None
best_exec_time = -1
best_vary_time = -1
for vm in self.instances:
head_pcp = pcp[len(pcp)-1]
for parent in self.G.predecessors(head_pcp):
head_exist_pcp = vm.task_list[0] #The last node of the previous critical path
if parent == head_exist_pcp:
if best_vm == None:
best_vm = vm
exec_time = self.exec_time_sum(pcp, vm.vm_type)
best_exec_time = exec_time
best_vary_time = self.est_vary(pcp, head_exist_pcp)
else:
best_vm_head = vm.task_list[0]
# if assigned to the vm, what will the est be
exec_time = self.exec_time_sum(pcp, vm.vm_type)
# calculate the lft
varied_time = self.G.node[head_pcp]['est']-self.est_vary(pcp, head_exist_pcp)
lft = varied_time+exec_time
if (exec_time - self.est_vary(pcp, head_exist_pcp))*self.vm_price[vm.vm_type]> \
(best_exec_time - self.est_vary(pcp, best_vm.task_list[0]))*self.vm_price[best_vm.vm_type] \
and lft < self.G.node[head_pcp]['lft']: #also should not violate the lft
best_vm = vm
best_exec_time = exec_time
best_vary_time = varied_time
if not best_vm == None:
best_vm.vm_end = self.G.node[pcp[len(pcp)-1]]['est']-best_vary_time+best_exec_time
return best_vm
def assign_path(self, pcp):
cheapest_vm = -1
cheapest_sum = 9999999 #the initialized value should be a very large number
for i in xrange(self.p_table.shape[0]): # use the the shape of the performance table to identify how many VM types are there
violate_LFT = 0
start = self.G.node[pcp[len(pcp)-1]]['est']
cost_sum = 0
for j in xrange(len(pcp)-1, -1, -1):
cost_sum += self.vm_price[i]
if j == len(pcp)-1:
start = start + self.p_table[i, pcp[j]]
else:
start = start + self.p_table[i, pcp[j]]+ self.G[pcp[j+1]][pcp[j]]['weight']
if self.G.node[pcp[j]]['lft'] < start:
violate_LFT = 1
#launch a new instance of the cheapest service which can finish each task of P before its LFT
if violate_LFT == 0 and cost_sum < cheapest_sum:
cheapest_vm = i
cheapest_sum = cost_sum
for i in xrange(len(pcp)):
self.assigned_list[pcp[i]] = cheapest_vm
return [cheapest_vm]*len(pcp)
def generate_string(self, node):
s = "name "+str(node)+"\n"
for i in xrange(len(self.vm_price)):
s = s+str(self.p_table[i, node])+"\n"
s = s+"assigned vm: " + str(self.assigned_list[node]+1)
return s
def generateJSON(self):
content = {}
for i in xrange(1, self.vertex_num-1, 1):
if self.assigned_list[i] == 0:
content[str(i)] = 'large'
elif self.assigned_list[i] == 1:
content[str(i)] = 'Medium'
elif self.assigned_list[i] == 2:
content[str(i)] = 'Small'
else:
content[str(i)] = 'Medium'
return content
#calculate the total execution cost
def cal_cost(self):
cost = 0
for vm in self.instances:
cost = cost + vm.cost
return cost
def cal_cost1(self):
cost = 0
for i in range(1, len(self.assigned_list)-1):
cost += self.vm_price[self.assigned_list[i]]
return cost
def has_edge_vm(self, vm1, vm2):
for node1 in vm1.task_list:
for node2 in vm2.task_list:
if self.G.has_edge(node1, node2) or self.G.has_edge(node2, node1):
return True
return False
class NewInstance(object):
def __init__(self, vm_type, vm_start, vm_end, pcp):
self.vm_type = vm_type
self.vm_start = vm_start
self.vm_end = vm_end
self.task_list = pcp
self.cost = 0
\ No newline at end of file
#!/usr/bin/env python
import pika
import networkx as nx
import sys
import numpy as np
import sys, argparse
import operator
import os
from toscaparser import *
from toscaparser.tosca_template import ToscaTemplate
import re
import getopt
from ICPCP import Workflow
import random
import time
import json
print sys.argv
if len(sys.argv) > 1:
rabbitmq_host = sys.argv[1]
else:
rabbitmq_host = '127.0.0.1'
connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_host))
channel = connection.channel()
channel.queue_declare(queue='planner_queue')
def handleDelivery(message):
parsed_json_message = json.loads(message)
params = parsed_json_message["parameters"]
param = params[0]
value = param["value"]
parsed_json_value = json.loads(value)
topology_template = parsed_json_value['topology_template']
node_templates = topology_template["node_templates"]
json1 = node_templates
deadline = 0
for j in json1:
if "Switch.nodes.Application.Container.Docker." in json1[j]['type'] and "QoS" in str(json1[j]['properties']) and "response_time" in str(json1[j]['properties']):
response_time = json1[j]['properties']['QoS']['response_time']
reg_ex = re.search(r'\d+', str(response_time))
gr = reg_ex.group()
deadline = int(gr)
#get the nodes from the json
nodeDic = {}
nodeDic1 = {}
i = 1
for j in json1:
if "Switch.nodes.Application.Container.Docker." in json1[j]['type']:
nodeDic[j] = i
nodeDic1[i] = j
i = i + 1
#get the links from the json
links = []
for j in json1:
if json1[j]['type'] == "Switch.nodes.Application.Connection":
link= {}
link['source'] = nodeDic[json1[j]['properties']['source']['component_name']]
link['target'] = nodeDic[json1[j]['properties']['target']['component_name']]
link['weight'] = random.randint(1, 10)
links.append(link)
# compose the json as input of the workflow
wfJson = {}
wfJson['workflow'] = {}
nodesList = []
sorted_nodeDic = sorted(nodeDic.items(), key=operator.itemgetter(1))
for key, value in sorted_nodeDic:
v = {}
v['name'] = value
nodesList.append(v)
wfJson['workflow']['nodes'] = nodesList
wfJson['workflow']['links'] = links
#print deadline
#Spiros: Manually set price and preformace arrays. This is a hack to make the planner work
price = ""
prefix = ""
for i in reversed(xrange(len(nodesList))):
#price += str(i)","
price = price+prefix+str(i+1)
prefix = ","
prefix = ""
performance_str = ""
for j in range(1, len(nodesList)+1):
performance_str = performance_str+prefix+str(j)
prefix = ","
wfJson['price'] = price #"9,8,7,6,5,2,1"
wfJson['deadline'] = {'2': deadline}
#generate performance
performance = {}
for key, value in sorted_nodeDic:
performance[str(value)] = performance_str #"1,2,3,4,5,6,7"
wfJson['performance'] = performance
#print wfJson
#send request to the server
start = time.time()
wf = Workflow()
wf.init(wfJson)
wf.ic_pcp()
#print content['workflow']
#return
res = wf.generateJSON()
end = time.time()
#print (end - start)
# convert the json to the file required
outcontent = {}
current_milli_time = lambda: int(round(time.time() * 1000))
outcontent["creationDate"] = current_milli_time()
outcontent["parameters"] = []
for key, value in sorted_nodeDic:
if json1[nodeDic1[value]].get('artifacts') is None:
#print key
#print json1[nodeDic1[value]]
continue
if "docker_image." not in json1[nodeDic1[value]].get('artifacts'):
keys = json1[nodeDic1[value]].get('artifacts').keys()
for k in keys:
docker = json1[nodeDic1[value]].get('artifacts').get(k).get('file')
par = {}
res1 = {}
par["url"] = "null"
par["encoding"] = "UTF-8"
name = str(nodeDic1[value])
res1["name"] = str(nodeDic1[value])
res1["size"] = res[str(value)]
res1["docker"] = str(docker)
par["value"] = res1
par["attributes"] = "null"
##print ("Parameter: %s" % par)
outcontent["parameters"].append(par)
else:
for key, value in sorted_nodeDic:
par = {}
res1 = {}
par["url"] = "null"
par["encoding"] = "UTF-8"
name = str(nodeDic1[value])
docker = json1[nodeDic1[value]].get('artifacts').get('docker_image').get('file')
res1["name"] = str(nodeDic1[value])
res1["size"] = res[str(value)]
res1["docker"] = str(docker)
par["value"] = res1
par["attributes"] = "null"
##print ("Parameter: %s" % par)
outcontent["parameters"].append(par)
print ("Output message: %s" % outcontent)
return outcontent
def on_request(ch, method, props, body):
response = handleDelivery(body)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='planner_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming()
#f = open("../doc/json_samples/plannerInput2.json","r")
#body=f.read()
#response = handleDelivery(body)
#https://github.com/skoulouzis/DRIP/blob/package/doc/json_samples/kbExampleMessage.json
import networkx as nx
import sys
import numpy as np
import sys, argparse
import operator
import os
from toscaparser import *
from toscaparser.tosca_template import ToscaTemplate
import re
import getopt
from ICPCP import Workflow
import random
import time
import json
def main(argv):
workflow_file = ""
try:
opts, args = getopt.getopt(argv,"hw:s:",["workflow=", "SDI="])
except getopt.GetoptError:
print 'server.py -w <workflowfile> -s <SDI>'
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print 'server.py -w <workflowfile> -s <SDI>'
sys.exit()
elif opt in ("-w", "--workflow"):
workflow_file = arg
elif opt in ("-s", "--SDI"):
SDI_file = arg
data = {}
#print workflow_file
with open(workflow_file) as data_file:
data = json.load(data_file)
#print data
#path = "input.yaml"
'''
a_file = os.path.isfile(path)
tosca = ToscaTemplate(path)
#print tosca.tpl
json = tosca.tpl.get('topology_template').get('node_templates')
#print json
'''
json1 = data.get('parameters')[0].get('value').get('topology_template').get('node_templates')
deadline = 0
for j in json1:
#print json[j]
if not json1[j]['type'] == "Switch.nodes.Application.Connection":
deadline = int(re.search(r'\d+', json1[j]['properties']['QoS']['response_time']).group())
#get the nodes from the json
nodeDic = {}
nodeDic1 = {}
i = 1
for j in json1:
if not json1[j]['type'] == "Switch.nodes.Application.Connection":
print j, json1[j]
nodeDic[j] = i
nodeDic1[i] = j
i = i + 1
#get the links from the json
links = []
for j in json1:
if json1[j]['type'] == "Switch.nodes.Application.Connection":
print json1[j]['properties']['source']['component_name']
print json1[j]['properties']['target']['component_name']
link= {}
link['source'] = nodeDic[json1[j]['properties']['source']['component_name']]
link['target'] = nodeDic[json1[j]['properties']['target']['component_name']]
link['weight'] = random.randint(1, 10)
links.append(link)
# compose the json as input of the workflow
wfJson = {}
wfJson['workflow'] = {}
nodesList = []
sorted_nodeDic = sorted(nodeDic.items(), key=operator.itemgetter(1))
for key, value in sorted_nodeDic:
v = {}
v['name'] = value
nodesList.append(v)
wfJson['workflow']['nodes'] = nodesList
wfJson['workflow']['links'] = links
#print deadline
wfJson['price'] = "5,2,1"
wfJson['deadline'] = {'2': deadline}
#generate performance
performance = {}
for key, value in sorted_nodeDic:
performance[str(value)] = "1,2,3"
wfJson['performance'] = performance
print wfJson
#send request to the server
start = time.time()
wf = Workflow()
wf.init(wfJson)
wf.ic_pcp()
#print content['workflow']
#return
res = wf.generateJSON()
end = time.time()
print (end - start)
# convert the json to the file required
res1 = {}
for key, value in sorted_nodeDic:
print value, res[str(value)]
res1_value = {}
res1_value["size"] = res[str(value)]
res1_value["docker"] = json1[nodeDic1[value]].get('artifacts').get('docker_image').get('file')
res1[str(nodeDic1[value])] = res1_value
print res1
# generate the json files in the corresponding format as the
outcontent = {}
outcontent["creationDate"] = 1487002029722
outcontent["parameters"] = []
par1 = {}
par1["url"] = "null"
par1["encoding"] = "UTF-8"
par1["value"] = res1
par1["attributes"] = "null"
outcontent["parameters"].append(par1)
#print outcontent
return outcontent
if __name__ == '__main__':
main(sys.argv[1:])
\ No newline at end of file
...@@ -207,10 +207,10 @@ public class P2PConverter { ...@@ -207,10 +207,10 @@ public class P2PConverter {
if (size <= 1) { if (size <= 1) {
return "XOSmall"; return "XOSmall";
} }
if (size > 1 && size <= 5) { if (size > 1 && size < 5) {
return "XOMedium"; return "XOMedium";
} }
if (size > 5 && size <= 10) { if (size >= 5 && size <= 10) {
return "XOLarge"; return "XOLarge";
} }
default: default:
......
...@@ -2,6 +2,14 @@ ...@@ -2,6 +2,14 @@
<project version="4"> <project version="4">
<component name="ChangeListManager"> <component name="ChangeListManager">
<list default="true" id="462ede19-adfe-472b-975e-fefefa973fe0" name="Default Changelist" comment="slolved cap error"> <list default="true" id="462ede19-adfe-472b-975e-fefefa973fe0" name="Default Changelist" comment="slolved cap error">
<change beforePath="$PROJECT_DIR$/../drip-deployer/docker_engine.py" beforeDir="false" afterPath="$PROJECT_DIR$/../drip-deployer/docker_engine.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../drip-deployer/docker_kubernetes.py" beforeDir="false" afterPath="$PROJECT_DIR$/../drip-deployer/docker_kubernetes.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../drip-deployer/docker_kubernetes.sh" beforeDir="false" afterPath="$PROJECT_DIR$/../drip-deployer/docker_kubernetes.sh" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../drip-planner/ICPCP.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/../drip-planner/NewInstance.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/../drip-planner/rpc_server.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/../drip-planner/server.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/../drip-planner2provisioner/src/main/java/nl/uva/sne/drip/drip/converter/P2PConverter.java" beforeDir="false" afterPath="$PROJECT_DIR$/../drip-planner2provisioner/src/main/java/nl/uva/sne/drip/drip/converter/P2PConverter.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/planner/basic_planner.py" beforeDir="false" afterPath="$PROJECT_DIR$/src/planner/basic_planner.py" afterDir="false" /> <change beforePath="$PROJECT_DIR$/src/planner/basic_planner.py" beforeDir="false" afterPath="$PROJECT_DIR$/src/planner/basic_planner.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/venv/lib/python3.6/site-packages/easy-install.pth" beforeDir="false" /> <change beforePath="$PROJECT_DIR$/venv/lib/python3.6/site-packages/easy-install.pth" beforeDir="false" />
...@@ -83,7 +91,7 @@ ...@@ -83,7 +91,7 @@
<option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/src/rpc_server.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/src/rpc_server.py" />
<option name="PARAMETERS" value="test_local" /> <option name="PARAMETERS" value="localhost planner_queue" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" /> <option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" /> <option name="MODULE_MODE" value="false" />
...@@ -123,7 +131,14 @@ ...@@ -123,7 +131,14 @@
<option name="project" value="LOCAL" /> <option name="project" value="LOCAL" />
<updated>1568123822036</updated> <updated>1568123822036</updated>
</task> </task>
<option name="localTasksCounter" value="3" /> <task id="LOCAL-00003" summary="changed to ubunut 17.10">
<created>1568902303586</created>
<option name="number" value="00003" />
<option name="presentableId" value="LOCAL-00003" />
<option name="project" value="LOCAL" />
<updated>1568902303586</updated>
</task>
<option name="localTasksCounter" value="4" />
<servers /> <servers />
</component> </component>
<component name="Vcs.Log.Tabs.Properties"> <component name="Vcs.Log.Tabs.Properties">
...@@ -143,7 +158,8 @@ ...@@ -143,7 +158,8 @@
<MESSAGE value="slolved cap error" /> <MESSAGE value="slolved cap error" />
<MESSAGE value="produces valid TOSCA" /> <MESSAGE value="produces valid TOSCA" />
<MESSAGE value="format TOSCA output" /> <MESSAGE value="format TOSCA output" />
<option name="LAST_COMMIT_MESSAGE" value="format TOSCA output" /> <MESSAGE value="changed to ubunut 17.10" />
<option name="LAST_COMMIT_MESSAGE" value="changed to ubunut 17.10" />
</component> </component>
<component name="XDebuggerManager"> <component name="XDebuggerManager">
<breakpoint-manager> <breakpoint-manager>
......
...@@ -269,7 +269,7 @@ class BasicPlanner: ...@@ -269,7 +269,7 @@ class BasicPlanner:
return max_occurrences return max_occurrences
if max_occurrences and max_occurrences <= -1 and min_max_occurrences[ if max_occurrences and max_occurrences <= -1 and min_max_occurrences[
1] == 'UNBOUNDED' and node_type == 'tosca.nodes.ARTICONF.VM.Compute': 1] == 'UNBOUNDED' and node_type == 'tosca.nodes.ARTICONF.VM.Compute':
return 2 return 1
else: else:
return 1 return 1
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment