воскресенье, 20 октября 2019 г.

Паттерны конкурентности в Golang: пайплайны

Примитивы Go для параллелизма упрощают создание потоковых пайплайнов данных, эффективно использующих ввод/вывод и несколько CPU. В этом посте приводятся примеры таких пайплайнов, освещаются тонкости, возникающие при сбое операций, и вводятся техники, позволяющие бороться с отказами.

Что такое пайплайн?

В Go нет формального определения пайплайна; это всего лишь одна из множества конкурентных программ. Неформально пайплайн представляет собой последовательность этапов, соединенных каналами, где каждый этап представляет собой группу goroutine, выполняющих одну и ту же функцию. На каждом этапе goroutine

  • получают значения из вышестоящего потока через входящие каналы
  • выполняют некоторую функцию над этими данными, обычно создавая новые значения
  • отправляют значения дальше вниз по исходящим каналам

Каждый этап имеет любое количество входящих и исходящих каналов, кроме первого и последнего этапов, которые имеют только исходящие или входящие каналы соответственно. Первый этап иногда называют источником (source) или продюсером (producer); последний этап, раковиной (sink) или потребитель (consumer).

Начнем с простого примера пайплайна, чтобы объяснить идеи и методы. В следующем посте будет представлен более реалистичный пример.

Квадратные числа

Рассмотрим пайплайн с тремя этапами.

Первый этап, gen, - это функция, которая преобразует список целых чисел в канал, который выдает целые числа из списка. Функция gen запускает go-процедуру, которая отправляет целые числа по каналу, и закрывает канал после отправки всех значений:

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

Второй этап, sq, получает целые числа из канала и возвращает канал, который выдает квадрат каждого полученного целого числа. После того, как входящий канал закрыт и этот этап отправил все значения в нисходящем направлении, он закрывает исходящий канал:

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

Функция main устанавливает пайплайн и запускает последний этап: она получает значения со второго этапа и печатает каждый из них, пока канал не будет закрыт:

func main() {
    // Устанавливаем пайплайн.
    c := gen(2, 3)
    out := sq(c)

    // Потребляем вывод.
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}

Запустить пример в песочнице play.golang.org

Поскольку sq имеет одинаковый тип для входящих и исходящих каналов, мы можем составить его любое количество раз. Мы также можем переписать main как range цикл, как и другие этапы:

func main() {
    // Устанавливаем пайплайн и потребляем вывод.
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n) // 16 затем 81
    }
}

Запустить пример в песочнице play.golang.org

Fan-out, fan-in

Несколько функций могут читать с одного канала, пока этот канал не будет закрыт; это называется fan-out (раздуванием). Это дает возможность распределить работу среди группы работников, чтобы распараллелить использование CPU и I/O (ввод/вывод).

Функция может считывать данные с нескольких входов и работать до тех пор, пока все они не будут закрыты, путем мультиплексирования входных каналов в один канал, который закрыт, когда все входы закрыты. Это называется fan-in (вдуванием).

Мы можем изменить наш пайплайн для запуска двух экземпляров sq, каждый из которых читает из одного и того же входного канала. Мы вводим новую функцию merge, чтобы выполнять fan in для результатов:

func main() {
    in := gen(2, 3)

    // Распределяем работу sq по двум goroutine, 
    // которые обе читают из in.
    c1 := sq(in)
    c2 := sq(in)

    // Потребляем объединенный вывод из c1 и c2.
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 затем 9, или 9 затем 4
    }
}

Функция merge преобразует список каналов в один канал, запуская goroutine для каждого входящего канала, которая копирует значения в единственный исходящий канал. Как только все выходные goroutine были запущены, merge запускает еще одну goroutine, чтобы закрыть исходящий канал после того, как все отправки по этому каналу сделаны.

Отправка по закрытому каналу вызывает panic, поэтому важно убедиться, что все goroutine выполнены до вызова close. Тип sync.WaitGroup предоставляет простой способ организовать эту синхронизацию:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Запускаем output goroutine 
    // для каждого входного канала в cs.
    // output копирует значения из c в out 
    // до тех пор пока c не закрыт, затем вызывает wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Запускаем goroutine чтобы закрыть out 
    // когда все output goroutine заверешены.
    // Это должно начнаться после вызова wg.Add.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

Запустить пример в песочнице play.golang.org

Ранняя остановка

В функциях нашего пайплайна есть паттерн:

  • Этапы закрывают свои исходящие каналы, когда все операции отправки завершены.
  • Этапы продолжают получать значения из входящих каналов, пока эти каналы не будут закрыты.

Этот паттерн позволяет каждому этапу приема быть записаным как range цикл и гарантирует, что все goroutine завершают работу после того, как все значения были успешно отправлены в нисходящем направлении.

Но в реальных пайплайнах этапы не всегда получают все входящие значения. Иногда это сделано специально: получателю может потребоваться только подмножество значений для достижения прогресса. Чаще всего этап завершается рано, поскольку входящее значение представляет собой ошибку на более раннем этапе. В любом случае получателю не нужно ждать поступления оставшихся значений, и мы хотим, чтобы более ранние этапы прекратили создавать значения, которые не нужны на более поздних этапах.

В нашем примере пайплайна, если этап не может использовать все входящие значения, go-процедуры, пытающиеся отправить эти значения, будут блокироваться бесконечно:

    // Используем первое значение из вывода.
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    return
    // Поскольку мы не получили второе значение из out,
    // одна из output goroutine зависла 
    // при попытке отправить его.
}

Это утечка ресурсов: go-процедуры потребляют ресурсы памяти и среды выполнения, а ссылки на кучу (heap) в стеках go-процедур не позволяют данным быть утилизированными сборщиком мусора. go-процедуры не утилизируются сборщиком мусора; они должны выйти самостоятельно.

Нам необходимо организовать выход вышестоящих этапов нашего пайплайна, даже если последующие этапы не смогут получить все входящие значения. Один из способов сделать это - изменить исходящие каналы, чтобы они имели буферы. Буфер может содержать фиксированное количество значений; операции отправки завершаются немедленно, если в буфере есть место:

c := make(chan int, 2) // размер буфера 2
c <- 1 // проходит успешно сразу
c <- 2 // проходит успешно сразу
c <- 3 // блокируется, пока другая goroutine 
       // не выполнит <-c и не получит 1

Когда количество отправляемых значений известно во время создания канала, буфер может упростить код. Например, мы можем переписать gen, чтобы скопировать список целых чисел в буферизованный канал и избежать создания новой goroutine:

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

Возвращаясь к заблокированным go-процедурам в нашем пайплайне, мы могли бы рассмотреть возможность добавления буфера к исходящему каналу, возвращаемому функцией merge:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int, 1) // достаточно места 
                             // для непрочитанных вводов
    // ... остальное без изменений ...

Запустить пример в песочнице play.golang.org

Хотя это исправляет заблокированную goroutine в этой программе, это плохой код. Выбор размера буфера равным 1 здесь зависит от знания количества значений, которые получит merge, и количества значений, которые будут использовать последующие этапы. Это хрупко: если мы передадим дополнительное значение в gen или если нижестоящая стадия считывает какие-либо меньшие значения, у нас снова будут заблокированы go-процедуры.

Вместо этого нам нужно предоставить способ для последующих этапов, чтобы указать отправителям, что они перестанут принимать входные данные.

Явная отмена

Когда main решает выйти без получения всех значений из out, она должна сообщить go-процедурам на вышестоящих этапах отказаться от значений, которые они пытаются отправить. Это делается путем отправки значений по каналу названному здесь done. Он отправляет два значения, поскольку потенциально могут быть два заблокированных отправителя:

func main() {
    in := gen(2, 3)

    // Распределяем работу sq по двум go-процедурам, 
    // которые обе читают из in.
    c1 := sq(in)
    c2 := sq(in)

    // Используем первое значение из вывода.
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 или 9

    // Сообщаем оставшимся отправителям, что мы уходим.
    done <- struct{}{}
    done <- struct{}{}
}

go-процедуры-отправители заменяют свою операцию отправки утверждением select, которое выполняется либо при отправке, либо при получении значения от done. Тип значения done - пустая структура, потому что значение не играет в данном случае никакой роли: это событие приема, которое указывает на то, что отсылка должна быть прекращена. output go-процедуры продолжают работать в цикле на своем входящем канале, с, поэтому вышестоящие этапы не блокируются. (Мы обсудим через некоторое время, как позволить этому циклу вернуться рано.)

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Запускаем output go-процедуру 
    // для каждого входного канала в cs.
    // output копирует значения из c в out до тех пор, 
    // пока c не закроется или не получит значение из done,
    // затем output вызывает wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ... остальное без изменений ...

Запустить пример в песочнице play.golang.org

У этого подхода есть проблема: каждому получателю в нисходящем направлении необходимо знать количество потенциально заблокированных отправителей в восходящем направлении и организовать сигнализацию этим отправителям о досрочном возврате. Отслеживание этих показателей утомительно и подвержено ошибкам.

Нам нужен способ сообщить неизвестному и неограниченному числу goroutine, чтобы они прекратили отправку своих значений вниз. В Go мы можем сделать это, закрыв канал, потому что операция приема на закрытом канале всегда может выполняться немедленно, давая нулевое значение типа элемента.

Это означает, что main может разблокировать всех отправителей, просто закрыв done канал. Это закрытие фактически является широковещательным сигналом для отправителей. Мы расширяем каждую из наших функций пайплайна, чтобы принимать done в качестве параметра, и организуем закрытие через утверждение defer, так что все пути возврата из main будут сигнализировать о завершении этапов пайплайна.

func main() {
    // Установливаем done канал, общий для всего пайплайна,
    // и закрываем этот канал при выходе из этого пайплайна 
    // в качестве сигнала для всех go-процедур, 
    // что мы начали выходить.
    done := make(chan struct{})
    defer close(done)

    in := gen(done, 2, 3)

    // Распределяем работу sq по двум goroutine, 
    // которые обе читают из in.
    c1 := sq(done, in)
    c2 := sq(done, in)

    // Используем первое значение из output.
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // done будет закрыт отложенным вызовом.
}

Каждый из наших этапов пайплайна теперь может вернуться, как только done будет закрыт. output процедура в merge может вернуться, не опустошая свой входящий канал, так как она знает, что вышестоящий отправитель sq прекратит попытки отправки, когда done закрыт. output гарантирует, что wg.Done вызывается на всех путях возврата с помощью утверждения defer:

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Запускаем output goroutine 
    // для каждого входного канала в cs.
    // output копирует значения из c в out 
    // до закрытия c или done, 
    // затем вызывает wg.Done.
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ... остальное без изменений ...

Аналогично, sq может вернуться, как только done будет закрыт. sq обеспечивает закрытие своего out канала на всех путях возврата с помощью утверждения defer:

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

Запустить пример в песочнице play.golang.org

Вот рекомендации по созданию пайплайнов:

  • Этапы закрывают свои исходящие каналы, когда все операции отправки завершены.
  • Этапы продолжают получать значения из входящих каналов до тех пор, пока эти каналы не будут закрыты или отправители не будут разблокированы.

Пайплайны разблокируют отправителей либо путем обеспечения достаточного буфера для всех отправляемых значений, либо путем явной сигнализации отправителям, когда получатель может покинуть канал.


Читайте также:


Комментариев нет:

Отправить комментарий