Search     or:     and:
 LINUX 
 Language 
 Kernel 
 Package 
 Book 
 Test 
 OS 
 Forum 
iakovlev.org

Глава 7 : Мьютексы и условные переменные

Исходники для этой страницы лежат тут

Взаимные исключения - mutual exclusion или mutex - и условные переменные - conditional variables - являются основными инструментами синхронизации . Они применяются как правило для потоков в рамках одного родительского процесса.

Схема работы мьютекса следующая :

       
 	блокировать_mutex()...
 	критическая область 
 	разблокировать_mutex()...
 
pthread_mutex_t - базовый тип мьютекса.

Мютекс можно создать либо статически , либо динамически :

       
 Статически :
 	static pthread_mutex_t	lock = PTHREAD_MUTEX_INITIALIZER;
 Динамически:
 	pthread_mutex_init
 
Для установки-снятия блокировка мьютекса есть 3 функции :
       
 	int pthread_mutex_lock(pthread_mutex_t * mptr);
 	int pthread_mutex_trylock(pthread_mutex_t * mptr);
 	int pthread_mutex_unlock(pthread_mutex_t * mptr);
 
 В случае успеха все 3 возвращают 0 .
 
Первая отличается от второй тем , что будет ждать в случае , если мьютекс уже заблокирован , а вторая сразу вернет ошибку .

Классической задачей синхронизации является схема производитель-потребитель :
Имеется несколько потоков-производителей , которые заполняют интовый массив значениями по порядку , а один поток-потребитель проверяет правильность заполнения .

       
 //mutex/prodcons2.c
 
 int		nitems;			/* read-only by producer and consumer */
 struct {
   pthread_mutex_t	mutex;
   int	buff[MAXNITEMS];
   int	nput;
   int	nval;
 } shared = { PTHREAD_MUTEX_INITIALIZER };
 
 void	*produce(void *), *consume(void *);
 
 int
 main(int argc, char **argv)
 {
 	int			i, nthreads, count[MAXNTHREADS];
 	pthread_t	tid_produce[MAXNTHREADS], tid_consume;
 
 	if (argc != 3)
 		err_quit("usage: prodcons2 <#items> <#threads>");
 	nitems = min(atoi(argv[1]), MAXNITEMS);
 	nthreads = min(atoi(argv[2]), MAXNTHREADS);
 
 	Set_concurrency(nthreads);
 		/* 4start all the producer threads */
 	for (i = 0; i < nthreads; i++) {
 		count[i] = 0;
 		Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
 	}
 
 		/* 4wait for all the producer threads */
 	for (i = 0; i < nthreads; i++) {
 		Pthread_join(tid_produce[i], NULL);
 		printf("count[%d] = %d\n", i, count[i]);	
 	}
 
 		/* 4start, then wait for the consumer thread */
 	Pthread_create(&tid_consume, NULL, consume, NULL);
 	Pthread_join(tid_consume, NULL);
 
 	exit(0);
 }
 /* end main */
 
 /* include producer */
 void *
 produce(void *arg)
 {
 	for ( ; ; ) {
 		Pthread_mutex_lock(&shared.mutex);
 		if (shared.nput >= nitems) {
 			Pthread_mutex_unlock(&shared.mutex);
 			return(NULL);		/* array is full, we're done */
 		}
 		shared.buff[shared.nput] = shared.nval;
 		shared.nput++;
 		shared.nval++;
 		Pthread_mutex_unlock(&shared.mutex);
 		*((int *) arg) += 1;
 	}
 }
 
 void *
 consume(void *arg)
 {
 	int		i;
 
 	for (i = 0; i < nitems; i++) {
 		if (shared.buff[i] != i)
 			printf("buff[%d] = %d\n", i, shared.buff[i]);
 	}
 	return(NULL);
 }
 /* end producer */
 
 
Глобальные переменные мы обьединяем в структуру shared вместе с мьютексом . Первый аргумент командной строки - это размерность массива , второй - число создаваемых потоков . Каждый поток вызывает функцию produce . id-шники потоков хранятся в массиве tid_produce. Критическая область кода - это функция produce , которую мы блокируем с помощью
       
 	Pthread_mutex_lock(&shared.mutex);
 	...
 	Pthread_mutex_unlock(&shared.mutex);
 
Запустив эту команду , мы должны получить результат что-то типа :
       
 >> prodcons2 1000000 5
 count[0]=123456
 count[1]=4456
 count[2]=56345
 count[3]=456
 count[4]=1256
 
Т.е. запущено 5 потоков , которые в сумме породили массив на 1000000 элементов , причем каждый успел заполнить различное количество элементов массива. Если мы закомментируем блокировку в функции produce , то все элементы будут созданы первым потоком .

Теперь изменим предыдущий пример , запустив потребителя сразу же после запуска всех производителей. Это даст возможность потребителю сразу обрабатывать данные по мере их поступления. Для этого нужно синхронизировать данные .

       
 //mutex/prodcons3.c
 
 int main(int argc, char **argv)
 {
 	int			i, nthreads, count[MAXNTHREADS];
 	pthread_t	tid_produce[MAXNTHREADS], tid_consume;
 
 	if (argc != 3)
 		err_quit("usage: prodcons3 <#items> <#threads>");
 	nitems = min(atoi(argv[1]), MAXNITEMS);
 	nthreads = min(atoi(argv[2]), MAXNTHREADS);
 
 		/* 4create all producers and one consumer */
 	Set_concurrency(nthreads + 1);
 	for (i = 0; i < nthreads; i++) {
 		count[i] = 0;
 		Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
 	}
 	Pthread_create(&tid_consume, NULL, consume, NULL);
 
 		/* 4wait for all producers and the consumer */
 	for (i = 0; i < nthreads; i++) {
 		Pthread_join(tid_produce[i], NULL);
 		printf("count[%d] = %d\n", i, count[i]);	
 	}
 	Pthread_join(tid_consume, NULL);
 
 	exit(0);
 }
 
 
Меняется функция consume , которая вызывает новую функцию consume_wait :

 void
 consume_wait(int i)
 {
 	for ( ; ; ) {
 		Pthread_mutex_lock(&shared.mutex);
 		if (i < shared.nput) {
 			Pthread_mutex_unlock(&shared.mutex);
 			return;			/* an item is ready */
 		}
 		Pthread_mutex_unlock(&shared.mutex);
 	}
 }
 
 void *
 consume(void *arg)
 {
 	int		i;
 
 	for (i = 0; i < nitems; i++) {
 		consume_wait(i);
 		if (shared.buff[i] != i)
 			printf("buff[%d] = %d\n", i, shared.buff[i]);
 	}
 	return(NULL);
 }
 /* end consume */
      
 
Функция consume_wait ждет , пока производители не создадут 1-й элемент . Для этого блокируется мьютекс и проверяется индекс производителя nput . Это цикл проверки называется опросом (spinning или polling) и является по сути лишней тратой времени процессора . Было бы лучше использовать какое-то другое средство для проверки , которое происходило бы при наступлении определенного события .

Мьютекс используется для блокировки , а условная переменная - для ожидания . Условная переменная представляет из себя тип pthread_cond_t , и для работы с ней есть 2 функции :

       
 	int pthread_cond_wait(pthread_cond_t *cptr , pthread_mutex_t *mptr)
 	int pthread_cond_signal(pthread_cond_t *cptr)
 
  В первой функции оба параметра являются обязательными .
 
Мы в очередной раз переписываем предыдущий пример. Переменные nput и nval ассоциируются с мьютексом , и мы их обьединим в структуру put . В другой структуре , nready , содержутся счетчик , мьютекс и условная переменная . Условная переменная инициируется с помощью PTHREAD_COND_INITIALIZER .
       
 //mutex/prodcons6.c
 
 #define	MAXNITEMS 		1000000
 #define	MAXNTHREADS			100
 
 		/* globals shared by threads */
 int		nitems;				/* read-only by producer and consumer */
 int		buff[MAXNITEMS];
 struct {
   pthread_mutex_t	mutex;
   int				nput;	/* next index to store */
   int				nval;	/* next value to store */
 } put = { PTHREAD_MUTEX_INITIALIZER };
 
 struct {
   pthread_mutex_t	mutex;
   pthread_cond_t	cond;
   int				nready;	/* number ready for consumer */
 } nready = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };
 
 
