Один из наиболее распространенных вариантов использования любого клиента Elasticsearch - как можно быстрее и эффективнее индексировать документы в Elasticsearch. Самый простой вариант, использующий простой Elasticsearch Bulk API, имеет множество недостатков: вам нужно вручную подготовить пары метаданных и данных полезной нагрузки, разделить полезную нагрузку на пакеты, десериализовать ответ, проверить результаты на наличие ошибок, отобразить отчет и так далее. Пример по умолчанию в репозитории довольно красноречиво демонстрирует, насколько все это задействовано.
По этой причине клиент предоставляет вспомогательный компонент esutil.BulkIndexer, аналогичный массовым помощникам в других клиентах:
$ go doc -short github.com/elastic/go-elasticsearch/v7/esutil.BulkIndexer
type BulkIndexer interface {
// Add добавляет элемент в индексатор.
// ...
Add(context.Context, BulkIndexerItem) error
// Close ожидает,
// пока все добавленные элементы не будут записаны,
// и закрывает индексатор.
Close(context.Context) error
// Stats возвращает статистику индексатора.
Stats() BulkIndexerStats
}
Как видите, интерфейс довольно минимален и позволяет добавлять отдельные элементы в индексатор, закрывать операции индексирования, когда больше нет элементов для добавления, и получать статистику по операциям. Компонент позаботится о сериализации элементов и подготовке полезной нагрузки, отправке ее партиями и распараллеливании операций безопасным для одновременного доступа способом.
Индексатор настраивается с помощью структуры esutil.BulkIndexerConfig, переданной в качестве аргумента конструктору:
$ go doc -short github.com/elastic/go-elasticsearch/v7/esutil.BulkIndexerConfig
type BulkIndexerConfig struct {
NumWorkers int // Количество воркеров. По умолчанию используется runtime.NumCPU().
FlushBytes int // Порог записи в байтах. По умолчанию 5MB.
FlushInterval time.Duration // Порог записи по продолжительности. По умолчанию 30 секунд.
Client *elasticsearch.Client // Elasticsearch клиент.
Decoder BulkResponseJSONDecoder // Пользовательский JSON декодер.
DebugLogger BulkIndexerDebugLogger // Дополнительный регистратор для отладки.
OnError func(context.Context, error) // Вызывается при ошибках индексатора.
OnFlushStart func(context.Context) context.Context // Вызывается при запуске записи.
OnFlushEnd func(context.Context) // Вызывается по окончании записи.
// Параметры Bulk API.
Index string
// ...
}
Поле NumWorkers контролирует уровень распараллеливания, то есть устанавливает количество рабочих процессов, выполняющих операции записи. Поля FlushBytes и FlushInterval устанавливают пороговые значения для операции записи на основе содержимого полезной нагрузки или временного интервала. Важно поэкспериментировать с разными значениями этих параметров, адаптированными к вашим данным и среде.
Репозиторий содержит исполняемый скрипт, который позволяет легко экспериментировать с различными настройками этих параметров. Крайне важно запустить его в топологии, отражающей вашу производственную среду. Например, на обычном ноутбуке, работающем с локальным кластером, пропускная способность индексатора составляет около 10 000 небольших документов в секунду. Когда индексатор работает на выделенной машине против удаленного кластера на реалистичном оборудовании, пропускная способность приближается к 300 000 документов в секунду.
Поле Client позволяет передать экземпляр elasticsearch.Client с любой желаемой конфигурацией для ведения журнала, безопасности, повторных попыток, настраиваемого транспорта и т. д.
Следуя общей теме расширяемости, поле Decoder принимает тип, реализующий интерфейс esutil.BulkResponseJSONDecoder, что позволяет использовать более эффективный кодировщик JSON, чем encoding/json стандартной библиотеки.
Документы для индексации добавляются в индексатор как esutil.BulkIndexerItem:
go doc -short github.com/elastic/go-elasticsearch/v7/esutil.BulkIndexerItem
type BulkIndexerItem struct {
Index string
Action string
DocumentID string
Body io.Reader
RetryOnConflict *int
OnSuccess func(context.Context, BulkIndexerItem, BulkIndexerResponseItem) // Для каждого элемента
OnFailure func(context.Context, BulkIndexerItem, BulkIndexerResponseItem, error) // Для каждого элемента
}
Давайте объединим всю эту информацию, пройдя по коду примера репозитория. Клонируйте репозиторий и запустите cd _examples/bulk && go run indexer.go, чтобы запустить его локально.
В примере индексируется структура данных, определяемая типами Article и Author:
type Article struct {
ID int `json:"id"`
Title string `json:"title"`
Body string `json:"body"`
Published time.Time `json:"published"`
Author Author `json:"author"`
}
type Author struct {
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
}
Сначала мы создадим клиент Elasticsearch, используя сторонний пакет cenkalti/backoff/ для реализации экспоненциальной отсрочки.
// Используйте сторонний пакет для реализации функции отсрочки
//
retryBackoff := backoff.NewExponentialBackOff()
es, err := elasticsearch.NewClient(elasticsearch.Config{
// Повторить попытку при 429 TooManyRequests статусе
//
RetryOnStatus: []int{502, 503, 504, 429},
// Настраиваем функцию отсрочки
//
RetryBackoff: func(i int) time.Duration {
if i == 1 {
retryBackoff.Reset()
}
return retryBackoff.NextBackOff()
},
// Повтор до 5 попыток
//
MaxRetries: 5,
})
Далее мы создадим массовый индексатор:
// Создаем BulkIndexer
//
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: indexName, // Имя индекса по умолчанию
Client: es, // Elasticsearch клиент
NumWorkers: numWorkers, // Количество рабочих goroutines
FlushBytes: int(flushBytes), // Порог записи в байтах
FlushInterval: 30 * time.Second, // Интервал периодической записи
})
Создадим данные для индексации:
var articles []*Article
names := []string{"Alice", "John", "Mary"}
for i := 1; i <= numItems; i++ {
articles = append(articles, &Article{
ID: i,
Title: strings.Join([]string{"Title", strconv.Itoa(i)}, " "),
Body: "Lorem ipsum dolor sit amet...",
Published: time.Now().Round(time.Second).UTC().AddDate(0, 0, i),
Author: Author{
FirstName: names[rand.Intn(len(names))],
LastName: "Smith",
},
})
}
Примечание. Переменные indexName, numWorkers, flushBytes и numItems устанавливаются с помощью флагов командной строки; смотрите go run indexer.go --help.
Теперь мы можем перебрать коллекцию статей, добавляя каждый элемент в индексатор:
var countSuccessful uint64
start := time.Now().UTC()
for _, a := range articles {
// Готовим полезные данные: кодируем статью в JSON
//
data, err := json.Marshal(a)
if err != nil {
log.Fatalf("Cannot encode article %d: %s", a.ID, err)
}
// Добавляем элемент в BulkIndexer
//
err = bi.Add(
context.Background(),
esutil.BulkIndexerItem{
// Поле Action настраивает операцию для выполнения (index, create, delete, update)
Action: "index",
// DocumentID это (необязательный) идентификатор документа
DocumentID: strconv.Itoa(a.ID),
// Body это `io.Reader` с полезной нагрузкой
Body: bytes.NewReader(data),
// OnSuccess вызывается для каждой успешной операции
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
atomic.AddUint64(&countSuccessful, 1)
},
// OnFailure вызывается для каждой неудачной операции
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
log.Printf("ERROR: %s", err)
} else {
log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
}
},
},
)
if err != nil {
log.Fatalf("Unexpected error: %s", err)
}
}
Индексатор будет отправлять элементы в кластер партиями в соответствии с настроенными пороговыми значениями. В нашем случае коллекция исчерпана, поэтому мы закроем индексатор, чтобы очистить все оставшиеся буферы:
if err := bi.Close(context.Background()); err != nil {
log.Fatalf("Unexpected error: %s", err)
}
Индексатор собирает ряд показателей с помощью типа esutil.BulkIndexerStats:
$ go doc -short github.com/elastic/go-elasticsearch/v7/esutil.BulkIndexerStats
type BulkIndexerStats struct {
NumAdded uint64
NumFlushed uint64
NumFailed uint64
NumIndexed uint64
NumCreated uint64
NumUpdated uint64
NumDeleted uint64
NumRequests uint64
}
Давайте воспользуемся им, чтобы отобразить простой отчет обо всей операции, используя пакет dustin/go-humanize для лучшей читаемости:
biStats := bi.Stats()
dur := time.Since(start)
if biStats.NumFailed > 0 {
log.Fatalf(
"Indexed [%s] documents with [%s] errors in %s (%s docs/sec)",
humanize.Comma(int64(biStats.NumFlushed)),
humanize.Comma(int64(biStats.NumFailed)),
dur.Truncate(time.Millisecond),
humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))),
)
} else {
log.Printf(
"Sucessfuly indexed [%s] documents in %s (%s docs/sec)",
humanize.Comma(int64(biStats.NumFlushed)),
dur.Truncate(time.Millisecond),
humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))),
)
}
// => Successfully indexed [10,000] documents in 1.622s (6,165 docs/sec)
Этот пример иллюстрирует внутреннюю работу массового индексатора и доступные параметры конфигурации. Чтобы увидеть, как использовать его в реалистичном приложении, взгляните на пример _examples/bulk/kafka в репозитории. Он использует Docker для запуска полной среды с Zookeeper, Kafka, Confluent Control Center, Elasticsearch, APM Server и Kibana и демонстрирует получение данных, полученных из темы Kafka. Чтобы попробовать его локально, просто следуйте инструкциям в репозитории.
Читайте также:
- Клиент Go для Elasticsearch
- Клиент Go для Elasticsearch: пакет esapi
- Клиент Go для Elasticsearch: пакет estransport
- Клиент Go для Elasticsearch: конфигурация и кастомизация
- Клиент Go для Elasticsearch: кодирование и декодирование полезной нагрузки JSON
Комментариев нет:
Отправить комментарий