Thread Pool : Théorie et Pratique

Introduction

Cet article a pour vocation de mettre en lumière les mécanismes internes utilisés par les outils récents permettant l'exploitation des architectures multi-coeurs tels que TBB et OpenMP par la présentation d'un exemple concret d'implémentation d'un thread pool basé sur la bibliothèque standardisée sur les systèmes POSIX, pthread.
De part les termes employés et les technologies présentées dans cet article, des connaissances de base en C et sur la programmation parallèle sont requise pour en saisir toute la portée.


Contexte

La course à la fréquence des CPU ayant atteint ses limites il y a quelques années, les constructeurs n'ont eu d'autre choix pour suivre la demande croissante en puissance de calcul que d'augmenter le nombre d'unités de calcul par microprocesseur.
Cependant les programmes ayant été écrit jusqu'à lors pour ne tirer profit que de processeurs mono-coeur, la puissance de calcul ainsi disponible n'a pas pu être exploitée directement. Le programmeurs devaient réapprendre à développer et repenser leurs programmes pour exploiter au mieux ces nouvelles architectures, mais les outils à leur disposition étaient de bas niveau et les contraintes engendrées par les accès concurrentiels aux ressources par les différents threads étaient très difficiles à gérer.
C'est dans ce cadre qu'est née le besoin d'avoir des abstractions de plus haut niveau afin de tirer un profit maximum de la puissance de calcul des machines.


En Théorie

Le patron de conception (design pattern)

Le concept de Thread Pool a été standardisé par l'introduction d'un patron de conception définissant son fonctionnement.
Le principe est simple, au lieux de créer des threads pendant le déroulement du programme (qui est une opération coûteuse) et de leur attribuer un comportement particulier, le programme va créer un certain nombre de threads à sont lancement qui seront en charge d'exécuter des tâches génériques, tâches qui seront mises en file d'attente le temps qu'un thread soit disponible pour les exécuter.


Le schémas ci-dessous représente le comportement attendu d'un thread pool.



Avantages

Les avantages à l'utilisation du Thread Pool sont multiples :

  • Comme dit précédement, on ne recrée pas de threads pendant l'execution du programme, ils sont démarrés au lancement et arrêtés lorsque le programme se termine, ce qui décharge le programmeur de cette responsabilité.
  • Le Thread Pool est conçu pour exécuter des tâches génériques, une fois implémenter il peut être réutilisé dans différents programmes et distribuer au moyen de bibliothèques partagées.
  • On peut décider du nombre de threads dont il sera composé et donc exploiter au maximum les architectures multi-coeurs des processeurs actuels.

Inconvénients

Cependant l'implémentation d'un tel outils a aussi quelques inconvénients :

  • Cela nécessite de pouvoir décomposer le travail sous forme de tâches indépendantes qui peuvent introduire une grande complexité dans l'implémentation et des difficultés à comprendre le code (on aura alors besoin d'outils de plus haut niveau encore pour faciliter le développement, cf. TBB)
  • On ne résout pas les problèmes de synchronisation quand à l'accès aux ressources partagées.

En Pratique

Implémentation d'un Thread Pool basé sur la bibliothèque pthread

Afin d'affiner la compréhension du modèle théorique nous allons voir un cas concret d'implémentation d'un Thread Pool basé sur la bibliothèque pthread. L'implémentation sera faite en C pour fournir un exemple facilement portable en ne nous encombrant pas des contraintes des langages de plus haut niveau.

Analyse de conception

Nous allons avoir besoin des éléments suivants :

  • Une abstraction des tâches qui seront exécutées par le Thread Pool (pour la généricité).
  • Une file d'attente dans laquelle les tâches seront mises avant d'être exécutées par les threads.
  • L'implémentation du Thread Pool en lui-même.

Comme dans tout système multi-threadé nous auront des contraintes de synchronisation lors des accès aux ressources partagées, dans notre cas la ressource partagées entre les différents threads est la file d'attente dans laquelle les tâches seront mises pour être exécutées.

La bibliothèque pthread nous fourni les outils suivants pour résoudre ce problème :

  • pthread_mutex_t Pour synchroniser l'accès aux sections critiques.
  • pthread_cond_t Pour mettre des threads en attente le temps qu'une tâche soit disponible pour être executées.


Le défis

On veut que le code suivant compile et exécute les tâches dans les différents threads du Thread Pool.

void routine()
{
  // do somehting here...
}

int main()
{
  int i;

  for (i = 0; i < 20; ++i)
    {
      // this is the function we will write to use the
      // thread pool
      pthread_async_exec(routine, NULL);
    }
  return (0);
}