Функции produce и consume :
       
 void *
 produce(void *arg)
 {
 	for ( ; ; ) {
 		Pthread_mutex_lock(&put.mutex);
 		if (put.nput >= nitems) {
 			Pthread_mutex_unlock(&put.mutex);
 			return(NULL);		/* array is full, we're done */
 		}
 		buff[put.nput] = put.nval;
 		put.nput++;
 		put.nval++;
 		Pthread_mutex_unlock(&put.mutex);
 
 		Pthread_mutex_lock(&nready.mutex);
 		if (nready.nready == 0)
 			Pthread_cond_signal(&nready.cond);
 		nready.nready++;
 		Pthread_mutex_unlock(&nready.mutex);
 
 		*((int *) arg) += 1;
 	}
 }
 
 void *
 consume(void *arg)
 {
 	int		i;
 
 	for (i = 0; i < nitems; i++) {
 		Pthread_mutex_lock(&nready.mutex);
 		while (nready.nready == 0)
 			Pthread_cond_wait(&nready.cond, &nready.mutex);
 		nready.nready--;
 		Pthread_mutex_unlock(&nready.mutex);
 
 		if (buff[i] != i)
 			printf("buff[%d] = %d\n", i, buff[i]);
 	}
 	return(NULL);
 }
 
Для блокировки теперь используется put.mutex . nready.nready - это счетчик , в котором хранися число элементов , готовых для обработки . Перед его увеличением мы проверяем , не было ли оно нулевым , и если да , то вызывается функция pthread_cond_signal , позволяющая возобновит работу потребителя , который ждет , когда эта переменная станет болше нуля. Этот счетчик используется совместно потребителем и производителями , поэтому доступ к нему осуществляется с помощью мьютекса .

Потребитель просто ждет , когда значение счетчика nready.nready станет больше нуля . Если его значение равно нулю , мы вызываем pthread_cond_wait , при этом выполняются два атомарных процесса :

       
 	1 разблокировка nready.mutex
 	2 поток приостанавливается , пока другой поток не вызовет pthread_cond_signal 
 
При выходе из pthread_cond_wait блокируется nready.mutex. Если тут мы обнаруживаем , что счетчик больше нуля , мы обнуляем его , зная при этом , что мьютекс заблокирован , и разблокируем мьютекс .

Условный код , передающий сигнал условной переменой , выглядит так :

       
 	...	
 	Pthread_mutex_lock
 	установка истинного значения условия
 	Pthread_cond_signal
 	Pthread_mutex_unlock
 
Условный код , проверяющий условие и приостанавливающий процесс , если условие не выполняется , выглядит так :
       
 	Pthread_mutex_lock
 	while(условие ложно)	
 		Pthread_cond_wait
 	изменение условия
 	Pthread_mutex_unlock
 
Можно воспользоваться функцией pthread_cond_broadcast для пробуждения всех процессов , заблокированных в ожидании сигнала данной условной переменной .
       
  int pthread_cond_broadcast(pthread_cond_t * ptr)
  int pthread_cond_timedwait(pthread_cond_t * ptr, pthread_mutex_t *mptr , ...)
 
  В случае успеха обе возвращают 0
 
pthread_cond_timedwait позволяет установить ограничение на время блокировки процесса в абсолютном формате, т.е. число секунд с 1970 года .

Инициализировать мьютексы и условные переменные можно с помощью других функций :

       
 	int pthread_mutex_init
 	int pthread_mutex_destroy
 	int pthread_cond_init
 	int pthread_cond_destroy
 	
 Все возвращают 0 в случае успеха
 
Атрибуты имеют тип pthread_mutexattr_t и pthread_condattr_t соответственно , для их инициализации и удаления есть свой набор функций . После инициализации их можно изменить с помощью другого набора специальных функций .

Следующий код показывает , как надо проинициализировать мьютекс , чтобы его можно было использовать нескольким процессам :

       
 	pthread_mutex_t *mptr;
 	pthread_mutexattr_t mattr;
 	mprt = pthread_mutexattr_init(&mattr);
 	pthread_mutexattr_setpshared(&mattr,PTHREAD_PROCESS_SHARED)
 	pthread_mutex_init(mptr,&mattr); 
 
Когда мьютекс используется совместно несколькими процессами , всегда есть возможность , что процесс будет завершен во время работы с заблокированным ресурсом . И нет способа заставить систему автоматически снять эту блокировку . Единственный тип блокировки , снимаемый автоматически - блокировка fcntl . При использовании семафоров System V можно указать ядру , следует ли автоматом снимать блокировку .

Т.о. мьютексы и взаимные исключения могут инициализироваться как статически , так и динамически . Динамическая инициализация позволяет указывать дополнительные атрибуты .

Оставьте свой комментарий !

Ваше имя:
Комментарий:
Оба поля являются обязательными

 Автор  Комментарий к данной статье