среда, 30 января 2019 г.

Эффективный Go: каналы

Как и карты, каналы аллоцируются с помощью make, а полученное значение действует как ссылка на подлежащую структуру данных. Если указан необязательный целочисленный параметр, он устанавливает размер буфера для канала. По умолчанию это ноль для небуферизованного или синхронного канала.

// небуферизованный канал целых чисел
ci := make(chan int)

// небуферизованный канал целых чисел            
cj := make(chan int, 0)   
     
// буферизованный канал указателей на Files
cs := make(chan *os.File, 100)  

Небуферизованные каналы объединяют связь (обмен значением) с синхронизацией, гарантируя, что два вычисления (goroutines) находятся в известном состоянии.

Есть много хороших идиом, использующих каналы. Вот одна из них. В предыдущем посте мы запустили сортировку в фоновом режиме. Канал может позволить запускающей программе ждать завершения сортировки.

c := make(chan int)  // Аллоцируем канал
// Начинаем сортировку в go-процедуре(goroutine); 
// когда она завершится, посылаем сигнал по каналу
go func() {
    list.Sort()
    c <- 1  // Отправляем сигнал; значение не важно
}()
doSomethingForAWhile()
<-c   // Ждем завершения сортировки; 
      // отменяем посланное значение

Приемники всегда блокируются, пока нет данных для приема. Если канал не буферизован, отправитель блокируется, пока получатель не получил значение. Если у канала есть буфер, отправитель блокируется только до того как значение было скопировано в буфер; если буфер заполнен, это означает ожидание, пока какой-либо получатель не получит значение.

Буферизованный канал может использоваться как семафор, например, для ограничения пропускной способности. В следующем примере входящие запросы передаются handle, который отправляет значение в канал, обрабатывает запрос, а затем получает значение из канала, чтобы подготовить "семафор" для следующего потребителя. Емкость буфера канала ограничивает количество одновременных вызовов process.

var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
    sem <- 1    // Ожидание истечения активной очереди.
    process(r)  // Может занять длительное время.
    <-sem       // Выполнено; 
                // разрешаем выполнение следующего запроса.
}

func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req)  // Не ждем окончания handle
    }
}

Пока обработчики(handlers) в MaxOutstanding выполняют process, будут блокироваться попытки отправки в заполненный буфер канала, пока один из существующих обработчиков не завершит работу и не получит из буфера.

Однако у этого дизайна есть проблема: Serve создает новую программу для каждого входящего запроса, хотя только MaxOutstanding из них может выполняться в любой момент. В результате программа может потреблять неограниченные ресурсы, если запросы поступают слишком быстро. Мы можем устранить этот недостаток, изменив Serve и сделав его в качестве шлюза создания go-процедур. Вот очевидное решение, но остерегайтесь, в нем есть ошибка, которую мы исправим далее:

func Serve(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func() {
            process(req) // Ошибочно; 
                         // смотрите объяснение ниже
            <-sem
        }()
    }
}

Ошибка в том, что в Go цикле for переменная цикла используется повторно для каждой итерации, поэтому req переменная является общей для всех программ. Это не то, что мы хотим. Нам нужно убедиться, что req уникален для каждой программы. Вот один из способов сделать это, передав значение req в качестве аргумента к замыканию в go-процедуре:

func Serve(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func(req *Request) {
            process(req)
            <-sem
        }(req)
    }
}

Сравните эту версию с предыдущей, чтобы увидеть разницу в том, как замыкание объявлено и запущено. Другое решение - просто создать новую переменную с тем же именем, как в следующем примере:

func Serve(queue chan *Request) {
    for req := range queue {
        req := req // Создаем новый экземпляров req 
                   // для go-процедуры
        sem <- 1
        go func() {
            process(req)
            <-sem
        }()
    }
}

Это может показаться странным писать

req := req

но в Go это законно и идиоматично. Вы получаете свежую версию переменной с тем же именем, сознательно затеняя переменную цикла локально, но уникально для каждой go-процедуры.

Возвращаясь к общей проблеме написания сервера, другой подход, который хорошо управляет ресурсами, состоит в том, чтобы начать фиксированное количество go-процедур handle, которые все читают из канала запросов. Количество go-процедур ограничивает количество одновременных вызовов process. Следующая функция Serve также принимает канал, на котором будет сказано выйти; после запуска go-процедур он блокирует получение из этого канала.

func handle(queue chan *Request) {
    for r := range queue {
        process(r)
    }
}

func Serve(clientRequests chan *Request, quit chan bool) {
    // Стартуем handlers
    for i := 0; i < MaxOutstanding; i++ {
        go handle(clientRequests)
    }
    <-quit  // Ждем того, что будет сказано выйти
}


Читайте также:


Комментариев нет:

Отправить комментарий