Published

Single-Machine Simulation of Federated Learning Systems

Photo of Taner Topal
Taner Topal
Founder & COO of Adap

The approach outlined in this blog post is no longer recommended. The blog post is only kept for reference purposes. Please refer to the new simulation examples for TensorFlow or PyTorch

Federated Learning brings together people from a diverse set of backgrounds. Building FL systems requires expertise in machine learning, distributed systems, software engineering, and operations, to name just a few. – Daniel J. Beutel

Single-Machine Simulation

What do we mean when we say simulate the Federated Learning process? Let us start with a quote from Wikipedia.

A simulation is an approximate imitation of the operation of a process or system that represents its operation over time. Simulation is used in many contexts, such as simulation of technology for performance tuning or optimizing. – Wikipedia

When we talk about a simulation with Flower we mean running an entire Federated Learning system (one server and multiple clients) on a single machine. Real-world Federated Learning systems (e.g., on mobile devices) have characteristics that are not present in simulation (e.g., connectivity issues), but well-crafted simulations are often a good starting point when developing a new system.

Overview

Our simulation will run one Federated Learning server and ten clients. We will write the code in such a way that we can increase the number of clients at a later point. All the code is available in the Flower repository in the examples directory.

Our simulation will start the server as well as all clients using sub-processes of our main process. The main process will block until each of these processes exits. For the purpose of the simulation we are also going to create our own partitioned dataset.

Dataset

Federated Learning solves the problem of learning a model over multiple datasets. For research purposes, we often use an existing dataset and artificially split it into multiple partitions. This simulation uses CIFAR-10 and partitions it into as many partitions as there are clients in our simulation. All the code related to this is in its own Python module named dataset.py.

To get started we import tensorflow, numpy, and flwr and define some types that are going to be used in the signatures of our functions.

from typing import List, Tuple, cast

import tensorflow as tf
import numpy as np

XY = Tuple[np.ndarray, np.ndarray]
XYList = List[XY]
PartitionedDataset = List[Tuple[XY, XY]]

PartitionedDataset will be our usual dataset which we normally would unpack as follows.

local_dataset = partitioned_dataset[0]
(x_train, y_train), (x_test, y_test) = local_dataset

Let's define a few helper methods now. The comments will explain what they do.

def shuffle(x: np.ndarray, y: np.ndarray) -> XY:
    """Shuffle x and y."""
    idx = np.random.permutation(len(x))
    return x[idx], y[idx]


def partition(x: np.ndarray, y: np.ndarray, num_partitions: int) -> XYList:
    """Split x and y into a number of partitions."""
    return list(zip(np.split(x, num_partitions), np.split(y, num_partitions)))


def create_partitions(
    source_dataset : XY,
    num_partitions: int,
) -> XYList:
    """Create partitioned version of a source dataset."""
    x, y = source_dataset
    x, y = shuffle(x, y)
    xy_partitions = partition(x, y, num_partitions)

    return xy_partitions

Now we just need to bring everything together in a final function which will return the list of smaller datasets derived from CIFAR-10.

def load(
    num_partitions: int,
) -> PartitionedDataset:
    """Create partitioned version of CIFAR-10."""
    xy_train, xy_test = tf.keras.datasets.cifar10.load_data()

    xy_train_partitions = create_partitions(xy_train, num_partitions)
    xy_test_partitions = create_partitions(xy_test, num_partitions)

    return list(zip(xy_train_partitions, xy_test_partitions))

You can find the full version here

We now have a simple function which will return a partitioned version of CIFAR-10. Each partition will only be passed to a single client which will perform its training exclusively on that partition.

Simulation

We are now going to create a fairly simple simulation based on our partitioned dataset. We again start by importing the necessary libraries:

from multiprocessing import Process
import os
from typing import Tuple

# Make TensorFlow log less verbose
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

import time
import tensorflow as tf
import numpy as np
import flwr as fl
from flwr.server.strategy import FedAvg

import dataset

