Search     or:     and:
 LINUX 
 Language 
 Kernel 
 Package 
 Book 
 Test 
 OS 
 Forum 
iakovlev.org

Concurrency

Goroutine - функция, которая может согласованно выполняться с другими функциями. Для создания рутины используется ключевое слово go.
Channel - канал, по которому можно передавать данные, в частности между рутинами, и синхронизировать их работу. Канал может иметь произвольный тип. Каналы можно использовать где угодно: в качестве полей структур, параметров функций, Каналы используются в том числе для синхронизации рутин.
В следующем примере создается один двунаправленный канал, который передается в качестве параметра. Он организует (или синхронизирует) работу между тремя рутинами. Две рутины отсылают данные в канал, а третья принимает. Оператор <- служит для отсылки данных в канал и приемки данных из канала. Если этот оператор стоит слева от переменной, обозначающей канал, это значит, что канал принимает данные извне. Если оператор стоит справа от переменной, это означает отсылку данных:

 func sender(c chan int) {
   for i := 0; i< 11; i++ {
     c <- i
   }
 }
 
 func sender2(c chan int) {
   for i := 11; i< 20; i++ {
     c <- i
   }
 }
 
 func receiver(c chan int) {	
   for {
     msg := <- c
     fmt.Println(msg)
   }
 }
 
 func main() {
   var c chan int = make(chan int)
 
   go sender(c)
   go sender2(c)
   go receiver(c)
 
   var input string
   fmt.Scanln(&input)
 }
Канал можно сделать однонаправленным - для этого нужно изменить заголовок функции:
func receiver(c <- chan int) {
В первом примере в качестве рутины выступает именованная функция. Анонимная функция также может выступать в качестве рутины. В следующем примере мы используем второй вариант. Будут созданы два канала, два сендера и один получатель. Отличие второго примера от первого в том, что получатель будет принимать данные не из одного, а из двух каналов. Для этого используется выражение, аналогичное switch, но только для каналов:
select

 func main() {
   var c1 chan int = make(chan int)
   var c2 chan int = make(chan int)
 
   go func () {
     for i := 0; i< 11; i++ {
       c1 <- i
     }
   }()
   
   go func () {
     for i := 11; i< 20; i++ {
       c2 <- i
     }
   }()
   
   go func () {
     for {
     select {
       case num1 := <- c1:
 	fmt.Println(num1)
       case num2 := <- c2:
 	fmt.Println(num2)
      }
     }
   }()
   
 
   var input string
   fmt.Scanln(&input)
 }
В приведенных примерах использовались небуфферизованные каналы, их особенность в том, что это синхронные каналы, в том смысле, что если мы попытаемся записать данные в уже непустой канал, операция записи заблокирует программу до тех пор, пока данные оттуда не будут прочитаны. Эту особенность можно проиллюстрировать следующим примером - в нем создается небуферизованный канал, делается попытка записать в него сразу три сообщения, что блокируется после первой же записи в канал до тех пор, пока мы не начинаем читать из него:

      message := make(chan string) // no buffer
      count := 3
      go func() {
           for i := 1; i <= count; i++ {
                fmt.Println("send message")
                message <- fmt.Sprintf("message %d", i)
           }
      }()
      time.Sleep(time.Second * 3)
      for i := 1; i <= count; i++ {
           fmt.Println(<-message)
      }
Буфферизованные асинхронные каналы создаются с дополнительным параметром - capacity.
Если мы сделаем попытку записать в канал или прочитать из канала сообщений больше, чем его буфер, произойдет дедлок:

     c := make(chan int, 2)
     c <- 1
     c <- 2
     c <- 3
     fmt.Println(<-c)
     fmt.Println(<-c)
 
Чтобы этого не происходило, канал нужно закрывать командой close. В следующем примере мы создаем канал с буфером=10, создаем рутину, в которой заполняем этот канал и закрываем канал, а потом читаем из него. Буферизованный канал позволяет делать итерацию, но перед этим канал нужно закрывать. После чего чтение из закрытого канала - неблокирующая операция:

 func fibonacci(n int, c chan int) {
     x, y := 0, 1
     for i := 0; i < n; i++ {
         c <- x
         x, y = y, x+y
     }
     close(c)
 }
 
 func main() {
     c := make(chan int, 10)
     go fibonacci(cap(c), c)
     for i := range c {
         fmt.Println(i)
     }
 }
 
Иногда возникают ситуация, когда канал по каким-то причинам задерживает отдачу. В этом случае можно использовать timeout для ограничения времени отклика. В следующем примере делается http-запрос. Создаются два канала - один для ответа и второй для ошибки. С помощью оператора select обрабатывается возможные сценарии, когда ответ на запрос из канала может вернуть ошибку либо задержаться:

 	response := make(chan *http.Response, 1)
 	errors   := make(chan *error)
 
 	go func() {
 		resp, err := http.Get("http://iakovlev.org/")
 		if err != nil {
 			errors <- &err
 		}
 		response <- resp
 	}()
 	for {
 		select {
 		case r := <-response:
 			fmt.Printf("%s", r.Body)
 			return
 		case err := <-errors:
 			log.Fatal(err)
 		case <-time.After(100 * time.Millisecond):
 			fmt.Printf("Timed out!")
 			return
 		}
 	}    
 
В гоу concurrency - это не параллелизм. Для обьяснения этого парадокса рассмотрим код, в котором не будет ни одной рутины, в нем будут две анонимных функции, которые по всем законам жанра будут выполняться строго последовательно - именно в том порядке, в котором они прописаны:

     fmt.Println("Start")
     func() {
 	time.Sleep(1000000 * time.Microsecond)
         for char := 'a'; char < 'a'+26; char++ {
             fmt.Printf("%c ", char)
         }
     }()
     println()
     func() {
 
         for number := 1; number < 27; number++ {
             fmt.Printf("%d ", number)
         }
     }()
     println()
     fmt.Println("End")
 
 Вывод:
 Start
 a b c d e f g h i j k l m n o p q r s t u v w x y z 
 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 
 End
 
Вывод вполне предсказуем, при этом обратите внимание на тот факт, что во время паузы в первой анонимной функции программа будет заблокирована на период ожидания.
Перепишем этот пример - анонимные функции сделаем рутинами с помощью go и добавим синхронизацию:

     var wg sync.WaitGroup
     wg.Add(2)
     fmt.Println("Start")
     go func() {
 	defer wg.Done()
 	time.Sleep(1000000 * time.Microsecond)
         for char := 'a'; char < 'a'+26; char++ {
             fmt.Printf("%c ", char)
         }
     }()
     go func() {
 	defer wg.Done()
         for number := 1; number < 27; number++ {
             fmt.Printf("%d ", number)
         }
 	println()
     }()
     wg.Wait()
     println()
     fmt.Println("End")
 Вывод:
 Start
 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 
 a b c d e f g h i j k l m n o p q r s t u v w x y z 
 End
 
Порядок работы и вывод существенно меняются - сначала выполнится вторая рутина, а потом первая, т.е. разница в том, что теперь первая рутина не блокирует ход программы, у нас есть возможность автоматически переключиться между рутинами в том случае, если одна занята ожиданием. Т.е. concurrency дает возможность использовать ресурсы на сто процентов.
Когда мы говорим, что concurrency - это не параллелизм, мы имеем ввиду тот факт, что в гоу по умолчанию программа выполняется одним процессором. Если мы ходим использовать больше одного ядра, т.е. сделать программу параллельной, мы должны сделать обьявление - runtime.GOMAXPROCS(2):

     runtime.GOMAXPROCS(2)
     var wg sync.WaitGroup
     wg.Add(2)
     fmt.Println("Start")
     go func() {
 	defer wg.Done()
         for char := 'a'; char < 'a'+26; char++ {
             fmt.Printf("%c ", char)
         }
     }()
     go func() {
 	defer wg.Done()
         for number := 1; number < 27; number++ {
             fmt.Printf("%d ", number)
         }
 	println()
     }()
     wg.Wait()
     println()
     fmt.Println("End")
 Вывод:
 Start
 1 2 3 4 5 a 6 b c d e f g h i j k l m n o p 7 q r 8 s t u v w x y z 9 10 ... 
 End
 
В данном случае это будет уже программа с двумя рутинами, выполняемыми параллельно на разных ядрах, и каждый раз будет генерироваться непредсказуемый вывод.

Race condition - ситуация, когда многопоточная программа дает непредсказуемый результат.
Такой результат зачастую невозможно воспроизвести.
Одна из разновидностей - data race - случается тогда, когда две рутины получают одновременный доступ на запись к одной и той же переменной. Есть три варианта для обхода data race:
1. Сделать доступ этой переменной только на чтение - правда, в большинстве случаев это не подходит
2. Сделать доступ на запись в переменную только для одной рутины
3. Можно использовать мьютекс, у которого есть методы блокирования и разблокирования:

     mu sync.Mutex 
     mu.Lock()
     ...
     mu.Unlock()
 
Область кода между Lock() и Unlock() называется критической секцией. Функции с такими секциями называются мониторами.
Иногда возникает ситуация, когда вызов Unlock() полезно ложить в отложенную функцию defer, чтобы быть уверенным в гарантированной разблокировке - в следующем примере разблокировка произойдет уже после того, как функция вернет значение - поэтому это concurrency-safe функция, даже если в функции случится паника и там будет стоять вызов рекавери, разблокировка все равно сработает :

 func Balance() int {
   mu.Lock()
   defer mu.Unlock()
   return balance
 }
 
 
Простая ситуация, когда возможен дедлок: есть две функции, одна вызывается внутри другой, обе функции используют один и тот же мьютекс для блокировки, при вызове вложенной функции и произойдет дедлок, потому что у мьютекса нельзя вызывать два раза подряд один и тот же метод
mu.Lock()
Есть специальный тип мьютекса с эксклюзивным доступом на запись, но он работает медленнее обычного:
var mu sync.RWMutex
В пакете sync есть еще один обьект
sync.Once
Он включает в себя одновременно мьютекс и булевский флаг. У него есть метод
once.Do(oblect)
При первом вызове этого метода флаг установится в true, при дальнейших вызовах чтение расшаренного в памяти обьекта будет оптимизировано.

В майнфреймовых языках считается правилом хорошего тона использование блокировок с помощью мьютексов. Гоу также предоставляет аналогичную возможность. Кроме этого, в гоу есть другая возможность для синхронизации - для этого можно использовать каналы.
В следующем примере показано, как работать с "глобальным" словарем, при этом мьютексы не используются. В словаре создается два счетчика, затем они наращиваются, после чего выводятся на экран. На самом деле словарь здесь не глобален, а локален внутри одной функции, но виден он отовсюду. Канал, по которому происходит обмен, предоставляет доступ к этому словарю из любой точки программы. Этот небуферизованный - синхронный - канал является одновременно синронизатором доступа к словарю:

 package main
 
 import "sync"
 
 type request struct {
 	key int
 	value int
 	op string
 	ret chan int
 } 
 
 func set(c chan request, key int)  {
 	c <- request{key, 0, "set", nil}
 }
 
 func get(c chan request, key int) int {
 	result := make(chan int)
 	c <- request{key, 0, "get", result}
 	return <-result
 	
 }
 
 func add(c chan request, key int)  {
 	c <- request{key, 0, "add", nil}
 }
 
 func runMap(c chan request) {
 	m := make(map[int] int)
 	for {
 		req := <- c
 		switch req.op {
 		  case "set":
 		    m[req.key] = 0
 		  case "get":
 		    req.ret <- m[req.key]
 		  case "add":
 		    m[req.key] += 1
 		}
 	}
 }
 
 func main() {
 	m := make(chan request)
 	
 	go runMap(m)
 	set(m, 1 )
 	set(m, 2)
 	
 	var wg sync.WaitGroup
 	wg.Add(2)
 	go func() {
 	  defer wg.Done()	  
 	  for i := 0; i< 100000; i++ {
 	    add(m, 1)
 	  }
 	}()
 	
 	go func() {
 	  defer wg.Done()	  
 	  for i := 0; i< 200000; i++ {
 	    add(m, 2)
 	  }
 	}()
 	wg.Wait()
 
  
 	println(get(m, 1))
 	println(get(m, 2))
 	
 	  
 }
 



Оставьте свой комментарий !

Ваше имя:
Комментарий:
Оба поля являются обязательными

 Автор  Комментарий к данной статье