pre deploy

parent 0dcf5321
FROM python:3
LABEL maintainer="Alexander Lercher"
ENV http_proxy http://proxy.uni-klu.ac.at:3128/
ENV https_proxy http://proxy.uni-klu.ac.at:3128/
RUN apt-get update
EXPOSE 5000
WORKDIR /app
COPY src/data-hub/federated-training-microservice/app/requirements.txt /app/
RUN pip install -r requirements.txt
COPY src/modules/ /app/
COPY src/data-hub/federated-training-microservice/app/ /app/
RUN chmod a+x main.py
CMD ["python", "./main.py"]
\ No newline at end of file
......@@ -84,4 +84,31 @@ else:
BUSINESS_LOGIC_REST_PORT = 30420
BUSINESS_LOGIC_DB_PORT = 30421
## Federated Learning
if server:
BUSINESS_LOGIC_HOSTNAME = 'federated-learning'
BUSINESS_LOGIC_DB_HOSTNAME = f'{BUSINESS_LOGIC_HOSTNAME}-db'
BUSINESS_LOGIC_REST_PORT = 80
BUSINESS_LOGIC_DB_PORT = 27017
else:
BUSINESS_LOGIC_HOSTNAME = 'articonf1.itec.aau.at'
BUSINESS_LOGIC_DB_HOSTNAME = 'articonf1.itec.aau.at'
BUSINESS_LOGIC_REST_PORT = 30422
BUSINESS_LOGIC_DB_PORT = 30423
#endregion Participation Hub
#region Federated Training
## Federated Training
if server:
BUSINESS_LOGIC_HOSTNAME = 'federated-training'
BUSINESS_LOGIC_DB_HOSTNAME = f'{BUSINESS_LOGIC_HOSTNAME}-db'
BUSINESS_LOGIC_REST_PORT = 80
BUSINESS_LOGIC_DB_PORT = 27017
else:
BUSINESS_LOGIC_HOSTNAME = 'articonf1.itec.aau.at'
BUSINESS_LOGIC_DB_HOSTNAME = 'articonf1.itec.aau.at'
BUSINESS_LOGIC_REST_PORT = 30424
BUSINESS_LOGIC_DB_PORT = 30425
#endregion Federated Training
\ No newline at end of file
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
#from models import MnistModel
from models2 import model_fn
def evaluate(server_state):
keras_model = model_fn
keras_model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()]
)
keras_model.set_weights(server_state)
keras_model.evaluate(central_emnist_test)
def evaluate2(server_state,tff_learning_model):
#TODO: Assign weights to the model
#First idea = server_update function??
evaluation = tff.learning.build_federated_evaluation(tff_learning_model)
# keras_model = MnistModel()
# keras_model.compile(
# loss=tf.keras.losses.SparseCategoricalCrossentropy(),
# metrics=[tf.keras.metrics.SparseCategoricalAccuracy()]
# )
# keras_model.set_weights(server_state)
# keras_model.evaluate(central_emnist_test)
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
from models import MnistModel
from models2 import model_fn
#tf_dataset_type = None
#model_weights_type = None
@tf.function
def client_update(model, dataset, server_weights, client_optimizer):
"""Performs training (using the server model weights) on the client's dataset."""
# Initialize the client model with the current server weights.
client_weights = model.trainable_variables
# Assign the server weights to the client model.
tf.nest.map_structure(lambda x, y: x.assign(y),
client_weights, server_weights)
# Use the client_optimizer to update the local model.
for batch in dataset:
with tf.GradientTape() as tape:
# Compute a forward pass on the batch of data
outputs = model.forward_pass(batch)
# Compute the corresponding gradient
grads = tape.gradient(outputs.loss, client_weights)
grads_and_vars = zip(grads, client_weights)
# Apply the gradient using a client optimizer.
client_optimizer.apply_gradients(grads_and_vars)
return client_weights
@tf.function
def server_update(model, mean_client_weights):
"""Updates the server model weights as the average of the client model weights."""
model_weights = model.trainable_variables
# Assign the mean client weights to the server model.
tf.nest.map_structure(lambda x, y: x.assign(y),
model_weights, mean_client_weights)
return model_weights
#Creating the initialization computation
@tff.tf_computation
def server_init():
model = MnistModel() #model_fn()
return model.trainable_variables
@tff.federated_computation
def initialize_fn():
return tff.federated_value(server_init(), tff.SERVER)
whimsy_model = MnistModel()
tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)
model_weights_type = server_init.type_signature.result
@tff.tf_computation(tf_dataset_type, model_weights_type)
def client_update_fn(tf_dataset, server_weights):
model = MnistModel()#model_fn()
client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
return client_update(model, tf_dataset, server_weights, client_optimizer)
@tff.tf_computation(model_weights_type)
def server_update_fn(mean_client_weights):
model = MnistModel()#model_fn()
return server_update(model, mean_client_weights)
federated_server_type = tff.FederatedType(model_weights_type, tff.SERVER)
federated_dataset_type = tff.FederatedType(tf_dataset_type, tff.CLIENTS)
@tff.federated_computation(federated_server_type, federated_dataset_type)
def next_fn(server_weights, federated_dataset):
# Broadcast the server weights to the clients.
print("server_weights")
print(str(server_weights.type_signature))
server_weights_at_client = tff.federated_broadcast(server_weights)
# Each client computes their updated weights.
client_weights = tff.federated_map(
client_update_fn, (federated_dataset, server_weights_at_client))
# The server averages these updates.
mean_client_weights = tff.federated_mean(client_weights)
print("mean_client_wieghts")
print(str(mean_client_weights.type_signature))
# The server updates its model.
server_weights = tff.federated_map(server_update_fn, mean_client_weights)
return server_weights
def get_federated_algorithm():
#Creating the next_fn
#Getting the data type, needed explicitily in the modell functions
whimsy_model = MnistModel() #model_fn()
global tf_dataset_type
tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)
print("tf_dataset_type")
print(str(tf_dataset_type))
global model_weights_type
model_weights_type = server_init.type_signature.result
print("model_weights_type")
print(str(model_weights_type))
# finished printing types
federated_server_type = tff.FederatedType(model_weights_type, tff.SERVER)
federated_dataset_type = tff.FederatedType(tf_dataset_type, tff.CLIENTS)
federated_algorithm = tff.templates.IterativeProcess(
initialize_fn=initialize_fn,
next_fn=next_fn
)
return federated_algorithm
def merge_2_states(server_state1, server_state2):
return np.mean( np.array([ server_state, server2_state ]), axis=0 )
\ No newline at end of file
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
#from models import MnistModel
from models2 import model_fn
#tf_dataset_type = None
#model_weights_type = None
@tf.function
def client_update(model, dataset, server_weights, client_optimizer):
"""Performs training (using the server model weights) on the client's dataset."""
# Initialize the client model with the current server weights.
client_weights = model.trainable_variables
# Assign the server weights to the client model.
tf.nest.map_structure(lambda x, y: x.assign(y),
client_weights, server_weights)
# Use the client_optimizer to update the local model.
for batch in dataset:
with tf.GradientTape() as tape:
# Compute a forward pass on the batch of data
outputs = model.forward_pass(batch)
# Compute the corresponding gradient
grads = tape.gradient(outputs.loss, client_weights)
grads_and_vars = zip(grads, client_weights)
# Apply the gradient using a client optimizer.
client_optimizer.apply_gradients(grads_and_vars)
return client_weights
@tf.function
def server_update(model, mean_client_weights):
"""Updates the server model weights as the average of the client model weights."""
model_weights = model.trainable_variables
# Assign the mean client weights to the server model.
tf.nest.map_structure(lambda x, y: x.assign(y),
model_weights, mean_client_weights)
return model_weights
#Creating the initialization computation
@tff.tf_computation
def server_init():
model = model_fn()
return model.trainable_variables
@tff.federated_computation
def initialize_fn():
return tff.federated_value(server_init(), tff.SERVER)
whimsy_model = model_fn()
tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)
model_weights_type = server_init.type_signature.result
@tff.tf_computation(tf_dataset_type, model_weights_type)
def client_update_fn(tf_dataset, server_weights):
model = model_fn()
client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
return client_update(model, tf_dataset, server_weights, client_optimizer)
@tff.tf_computation(model_weights_type)
def server_update_fn(mean_client_weights):
model = model_fn()
return server_update(model, mean_client_weights)
federated_server_type = tff.FederatedType(model_weights_type, tff.SERVER)
federated_dataset_type = tff.FederatedType(tf_dataset_type, tff.CLIENTS)
@tff.federated_computation(federated_server_type, federated_dataset_type)
def next_fn(server_weights, federated_dataset):
# Broadcast the server weights to the clients.
print("server_weights")
print(str(server_weights.type_signature))
server_weights_at_client = tff.federated_broadcast(server_weights)
# Each client computes their updated weights.
client_weights = tff.federated_map(
client_update_fn, (federated_dataset, server_weights_at_client))
# The server averages these updates.
mean_client_weights = tff.federated_mean(client_weights)
print("mean_client_wieghts")
print(str(mean_client_weights.type_signature))
# The server updates its model.
server_weights = tff.federated_map(server_update_fn, mean_client_weights)
return server_weights
def get_federated_algorithm():
#Creating the next_fn
#Getting the data type, needed explicitily in the modell functions
whimsy_model = model_fn()
global tf_dataset_type
tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)
print("tf_dataset_type")
print(str(tf_dataset_type))
global model_weights_type
model_weights_type = server_init.type_signature.result
print("model_weights_type")
print(str(model_weights_type))
# finished printing types
federated_server_type = tff.FederatedType(model_weights_type, tff.SERVER)
federated_dataset_type = tff.FederatedType(tf_dataset_type, tff.CLIENTS)
federated_algorithm = tff.templates.IterativeProcess(
initialize_fn=initialize_fn,
next_fn=next_fn
)
return federated_algorithm
def merge_2_states(server1_state, server2_state):
return np.mean( np.array([ server1_state, server2_state ]), axis=0 )
\ No newline at end of file
import collections
import attr
import functools
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
from preprocessing import get_client_ids
from preprocessing import get_federated_train_data
from preprocessing import preprocess
#from models import MnistModel
from federated_training_algorithm import get_federated_algorithm
from federated_training_algorithm import merge_2_states
from evaluation import evaluate
np.random.seed(0)
print("## Starting...")
#GET THE DATA (for now it's the default dataset)
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()
######client_ids = get_client_ids(emnist_train) not used
print("## Preprocessing the federated_train_data")
federated_train_data = get_federated_train_data(emnist_train)
print("## Declaring the model")
#it is done in models.py
print("## Declaring the federated algorithm")
federated_algorithm = get_federated_algorithm()
server_state = federated_algorithm.initialize()
for round in range(20):
server_state = federated_algorithm.next(server_state, federated_train_data)
print("server_state type")
print(str(type(server_state)))
print(str(type(server_state[0])))
print("FINISHEEED")
server2_state = federated_algorithm.initialize()
for round in range(2):
server2_state = federated_algorithm.next(server2_state, federated_train_data)
merged_state = merge_2_states(server_state,server2_state)
print("server_state[1]")
print(server_state[1])
print("server2_state[1]")
print(server2_state[1])
print("merged_state[1]")
print(merged_state[1])
# print("federated_algorithm.initialize.type_signature")
# print(str(federated_algorithm.initialize.type_signature)
# print("federated_algorithm.next.type_signature")
# print(str(federated_algorithm.next.type_signature))
# print("## Training the model")
# iterative_process = tff.learning.build_federated_averaging_process(
# MnistModel,
# client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02))
# state = iterative_process.initialize()
# for round_num in range(1, 11):
# state, metrics = iterative_process.next(state, federated_train_data)
# print('round {:2d}, metrics={}'.format(round_num, metrics))
#evaluation = tff.learning.build_federated_evaluation(MnistModel)
#TODO integration
print("## Evaluation of the model")
\ No newline at end of file
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
import collections
input_spec_data_global = None
MnistVariables = collections.namedtuple(
'MnistVariables', 'weights bias num_examples loss_sum accuracy_sum')
def create_mnist_variables():
return MnistVariables(
weights=tf.Variable(
lambda: tf.zeros(dtype=tf.float32, shape=(784, 10)),
name='weights',
trainable=True),
bias=tf.Variable(
lambda: tf.zeros(dtype=tf.float32, shape=(10)),
name='bias',
trainable=True),
num_examples=tf.Variable(0.0, name='num_examples', trainable=False),
loss_sum=tf.Variable(0.0, name='loss_sum', trainable=False),
accuracy_sum=tf.Variable(0.0, name='accuracy_sum', trainable=False))
def mnist_forward_pass(variables, batch):
y = tf.nn.softmax(tf.matmul(batch['x'], variables.weights) + variables.bias)
predictions = tf.cast(tf.argmax(y, 1), tf.int32)
flat_labels = tf.reshape(batch['y'], [-1])
loss = -tf.reduce_mean(
tf.reduce_sum(tf.one_hot(flat_labels, 10) * tf.math.log(y), axis=[1]))
accuracy = tf.reduce_mean(
tf.cast(tf.equal(predictions, flat_labels), tf.float32))
num_examples = tf.cast(tf.size(batch['y']), tf.float32)
variables.num_examples.assign_add(num_examples)
variables.loss_sum.assign_add(loss * num_examples)
variables.accuracy_sum.assign_add(accuracy * num_examples)
return loss, predictions
def get_local_mnist_metrics(variables):
return collections.OrderedDict(
num_examples=variables.num_examples,
loss=variables.loss_sum / variables.num_examples,
accuracy=variables.accuracy_sum / variables.num_examples)
@tff.federated_computation
def aggregate_mnist_metrics_across_clients(metrics):
return collections.OrderedDict(
num_examples=tff.federated_sum(metrics.num_examples),
loss=tff.federated_mean(metrics.loss, metrics.num_examples),
accuracy=tff.federated_mean(metrics.accuracy, metrics.num_examples))
class MnistModel(tff.learning.Model):
def __init__(self):
self._variables = create_mnist_variables()
@property
def trainable_variables(self):
return [self._variables.weights, self._variables.bias]
@property
def non_trainable_variables(self):
return []
@property
def local_variables(self):
return [
self._variables.num_examples, self._variables.loss_sum,
self._variables.accuracy_sum
]
@property
def input_spec(self):
return collections.OrderedDict(
x=tf.TensorSpec([None, 784], tf.float32),
y=tf.TensorSpec([None, 1], tf.int32))
@tf.function
def forward_pass(self, batch, training=True):
del training
loss, predictions = mnist_forward_pass(self._variables, batch)
num_exmaples = tf.shape(batch['x'])[0]
return tff.learning.BatchOutput(
loss=loss, predictions=predictions, num_examples=num_exmaples)
@tf.function
def report_local_outputs(self):
return get_local_mnist_metrics(self._variables)
@property
def federated_output_computation(self):
return aggregate_mnist_metrics_across_clients
\ No newline at end of file
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
import collections
def create_keras_model(): #### DEFAULT TEST MODEL
return tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(784,)),
tf.keras.layers.Dense(10, kernel_initializer='zeros'),
tf.keras.layers.Softmax(),
])
def model_fn():
# We _must_ create a new model here, and _not_ capture it from an external
# scope. TFF will call this within different graph contexts.
keras_model = create_keras_model() ###TODO CAN BE CHANGED TO INCLUDE CALLS TO OTHER MODELS (or it cannot, because you cannot specify which model you want to call?)
return tff.learning.from_keras_model(
keras_model,
input_spec=get_input_spec(),
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
def get_input_spec():
return collections.OrderedDict(
x=tf.TensorSpec([None, 784], tf.float32),
y=tf.TensorSpec([None, 1], tf.int32))
\ No newline at end of file
import collections
import attr
import functools
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
np.random.seed(0)
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()
# NUM_CLIENTS = 10
# BATCH_SIZE = 20
def preprocess(dataset, BATCH_SIZE = 20):
def batch_format_fn(element):
"""Flatten a batch of EMNIST data and return a (features, label) tuple."""
return (tf.reshape(element['pixels'], [-1, 784]),
tf.reshape(element['label'], [-1, 1]))
return dataset.batch(BATCH_SIZE).map(batch_format_fn)
def get_client_ids(emnist_train, NUM_CLIENTS = 10):
return np.random.choice(emnist_train.client_ids, size=NUM_CLIENTS, replace=False)
#client_ids = np.random.choice(emnist_train.client_ids, size=NUM_CLIENTS, replace=False)
def get_federated_train_data(emnist_train, NUM_CLIENTS = 10):
client_ids = get_client_ids(emnist_train, NUM_CLIENTS)
return [preprocess(emnist_train.create_tf_dataset_for_client(x))
for x in client_ids
]
# federated_train_data = [preprocess(emnist_train.create_tf_dataset_for_client(x))
# for x in client_ids
# ]
##################################################################
## Second Dataset
############################################
def preprocess_and_shuffle(dataset):
"""Applies `preprocess_dataset` above and shuffles the result."""
preprocessed = preprocess(dataset)
return preprocessed.shuffle(buffer_size=5)
import collections
import attr
import functools
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff