{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Understanding the **data pipeline** in ``armlet``\n", "\n", "This tutorial details the different steps of the **ARMLET** [data pipeline](https://sara-bouchenak.github.io/ARMLET/api/armlet.data.html#armlet.data.data_pipeline).\n", "\n", "In this pipeline, you can:\n", "- load the dataset from raw file(s);\n", "- load static data (splitted, cleaned, tensors) that were save during previous experiments;\n", "- save data at one or multiple specific steps of the data pipeline;\n", "- split data across clients;\n", "- split data into train, validation, and test sets;\n", "- clean data;\n", "- perform other preprocessing steps (normalization, encoding, etc.).\n", "\n", "## Prerequisites (environment configuration and installation)\n", "\n", "If you have not configured the environment and installed ``armlet`` yet, you can first:\n", "\n", "1. Install [conda](https://www.anaconda.com/docs/getting-started/main) for managing the environments and run the following commands:\n", "\n", "```bash\n", "conda create -n armlet python=3.13.5\n", "conda activate armlet\n", "```\n", "\n", "2. Install **``armlet``** using `pip`:\n", "\n", "```bash\n", "cd ARMLET_DIR\n", "pip install .\n", "```\n", "\n", "Then, fill the two following variables to detail the main paths and run the command." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "ARMLET_DIR = \"../../../../\"\n", "OUTPUT_DIR = os.path.join(ARMLET_DIR, \"outputs\", \"tutorial\", \"data_pipeline\")\n", "\n", "if not os.path.exists(OUTPUT_DIR):\n", " os.makedirs(OUTPUT_DIR)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prepare the configuration\n", "\n", "The data pipeline of **ARMLET** requires several configuration values.\n", "We define the dictionaries `cfg_paths`, `cfg_data`, `cfg_method`, and `cfg_protocol`, which are explained in details in the tutorial [Understanding the **FL mode** in `armlet`](https://sara-bouchenak.github.io/ARMLET/getting_started/tutorials/FL_mode.html)." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from omegaconf import OmegaConf\n", "from armlet.utils.configs import ArmletConfiguration\n", "\n", "cfg_paths = OmegaConf.create({\n", " \"data_dir\": os.path.join(ARMLET_DIR, \"datasets\"),\n", " \"log_dir\": os.path.join(ARMLET_DIR, \"logs\"),\n", " \"output_dir\": OUTPUT_DIR,\n", " \"root_dir\": ARMLET_DIR,\n", "})\n", "\n", "cfg_data = OmegaConf.create({\n", " \"cleaning\": {\n", " \"name\": \"default\",\n", " \"missing_values\": {\"_target_\": \"armlet.data.cleaning.missing_values.MissingValuesDataCleaningMethod\"},\n", " },\n", " \"dataset\": {\n", " \"_target_\": \"armlet.data.datasets.load_Adult_dataset\",\n", " \"dataset_name\": \"Adult\",\n", " \"path\": os.path.join(cfg_paths[\"data_dir\"], \"Adult\", \"raw_data\"),\n", " \"sensitive_attributes\": ['age', 'gender', 'race'],\n", " },\n", " \"distribution\": {\"_target_\": \"armlet.data.splitter.ArmletDataSplitter.iid\"},\n", " \"others\": {\n", " \"client_split\": 0.2,\n", " \"client_val_split\": 0.0,\n", " \"keep_test\": False,\n", " \"sampling_perc\": 1.0,\n", " \"server_split\": 0.0,\n", " \"server_test\": False,\n", " \"server_test_union\": True,\n", " \"server_val_split\": 0.0,\n", " \"uniform_test\": False,\n", " },\n", " \"processing\": {\n", " \"one_hot_encoding\": {\n", " \"_apply_directly_to_subdata_\": False,\n", " \"_target_\": \"armlet.data.processing.feature_encoding.one_hot_encoding_pipeline\",\n", " },\n", " \"conversion_to_num\": {\n", " \"_apply_directly_to_subdata_\": True,\n", " \"_target_\": \"armlet.data.processing.format_conversion.convert_bool_and_cat_to_num\",\n", " },\n", " \"normalization\": {\n", " \"_apply_directly_to_subdata_\": False,\n", " \"_target_\": \"armlet.data.processing.normalization.normalization_pipeline\",\n", " \"cols_to_exclude\": ['age', 'gender', 'race'],\n", " },\n", " \"conversion_to_tensors\": {\n", " \"_apply_directly_to_subdata_\": True,\n", " \"_target_\": \"armlet.data.processing.format_conversion.convert_dataframes_to_tensors\",\n", " \"sensitive_attributes\": ['age', 'gender', 'race'],\n", " },\n", " },\n", " \"seed\": 42,\n", "})\n", "\n", "cfg_method = OmegaConf.create({\n", " \"_target_\": \"armlet.FL_pipeline.FL_algorithms.ArmletCentralizedFL\",\n", " \"hyperparameters\": {\n", " \"client\": {\n", " \"batch_size\": 128,\n", " \"local_epochs\": 10,\n", " \"loss\": {\"_target_\": \"torch.nn.BCELoss\"},\n", " \"optimizer\": {\"lr\": 0.001, \"name\": \"SGD\", \"weight_decay\": 0.01},\n", " \"scheduler\": {\"gamma\": 1, \"name\": \"StepLR\", \"step_size\": 1},\n", " },\n", " \"model\": {\n", " \"_target_\": \"armlet.utils.net.LogRegression\",\n", " \"input_size\": None, # Automatically adjusted after data loading\n", " \"num_classes\": None, # Automatically adjusted after data loading\n", " },\n", " \"server\": {\n", " \"loss\": {\"_target_\": \"torch.nn.BCELoss\"},\n", " \"time_to_accuracy_target\": None,\n", " \"weighted\": True,\n", " },\n", " },\n", "})\n", "\n", "cfg_protocol = OmegaConf.create({\n", " \"eligible_perc\": 1.0,\n", " \"n_clients\": 4,\n", " \"n_rounds\": 2,\n", "})\n", "\n", "cfg = OmegaConf.create({\n", " \"data\": cfg_data,\n", " #\"eval\": cfg_eval,\n", " #\"exp\": cfg_exp,\n", " #\"logger\": cfg_logger,\n", " \"method\": cfg_method,\n", " \"paths\": cfg_paths,\n", " \"protocol\": cfg_protocol,\n", " #\"save\": {},\n", "})\n", "\n", "cfg = ArmletConfiguration(cfg)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data pipeline preparation\n", "\n", "In the beginning of the data pipeline, we fix the data seed and compute 3 boolean values that will be used at specific steps:\n", "- `is_static_loading` is used to load static data that has been saved during previous experiments;\n", "- `is_tensor_data` is used to dynamically load tensors instead of performing the first data processing steps;\n", "- `is_saving_mode` is used to save data at different steps of the pipeline." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import random\n", "\n", "np.random.seed(cfg.data.seed)\n", "random.seed(cfg.data.seed)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "is_static_loading = (\"loading\" in cfg.data.keys()) and (\"static\" in cfg.data.loading.keys()) and cfg.data.loading.static\n", "is_tensor_data = is_static_loading and \"tensors\" in cfg.data.loading.load_dir.split(\"/\")[-1]\n", "is_saving_mode = \"saving\" in cfg.data.keys()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data loading and partitionning\n", "\n", "Then, if `is_static_loading` is set to `False`, we perform the default data loading step.\n", "It instantiates and runs the dataset loading function specified in `cfg_data[\"dataset\"]`, and splits the data across clients by following the distribution specified in `cfg_data[\"distribution\"]`.\n", "When assigning data to clients, the server and the clients also split their data into training, validation, and test sets, according to the configuration values in `cfg_data[\"others\"]`.\n", "\n", "If `is_static_loading` is `True`, we load static data that has been saved during previous experiments from the folder specified in `cfg_data[\"loading\"]` and determine if the data is cleaned or not.\n", "\n", "Note that, in both cases, the output `splitted_data` is a dictionary that contains the server data (with the keys `server_val` and `server_test`) and the clients data (with the keys `clients_train`, `clients_val` and `clients_test`).\n", "At this step, all data are [DataFrames](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html), i.e., data structures provided by the [Pandas](https://pandas.pydata.org/docs/index.html) library that contains labeled axes (rows and columns).\n", "\n", "In this tutorial, we do not activate the static loading and perform default data loading.\n", "Thus, we load the Adult dataset from raw data and split it across 4 clients.\n", "Each client has a training set with 80% of its data and a test set with 20%.\n", "The server has a test set, which is the union of the clients test sets.\n", "\n", "To illustrate this tutorial, we print the training DataFrame of the first client (only the features since the label is separated in a second DataFrame).\n", "You can see the different features of the Adult dataset." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
ageworkclassfnlwgteducationeducation-nummarital-statusoccupationrelationshipracegendercapital-gaincapital-losshours-per-weeknative-country
0FalseNaN9055711th7Married-civ-spouseNaNHusbandTrueTrue008United-States
1TruePrivate36228HS-grad9DivorcedTransport-movingNot-in-familyTrueTrue0044United-States
2TruePrivate170651Bachelors13Married-civ-spouseExec-managerialWifeTrueFalse0197738United-States
3TruePrivate123430HS-grad9Married-civ-spouseFarming-fishingHusbandTrueTrue0065United-States
4FalsePrivate181655Assoc-voc11Married-civ-spouseAdm-clericalHusbandTrueTrue0237745United-States
\n", "
" ], "text/plain": [ " age workclass fnlwgt education education-num marital-status \\\n", "0 False NaN 90557 11th 7 Married-civ-spouse \n", "1 True Private 36228 HS-grad 9 Divorced \n", "2 True Private 170651 Bachelors 13 Married-civ-spouse \n", "3 True Private 123430 HS-grad 9 Married-civ-spouse \n", "4 False Private 181655 Assoc-voc 11 Married-civ-spouse \n", "\n", " occupation relationship race gender capital-gain capital-loss \\\n", "0 NaN Husband True True 0 0 \n", "1 Transport-moving Not-in-family True True 0 0 \n", "2 Exec-managerial Wife True False 0 1977 \n", "3 Farming-fishing Husband True True 0 0 \n", "4 Adm-clerical Husband True True 0 2377 \n", "\n", " hours-per-week native-country \n", "0 8 United-States \n", "1 44 United-States \n", "2 38 United-States \n", "3 65 United-States \n", "4 45 United-States " ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import hydra\n", "\n", "from armlet.data.loading import load_data_from_folder\n", "from armlet.data.splitter import ArmletDataSplitter\n", "\n", "if is_static_loading:\n", "\n", " splitted_data, is_data_cleaned = load_data_from_folder(cfg.to_dict()[\"data\"], cfg.protocol.n_clients)\n", "\n", "else:\n", "\n", " data = hydra.utils.instantiate(cfg.data.dataset.exclude(\"dataset_name\"))\n", "\n", " data_splitter = ArmletDataSplitter(\n", " data_dict=data,\n", " dist_cfg=cfg.data.distribution,\n", " **cfg.data.others,\n", " )\n", "\n", " splitted_data = data_splitter.assign(cfg.protocol.n_clients)\n", " is_data_cleaned = False\n", "\n", "splitted_data[\"clients_train\"][\"client_0\"][0].head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data cleaning\n", "\n", "After loading the data, we perform data cleaning.\n", "For this purpose, we instantiate and run each cleaning methods (e.g., for missing values, outliers, label errors) that we provide in `cfg_data[\"cleaning\"]`.\n", "A JSON file (`data_cleaning_metrics.json`) is then saved with the data cleaning metrics.\n", "Moreover, depending on the chosen settings in `cfg_data[\"saving\"]`, data can be saved before or after cleaning.\n", "\n", "Just as for `splitted_data`, `cleaned_data` is a dictionary that contains the server data and the clients data, but cleaned.\n", "\n", "In this tutorial, we only delete missing values without activating the saving mode.\n", "After printing the clean training data of the first client, we can see that the first line of the previous DataFrame was deleted as it contains NaN values." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
ageworkclassfnlwgteducationeducation-nummarital-statusoccupationrelationshipracegendercapital-gaincapital-losshours-per-weeknative-country
0TruePrivate36228HS-grad9DivorcedTransport-movingNot-in-familyTrueTrue0044United-States
1TruePrivate170651Bachelors13Married-civ-spouseExec-managerialWifeTrueFalse0197738United-States
2TruePrivate123430HS-grad9Married-civ-spouseFarming-fishingHusbandTrueTrue0065United-States
3FalsePrivate181655Assoc-voc11Married-civ-spouseAdm-clericalHusbandTrueTrue0237745United-States
4TrueLocal-gov66278Masters14Never-marriedProf-specialtyNot-in-familyTrueFalse0045United-States
\n", "
" ], "text/plain": [ " age workclass fnlwgt education education-num marital-status \\\n", "0 True Private 36228 HS-grad 9 Divorced \n", "1 True Private 170651 Bachelors 13 Married-civ-spouse \n", "2 True Private 123430 HS-grad 9 Married-civ-spouse \n", "3 False Private 181655 Assoc-voc 11 Married-civ-spouse \n", "4 True Local-gov 66278 Masters 14 Never-married \n", "\n", " occupation relationship race gender capital-gain capital-loss \\\n", "0 Transport-moving Not-in-family True True 0 0 \n", "1 Exec-managerial Wife True False 0 1977 \n", "2 Farming-fishing Husband True True 0 0 \n", "3 Adm-clerical Husband True True 0 2377 \n", "4 Prof-specialty Not-in-family True False 0 0 \n", "\n", " hours-per-week native-country \n", "0 44 United-States \n", "1 38 United-States \n", "2 65 United-States \n", "3 45 United-States \n", "4 45 United-States " ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import json\n", "\n", "from armlet.data.saving import save_data\n", "from armlet.data.cleaning import data_cleaning_pipeline\n", "\n", "if (\"cleaning\" in cfg.data.keys()) and not is_data_cleaned:\n", "\n", " if is_saving_mode and (\"save_data_before_cleaning\" in cfg.data.saving.keys()) and (cfg.data.saving.save_data_before_cleaning):\n", " save_data(splitted_data, cfg.to_dict()[\"data\"], mode=\"before_cleaning\")\n", "\n", " cleaned_data, data_cleaning_metrics = data_cleaning_pipeline(\n", " data=splitted_data,\n", " cfg_cleaning=cfg.data.cleaning,\n", " sensitive_attributes=cfg.data.dataset.sensitive_attributes,\n", " )\n", "\n", " data_cleaning_metrics_path = os.path.join(cfg.paths.output_dir, \"data_cleaning_metrics.json\")\n", " json.dump(data_cleaning_metrics, open(data_cleaning_metrics_path, \"w\"), indent=4)\n", "\n", " if is_saving_mode and (\"save_data_after_cleaning\" in cfg.data.saving.keys()) and (cfg.data.saving.save_data_after_cleaning):\n", " save_data(cleaned_data, cfg.to_dict()[\"data\"], mode=\"after_cleaning\", metrics=data_cleaning_metrics)\n", "\n", "else:\n", " cleaned_data = splitted_data\n", "\n", "cleaned_data[\"clients_train\"][\"client_0\"][0].head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data preprocessing\n", "\n", "The next step is to preprocess all data.\n", "In **ARMLET**, we specify all preprocessing steps in the configuration values, as they are specific to the data types and the use cases.\n", "\n", "In this tutorial, we:\n", "- perform one hot encoding for the categorical features of each DataFrame;\n", "- convert the boolean features and the features of type `category` to numeric (`category` is a special data type we use when loading the dataset to not convert the corresponding feature);\n", "- normalize all features using a standard scaler (except for features tagged as sensitive attributes);\n", "- and convert the dataframes to tensors.\n", "\n", "We can see in the notebook output that the DataFrame were converted to tensors.\n", "Note that the sensitive attributes (specified in the `cfg_data` config file) have been moved to the last columns before tensor conversion so that they can be easily identified during the training/evaluation." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "tensor([[-1.4487, -0.4446, -0.1475, ..., 1.0000, 1.0000, 1.0000],\n", " [-0.1786, 1.1289, -0.1475, ..., 1.0000, 0.0000, 1.0000],\n", " [-0.6247, -0.4446, -0.1475, ..., 1.0000, 1.0000, 1.0000],\n", " ...,\n", " [ 0.7367, 1.1289, -0.1475, ..., 1.0000, 0.0000, 1.0000],\n", " [ 0.1345, 0.3422, -0.1475, ..., 1.0000, 1.0000, 1.0000],\n", " [-0.4184, -1.2314, -0.1475, ..., 1.0000, 0.0000, 1.0000]])" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from armlet.data.processing import data_processing_pipeline\n", "\n", "tensor_data = data_processing_pipeline(\n", " data=cleaned_data,\n", " cfg_data=cfg.data,\n", ")\n", "\n", "tensor_data[\"clients_train\"][\"client_0\"][0]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Depending on the chosen settings in `cfg_data[\"saving\"]`, data can also be saved after conversion to tensors, which can be an interesting choice when dealing with images." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "if is_saving_mode and (\"save_data_after_conversion_to_tensors\" in cfg.data.saving.keys()) and (cfg.data.saving.save_data_after_conversion_to_tensors):\n", " save_data(tensor_data, cfg.to_dict()[\"data\"], mode=\"after_tensors\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Note**: In the beginning of this tutorial, we compute the `is_tensor_data` boolean based on the configuration values.\n", "If this value is set to `True`, all previous steps will be skipped and the tensors will be directly loaded from folder.\n", "\n", "```python\n", " tensor_data, _ = load_data_from_folder(cfg.to_dict()[\"data\"], cfg.protocol.n_clients)\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Conversion to Fluke data format\n", "\n", "After data preprocessing, the final step is to transform the actual data (that is a dictionary of tensors) into the data format required by the FL process of Fluke (i.e., [DataSplitter](https://makgyver.github.io/fluke/fluke.data.html#fluke.data.DataSplitter)).\n", "To do that, through the `convert_tensors_to_fluke_data_format()` function, we first convert the tensors to [FastDataLoader](https://makgyver.github.io/fluke/fluke.data.html#fluke.data.FastDataLoader), then transform the dictionary into a [DummyDataContainer](https://makgyver.github.io/fluke/fluke.data.html#fluke.data.DummyDataContainer), and finally pack it in a [DummyDataSplitter](https://sara-bouchenak.github.io/ARMLET/api/armlet.data.splitter.html#armlet.data.splitter.DummyDataSplitter), which is a simplified version of Fluke's DataSplitter." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "from fluke.data import DummyDataContainer\n", "\n", "from armlet.data.processing.format_conversion import convert_tensors_to_fast_data_loaders\n", "from armlet.data.splitter import DummyDataSplitter\n", "\n", "# The following lines are included in the convert_tensors_to_fluke_data_format() function\n", "\n", "num_classes = max([len(val[1].squeeze(1).unique()) for val in tensor_data[\"clients_train\"].values()])\n", "fast_data_loaders = convert_tensors_to_fast_data_loaders(tensor_data, cfg, num_classes)\n", "\n", "dummy_data_container = DummyDataContainer(\n", " fast_data_loaders[\"clients_train\"],\n", " fast_data_loaders[\"clients_test\"],\n", " fast_data_loaders[\"server_test\"],\n", " num_classes,\n", ")\n", "\n", "data_splitter = DummyDataSplitter(\n", " dataset=dummy_data_container,\n", " distribution=\"\",\n", " **cfg.data.others.exclude(\"client_val_split\", \"server_test_union\", \"server_val_split\"),\n", ")\n", "\n", "val_data = {k: v for k, v in fast_data_loaders.items() if k in [\"clients_val\", \"server_val\"]}" ] } ], "metadata": { "kernelspec": { "display_name": "armlet", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.5" } }, "nbformat": 4, "nbformat_minor": 2 }