Published

Federated Analytics with Flower and Pandas

Photo of Charles Beauville
Charles Beauville
Data Scientist at Adap

With Flower 1.2, you might have noticed that we have added a new example to the repo. If you were curious enough to check out the code, you might have been left a bit straddled… Indeed, this example, unlike the others, is for doing Federated Analytics instead of Federated Learning. In this post, we'll briefly go through what Federated Analytics consists of, what this example is about and, how it is built.

Special thanks to Ragy Haddad who made the original contribution to the repo! 🙌

A quick intro to Federated Analytics

The setting in which Federated Analytics is performed is analogous to the usual FL setting, here we will take the example of 100 clients, each having a subset of the data (not shared with anyone). Instead of training a model, our goal will be to compute statistics of the distributed dataset without allowing data sharing. For instance we might be interested in the mean of the distributed dataset, which can be calculated by asking each client the mean and the length of their dataset and then aggregated the results to obtain the global mean. This is the gist of Federated Analytics, to learn more you can check about this article by Google.

Fun fact: Federated Learning is actually a subset of Federated Analytics, with the Machine Learning model as the statistics being computed. It's just a more complex statistic than a simple mean…

The new Flower example

The new federated analytics example in Flower makes use of the popular data analysis library, Pandas, to analyze data from multiple sources. It is designed to be easy to use and understand, making it a great starting point for anyone looking to get started with federated analytics.

The example is built on top of the popular Iris dataset and shows how to use Pandas to analyze the data across multiple devices. The Iris dataset contains 150 samples of iris flowers, each with four features: sepal length, sepal width, petal length, and petal width. Here we will only focus on sepal length and sepal width. Our objective will be to compute the following statistics using the distributed data:

  • the frequency of each length of sepal
  • the frequency of each width of sepal

Of course, other statistics could be computed or added on, but we tried to keep this example as basic as possible.

Building the Pandas Example

We will now go through the main steps needed to build such an example.

The client

Before defining our custom flwr.client.NumPyClient, we will write the functions that will compute our statistics. These steps roughly correspond to the creation of the model in the usual federated learning setting.

def compute_hist(df, col_name):
    _, vals = np.histogram(df[col_name])
    return vals

Now, for the client itself, we only need to define one function, fit, which computes the histogram for each wanted feature and adds them to a list:

class FlowerClient(fl.client.NumPyClient):

    def fit(
        self, parameters: List[np.ndarray], config: Dict[str, str]
    ) -> Tuple[List[np.ndarray], int, Dict]:
        hist_list = []
        # Execute query locally
        for col_name in ["sepal length (cm)", "sepal width (cm)"]:
            hist = compute_hist(df, col_name)
            hist_list.append(hist)
        return (
            hist_list,
            len(df),
            {},
        )

We can then instantiate our client with:

fl.client.start_numpy_client(
    server_address="127.0.0.1:8080",
    client=FlowerClient(),
)

The server

For our server we will need to create a custom strategy. The only important steps here happen in aggregate_fit and evaluate.

In aggregate_fit, we first convert the parameters we received from our clients to ndarrays. Those arrays will each contain 2 arrays, one for the histogram of the length of the sepals and one for the histogram of the width of the sepals.

Then the real aggregation happens, we just sum the width histograms of all the clients together and do the same for the length histograms. We will then concatenate the 2 resulting arrays with strings to separate length and width. And finally we convert this array back to parameters and return it.

The evaluate function will just be responsible of writing the previously obtained parameters to the centralized_metrics dict, in order to display the results at the end of the run.

Here is the complete strategy:

class FedAnalytics(Strategy):
    def __init__(
        self, compute_fns: List[Callable] = None, col_names: List[str] = None
    ) -> None:
        super().__init__()

    def initialize_parameters(
        self, client_manager: Optional[ClientManager] = None
    ) -> Optional[Parameters]:
        return None

    def configure_fit(
        self, server_round: int, parameters: Parameters, client_manager: ClientManager
    ) -> List[Tuple[ClientProxy, FitIns]]:
        config = {}
        fit_ins = FitIns(parameters, config)
        clients = client_manager.sample(num_clients=2, min_num_clients=2)
        return [(client, fit_ins) for client in clients]

    def aggregate_fit(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, FitRes]],
        failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
    ) -> Tuple[Optional[Parameters], Dict[str, Scalar]]:
        # Get results from fit
        # Convert results
        values_aggregated = [
            (parameters_to_ndarrays(fit_res.parameters)) for _, fit_res in results
        ]
        length_agg_hist = 0
        width_agg_hist = 0
        for val in values_aggregated:
            length_agg_hist += val[0]
            width_agg_hist += val[1]

        ndarr = np.concatenate((["Length:"], length_agg_hist, ["Width:"], width_agg_hist))
        return ndarrays_to_parameters(ndarr), {}

    def evaluate(
        self, server_round: int, parameters: Parameters
    ) -> Optional[Tuple[float, Dict[str, Scalar]]]:
        agg_hist = [arr.item() for arr in parameters_to_ndarrays(parameters)]
        return 0, {"Aggregated histograms": agg_hist}

    def configure_evaluate(
        self, server_round: int, parameters: Parameters, client_manager: ClientManager
    ) -> List[Tuple[ClientProxy, EvaluateIns]]:
        pass

    def aggregate_evaluate(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, EvaluateRes]],
        failures: List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]],
    ) -> Tuple[Optional[float], Dict[str, Scalar]]:
        pass

