суббота, 8 мая 2021 г.

Клиент Go для Elasticsearch: массовая индексация

Один из наиболее распространенных вариантов использования любого клиента Elasticsearch - как можно быстрее и эффективнее индексировать документы в Elasticsearch. Самый простой вариант, использующий простой Elasticsearch Bulk API, имеет множество недостатков: вам нужно вручную подготовить пары метаданных и данных полезной нагрузки, разделить полезную нагрузку на пакеты, десериализовать ответ, проверить результаты на наличие ошибок, отобразить отчет и так далее. Пример по умолчанию в репозитории довольно красноречиво демонстрирует, насколько все это задействовано.

По этой причине клиент предоставляет вспомогательный компонент esutil.BulkIndexer, аналогичный массовым помощникам в других клиентах:

$ go doc -short github.com/elastic/go-elasticsearch/v7/esutil.BulkIndexer
type BulkIndexer interface {
  // Add добавляет элемент в индексатор.
  // ...
  Add(context.Context, BulkIndexerItem) error

  // Close ожидает, 
  // пока все добавленные элементы не будут записаны, 
  // и закрывает индексатор.
  Close(context.Context) error

  // Stats возвращает статистику индексатора.
  Stats() BulkIndexerStats
}

Как видите, интерфейс довольно минимален и позволяет добавлять отдельные элементы в индексатор, закрывать операции индексирования, когда больше нет элементов для добавления, и получать статистику по операциям. Компонент позаботится о сериализации элементов и подготовке полезной нагрузки, отправке ее партиями и распараллеливании операций безопасным для одновременного доступа способом.

Индексатор настраивается с помощью структуры esutil.BulkIndexerConfig, переданной в качестве аргумента конструктору:

$ go doc -short github.com/elastic/go-elasticsearch/v7/esutil.BulkIndexerConfig
type BulkIndexerConfig struct {
  NumWorkers    int           // Количество воркеров. По умолчанию используется runtime.NumCPU().
  FlushBytes    int           // Порог записи в байтах. По умолчанию 5MB.
  FlushInterval time.Duration // Порог записи по продолжительности. По умолчанию 30 секунд.

  Client      *elasticsearch.Client   // Elasticsearch клиент.
  Decoder     BulkResponseJSONDecoder // Пользовательский JSON декодер.
  DebugLogger BulkIndexerDebugLogger  // Дополнительный регистратор для отладки.

  OnError      func(context.Context, error)          // Вызывается при ошибках индексатора.
  OnFlushStart func(context.Context) context.Context // Вызывается при запуске записи.
  OnFlushEnd   func(context.Context)                 // Вызывается по окончании записи.

  // Параметры Bulk API.
  Index               string
  // ...
}

Поле NumWorkers контролирует уровень распараллеливания, то есть устанавливает количество рабочих процессов, выполняющих операции записи. Поля FlushBytes и FlushInterval устанавливают пороговые значения для операции записи на основе содержимого полезной нагрузки или временного интервала. Важно поэкспериментировать с разными значениями этих параметров, адаптированными к вашим данным и среде.

Репозиторий содержит исполняемый скрипт, который позволяет легко экспериментировать с различными настройками этих параметров. Крайне важно запустить его в топологии, отражающей вашу производственную среду. Например, на обычном ноутбуке, работающем с локальным кластером, пропускная способность индексатора составляет около 10 000 небольших документов в секунду. Когда индексатор работает на выделенной машине против удаленного кластера на реалистичном оборудовании, пропускная способность приближается к 300 000 документов в секунду.

Поле Client позволяет передать экземпляр elasticsearch.Client с любой желаемой конфигурацией для ведения журнала, безопасности, повторных попыток, настраиваемого транспорта и т. д.

Следуя общей теме расширяемости, поле Decoder принимает тип, реализующий интерфейс esutil.BulkResponseJSONDecoder, что позволяет использовать более эффективный кодировщик JSON, чем encoding/json стандартной библиотеки.

Документы для индексации добавляются в индексатор как esutil.BulkIndexerItem:

go doc -short github.com/elastic/go-elasticsearch/v7/esutil.BulkIndexerItem
type BulkIndexerItem struct {
  Index           string
  Action          string
  DocumentID      string
  Body            io.Reader
  RetryOnConflict *int

  OnSuccess func(context.Context, BulkIndexerItem, BulkIndexerResponseItem)        // Для каждого элемента
  OnFailure func(context.Context, BulkIndexerItem, BulkIndexerResponseItem, error) // Для каждого элемента
}

Давайте объединим всю эту информацию, пройдя по коду примера репозитория. Клонируйте репозиторий и запустите cd _examples/bulk && go run indexer.go, чтобы запустить его локально.

В примере индексируется структура данных, определяемая типами Article и Author:

type Article struct {
  ID        int       `json:"id"`
  Title     string    `json:"title"`
  Body      string    `json:"body"`
  Published time.Time `json:"published"`
  Author    Author    `json:"author"`
}

type Author struct {
  FirstName string `json:"first_name"`
  LastName  string `json:"last_name"`
}

Сначала мы создадим клиент Elasticsearch, используя сторонний пакет cenkalti/backoff/ для реализации экспоненциальной отсрочки.

