Skip to main content
Niveau : Expert

Chapitre 43 : Le Parallélisme avec multiprocessing

Objectif

Ce module a pour but de vous apprendre à utiliser le module multiprocessing pour exécuter du code en parallèle et ainsi contourner les limitations du GIL (Global Interpreter Lock) pour les tâches gourmandes en CPU. Vous découvrirez comment créer et gérer des processus, et comment utiliser des Pool de workers pour paralléliser des tâches simplement.

1. threading vs multiprocessing : Le Rappel

  • threading : Concurrence. Utile pour les tâches I/O-bound. Plusieurs threads s'exécutent dans le même processus, partagent la même mémoire, mais sont limités par le GIL (un seul thread exécute du code Python à la fois).
  • multiprocessing : Parallélisme. La solution pour les tâches CPU-bound. Plusieurs processus sont créés. Chaque processus a son propre interpréteur Python, sa propre mémoire et son propre GIL. Ils peuvent donc s'exécuter en véritable parallèle sur plusieurs cœurs de CPU.

Inconvénient : La communication entre processus est plus complexe et plus lente que la communication entre threads, car les données doivent être sérialisées (avec pickle) pour passer d'un processus à l'autre.

2. La Classe Process

La classe multiprocessing.Process est l'équivalent de threading.Thread. Elle permet de lancer une fonction dans un nouveau processus.

import multiprocessing
import time
import os

def info_processus(titre):
print(titre)
print('Nom du module:', __name__)
# os.getppid() : ID du processus parent
print('ID du processus parent:', os.getppid())
# os.getpid() : ID du processus courant
print('ID du processus:', os.getpid())
print("-" * 20)

def worker_function(nom):
info_processus(f'Fonction worker {nom}')
print(f'Worker {nom}: Démarrage')
time.sleep(2)
print(f'Worker {nom}: Fin')

if __name__ == '__main__':
info_processus('Fonction principale')

# Crée un objet Process
p = multiprocessing.Process(target=worker_function, args=('P1',))

# Démarre le processus enfant
p.start()

print("Processus principal attend la fin du worker...")
# Attend que le processus enfant se termine
p.join()

print("Processus worker terminé.")

L'importance de if __name__ == '__main__':

Lorsque vous créez un nouveau processus, celui-ci importe votre script. Si le code de création du processus n'était pas protégé par ce bloc, chaque processus enfant essaierait de créer ses propres processus enfants, menant à une boucle infinie de création de processus.

Ce bloc garantit que le code de création des processus n'est exécuté que par le script principal, et non par les processus enfants importés. C'est obligatoire sur Windows et macOS (avec le mode "spawn"), et une très bonne pratique sur Linux.

3. Les Pool de Workers

Gérer manuellement des dizaines de processus peut être fastidieux. Le multiprocessing.Pool est une abstraction de haut niveau qui simplifie grandement la parallélisation de tâches.

Un Pool gère un ensemble de processus "workers". Vous lui soumettez des tâches, et il les distribue automatiquement aux workers disponibles.

import multiprocessing
import time

def carre(x):
"""Calcule le carré d'un nombre après une petite pause."""
time.sleep(0.1)
return x * x

if __name__ == '__main__':
nombres = range(10)

start_time = time.time()

# Crée un pool avec un nombre de workers.
# Si aucun argument n'est donné, il utilise os.cpu_count().
with multiprocessing.Pool(processes=4) as pool:
# pool.map applique la fonction 'carre' à chaque élément de 'nombres'.
# C'est une opération bloquante : elle attend que tous les résultats soient prêts.
resultats = pool.map(carre, nombres)

end_time = time.time()

print(f"Résultats : {resultats}")
print(f"Temps écoulé : {end_time - start_time:.2f} secondes")

Analyse de l'exécution :

  • Le Pool a 4 workers.
  • Il distribue les 10 tâches (carre(0), carre(1), ..., carre(9)) aux 4 workers.
  • Chaque tâche prend 0.1s.
  • Les 4 premiers nombres sont traités en parallèle (temps ~0.1s).
  • Les 4 suivants sont traités en parallèle (temps total ~0.2s).
  • Les 2 derniers sont traités en parallèle (temps total ~0.3s).
  • Le temps total sera donc d'environ 0.3s, bien plus rapide que les 1.0s d'une exécution séquentielle.