We can then instantiate the Flower server with:

fl.server.start_server(
    server_address="0.0.0.0:8080",
    config=fl.server.ServerConfig(num_rounds=1),
    strategy=FedAnalytics(),
)

Note that only one round is necessary as there is no training involved.

Actually running the example

You first need to download the data and put it in the right folder. This can be done using the following two commands:

$ mkdir -p ./data
$ python -c "from sklearn.datasets import load_iris; load_iris(as_frame=True)['data'].to_csv('./data/client.csv')"

Then you can start the server using:

$ python server.py

Next, you can open a new terminal and start the first client:

$ python client.py

And finally, you can open another new terminal and start the second client:

$ python client.py

And you should see something similar to this:

Starting server
INFO flwr 2023-01-20 19:32:34,503 | app.py:135 | Starting Flower server, config: ServerConfig(num_rounds=1, round_timeout=None)
INFO flwr 2023-01-20 19:32:34,511 | app.py:148 | Flower ECE: gRPC server running (1 rounds), SSL is disabled
INFO flwr 2023-01-20 19:32:34,511 | server.py:86 | Initializing global parameters
INFO flwr 2023-01-20 19:32:34,511 | server.py:270 | Requesting initial parameters from one random client
Starting client 0
Starting client 1
INFO flwr 2023-01-20 19:32:38,825 | grpc.py:50 | Opened insecure gRPC connection (no certificates were passed)
DEBUG flwr 2023-01-20 19:32:38,826 | connection.py:38 | ChannelConnectivity.IDLE
DEBUG flwr 2023-01-20 19:32:38,827 | connection.py:38 | ChannelConnectivity.CONNECTING
DEBUG flwr 2023-01-20 19:32:38,827 | connection.py:38 | ChannelConnectivity.READY
INFO flwr 2023-01-20 19:32:38,830 | server.py:274 | Received initial parameters from one random client
INFO flwr 2023-01-20 19:32:38,830 | server.py:88 | Evaluating initial parameters
INFO flwr 2023-01-20 19:32:38,830 | server.py:91 | initial parameters (loss, other metrics): 0, {'Aggregated histograms': []}
INFO flwr 2023-01-20 19:32:38,830 | server.py:101 | FL starting
INFO flwr 2023-01-20 19:32:38,843 | grpc.py:50 | Opened insecure gRPC connection (no certificates were passed)
DEBUG flwr 2023-01-20 19:32:38,844 | connection.py:38 | ChannelConnectivity.IDLE
DEBUG flwr 2023-01-20 19:32:38,846 | connection.py:38 | ChannelConnectivity.CONNECTING
DEBUG flwr 2023-01-20 19:32:38,846 | connection.py:38 | ChannelConnectivity.READY
DEBUG flwr 2023-01-20 19:32:38,847 | server.py:215 | fit_round 1: strategy sampled 2 clients (out of 2)
DEBUG flwr 2023-01-20 19:32:38,852 | server.py:229 | fit_round 1 received 2 results and 0 failures
INFO flwr 2023-01-20 19:32:38,860 | server.py:116 | fit progress: (1, 0, {'Aggregated histograms': ['Length:', '18', '46', '28', '54', '32', '52', '36', '12', '10', '12', 'Width:', '8', '14', '44', '48', '74', '62', '20', '22', '4', '4']}, 0.03036291105672717)
INFO flwr 2023-01-20 19:32:38,860 | server.py:163 | evaluate_round 1: no clients selected, cancel
INFO flwr 2023-01-20 19:32:38,861 | server.py:144 | FL finished in 0.03067204204853624
INFO flwr 2023-01-20 19:32:38,861 | app.py:198 | app_fit: losses_distributed []
INFO flwr 2023-01-20 19:32:38,861 | app.py:199 | app_fit: metrics_distributed {}
INFO flwr 2023-01-20 19:32:38,861 | app.py:200 | app_fit: losses_centralized [(0, 0), (1, 0)]
INFO flwr 2023-01-20 19:32:38,861 | app.py:201 | app_fit: metrics_centralized {'Aggregated histograms': [(0, []), (1, ['Length:', '18', '46', '28', '54', '32', '52', '36', '12', '10', '12', 'Width:', '8', '14', '44', '48', '74', '62', '20', '22', '4', '4'])]}
DEBUG flwr 2023-01-20 19:32:38,867 | connection.py:109 | gRPC channel closed
DEBUG flwr 2023-01-20 19:32:38,867 | connection.py:109 | gRPC channel closed
INFO flwr 2023-01-20 19:32:38,867 | app.py:152 | Disconnect and shut down
INFO flwr 2023-01-20 19:32:38,867 | app.py:152 | Disconnect and shut down

Were you can see that our desired results are the sepal length histogram: [18, 46, 28, 54, 32, 52, 36, 12, 10, 12] and the sepal width histogram: [8, 14, 44, 48, 74, 62, 20, 22, 4, 4].

Please note that this is an oversimplified example, it serves more as a proof of concept rather than a practical example. If you went through the code you might have even noticed that the setting used is not at all a realistic distributed setting, as each client hold the same data!!

If you feel like adding onto this example or writing your own example, be sure to check out our guide for contributing to Flower!