Understanding the data pipeline in armlet#

This tutorial details the different steps of the ARMLET data pipeline.

In this pipeline, you can:

  • load the dataset from raw file(s);

  • load static data (splitted, cleaned, tensors) that were save during previous experiments;

  • save data at one or multiple specific steps of the data pipeline;

  • split data across clients;

  • split data into train, validation, and test sets;

  • clean data;

  • perform other preprocessing steps (normalization, encoding, etc.).

Prerequisites (environment configuration and installation)#

If you have not configured the environment and installed armlet yet, you can first:

  1. Install conda for managing the environments and run the following commands:

conda create -n armlet python=3.13.5
conda activate armlet
  1. Install armlet using pip:

cd ARMLET_DIR
pip install .

Then, fill the two following variables to detail the main paths and run the command.

import os

ARMLET_DIR = "../../../../"
OUTPUT_DIR = os.path.join(ARMLET_DIR, "outputs", "tutorial", "data_pipeline")

if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

Prepare the configuration#

The data pipeline of ARMLET requires several configuration values. We define the dictionaries cfg_paths, cfg_data, cfg_method, and cfg_protocol, which are explained in details in the tutorial Understanding the FL mode in armlet.

from omegaconf import OmegaConf
from armlet.utils.configs import ArmletConfiguration

cfg_paths = OmegaConf.create({
  "data_dir": os.path.join(ARMLET_DIR, "datasets"),
  "log_dir": os.path.join(ARMLET_DIR, "logs"),
  "output_dir": OUTPUT_DIR,
  "root_dir": ARMLET_DIR,
})

cfg_data = OmegaConf.create({
    "cleaning": {
        "name": "default",
        "missing_values": {"_target_": "armlet.data.cleaning.missing_values.MissingValuesDataCleaningMethod"},
    },
    "dataset": {
        "_target_": "armlet.data.datasets.load_Adult_dataset",
        "dataset_name": "Adult",
        "path": os.path.join(cfg_paths["data_dir"], "Adult", "raw_data"),
        "sensitive_attributes": ['age', 'gender', 'race'],
    },
    "distribution": {"_target_": "armlet.data.splitter.ArmletDataSplitter.iid"},
    "others": {
        "client_split": 0.2,
        "client_val_split": 0.0,
        "keep_test": False,
        "sampling_perc": 1.0,
        "server_split": 0.0,
        "server_test": False,
        "server_test_union": True,
        "server_val_split": 0.0,
        "uniform_test": False,
    },
    "processing": {
        "one_hot_encoding": {
            "_apply_directly_to_subdata_": False,
            "_target_": "armlet.data.processing.feature_encoding.one_hot_encoding_pipeline",
        },
        "conversion_to_num": {
            "_apply_directly_to_subdata_": True,
            "_target_": "armlet.data.processing.format_conversion.convert_bool_and_cat_to_num",
        },
        "normalization": {
        "_apply_directly_to_subdata_": False,
            "_target_": "armlet.data.processing.normalization.normalization_pipeline",
            "cols_to_exclude": ['age', 'gender', 'race'],
        },
        "conversion_to_tensors": {
            "_apply_directly_to_subdata_": True,
            "_target_": "armlet.data.processing.format_conversion.convert_dataframes_to_tensors",
            "sensitive_attributes": ['age', 'gender', 'race'],
        },
    },
    "seed": 42,
})

cfg_method = OmegaConf.create({
    "_target_": "armlet.FL_pipeline.FL_algorithms.ArmletCentralizedFL",
    "hyperparameters": {
        "client": {
          "batch_size": 128,
          "local_epochs": 10,
          "loss": {"_target_": "torch.nn.BCELoss"},
          "optimizer": {"lr": 0.001, "name": "SGD", "weight_decay": 0.01},
          "scheduler": {"gamma": 1, "name": "StepLR", "step_size": 1},
        },
        "model": {
          "_target_": "armlet.utils.net.LogRegression",
          "input_size": None, # Automatically adjusted after data loading
          "num_classes": None, # Automatically adjusted after data loading
        },
        "server": {
          "loss": {"_target_": "torch.nn.BCELoss"},
          "time_to_accuracy_target": None,
          "weighted": True,
        },
    },
})

cfg_protocol = OmegaConf.create({
    "eligible_perc": 1.0,
    "n_clients": 4,
    "n_rounds": 2,
})

cfg = OmegaConf.create({
    "data": cfg_data,
    #"eval": cfg_eval,
    #"exp": cfg_exp,
    #"logger": cfg_logger,
    "method": cfg_method,
    "paths": cfg_paths,
    "protocol": cfg_protocol,
    #"save": {},
})

cfg = ArmletConfiguration(cfg)

Data pipeline preparation#

In the beginning of the data pipeline, we fix the data seed and compute 3 boolean values that will be used at specific steps:

  • is_static_loading is used to load static data that has been saved during previous experiments;

  • is_tensor_data is used to dynamically load tensors instead of performing the first data processing steps;

  • is_saving_mode is used to save data at different steps of the pipeline.

import numpy as np
import random

np.random.seed(cfg.data.seed)
random.seed(cfg.data.seed)
is_static_loading = ("loading" in cfg.data.keys()) and ("static" in cfg.data.loading.keys()) and cfg.data.loading.static
is_tensor_data = is_static_loading and "tensors" in cfg.data.loading.load_dir.split("/")[-1]
is_saving_mode = "saving" in cfg.data.keys()

Data loading and partitionning#

Then, if is_static_loading is set to False, we perform the default data loading step. It instantiates and runs the dataset loading function specified in cfg_data["dataset"], and splits the data across clients by following the distribution specified in cfg_data["distribution"]. When assigning data to clients, the server and the clients also split their data into training, validation, and test sets, according to the configuration values in cfg_data["others"].

If is_static_loading is True, we load static data that has been saved during previous experiments from the folder specified in cfg_data["loading"] and determine if the data is cleaned or not.

Note that, in both cases, the output splitted_data is a dictionary that contains the server data (with the keys server_val and server_test) and the clients data (with the keys clients_train, clients_val and clients_test). At this step, all data are DataFrames, i.e., data structures provided by the Pandas library that contains labeled axes (rows and columns).

In this tutorial, we do not activate the static loading and perform default data loading. Thus, we load the Adult dataset from raw data and split it across 4 clients. Each client has a training set with 80% of its data and a test set with 20%. The server has a test set, which is the union of the clients test sets.

To illustrate this tutorial, we print the training DataFrame of the first client (only the features since the label is separated in a second DataFrame). You can see the different features of the Adult dataset.

import hydra

from armlet.data.loading import load_data_from_folder
from armlet.data.splitter import ArmletDataSplitter

if is_static_loading:

    splitted_data, is_data_cleaned = load_data_from_folder(cfg.to_dict()["data"], cfg.protocol.n_clients)

else:

    data = hydra.utils.instantiate(cfg.data.dataset.exclude("dataset_name"))

    data_splitter = ArmletDataSplitter(
        data_dict=data,
        dist_cfg=cfg.data.distribution,
        **cfg.data.others,
    )

    splitted_data = data_splitter.assign(cfg.protocol.n_clients)
    is_data_cleaned = False

splitted_data["clients_train"]["client_0"][0].head()
age workclass fnlwgt education education-num marital-status occupation relationship race gender capital-gain capital-loss hours-per-week native-country
0 False NaN 90557 11th 7 Married-civ-spouse NaN Husband True True 0 0 8 United-States
1 True Private 36228 HS-grad 9 Divorced Transport-moving Not-in-family True True 0 0 44 United-States
2 True Private 170651 Bachelors 13 Married-civ-spouse Exec-managerial Wife True False 0 1977 38 United-States
3 True Private 123430 HS-grad 9 Married-civ-spouse Farming-fishing Husband True True 0 0 65 United-States
4 False Private 181655 Assoc-voc 11 Married-civ-spouse Adm-clerical Husband True True 0 2377 45 United-States

Data cleaning#

After loading the data, we perform data cleaning. For this purpose, we instantiate and run each cleaning methods (e.g., for missing values, outliers, label errors) that we provide in cfg_data["cleaning"]. A JSON file (data_cleaning_metrics.json) is then saved with the data cleaning metrics. Moreover, depending on the chosen settings in cfg_data["saving"], data can be saved before or after cleaning.

Just as for splitted_data, cleaned_data is a dictionary that contains the server data and the clients data, but cleaned.

In this tutorial, we only delete missing values without activating the saving mode. After printing the clean training data of the first client, we can see that the first line of the previous DataFrame was deleted as it contains NaN values.

import json

from armlet.data.saving import save_data
from armlet.data.cleaning import data_cleaning_pipeline

if ("cleaning" in cfg.data.keys()) and not is_data_cleaned:

    if is_saving_mode and ("save_data_before_cleaning" in cfg.data.saving.keys()) and (cfg.data.saving.save_data_before_cleaning):
        save_data(splitted_data, cfg.to_dict()["data"], mode="before_cleaning")

    cleaned_data, data_cleaning_metrics = data_cleaning_pipeline(
        data=splitted_data,
        cfg_cleaning=cfg.data.cleaning,
        sensitive_attributes=cfg.data.dataset.sensitive_attributes,
    )

    data_cleaning_metrics_path = os.path.join(cfg.paths.output_dir, "data_cleaning_metrics.json")
    json.dump(data_cleaning_metrics, open(data_cleaning_metrics_path, "w"), indent=4)

    if is_saving_mode and ("save_data_after_cleaning" in cfg.data.saving.keys()) and (cfg.data.saving.save_data_after_cleaning):
        save_data(cleaned_data, cfg.to_dict()["data"], mode="after_cleaning", metrics=data_cleaning_metrics)

else:
    cleaned_data = splitted_data

cleaned_data["clients_train"]["client_0"][0].head()
age workclass fnlwgt education education-num marital-status occupation relationship race gender capital-gain capital-loss hours-per-week native-country
0 True Private 36228 HS-grad 9 Divorced Transport-moving Not-in-family True True 0 0 44 United-States
1 True Private 170651 Bachelors 13 Married-civ-spouse Exec-managerial Wife True False 0 1977 38 United-States
2 True Private 123430 HS-grad 9 Married-civ-spouse Farming-fishing Husband True True 0 0 65 United-States
3 False Private 181655 Assoc-voc 11 Married-civ-spouse Adm-clerical Husband True True 0 2377 45 United-States
4 True Local-gov 66278 Masters 14 Never-married Prof-specialty Not-in-family True False 0 0 45 United-States

Data preprocessing#

The next step is to preprocess all data. In ARMLET, we specify all preprocessing steps in the configuration values, as they are specific to the data types and the use cases.

In this tutorial, we:

  • perform one hot encoding for the categorical features of each DataFrame;

  • convert the boolean features and the features of type category to numeric (category is a special data type we use when loading the dataset to not convert the corresponding feature);

  • normalize all features using a standard scaler (except for features tagged as sensitive attributes);

  • and convert the dataframes to tensors.

We can see in the notebook output that the DataFrame were converted to tensors. Note that the sensitive attributes (specified in the cfg_data config file) have been moved to the last columns before tensor conversion so that they can be easily identified during the training/evaluation.

from armlet.data.processing import data_processing_pipeline

tensor_data = data_processing_pipeline(
    data=cleaned_data,
    cfg_data=cfg.data,
)

tensor_data["clients_train"]["client_0"][0]
tensor([[-1.4487, -0.4446, -0.1475,  ...,  1.0000,  1.0000,  1.0000],
        [-0.1786,  1.1289, -0.1475,  ...,  1.0000,  0.0000,  1.0000],
        [-0.6247, -0.4446, -0.1475,  ...,  1.0000,  1.0000,  1.0000],
        ...,
        [ 0.7367,  1.1289, -0.1475,  ...,  1.0000,  0.0000,  1.0000],
        [ 0.1345,  0.3422, -0.1475,  ...,  1.0000,  1.0000,  1.0000],
        [-0.4184, -1.2314, -0.1475,  ...,  1.0000,  0.0000,  1.0000]])

Depending on the chosen settings in cfg_data["saving"], data can also be saved after conversion to tensors, which can be an interesting choice when dealing with images.

if is_saving_mode and ("save_data_after_conversion_to_tensors" in cfg.data.saving.keys()) and (cfg.data.saving.save_data_after_conversion_to_tensors):
    save_data(tensor_data, cfg.to_dict()["data"], mode="after_tensors")

Note: In the beginning of this tutorial, we compute the is_tensor_data boolean based on the configuration values. If this value is set to True, all previous steps will be skipped and the tensors will be directly loaded from folder.

    tensor_data, _ = load_data_from_folder(cfg.to_dict()["data"], cfg.protocol.n_clients)

Conversion to Fluke data format#

After data preprocessing, the final step is to transform the actual data (that is a dictionary of tensors) into the data format required by the FL process of Fluke (i.e., DataSplitter). To do that, through the convert_tensors_to_fluke_data_format() function, we first convert the tensors to FastDataLoader, then transform the dictionary into a DummyDataContainer, and finally pack it in a DummyDataSplitter, which is a simplified version of Fluke’s DataSplitter.

from fluke.data import DummyDataContainer

from armlet.data.processing.format_conversion import convert_tensors_to_fast_data_loaders
from armlet.data.splitter import DummyDataSplitter

# The following lines are included in the convert_tensors_to_fluke_data_format() function

num_classes = max([len(val[1].squeeze(1).unique()) for val in tensor_data["clients_train"].values()])
fast_data_loaders = convert_tensors_to_fast_data_loaders(tensor_data, cfg, num_classes)

dummy_data_container = DummyDataContainer(
    fast_data_loaders["clients_train"],
    fast_data_loaders["clients_test"],
    fast_data_loaders["server_test"],
    num_classes,
)

data_splitter = DummyDataSplitter(
    dataset=dummy_data_container,
    distribution="",
    **cfg.data.others.exclude("client_val_split", "server_test_union", "server_val_split"),
)

val_data = {k: v for k, v in fast_data_loaders.items() if k in ["clients_val", "server_val"]}