Méthodes utiles du Pool

  • pool.map(func, iterable) : Bloquant. Applique func à chaque élément de iterable. Retourne une liste de résultats dans le même ordre.
  • pool.starmap(func, iterable_de_tuples) : Similaire à map, mais pour les fonctions qui prennent plusieurs arguments. Chaque élément de l'itérable est un tuple d'arguments.
  • pool.apply(func, args) : Bloquant. Appelle func avec les arguments args. Utile pour une seule tâche.
  • pool.apply_async(func, args) : Non-bloquant. Soumet la tâche et retourne immédiatement un objet AsyncResult. On peut récupérer le résultat plus tard avec result.get().
  • pool.map_async(func, iterable) : Version non-bloquante de map.

Exemple avec apply_async

apply_async est utile quand on veut lancer des tâches en arrière-plan et faire autre chose en attendant.

import multiprocessing
import time

def worker(duree):
print(f"Début d'une tâche de {duree}s")
time.sleep(duree)
return f"Tâche de {duree}s terminée"

if __name__ == '__main__':
with multiprocessing.Pool(processes=2) as pool:
# Soumet deux tâches. L'appel est non-bloquant.
result1 = pool.apply_async(worker, (2,))
result2 = pool.apply_async(worker, (3,))

print("Le programme principal continue pendant que les workers travaillent...")

# .get() est bloquant. Il attend que le résultat soit disponible.
print(result1.get())
print(result2.get())

print("Toutes les tâches sont finies.")

4. Communication entre Processus

Puisque les processus ne partagent pas la mémoire, ils ont besoin de mécanismes spécifiques pour communiquer.

  • multiprocessing.Queue : Une file d'attente thread-safe et process-safe. Les objets qui y sont placés sont sérialisés (pickled).
  • multiprocessing.Pipe : Crée une paire de connexions bidirectionnelles. Plus rapide qu'une Queue mais ne peut être utilisé qu'entre deux processus.
  • Mémoire partagée (Value, Array) : Pour partager des données simples (comme un nombre ou un tableau de nombres) sans avoir à les sérialiser. L'accès doit être protégé par des verrous (Lock).

Exemple avec Queue

import multiprocessing

def producteur(q):
print("Le producteur envoie des données...")
for i in range(5):
q.put(f"item {i}")
q.put(None) # Signal de fin

def consommateur(q):
print("Le consommateur attend des données...")
while True:
item = q.get()
if item is None: # Fin du travail
break
print(f"Consommé : {item}")

if __name__ == '__main__':
# La Queue doit être créée avant les processus
queue = multiprocessing.Queue()

p_prod = multiprocessing.Process(target=producteur, args=(queue,))
p_cons = multiprocessing.Process(target=consommateur, args=(queue,))

p_prod.start()
p_cons.start()

p_prod.join()
p_cons.join()

Conclusion

Le module multiprocessing est l'outil indispensable en Python pour atteindre un vrai parallélisme et exploiter pleinement les processeurs multi-cœurs. Il est la solution de choix pour accélérer les applications CPU-bound. Bien que la communication entre processus soit plus complexe que pour les threads, des abstractions de haut niveau comme les Pool rendent la parallélisation de nombreuses tâches étonnamment simple. Pour des applications de calcul scientifique, d'analyse de données ou de traitement d'images, la maîtrise de multiprocessing est une compétence essentielle.


Exercice 01 : Parallélisation d'un Calcul de Factorielles avec Pool.map

Objectif

Cet exercice a pour but de vous faire utiliser un Pool de workers du module multiprocessing pour paralléliser une tâche CPU-bound. Vous comparerez le temps d'exécution de la version parallèle avec une version séquentielle pour visualiser le gain de performance.

Contexte

Le calcul de la factorielle d'un grand nombre est une opération CPU-bound. Si vous devez calculer la factorielle pour une longue liste de nombres, le faire séquentiellement peut prendre beaucoup de temps. C'est un cas d'usage parfait pour la parallélisation avec multiprocessing.

Vous allez écrire une fonction qui calcule la factorielle, puis l'appliquer à une liste de nombres en utilisant pool.map() pour distribuer le travail sur plusieurs cœurs de CPU.