Seuls les points importants du code seront mis en avant dans cet article, le code source complet est disponible en téléchargement en bas de page.


Tâche (pthread_task_t)

L'implémentation d'une tâche est très simple, nous n'avons besoin que d'un pointeur sur fonction qui sera exécutée par un thread et d'un paramètre à lui envoyer qui contiendra des données associées au travail à exécuter.
La structure fera aussi office de node de liste chainée afin de stocker les tâches dans la liste d'attente.

 

/**
 * @brief Here is the definition of a structure representing a task to be
 * executed by a thread pool.
 *
 * @attribute routine The funtion to be called when the task is executed.
 * @attribute data    The data to send to the callback function.
 * @attribute next    The next task in the queue that stores the tasks.
 */
typedef struct _pthread_task
{
  void (*routine)();
  void *data;
  struct _pthread_task *next;
} pthread_task_t;

 

Sont associé à la structure les constructeurs et destructeurs et la fonction pthread_task_exec() permettant d'exécuter une tâche (elle sera appelée par les threads du Thread Pool et appellera elle-même le callback associé à la tâche en lui passant sont argument en paramètre).

int pthread_task_exec(pthread_task_t *task)
{
  if (task == NULL)
    {
      return (-1);
    }
  if (task->data == NULL)
    {
      task->routine();
    }
  else
    {
      task->routine(task->data);
    }
  return (0);
}

 

File d'attente (pthread_queue_t)

La file d'attente doit garder un pointeur vers la première tâche (qui sera donnée au premier thread disponnible) et vers la dernière tâche de la liste (pour pouvoir en rajouter en les chainant les unes les autres).
Une simple mutex est nécessaire pour synchroniser les accès à la liste et acquérir le verrou nécessaire à l'attente lorsqu'une tâche est disponnible.

 

/**
 * @brief Here is the definition of a structure representing a synchronized
 * queue where pending tasks are stored before being executed by the thread
 * pool.
 *
 * @attribute mutex The mutex used to synchronize the access to the task queue.
 * @attribute cond  The condition to be used by the threads to wait for tasks.
 * @attribute first The first task in the queue.
 * @attribute last  The last task in the queue.
 * @attribute terminated A flag to say whether the execution has been terminated.
 */
typedef struct _pthread_queue
{
  pthread_mutex_t mutex;
  pthread_cond_t cond;
  pthread_task_t *first;
  pthread_task_t *last;
  int terminated;
} pthread_queue_t;


Sont associé à la structure les constructeurs et destructeurs et les fonctions pthread_queue_push() et pthread_queue_pop() permettant de mettre les tâches en file d'attente et de les en extraire (ces fonctions seront synchronisées en interne au moyen de la mutex encapsulée dans le file d'attente).

 

int pthread_queue_push(pthread_queue_t *queue, const pthread_task_t *task)
{
  pthread_mutex_t *mutex;
  pthread_cond_t *cond;
  pthread_task_t *qtask;

  if ((queue == NULL) || (task == NULL))
    {
      return (-1);
    }
  if ((qtask = malloc(sizeof(*qtask))) == NULL)
    {
      return (-1);
    }
  pthread_task_init(qtask, task->routine, task->data);
  mutex = &(queue->mutex);
  cond = &(queue->cond);
  pthread_mutex_lock(mutex);
  if (queue->last == NULL)
    {
      queue->first = qtask;
      queue->last = qtask;
    }
  else
    {
      queue->last->next = qtask;
      queue->last = qtask;
    }
  pthread_cond_signal(cond);
  pthread_mutex_unlock(mutex);
  return (0);
}

int pthread_queue_pop(pthread_queue_t *queue, pthread_task_t *task)
{
  pthread_mutex_t *mutex;
  pthread_cond_t *cond;
  pthread_task_t *qtask;

  if ((queue == NULL) || (task == NULL))
    {
      return (-1);
    }
  mutex = &(queue->mutex);
  cond = &(queue->cond);
  pthread_mutex_lock(mutex);
  while (queue->first == NULL)
    {
      pthread_cond_wait(cond, mutex);
      if (queue->terminated)
	{
	  pthread_mutex_unlock(mutex);
	  return (-1);
	}
    }
  qtask = queue->first;
  queue->first = queue->first->next;
  pthread_mutex_unlock(mutex);
  pthread_task_init(task, qtask->routine, qtask->data);
  pthread_task_destroy(qtask);
  free(qtask);
  return (0);
}


Thread Pool (pthread_pool_t)

Le Thread Pool quant à lui contient la liste d'attente et un vecteur où sont mémorisés les identifiants des threads actuellement lancés par le programme.

 

