armlet.FL_pipeline.data_selection.FL_algorithms package#

Module contents#

class armlet.FL_pipeline.data_selection.FL_algorithms.DataSelectionCentralizedFL(n_clients: int, data_splitter: DataSplitter, hyperparameters: DDict, val_data: dict, **kwargs)#

Bases: ArmletCentralizedFL

get_client_class() type[Client]#

Get the client class. This method should be overridden by the subclasses when a different client class is defined. This allows to reuse all the logic of the algorithm and only change the client class.

Returns:

Client class.

Return type:

type[Client]

get_server_class() type[Server]#

Get the server class. This method should be overridden by the subclasses when a different server class is defined. This allows to reuse all the logic of the algorithm and only change the server class.

Returns:

Server class.

Return type:

type[Server]

class armlet.FL_pipeline.data_selection.FL_algorithms.DataSelectionClient(index: int, train_set: FastDataLoader, test_set: FastDataLoader, val_set: FastDataLoader, optimizer_cfg: OptimizerConfigurator, loss_fn: Module, data_selection: DDict, local_epochs: int, fine_tuning_epochs: int = 0, clipping: float = 0, **kwargs)#

Bases: ArmletClient

fit(override_local_epochs: int = 0) float#

Client’s local training procedure.

Parameters:

override_local_epochs (int, optional) – Overrides the number of local epochs, by default 0 (use the default number of local epochs as in hyper_params.local_epochs).

Returns:

The average loss of the model during the training.

Return type:

float

class armlet.FL_pipeline.data_selection.FL_algorithms.DataSelectionServer(model: Module, test_set: FastDataLoader | None, val_set: FastDataLoader | None, clients: Sequence[Client], data_selection: DDict, weighted: bool = False, lr: float = 1, **kwargs)#

Bases: ArmletServer

aggregate(eligible: Sequence[Client], client_models: Iterable[Module]) None#

Aggregate the models of the clients. The aggregation is done by averaging the models of the clients. If the hyperparameter weighted is True, the clients are weighted by their number of samples. The method directly updates the model of the server. Formally, let \(\theta\) be the model of the server, \(\theta_i\) the model of client \(i\), and \(w_i\) the weight of client \(i\) such that \(\sum_{i=1}^{N} w_i = 1\). The aggregation is done as follows [FedAVG]:

\[\theta = \sum_{i=1}^{N} w_i \theta_i\]

Note

In case of networks with batch normalization layers, the running statistics of the batch normalization layers are also aggregated. For all statistics but num_batches_tracked are aggregated the mean is computed, while for the num_batches_tracked parameter, the maximum between the clients’ values is taken.

See also

fluke.utils.model.aggregate_models()

Parameters:
  • eligible (Sequence[Client]) – The clients that will participate in the aggregation.

  • client_models (Iterable[torch.nn.Module]) – The models of the clients.

References

[FedAVG]

H. B. McMahan, E. Moore, D. Ramage, S. Hampson, and B. A. y Arcas, “Communication-Efficient Learning of Deep Networks from Decentralized Data”. In AISTATS (2017).

broadcast_model(eligible: Sequence[Client]) None#

Broadcast the global model to the clients.

Parameters:

eligible (Sequence[Client]) – The clients that will receive the global model.

fit(n_rounds: int = 10, eligible_perc: float = 0.1, finalize: bool = True, **kwargs) None#

Run the federated learning algorithm. The default behaviour of this method is to run the Federated Averaging algorithm. The server selects a percentage of the clients to participate in each round, sends the global model to the clients, which compute the local updates and send them back to the server. The server aggregates the models of the clients and repeats the process for a number of rounds.

Parameters:
  • n_rounds (int, optional) – The number of rounds to run. Defaults to 10.

  • eligible_perc (float, optional) – The percentage of clients that will be selected for each round. Defaults to 0.1.

  • finalize (bool, optional) – If True, the server will finalize the federated learning process. Defaults to True.

  • **kwargs – Additional keyword arguments.

get_eligible_clients(eligible_perc: float) Sequence[Client]#

Get the clients that will participate in the current round. Clients are selected randomly based on the percentage of eligible clients.

Parameters:

eligible_perc (float) – The percentage of clients that will be selected.

Returns:

The clients that will participate in the current round.

Return type:

Sequence[Client]

receive_client_models(eligible: Sequence[Client], state_dict: bool = True) Generator[Module, None, None]#

Retrieve the models of the clients. This method assumes that the clients have already sent their models to the server. The models are received through the channel in the same order as the clients in eligible.

Caution

The method returns a generator of the models of the clients to avoid to clutter the memory with all the models. This means that this method is expected to be called only once per round. If the models are needed multiple times, the generator should be converted to a list, tuple, or any other iterable.

Parameters:
  • eligible (Sequence[Client]) – The clients that will participate in the aggregation.

  • state_dict (bool, optional) – If True, the method returns the state_dict of the models. Otherwise, it returns the models. Defaults to True.

Returns:

The models of the clients.

Return type:

Generator[torch.nn.Module]