Énoncé

  1. Créez un nouveau fichier Python nommé parallel_factorial.py.

  2. Importez les modules nécessaires : multiprocessing, time, et math (pour math.factorial).

  3. Définissez la fonction de travail.

    • Créez une fonction calculer_factorielle(n) qui prend un entier n.
    • Cette fonction doit calculer la factorielle de n. Pour rendre la tâche plus longue et l'effet de la parallélisation plus visible, vous pouvez ajouter une boucle qui consomme du CPU avant le calcul, ou simplement utiliser math.factorial sur de grands nombres.
    import math

    def calculer_factorielle(n):
    # math.factorial est déjà très rapide car implémenté en C.
    # Pour simuler une charge de travail plus lourde en Python pur,
    # on pourrait faire une boucle, mais pour cet exercice,
    # l'utilisation de grands nombres suffira à montrer le principe.
    return math.factorial(n)
  4. Préparez les données.

    • Créez une liste de nombres pour lesquels vous voulez calculer la factorielle. Utilisez des nombres assez grands pour que le calcul prenne un temps mesurable, par exemple list(range(20000, 20020)).
  5. Implémentez et mesurez la version séquentielle.

    • Protégez votre code principal avec if __name__ == '__main__':.
    • Enregistrez le temps de début.
    • Utilisez une compréhension de liste ou une boucle for pour appeler calculer_factorielle pour chaque nombre de votre liste.
    • Enregistrez le temps de fin et affichez la durée.
  6. Implémentez et mesurez la version parallèle.

    • Juste après la version séquentielle, réinitialisez le temps de début.
    • Créez un Pool de workers en utilisant with multiprocessing.Pool() as pool:. Laisser le nombre de processus vide pour qu'il soit déterminé automatiquement (os.cpu_count()).
    • Utilisez pool.map(calculer_factorielle, nombres) pour distribuer le calcul sur les workers.
    • Enregistrez le temps de fin et affichez la durée.
  7. Comparez les résultats.

    • Affichez les durées des deux versions pour comparer. La version parallèle devrait être significativement plus rapide sur une machine multi-cœur.

Résultat Attendu

Les temps exacts dépendront du nombre de cœurs de votre CPU et de sa vitesse, mais la version parallèle devrait montrer un gain de performance clair.

Calcul des factorielles pour 20 nombres...

--- Version Séquentielle ---
Durée : 1.52 secondes

--- Version Parallèle avec Pool ---
Durée : 0.45 secondes

Gain de performance : La version parallèle est environ 3.4 fois plus rapide.
Cliquez ici pour voir un exemple de code de solution
# parallel_factorial.py

import multiprocessing
import time
import math

def calculer_factorielle(n):
"""
Calcule la factorielle d'un nombre.
C'est une tâche gourmande en CPU pour de grands nombres.
"""
return math.factorial(n)

if __name__ == '__main__':
# Utiliser des nombres assez grands pour que le calcul soit non trivial
NUMBERS = list(range(30000, 30025))
print(f"Calcul des factorielles pour {len(NUMBERS)} nombres (de {NUMBERS[0]} à {NUMBERS[-1]})...")

# --- 1. Version Séquentielle ---
print("\n--- Version Séquentielle ---")
start_time_seq = time.time()

# Utilise map() de base, qui est séquentiel
resultats_seq = list(map(calculer_factorielle, NUMBERS))

end_time_seq = time.time()
duration_seq = end_time_seq - start_time_seq
print(f"Durée : {duration_seq:.2f} secondes")

# --- 2. Version Parallèle ---
print("\n--- Version Parallèle avec Pool ---")
start_time_parallel = time.time()

# Crée un pool de processus. Le nombre de workers est géré automatiquement.
with multiprocessing.Pool() as pool:
# pool.map distribue les éléments de NUMBERS aux processus workers
# et applique la fonction calculer_factorielle sur chacun.
resultats_parallel = pool.map(calculer_factorielle, NUMBERS)

end_time_parallel = time.time()
duration_parallel = end_time_parallel - start_time_parallel
print(f"Durée : {duration_parallel:.2f} secondes")

# --- 3. Conclusion ---
print("\n--- Conclusion ---")
if duration_parallel < duration_seq:
gain = duration_seq / duration_parallel
print(f"La version parallèle est environ {gain:.1f} fois plus rapide.")
else:
print("La version parallèle n'a pas montré de gain de performance.")

# Vérification que les résultats sont identiques
assert resultats_seq == resultats_parallel
print("Les résultats des deux versions sont identiques.")