/**
 * @brief Here is the definition of a structure representing a thread pool.
 *
 * @attribute mutex   A mutex used to synchronize the access to the thread pool.
 * @attribute queue   The instance of pthread_queue_t used to store the pending
 * tasks.
 * @attribute threads The vector storing the list of threads.
 * @attribute thread_init The number of initialized threads.
 * @attribute thread_count The number of threads in the thread pool. 
 */
typedef struct _pthread_pool
{
  pthread_mutex_t mutex;
  pthread_queue_t queue;
  pthread_t *threads;
  size_t thread_init;
  size_t thread_count;
} pthread_pool_t;


Sont associées les construceurs et destructeurs et les fonctions pthread_pool_exec() qui permet de donner une tâche à executer au Thread Pool et pthread_pool_routine() qui sera le point d'entrée des threads du Thread Pool.

 

int pthread_pool_exec(pthread_pool_t *thread_pool, void (*function)(), void *data)
{
  pthread_task_t task;
  int code;

  if ((thread_pool == NULL) || (function == NULL))
    {
      return (-1);
    }
  pthread_task_init(&task, function, data);
  code = pthread_queue_push(&(thread_pool->queue), &task);
  pthread_task_destroy(&task);
  return (code);
}

 

void *pthread_pool_routine(void *data)
{
  pthread_pool_t *thread_pool = (pthread_pool_t *)data;
  pthread_task_t task;

  pthread_mutex_lock(&(thread_pool->mutex));
  thread_pool->thread_init += 1;
  pthread_mutex_unlock(&(thread_pool->mutex));
  while (pthread_queue_pop(&(thread_pool->queue), &task) == 0)
    {
      pthread_task_exec(&task);
    }
  return (NULL);
}

 

Un petit tour de magie

Maintenant que nous avons terminé l'implémentation de notre Thread Pool, nous voulons pouvoir l'utiliser de façon simple, autrement dit nous développeur sommes fainéants et initialiser/détruire un objet dont la durée de vie est celle du programme n'est pas une solution envisageable.
Pas de panique, nous allons utiliser le mot-clé __attribute__((constructor)) pour automatiser l'initialisation du Thread Pool et nous n'auront plus qu'à l'utiliser directement dans notre programme.

 

void pthread_pool_constructor() __attribute__((constructor));
void pthread_pool_destructor()  __attribute__((destructor));

 

static pthread_pool_t thread_pool;

void pthread_pool_constructor()
{
  if (pthread_pool_init(&thread_pool) < 0)
    {
      fprintf(stderr, "ERROR: failed to create the thread pool, %sn", strerror(errno));
    }
}

void pthread_pool_destructor()
{
  pthread_pool_destroy(&thread_pool);
}

int pthread_async_exec(void (*function)(), void *data)
{
  return (pthread_pool_exec(&thread_pool, function, data));
}

Défi réussi !!!
Le bout de code que nous voulions pouvoir faire compiler s'exécute à merveille, nous pouvons maintenant utiliser toute la puissance disponnible sur nos machines multi-coeurs grâce à notre nouvel outils.

Conclusion


Cet article nous aura permis d'étudier les principes de fonctionnement interne des bibliothèque de haut niveau permettant l'exploitation des processeurs multi-coeurs au maximum de leurs possibilités, tout en essayant de garder un code aussi clair et facile à maintenir que possible (en déportant la gestion des threads dans une implémentation générique basée sur le design pattern Thread Pool).

Certe beaucoup d'améliorations peuvent être apportées à cette implémentation (utilisation de gestionnaires de mémoire pour éviter les allocations et libérations des tâches en mémoire, améloriation de la liste d'attente pour éviter l'effet "goulot d'étranglement" du à la synchronisation des threads... ) mais ce n'était pas le but de cet article que d'aborder des points là.

Dans un prochain article nous verrons comment exploiter au mieux les nouvelles fonctionnalités de la norme 99 du C pour rendre la parallélisation des tâches toujours plus simple.

Téléchargements

Reportez-vous à notre Notice d'optimisation pour plus d'informations sur les choix et l'optimisation des performances dans les produits logiciels Intel.

Commentaires

Portrait de

Bonjour,

Excellent article.

La fonction "pthread_pool_destroy" termine l'exécution de façon un peu abrupte. J'aimerais savoir comment faire pour attendre que toutes les tâches soient exécutées avant de terminer le programme. Des suggestions ?

Merci,

Christine

Portrait de

Article vraiment très intéressant.

Portrait de

Pour attendre que les threads on fini on peut faire un pthread_join au niveau de la Thread_Pool

Portrait de Thomas D.

Le lien de téléchargement des sources de l'implémentation n'est plus valide. Serait-il possible d'avoir les sources par un autre lien ? Merci