// Используйте сторонний пакет для реализации функции отсрочки
//
retryBackoff := backoff.NewExponentialBackOff()

es, err := elasticsearch.NewClient(elasticsearch.Config{
// Повторить попытку при 429 TooManyRequests статусе
//
RetryOnStatus: []int{502, 503, 504, 429},

// Настраиваем функцию отсрочки
//
RetryBackoff: func(i int) time.Duration {
  if i == 1 {
    retryBackoff.Reset()
  }
  return retryBackoff.NextBackOff()
},

// Повтор до 5 попыток
//
MaxRetries: 5,
})

Далее мы создадим массовый индексатор:

// Создаем BulkIndexer
//
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
  Index:         indexName,        // Имя индекса по умолчанию
  Client:        es,               // Elasticsearch клиент
  NumWorkers:    numWorkers,       // Количество рабочих goroutines
  FlushBytes:    int(flushBytes),  // Порог записи в байтах
  FlushInterval: 30 * time.Second, // Интервал периодической записи
})

Создадим данные для индексации:

var articles []*Article
names := []string{"Alice", "John", "Mary"}
for i := 1; i <= numItems; i++ {
  articles = append(articles, &Article{
    ID:        i,
    Title:     strings.Join([]string{"Title", strconv.Itoa(i)}, " "),
    Body:      "Lorem ipsum dolor sit amet...",
    Published: time.Now().Round(time.Second).UTC().AddDate(0, 0, i),
    Author: Author{
      FirstName: names[rand.Intn(len(names))],
      LastName:  "Smith",
    },
  })
}

Примечание. Переменные indexName, numWorkers, flushBytes и numItems устанавливаются с помощью флагов командной строки; смотрите go run indexer.go --help.

Теперь мы можем перебрать коллекцию статей, добавляя каждый элемент в индексатор:

var countSuccessful uint64
start := time.Now().UTC()

for _, a := range articles {
  // Готовим полезные данные: кодируем статью в JSON
  //
  data, err := json.Marshal(a)
  if err != nil {
    log.Fatalf("Cannot encode article %d: %s", a.ID, err)
  }

  // Добавляем элемент в BulkIndexer
  //
  err = bi.Add(
    context.Background(),
    esutil.BulkIndexerItem{
      // Поле Action настраивает операцию для выполнения (index, create, delete, update)
      Action: "index",

      // DocumentID это (необязательный) идентификатор документа
      DocumentID: strconv.Itoa(a.ID),

      // Body это `io.Reader` с полезной нагрузкой
      Body: bytes.NewReader(data),

      // OnSuccess вызывается для каждой успешной операции
      OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
        atomic.AddUint64(&countSuccessful, 1)
      },

      // OnFailure вызывается для каждой неудачной операции
      OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
        if err != nil {
          log.Printf("ERROR: %s", err)
        } else {
          log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
        }
      },
    },
  )
  if err != nil {
    log.Fatalf("Unexpected error: %s", err)
  }
}

Индексатор будет отправлять элементы в кластер партиями в соответствии с настроенными пороговыми значениями. В нашем случае коллекция исчерпана, поэтому мы закроем индексатор, чтобы очистить все оставшиеся буферы:

if err := bi.Close(context.Background()); err != nil {
  log.Fatalf("Unexpected error: %s", err)
}

Индексатор собирает ряд показателей с помощью типа esutil.BulkIndexerStats:

$ go doc -short github.com/elastic/go-elasticsearch/v7/esutil.BulkIndexerStats
type BulkIndexerStats struct {
  NumAdded    uint64
  NumFlushed  uint64
  NumFailed   uint64
  NumIndexed  uint64
  NumCreated  uint64
  NumUpdated  uint64
  NumDeleted  uint64
  NumRequests uint64
}

Давайте воспользуемся им, чтобы отобразить простой отчет обо всей операции, используя пакет dustin/go-humanize для лучшей читаемости:

biStats := bi.Stats()
dur := time.Since(start)

if biStats.NumFailed > 0 {
  log.Fatalf(
    "Indexed [%s] documents with [%s] errors in %s (%s docs/sec)",
    humanize.Comma(int64(biStats.NumFlushed)),
    humanize.Comma(int64(biStats.NumFailed)),
    dur.Truncate(time.Millisecond),
    humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))),
  )
} else {
  log.Printf(
    "Sucessfuly indexed [%s] documents in %s (%s docs/sec)",
    humanize.Comma(int64(biStats.NumFlushed)),
    dur.Truncate(time.Millisecond),
    humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))),
  )
}

// => Successfully indexed [10,000] documents in 1.622s (6,165 docs/sec)

Этот пример иллюстрирует внутреннюю работу массового индексатора и доступные параметры конфигурации. Чтобы увидеть, как использовать его в реалистичном приложении, взгляните на пример _examples/bulk/kafka в репозитории. Он использует Docker для запуска полной среды с Zookeeper, Kafka, Confluent Control Center, Elasticsearch, APM Server и Kibana и демонстрирует получение данных, полученных из темы Kafka. Чтобы попробовать его локально, просто следуйте инструкциям в репозитории.


Читайте также:


Комментариев нет:

Отправить комментарий