Интересные способы использования Go каналов (перевод)

в 10:15, , рубрики: channels, concurrency, Go

Предлагаю вам перевод статьи Gary Willoughby «Interesting ways of using Go channels».

Статья предназначена для тех, кто уже немного разбирается в Go.

gopher

Интересные способы использования Go каналов

Я написал этот пост, чтобы задокументировать доклад про Go каналы Джона Грэм-Камминга на конференции GopherCon 2014. Доклад назывался «Краткое руководство по каналам» и он доступен для просмотра на youtube.com.

На протяжении доклада нам представляют интересные способы использования Go каналов и раскрывают возможности и преимущества конкурентного программирования. Лично мне этот доклад открыл глаза на несколько новых способов структурирования программ и новых техник для синхронизации по нескольким ядрам процессора.

Следующие примеры демонстрируют различные техники, как использовать каналы в Go. Код был специально упрощен для их понимания. Не стоит его использовать для продакшн версий. Например, пропущены все обработки ошибок.

Сигналы

Ожидание события

В этом примере горутина запускается, делает какую-то работу (в данном случае ждет 5 секунд), затем закрывает канал. Небуферизированные каналы всегда останавливают выполнение текущей горутины, пока не придет сообщение. Закрытие канала сигнализирует горутине, что она может продолжить свое выполнение, потому что больше никаких данных не может быть получено. Закрытые каналы никогда не останавливают выполнение горутины.

package main

import (
	"fmt"
	"time"
)

func main() {

	c := make(chan bool)

	go func() {
		// ... выполняем что-нибудь
		time.Sleep(time.Second * 5)
		close(c)
	}()

	// Останавливает выполнение до получения сообщения из канала или его закрытия.
	<-c

	fmt.Println("Done")
}

Координирование нескольких горутин

В этом примере сотня горутин запускается, ждет передачи данных через канал start (или его закрытия). В случае, когда он будет закрыт, все горутины запустятся.

package main

func worker(start chan bool) {
	<-start
	// ... выполняем что-нибудь
}

func main() {
	start := make(chan bool)

	for i := 0; i < 100; i++ {
		go worker(start)
	}

	close(start)
	// ... все worker's запустятся сейчас
}

Скоординированное прекращение worker’ов

В этом примере, сотня горутин запускается, ждет передачи данных через канал die (или его закрытия). В случае, когда он будет закрыт, все горутины прекратят выполнение.

package main

func worker(die chan bool) {
	for {
		select {
		// ... выполняем что-нибудь в других case
		case <-die:
			return
		}
	}
}

func main() {
	die := make(chan bool)

	for i := 0; i < 100; i++ {
		go worker(die)
	}

	// Остановить всех worker'ов.
	close(die)
}

Проверка прекращения работы worker’ов

В этом примере горутина запускается, ждет передачи данных от канала die (или его закрытия). В случае, когда придет сигнал, горутина выполнит завершающие действия и пошлет сигнал в функцию main (через этот же канал die) о том, что она завершена.

package main

func worker(die chan bool) {
	for {
		select {
		// ... выполняем что-нибудь в других case
		case <-die:
			// ... выполняем необходимые действия перед завершением.
			die <- true
			return
		}
	}
}

func main() {
	die := make(chan bool)
	go worker(die)
	die <- true

	// Ждем, пока все горутины закончат выполняться
	<-die
}

Инкапсулируем состояние

Уникальный ID сервиса

В этом примере горутина запускается для генерации уникальных шестнадцатиричных id’шников. Каждый id посылается через канал id и горутина приостанавливается, пока сообщение из канала не будет прочитано. Каждый раз, когда канал будет прочитан, горутина инкрементирует счетчик и посылает его значение.

package main

import "fmt"

func main() {
	id := make(chan string)

	go func() {
		var counter int64 = 1
		for {
			id <- fmt.Sprintf("%x", counter)
			counter += 1
		}
	}()

	fmt.Printf("%sn", <-id) // will be 1
	fmt.Printf("%sn", <-id) // will be 2
}

Повторное использование памяти

В этом примере, горутина запускается для повторного использования буферов памяти. Канал give получает старые буферы памяти и сохраняет их в список. В это время канал get распределяет эти буферы для использования. Если нет доступных буферов в списке, создается один новый.

От переводичка

Проще говоря, мы активно повторно используем память, чтобы лишний раз ее не выделять (как мы знаем, ОС может выделять память очень долго). Используется список, в котором всегда есть как минимум 1 буфер. А уже использованные буфера отправляем обратно в этот же список.

package main

import "container/list"

func main() {

	give := make(chan []byte)
	get := make(chan []byte)

	go func() {
		q := new(list.List)

		for {
			if q.Len() == 0 {
				q.PushFront(make([]byte, 100))
			}

			e := q.Front()

			select {
			case s := <-give:
				q.PushFront(s)

			case get <- e.Value.([]byte):
				q.Remove(e)
			}
		}
	}()

	// Получаем новый буфер
	buffer := <-get

	// Возвращаем буфер
	give <- buffer

	// Получаем буфер снова
	buffer = <-get
}

Ограниченное повторное использование памяти

В этом примере буферизированный канал используется как хранилище буферов. Канал настроен на хранение пяти буферов в любой момент времени. Это значит, что канал не блокирует текущую горутину, если у него есть место еще для одной записи.

Select предоставляет неблокирующий доступ к этому каналу в случае, если он будет заполнен. Первый select создает новый буфер, если невозможно получить его из хранилища. Второй select по умолчанию ничиго не делает, если невозможно положить буфер в хранилище, что вызывает GC для очистки этого буфера.

От переводичка

Тут опять хочется добавить от себя. Мы просто создаем буферизированный канал, который ограничивает количество буферов, которые мы хотим «переиспользовать». И если нам приходит «лишний» буфер, мы его игнорим, позволяя отрабатывать сборщику мусора. В данном случае, обе функции неблокирующие, то есть не ждут получения данных ни от кого.

package main

func get(store chan []byte) []byte {
	select {
	case b := <-store:
		return b
	default:
		return make([]byte, 100)
	}
}

func give(store chan []byte, b []byte) {
	select {
	case store <- b:
	default:
		return
	}
}

func main() {

	// Создаем хранилище буферов.
	store := make(chan []byte, 5)

	// Получаем новый буфер из хранилища.
	buffer := get(store)

	// Возвращаем его обратно в хранилище.
	give(store, buffer)

	// Получаем буфер еще раз из хранилища.
	buffer = get(store)
}

Nil каналы

Отключение получения сообщений в операторе case

В этом примере горутина запускается и используется select для получения сообщений из двух каналов. Если канал закрывается, он устанавливается в nil. Поскольку nil каналы всегда блокируют выполнение, данный case больше не выполняется. Если оба канала будут установлены в nil, мы выйдем из горутины, поскольку она больше не может ничего получать.

От переводичка

В примере второе выводимое значение – false, так как после закрытия c1 мы получили x = false и ok = false. Если бы мы не присвоили каналу c1 значение nil, то в нашем бесконечном цикле мы бы продолжали бесконечно получать x = false, ok = false.

Важная мысль. Если вы закрыли канал, то в его case всегда будет приходить значение по умолчанию. Поэтому следует присваивать nil закрытому каналу. А после этого не стоит забывать проверить все каналы на nil, иначе можно навсегда заблокировать вашу горутину.

package main

import "fmt"

func main() {
	c1 := make(chan bool)
	c2 := make(chan bool)

	go func() {
		for {
			select {
			case x, ok := <-c1:
				if !ok {
					c1 = nil
				}
				fmt.Println(x)

			case x, ok := <-c2:
				if !ok {
					c2 = nil
				}
				fmt.Println(x)
			}

			if c1 == nil && c2 == nil {
				return
			}
		}
	}()

	c1 <- true

	// Отключение первого case из select'a выше.
	close(c1)

	c2 <- true
}

Отключение отправки сообщений в операторе case

В этом примере горутина запускается и используется для генерации рандомных чисел и отправки их в канал c. Когда приходит сообщение в канал d, канал c устанавливается в nil, отключая соответствующий оператор case. Отключенная горутина уже больше никогда не генерирует рандомных чисел.

От переводичка

Вот тут бы не помешала проверка на nil, иначе в наша горутина не заканчивает свое выполнение. Она заблокировалась на select, так как первый case уже никогда не выполнится, а второй потерял смысл.

Тут есть ошибка в комментарии – c в предпоследней строке не равен nil, nil установили только для локальной переменной src. А deadlock происходит потому, что наш канал c больше никто не записывает.

package main

import (
	"fmt"
	"math/rand"
)

func main() {
	c := make(chan int)
	d := make(chan bool)

	go func(src chan int) {
		for {
			select {
			case src <- rand.Intn(100):

			case <-d:
				src = nil
			}
		}
	}(c)

	// Печатаем несколько случайных чисел.
	fmt.Printf("%dn", <-c)
	fmt.Printf("%dn", <-c)

	// Отключаем генерацию рандомных чисел.
	d <- true

	// Здесь прекращается выполнение, потому что канал c теперь nil.
	fmt.Printf("%dn", <-c)
}

Таймеры

Таймаут

В этом примере горутина запускается, чтобы сделать некоторую работу. Канал timeout создан, чтобы обеспечить исполнение case, если select выполняется слишком долго. В данном случае, горутина завершается после 30 секунд ожидания. Таймаут пересоздается каждую итерацию select’a, чтобы быть уверенным, что он успешно выполнен. В каждой следующей итерации таймаут сбрасывается.

От переводичка

Тут небольшая опечатка — в коде 5 секунд, а в статье 30.

package main

import "time"

func worker() {
	for {
		timeout := time.After(5 * time.Second)

		select {
		// ... выполняем что-нибудь

		case <-timeout:
			// Закрываем эту горутину после указанного таймаута.
			return
		}
	}
}

func main() {
	go worker()
}

Heartbeat

В этом примере горутина запускается, чтобы делать некоторую работу. Канал heartbeat создан для исполнения оператора case через равные промежутки времени. Канал heartbeat не сбрасывается на каждой итерации, чтобы оператор case всегда исполнялся в срок.

package main

import "time"

func worker() {
	heartbeat := time.Tick(30 * time.Second)
	for {

		select {
		// ... выполняем что-нибудь

		case <-heartbeat:
			// ... выполняем что-нибудь по таймеру
		}
	}
}

func main() {
	go worker()
}

Примеры

Сетевой мультиплексор

Этот пример демонстрирует простой сетевой мультиплексор. В нашей главной горутине создается канал для обработки передаваемых сообщений и устанавливается сетевое соединение. Затем запускается сотня горутин для генерации строк (которые действуют как наши сообщения) и передачи их через этот канал. Каждое сообщение читается из канала на протяжении бесконечного цикла и посылается в сетевое соединение.

Этот пример не запускается (потому что мы пытаемся соединиться с тестовому домену), но он показывает, как легко запустить множество ассинхронных процессов, посылающих сообщения в единственное сетевое подключение.

package main

import "net"

func worker(messages chan string) {
	for {
		var msg string // ... генерируем сообщение
		messages <- msg
	}
}

func main() {

	messages := make(chan string)
	conn, _ := net.Dial("tcp", "example.com")

	for i := 0; i < 100; i++ {
		go worker(messages)
	}

	for {
		msg := <-messages
		conn.Write([]byte(msg))
	}
}

Первый ответ

В этом примере каждый url из массива передается в отдельную горутину. Каждая горутина выполняется ассинхронно и запрашивает переданный ей url. Каждый ответ на запрос передается в канал first, который, конечно, гарантирует, что первый полученный ответ первым попадет в канал. Затем мы можем прочитать этот ответ из канала и соответственно обработать.

От переводичка

Пример снова не рабочий (ругается на неиспользуемую переменную r). Вот тут похожий пример — youtube.com. Рекомендую к просмотру это видео целиком.

package main

import "net/http"

type response struct {
	resp *http.Response
	url  string
}

func get(url string, r chan response) {
	if resp, err := http.Get(url); err == nil {
		r <- response{resp, url}
	}
}

func main() {
	first := make(chan response)

	for _, url := range []string{"http://code.jquery.com/jquery-1.9.1.min.js",
		"http://cdnjs.cloudflare.com/ajax/libs/jquery/1.9.1/jquery.min.js",
		"http://ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js",
		"http://ajax.aspnetcdn.com/ajax/jQuery/jquery-1.9.1.min.js"} {
		go get(url, first)
	}

	r := <-first
	// ... выполняем что-нибудь
}

Передача канала ответа

В этом примере канал w создан для передачи задач в горутину. Горутина получает задачу и делает запрос на url, содержащийся в ней. Канал resp тоже приходит в горутину как часть задачи. Как только запрос будет выполнен, ответ отправляется обратно через канал resp. Это позволяет этой горутине обрабатывать задачи и посылать результат обратно через разные каналы, настроенные для каждой отдельной задачи.

От переводичка

Если проще, мы для каждой нашей задачи создаем свой канал для ответа, через который горутина нам шлет результаты.

package main

import "net/http"

type work struct {
	url  string
	resp chan *http.Response
}

func getter(w chan work) {
	for {
		do := <-w
		resp, _ := http.Get(do.url)
		do.resp <- resp
	}
}

func main() {

	w := make(chan work)

	go getter(w)

	resp := make(chan *http.Response)
	w <- work{"http://cdnjs.cloudflare.com/jquery/1.9.1/jquery.min.js", resp}
	r := <-resp
	// ... выполняем что-нибудь
}

HTTP балансировщик нагрузки

В этом примере создается балансировшщик нагрузки на основе предыдущих примеров. Он обрабатывает прочитанные с stdin url’ы и для каждого запускает горутину для обработки. Каждый запрос проходит через балансировщик нагрузки, чтобы отфильтровать эти задачи на ограниченое число worker’ов. Эти worker’ы обрабатывают запросы и возвращают результаты в единственный канал answer.

Использование балансировщика нагрузки наподобии этого позволяет отправить огромное количество запросов, распределить их по всем доступным ресурсам и обработать их в упорядоченной виде.

package main

import (
	"fmt"
	"net/http"
)

type job struct {
	url  string
	resp chan *http.Response
}

type worker struct {
	jobs  chan *job
	count int
}

func (w *worker) getter(done chan *worker) {
	for {
		j := <-w.jobs
		resp, _ := http.Get(j.url)
		j.resp <- resp
		done <- w
	}
}

func get(jobs chan *job, url string, answer chan string) {
	resp := make(chan *http.Response)
	jobs <- &job{url, resp}
	r := <-resp
	answer <- r.Request.URL.String()
}

func balancer(count int, depth int) chan *job {
	jobs := make(chan *job)
	done := make(chan *worker)
	workers := make([]*worker, count)

	for i := 0; i < count; i++ {
		workers[i] = &worker{make(chan *job, depth), 0}
		go workers[i].getter(done)
	}

	go func() {
		for {
			var free *worker
			min := depth
			for _, w := range workers {
				if w.count < min {
					free = w
					min = w.count
				}
			}
			var jobsource chan *job
			if free != nil {
				jobsource = jobs
			}
			select {
			case j := <-jobsource:
				free.jobs <- j
				free.count++

			case w := <-done:
				w.count--
			}
		}

	}()

	return jobs
}

func main() {
	jobs := balancer(10, 10)
	answer := make(chan string)
	for {
		var url string
		if _, err := fmt.Scanln(&url); err != nil {
			break
		}
		go get(jobs, url, answer)
	}
	for u := range answer {
		fmt.Printf("%sn", u)
	}
}

Заключение

Go — язык, который с моей точки зрения имеет проблемы, но это язык, который я готов учить и использовать. Идеи из этой презентации открыли мне новые концепции и после нее захотелось начать новый проект, который будет использовать преимущества фантастичекой поддержки конкурентности в Go. Она также подчеркнула необходимость чтения и понимания стандартной библиотеки, предоставляемой языками, такими как Go, чтобы лучше осознать характер и проектные решения языка.

Автор: ilyashikhaleev

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js