# Used for type signatures
DATASET = Tuple[Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]]

In the next step we are going to define a function which allows us to start a Flower server with a customized FedAvg strategy. We customize two strategy parameters: the number of clients we expect to be connected before the training starts and the fraction of connected clients that are sampled during each round.

An Example: We want to sample 50 out of 100 clients. To do this we have to wait until 100 clients are connected, which we do by setting min_available_clients to 100. The additional fraction_fit=0.5 results in 50% of the clients being sampled for training.

Let's look at the actual code to start the server:

def start_server(num_rounds: int, num_clients: int, fraction_fit: float):
    """Start the server with a slightly adjusted FedAvg strategy."""
    strategy = FedAvg(min_available_clients=num_clients, fraction_fit=fraction_fit)
    # Exposes the server by default on port 8080
    fl.server.start_server(strategy=strategy, config={"num_rounds": num_rounds})

For more details into the start_server function checkout the API Reference.

Now it's time to define our client code. We are going to use a Keras model which will make the client code fairly simple.

def start_client(dataset: DATASET) -> None:
    """Start a single client with the provided dataset."""

    # Load and compile a Keras model for CIFAR-10
    model = tf.keras.applications.MobileNetV2((32, 32, 3), classes=10, weights=None)
    model.compile("adam", "sparse_categorical_crossentropy", metrics=["accuracy"])

    # Unpack the CIFAR-10 dataset partition
    (x_train, y_train), (x_test, y_test) = dataset

    # Define a Flower client
    class CifarClient(fl.client.NumPyClient):
        def get_parameters(self):
            """Return current weights."""
            return model.get_weights()

        def fit(self, parameters, config):
            """Fit model and return new weights as well as number of training examples."""
            model.set_weights(parameters)
            # Remove steps_per_epoch if you want to train over the full dataset
            # https://keras.io/api/models/model_training_apis/#fit-method
            model.fit(x_train, y_train, epochs=1, batch_size=32, steps_per_epoch=3)
            return model.get_weights(), len(x_train)

        def evaluate(self, parameters, config):
            """Evaluate using provided parameters."""
            model.set_weights(parameters)
            loss, accuracy = model.evaluate(x_test, y_test)
            return len(x_test), loss, accuracy

    # Start Flower client
    fl.client.start_numpy_client("0.0.0.0:8080", client=CifarClient())

Finally we can bring everything together. We are going to load the dataset and provide each partition to a single client exclusively. The server and each client will have its own process which is started in our main process.

def run_simulation(num_rounds: int, num_clients: int, fraction_fit: float):
    """Start a FL simulation."""

    # This will hold all the processes which we are going to create
    processes = []

    # Start the server
    server_process = Process(target=start_server, args=(num_rounds, num_clients, fraction_fit))
    server_process.start()
    processes.append(server_process)

    # Optionally block the script here for a second or two so the server has time to start
    time.sleep(2)

    # Load the dataset partitions
    partitions = dataset.load(num_partitions=num_clients)

    # Start all the clients
    for partition in partitions:
        client_process = Process(target=start_client, args=(partition,))
        client_process.start()
        processes.append(client_process)

    # Block until all processes are finished
    for p in processes:
        p.join()


if __name__ == "__main__":
    run_simulation(num_rounds=100, num_clients=10, fraction_fit=0.5)

Setup with and without Docker

In the example code in our repository you will also find a Dockerfile as well as a run.sh shell script. If you have Docker installed you can simply run the shell script. It will build the Docker container and start the simulation. Alternatively checkout the README.md which describes how to install the example project.

Summary

Although Federated Learning is hard, Flower can make it effortless to create a simulation which is already quite accurate and easily runs on a single machine. Feel free to experiment with different number of clients, rounds, fraction of clients which perform training on each round and many more hyperparameters. If questions should arise our growing community is always happy to support and can be found on our Slack Community.

If you like to improve this blog post or the example feel free to open up a pull request or reach out on Slack.