Working And Tested Fed Training Microserv

parent fb06bd96
...@@ -28,20 +28,20 @@ def last(use_case: str): ...@@ -28,20 +28,20 @@ def last(use_case: str):
def upload_and_train(use_case: str, developer_id: int): def upload_and_train(use_case: str, developer_id: int):
#FORWARD TO GPU SERVER WITH IP AND PORT #TODO FORWARD FILES, for some reason the files are received in this microservice (Federated-LEARNING), but after forwarding they are empty in Federated-TRAINING
#data = {'use_case' : use_case, #data = {'use_case' : use_case,
# 'developer_id' : developer_id} # 'developer_id' : developer_id}
#url= 'gpu3.itec.aau.at/home/itec/bogdan/Articonf/smart/tools/federated-training/app/routes/developers' #url= 'gpu3.itec.aau.at/home/itec/bogdan/Articonf/smart/tools/federated-training/app/routes/developers'
try:
use_case_path = 'processing/'+use_case+'/' use_case_path = 'processing/'+use_case+'/'
app_path = dirname(dirname(abspath(__file__))) app_path = dirname(dirname(abspath(__file__)))
file_dict = request.files file_dict = request.files
db_File_True = file_dict["dataset_file1"] db_File_True = file_dict["dataset_file1"]
db_File_Fake = file_dict["dataset_file2"] db_File_Fake = file_dict["dataset_file2"]
true_csv_path = os.path.join(app_path+"/"+use_case_path+"db/", "True.csv") true_csv_path = "True.csv"
fake_csv_path = os.path.join(app_path+"/"+use_case_path+"db/", "Fake.csv") fake_csv_path = "Fake.csv"
db_File_True.save(true_csv_path) db_File_True.save(true_csv_path)
db_File_Fake.save(fake_csv_path) db_File_Fake.save(fake_csv_path)
time.sleep(2) #wait for hte files to be copied time.sleep(2) #wait for hte files to be copied
...@@ -61,9 +61,6 @@ def upload_and_train(use_case: str, developer_id: int): ...@@ -61,9 +61,6 @@ def upload_and_train(use_case: str, developer_id: int):
return json.loads(response.text) return json.loads(response.text)
except Exception as e:
return json.loads(str(e))
#upload_and_train("text_processing",1) #upload_and_train("text_processing",1)
#last("text_processing") #last("text_processing")
......
...@@ -21,6 +21,7 @@ def last(use_case: str): ...@@ -21,6 +21,7 @@ def last(use_case: str):
def upload_and_train(use_case: str): def upload_and_train(use_case: str):
#FORWARD TO GPU SERVER WITH IP AND PORT #FORWARD TO GPU SERVER WITH IP AND PORT
#TODO FORWARD FILES, for some reason the files are received in this microservice (Federated-LEARNING), but after forwarding they are empty in Federated-TRAINING
url = f'https://{network_constants.FEDERATED_TRAINING_HOSTNAME}:{network_constants.FEDERATED_TRAINING_REST_PORT}/api/Owners/use_cases/{use_case}/upload_and_train:' url = f'https://{network_constants.FEDERATED_TRAINING_HOSTNAME}:{network_constants.FEDERATED_TRAINING_REST_PORT}/api/Owners/use_cases/{use_case}/upload_and_train:'
response = requests.post( response = requests.post(
url, url,
......
...@@ -13,3 +13,4 @@ Trainer_id,Model_id,Dataset_id,Accuracy,Loss ...@@ -13,3 +13,4 @@ Trainer_id,Model_id,Dataset_id,Accuracy,Loss
1,1624284673,1624284673,0.5,nan 1,1624284673,1624284673,0.5,nan
0,1624550528,1624550528,0.75,nan 0,1624550528,1624550528,0.75,nan
2,1624872086,1624872086,0.5,nan 2,1624872086,1624872086,0.5,nan
2,1624884163,1624884163,0.75,nan
import global_hyperparams as globals
from model import get_simple_LSTM_model
import pickle
import tensorflow as tf
import tensorflow_federated as tff
def model_fn():
keras_model = get_simple_LSTM_model()
return tff.learning.from_keras_model(
keras_model,
input_spec=globals.INPUT_SPEC,
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
def federated_computation_new(train_dataset,test_dataset):
if(globals.INPUT_SPEC == None):
#should never reach this place because INPUT_SPEC is instantiated inside get_preprocessed_train_test_data.
#however, if in the future, processed data is provided without hte preprocessing function it will be none -> therefore assign it here
globals.INPUT_SPEC = train_dataset[0].element_spec
# Training and evaluating the model
iterative_process = tff.learning.build_federated_averaging_process(model_fn,client_optimizer_fn=lambda: tf.keras.optimizers.SGD(lr=0.5))
state = iterative_process.initialize()
print(type(state))
for n in range(globals.EPOCHS):
state, metrics = iterative_process.next(state, train_dataset)
print('round {}, training metrics={}'.format(n+1, metrics))
evaluation = tff.learning.build_federated_evaluation(model_fn)
eval_metrics = evaluation(state.model, train_dataset)
print('Training evaluation metrics={}'.format(eval_metrics))
test_metrics = evaluation(state.model, test_dataset)
print('Test evaluation metrics={}'.format(test_metrics))
####################################################################################################################################
#Save Last Trained Model
import pickle
with open("processing/"+globals.USE_CASE+"/last_model",'wb') as f:
pickle.dump(state, f)
return state,metrics
###############################################################################################
def federated_computation_continue(train_dataset,test_dataset,restored_state):
if(globals.INPUT_SPEC == None):
#should never reach this place because INPUT_SPEC is instantiated inside get_preprocessed_train_test_data.
#however, if in the future, processed data is provided without hte preprocessing function it will be none -> therefore assign it here
globals.INPUT_SPEC = train_dataset[0].element_spec
# Training and evaluating the model
iterative_process = tff.learning.build_federated_averaging_process(model_fn,client_optimizer_fn=lambda: tf.keras.optimizers.SGD(lr=0.5))
state = iterative_process.initialize()
state = restored_state
print(type(state))
for n in range(globals.EPOCHS):
state, metrics = iterative_process.next(state, train_dataset)
print('round {}, training metrics={}'.format(n+1, metrics))
evaluation = tff.learning.build_federated_evaluation(model_fn)
eval_metrics = evaluation(state.model, train_dataset)
print('Training evaluation metrics={}'.format(eval_metrics))
test_metrics = evaluation(state.model, test_dataset)
print('Test evaluation metrics={}'.format(test_metrics))
return state,metrics
###############################################################################################
def make_prediction(input_prediction_data):
try:
with open("processing/"+globals.USE_CASE+"/last_model",'rb') as f:
state = pickle.load(f)
except Exception as e:
print(e)
return None
model_for_inference = get_simple_LSTM_model()
state.model.assign_weights_to(model_for_inference)
predictions = model_for_inference.predict_on_batch(input_prediction_data)
return predictions
def initialize(use_case,trainer_id = 0,dataset_id = 0):
global MAX_LENGTH #Lenght of sentences to be fed into the NN. Similar to image size i.e. 100pixels x 100pixels, but it's 1D.
MAX_LENGTH = 40
global VOCAB_SIZE #tells how many words it memorises. each word is stored as an int.
VOCAB_SIZE = 6000
global NUM_CLIENTS #number of clients in the federated dataset
NUM_CLIENTS = 4
global SHUFFLE_BUFFER
SHUFFLE_BUFFER = 5000 #used in preprocessing
global BATCH_SIZE
BATCH_SIZE = 512 #size of the batch of the dataset. which is later fed into the NN.
global INPUT_SPEC #tell the model how the data will look like.
INPUT_SPEC = None #must & will be initialized after data preprocessing. Currently it's being initialised with: train_dataset[0].element_spec
global EMBED_DIM # number of dimension of the embedding of the layer in the model.
EMBED_DIM = 10
global LSTM_OUT # output size of the LSTM layer
LSTM_OUT = 100
global EPOCHS #number of epochs the model will be trained
EPOCHS = 15
global TRAINER_ID # ID of the trainer entity.
TRAINER_ID = trainer_id #0 = Owner of the use_case
global DATASET_ID # ID of the dataset used
DATASET_ID = dataset_id #0 = "Main"/Original dataset
global USE_CASE #Use_case name
USE_CASE = use_case
\ No newline at end of file
Trainer_id,Model_id,Dataset_id,Accuracy,Loss
0,1624884692,1624884692,0.25,nan
import csv
import os
import json
import time
import global_hyperparams as globals
from preprocessing import get_preprocessed_train_test_data, preprocess_single_train_data
from federated_algorithm import federated_computation_new, make_prediction
def start_processing(use_case, developer_id:int = 0):
globals.initialize(use_case,developer_id)
globals.TRAINER_ID = developer_id
train_dataset, test_dataset= get_preprocessed_train_test_data()
state,metrics = federated_computation_new(train_dataset,test_dataset)
trained_metrics= metrics['train']
timestamp = int(time.time())
globals.DATASET_ID = timestamp
written_row = save_to_file_CSV(use_case,globals.TRAINER_ID,timestamp,globals.DATASET_ID,trained_metrics['sparse_categorical_accuracy'],trained_metrics['loss'])
return written_row
def start_prediction(use_case,raw_input_prediction_data, developer_id:int = -1):
globals.initialize(use_case,developer_id)
#raw_input_prediction_data = "Test sentence. And another sentence which is going to be used as a mean for checking if this article is true or not. Also Santa is real"
#raw_input_prediction_data = "Donald Trump Sends Out Embarrassing New Year’s Eve Message. This is Disturbing,'Donald Trump just couldn t wish all Americans a Happy New Year and leave it at that. Instead, he had to give a shout out to his enemies, haters and the very dishonest fake news media. The former reality show star had just one job to do and he couldn t do it. As our Country rapidly grows stronger and smarter, I want to wish all of my friends, supporters, enemies, haters, and even the very dishonest Fake News Media, a Happy and Healthy New Year, President Angry Pants tweeted. 2018 will be a great year for America! As our Country rapidly grows stronger and smarter, I want to wish all of my friends, supporters, enemies, haters, and even the very dishonest Fake News Media, a Happy and Healthy New Year. 2018 will be a great year for America! Donald J. Trump (@realDonaldTrump) December 31, 2017Trump s tweet went down about as welll as you d expect.What kind of president sends a New Year s greeting like this despicable, petty, infantile gibberish? Only Trump! His lack of decency won t even allow him to rise above the gutter long enough to wish the American citizens a happy new year! Bishop Talbert Swan (@TalbertSwan) December 31, 2017no one likes you Calvin (@calvinstowell) December 31, 2017Your impeachment would make 2018 a great year for America, but I ll also accept regaining control of Congress. Miranda Yaver (@mirandayaver) December 31, 2017Do you hear yourself talk? When you have to include that many people that hate you you have to wonder? Why do the they all hate me? Alan Sandoval (@AlanSandoval13) December 31, 2017Who uses the word Haters in a New Years wish?? Marlene (@marlene399) December 31, 2017You can t just say happy new year? Koren pollitt (@Korencarpenter) December 31, 2017Here s Trump s New Year s Eve tweet from 2016.Happy New Year to all, including to my many enemies and those who have fought me and lost so badly they just don t know what to do. Love! Donald J. Trump (@realDonaldTrump) December 31, 2016This is nothing new for Trump. He s been doing this for years.Trump has directed messages to his enemies and haters for New Year s, Easter, Thanksgiving, and the anniversary of 9/11. pic.twitter.com/4FPAe2KypA Daniel Dale (@ddale8) December 31, 2017Trump s holiday tweets are clearly not presidential.How long did he work at Hallmark before becoming President? Steven Goodine (@SGoodine) December 31, 2017He s always been like this . . . the only difference is that in the last few years, his filter has been breaking down. Roy Schulze (@thbthttt) December 31, 2017Who, apart from a teenager uses the term haters? Wendy (@WendyWhistles) December 31, 2017he s a fucking 5 year old Who Knows (@rainyday80) December 31, 2017So, to all the people who voted for this a hole thinking he would change once he got into power, you were wrong! 70-year-old men don t change and now he s a year older.Photo by Andrew Burton/Getty Images."
input_prediction_data= preprocess_single_train_data(raw_input_prediction_data)
prediction_result = make_prediction(input_prediction_data)
res = prediction_result[0]
if (res[0]>=0.50):
return json.dumps({ "result" : "True" })
else:
return json.dumps({ "result" : "False" })
def save_to_file_CSV(use_case,trainer_id,model_id,dataset_id,accuracy,loss):
filename = "processing/"+use_case+"/ledger.csv"
row = [str(trainer_id),str(model_id),str(dataset_id),str(accuracy),str(loss)]
if not (os.path.exists(filename)):
fields = ['Trainer_id','Model_id','Dataset_id','Accuracy','Loss']
with open(filename, 'w') as csvfile:
csvwriter =csv.writer(csvfile)
csvwriter.writerow(fields)
csvwriter.writerow(row)
else:
with open(filename, 'a') as csvfile:
csvwriter =csv.writer(csvfile)
csvwriter.writerow(row)
return row
#start_processing("text_processing")
#start_prediction("text_processing")
\ No newline at end of file
import global_hyperparams as globals
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Embedding
def get_simple_LSTM_model():
model = Sequential()
model.add(Embedding(globals.VOCAB_SIZE, globals.EMBED_DIM, input_length=globals.MAX_LENGTH))
model.add(Dropout(0.3))
model.add(LSTM(globals.LSTM_OUT))
model.add(Dropout(0.3))
model.add(Dense(64, activation='relu'))
model.add(Dropout(0.3))
model.add(Dense(1, activation='sigmoid'))
return model
\ No newline at end of file
import global_hyperparams as globals
import pandas as pd
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import collections
import tensorflow as tf
def get_raw_data()-> tuple:
real = pd.read_csv("processing/"+globals.USE_CASE+"/db/True.csv")
fake = pd.read_csv("processing/"+globals.USE_CASE+"/db/Fake.csv")
return real,fake
def preprocess_raw_data(real: pd.DataFrame, fake: pd.DataFrame) -> tuple:
# dropping rows that have urls as text and date, real's dates look fine, also dropping ones that have no text
fake_drop = fake.drop(index=[9358,15507,15508,18933])
fake_drop = fake_drop.drop(fake_drop.loc[fake_drop.text == ' '].index)
real_drop = real.drop(real.loc[real.text == ' '].index)
# Give labels to data before combining
fake['label'] = 1
real['label'] = 0
combined = pd.concat([fake, real])
no_reuters = combined.copy()
no_reuters.text = no_reuters.text.str.replace('Reuters', '')
combined = no_reuters.copy()
## train/test split the text data and labels
df_text = combined['text'] #features is now
labels = combined['label'] #or maybe use target? #currently useless???
target = combined['label'].values
tokenizer = Tokenizer(oov_token = "<OOV>", num_words=6000)
tokenizer.fit_on_texts(df_text)
# MAX_LENGTH = 40
# VOCAB_SIZE = 6000
sequences_train = tokenizer.texts_to_sequences(df_text)
padded_train = pad_sequences(sequences_train, padding = 'post', maxlen=globals.MAX_LENGTH)
#Data_train, data_text, label_train, label_test
X_train, X_test, y_train, y_test = train_test_split(padded_train, target, test_size=0.2)
X_train = tf.convert_to_tensor(X_train)
X_test = tf.convert_to_tensor(X_test)
y_train = tf.convert_to_tensor(y_train)
y_test = tf.convert_to_tensor(y_test)
return X_train,X_test,y_train,y_test
#FED PREPROCESSING
# NUM_CLIENTS = 4
# SHUFFLE_BUFFER = 5000
# BATCH_SIZE = 512
def preprocess(dataset):
def element_fn(x, y):
return collections.OrderedDict([
('x', x),
('y', y)#tf.cast(tf.reshape(y, [1]), tf.float32))
])
return dataset.map(element_fn).shuffle(
globals.SHUFFLE_BUFFER).batch(globals.BATCH_SIZE)
def generate_clients_datasets(n, source_x, source_y):
clients_dataset=[]
for i in range(n):
dataset=tf.data.Dataset.from_tensor_slices(([source_x[i]], [source_y[i]]))
dataset=preprocess(dataset)
clients_dataset.append(dataset)
return clients_dataset
def get_preprocessed_train_test_data() -> tuple:
"""
Preprocesses and returns the train and test datasets
returns the tuple: (train_dataset,test_dataset)
"""
real,fake = get_raw_data()
X_train, X_test, y_train, y_test = preprocess_raw_data(real,fake)
train_dataset=generate_clients_datasets(globals.NUM_CLIENTS, X_train, y_train)
test_dataset=generate_clients_datasets(globals.NUM_CLIENTS, X_test, y_test)
globals.INPUT_SPEC = train_dataset[0].element_spec
print("DONE PREPROCESSING")
return train_dataset,test_dataset
def preprocess_single_train_data(input_data):
input_data = "Test sentence. And another sentence which is going to be used as a mean for checking if this article is true or not. Also Santa is real"
input_data_list = []
input_data_list.append(input_data)
df_text = input_data_list
tokenizer = Tokenizer(oov_token = "<OOV>", num_words=6000)
tokenizer.fit_on_texts(df_text)
sequences_train = tokenizer.texts_to_sequences(df_text)
padded_train = pad_sequences(sequences_train, padding = 'post', maxlen=globals.MAX_LENGTH)
padded_train = tf.convert_to_tensor(padded_train)
return padded_train
#globals.initialize("text_processing")
#preprocess_single_train_data("test")
\ No newline at end of file
...@@ -41,7 +41,7 @@ def upload_and_train(use_case: str, developer_id: int): ...@@ -41,7 +41,7 @@ def upload_and_train(use_case: str, developer_id: int):
fake_csv_path = os.path.join(app_path+"/"+use_case_path+"db/", "Fake.csv") fake_csv_path = os.path.join(app_path+"/"+use_case_path+"db/", "Fake.csv")
db_File_True.save(true_csv_path) db_File_True.save(true_csv_path)
db_File_Fake.save(fake_csv_path) db_File_Fake.save(fake_csv_path)
time.sleep(2) #wait for hte files to be copied time.sleep(5) #wait for the files to be copied before proceeding with the processing (they are copied in a separate thread, i think?)
#THEN start processing #THEN start processing
last_train_metrics = main_proc.start_processing(use_case,developer_id) last_train_metrics = main_proc.start_processing(use_case,developer_id)
print("## Last train metrics") print("## Last train metrics")
......
...@@ -4,6 +4,8 @@ import sys ...@@ -4,6 +4,8 @@ import sys
import shutil import shutil
from flask import Response, request from flask import Response, request
import pandas as pd import pandas as pd
from os.path import dirname, abspath
import time
def last(use_case: str): def last(use_case: str):
print("ENTERED ROUTES LAST") print("ENTERED ROUTES LAST")
...@@ -39,11 +41,12 @@ def upload_and_train(use_case: str): ...@@ -39,11 +41,12 @@ def upload_and_train(use_case: str):
#Start a new implementation of the model. #Start a new implementation of the model.
try: try:
os.mkdir(use_case_path) os.mkdir(use_case_path)
os.mkdir(use_case_path+"/db/")
#COPY DEFAULT FILES #COPY DEFAULT FILES
shutil.copyfile(default_path+'main_proc.py',use_case_path+'main_proc.py') shutil.copyfile(default_path+'main_proc.py',use_case_path+'main_proc.py')
shutil.copyfile(default_path+'__init__.py',use_case_path+'__init__.py') shutil.copyfile(default_path+'__init__.py',use_case_path+'__init__.py')
shutil.copyfile(default_path+'checkpoint_manager.py',use_case_path+'checkpoint_manager.py') #shutil.copyfile(default_path+'checkpoint_manager.py',use_case_path+'checkpoint_manager.py')
shutil.copyfile(default_path+'federated_algorithm.py',use_case_path+'federated_algorithm.py') shutil.copyfile(default_path+'federated_algorithm.py',use_case_path+'federated_algorithm.py')
#TODO: get the python files and create them locally in the {use_case} folder #TODO: get the python files and create them locally in the {use_case} folder
...@@ -52,7 +55,28 @@ def upload_and_train(use_case: str): ...@@ -52,7 +55,28 @@ def upload_and_train(use_case: str):
# copy preprocessing into use_case_path # copy preprocessing into use_case_path
# copy global_hyperparams into use_case_path # copy global_hyperparams into use_case_path
# copy model into use_case_path # copy model into use_case_path
use_case_path2 = 'processing/'+use_case+'/'
app_path = dirname(dirname(abspath(__file__)))
file_dict = request.files
##
file_global = file_dict["global_hyperparameters"]
file_preprocessing = file_dict["preprocessing"]
file_model = file_dict["model"]
file_True = file_dict["dataset_file1"]
file_Fake = file_dict["dataset_file2"]
##
global_hyp_path = os.path.join(app_path+"/"+use_case_path2, "global_hyperparams.py")
preprocessing_path = os.path.join(app_path+"/"+use_case_path2, "preprocessing.py")
model_path = os.path.join(app_path+"/"+use_case_path2, "model.py")
true_csv_path = os.path.join(app_path+"/"+use_case_path2+"db/", "True.csv")
fake_csv_path = os.path.join(app_path+"/"+use_case_path2+"db/", "Fake.csv")
##
file_global.save(global_hyp_path)
file_preprocessing.save(preprocessing_path)
file_model.save(model_path)
file_True.save(true_csv_path)
file_Fake.save(fake_csv_path)
time.sleep(5) #wait for the files to be copied before proceeding with the processing (they are copied in a separate thread, i think?)
#COPY flask files #COPY flask files
#use flask? request.files.getlist('filename')[0] ??? #use flask? request.files.getlist('filename')[0] ???
except OSError as error: except OSError as error:
......
...@@ -3,14 +3,14 @@ ...@@ -3,14 +3,14 @@
from flask import Response, request from flask import Response, request
import sys import sys
def check_article(use_case: str): def check_article(use_case: str, data_entry):
#body = request.STRING #body = request.STRING
use_case_path = 'processing/'+use_case+'/' use_case_path = 'processing/'+use_case+'/'
sys.path.append(use_case_path) sys.path.append(use_case_path)
import main_proc import main_proc
result = main_proc.start_prediction(use_case) result = main_proc.start_prediction(use_case,data_entry)
if result == None: if result == None:
return Response(status = 404, response="Server doesn't have a trained model. Training of the model should be finished before attempting a prediction.") return Response(status = 404, response="Server doesn't have a trained model. Training of the model should be finished before attempting a prediction.")
return Response(status=200, response=result) return Response(status=200, response=result)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment