вторник, 22 октября 2019 г.

Паттерны конкурентности в Golang: пайплайны, пример - получение дайджеста файлов

Рассмотрим более реалистичный пример пайплайна, чем в предыдущем посте.

MD5 - это алгоритм дайджеста сообщений, полезный в качестве контрольной суммы файла. Утилита командной строки md5sum выводит дайджест-значения для списка файлов.

% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

Наш пример программы похож на md5sum, но вместо этого принимает в качестве аргумента один каталог и печатает значения дайджеста для каждого обычного файла в этом каталоге, отсортированные по имени пути.

% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

main функция нашей программы вызывает вспомогательную функцию MD5All, которая возвращает карту (map) имени пути к значению дайджеста, затем сортирует и печатает результаты:

func main() {
    // Рассчитать MD5 сумму всех файлов 
    // в указанном каталоге,
    // затем печатаем результаты, 
    // отсортированные по имени пути.
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x  %s\n", m[path], path)
    }
}

Функция MD5All находится в центре нашего обсуждения. В serial.go реализация не использует конкурентность, а просто читает и суммирует каждый файл по мере обхода дерева.

// MD5All читает все файлы в дереве файлов с корнем в root 
// и возвращает карту пути к файлу к MD5 сумме 
// содержимого файла. Если происходит сбой прохода
// по каталогу или сбой любой операции чтения, 
// MD5All возвращает ошибку.
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if !info.Mode().IsRegular() {
            return nil
        }
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        }
        m[path] = md5.Sum(data)
        return nil
    })
    if err != nil {
        return nil, err
    }
    return m, nil
}

Параллельное получение digest'а

В parallel.go, мы разделили MD5All на двухступенчатый пайплайн. Первый этап, sumFiles, обходит дерево, получает digest каждого файла в новой go-процедуре и отправляет результаты по каналу с типом значения result:

type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}

sumFiles возвращает два канала: один для результатов и другой для ошибки, возвращаемой filepath.Walk. Функция walk запускает новую go-процедуру для обработки каждого обычного файла, а затем проверяет done. Если done закрыт, walk немедленно останавливается:

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // Для каждого обычного файла запускаем goroutine, 
    // которая суммирует файл и отправляет
    // результат в c. Ошибки walk отправляются в errc.
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // Завершаем walk если done закрыт.
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Walk вернулся, 
        // поэтому все вызовы wg.Add завершены. 
        // Начинаем goroutine для закрытия c, 
        // как только все посылки сделаны.
        go func() {
            wg.Wait()
            close(c)
        }()
        // select не нужен здесь, поскольку errc буферизован.
        errc <- err
    }()
    return c, errc
}

MD5All получает значения дайджеста от c. MD5All возвращается рано при ошибке, закрытие осуществляется с помощью defer:

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All закрывает done канал при возврате; 
    // это может быть сделано
    // до получения всех значений от c и errc.
    done := make(chan struct{})
    defer close(done)

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

Ограниченный параллелизм

Реализация MD5All в parallel.go запускает новую goroutine для каждого файла. В каталоге со многими большими файлами это может аллоцировать больше памяти, чем доступно на машине.

Мы можем ограничить эти аллокации, ограничив число файлов, читаемых параллельно. В bounded.go мы делаем это, создавая фиксированное количество go-процедур для чтения файлов. Наш пайплайн теперь имеет три этапа: пройтись по дереву, прочитать файлы и получить дайджесты файлов и собрать дайджесты.

Первый этап, walkFiles, генерирует пути обычных файлов в дереве:

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // Закрываем paths канал после возврата Walk.
        defer close(paths)
        // select не требуется для этой отправки, 
        // поскольку errc буферизован.
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return paths, errc
}

Средняя стадия запускает фиксированное число digester go-процедур, которые получают имена файлов из путей и отправляют результаты по каналу c:

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}

В отличие от наших предыдущих примеров, digester не закрывает свой выходной канал, так как несколько go-процедур отправляют по общему каналу. Вместо этого код в MD5All организует закрытие канала после завершения работы всех digester:

// Запускаем фиксированное количество go-процедур 
// для чтения и получения дайджеста файлов.
c := make(chan result)
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
    go func() {
        digester(done, paths, c)
        wg.Done()
    }()
}
go func() {
    wg.Wait()
    close(c)
}()

Вместо этого мы могли бы сделать так чтобы каждый digester создавал и возвращал свой собственный выходной канал, но тогда нам потребовались бы дополнительные go-процедуры для сдувания (fan-in) результатов.

Последний этап получает все результаты от c, затем проверяет ошибку от errc. Эта проверка не может произойти раньше, так как до этого момента walkFiles может блокировать отправку значений вниз:

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // Проверяем на пройзошел ли сбой Walk.
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

Заключение

В этом и предыдущем посте представлены методы построения потоковых пайплайнов данных в Go. Устранение сбоев в таких пайплайнах является сложной задачей, поскольку каждая ступень в пайплайне может блокировать попытки отправки значений в нисходящем направлении, а нисходящие ступени могут больше не заботиться о поступающих данных. Мы показали, как закрытие канала может транслировать сигнал "выполнено" на все go-процедуры, запущенные пайплайном, и определили правила правильного построения пайплайнов.


Читайте также:


Комментариев нет:

Отправить комментарий