среда, 5 мая 2021 г.

Клиент Go для Elasticsearch: конфигурация и кастомизация

В предыдущих постах мы видели, что кажущаяся простая задача клиента Elasticsearch - перемещение данных между вызывающим кодом и кластером - на самом деле довольно сложна под капотом. Естественно, как бы создатели ни старались сделать поведение клиента по умолчанию оптимальным для большинства сценариев, бывают ситуации, когда клиенты захотят настроить, кастомизировать или включить/отключить определенные функции.

Конструктор клиента Go принимает тип elasticsearch.Config{}, который предоставляет ряд параметров для управления поведением:

$ go doc -short github.com/elastic/go-elasticsearch/v7.Config
type Config struct {
  Addresses []string // Список используемых узлов Elasticsearch.
  // ...
}

Рассмотрим доступные варианты и примеры их использования.

Конечные точки и безопасность

Первое, что вы можете сделать, если вы просто не экспериментируете с клиентом локально, - это указать ему на удаленный кластер. Самый простой способ сделать это - экспортировать переменную ELASTICSEARCH_URL со списком URL-адресов узлов, разделенных запятыми. Это соответствует рекомендации Двенадцати факторов, полностью исключает конфигурацию из кодовой базы и хорошо работает с облачными функциями/лямбда-выражениями и системами оркестровки контейнеров, такими как Kubernetes.

// В main.go
es, err := elasticsearch.NewDefaultClient()
// ...

// В командной строке
$ ELASTICSEARCH_URL=https://foo:bar@es1:9200 go run main.go

// Для Google Cloud Function
$ gcloud functions deploy myfunction --set-env-vars ELASTICSEARCH_URL=https://foo:bar@es1:9200 ...

Чтобы настроить конечные точки кластера напрямую (например, когда вы загружаете их из файла конфигурации или извлекаете их из системы управления секретами, такой как Vault), используйте поле Addresses со срезом строк с URL-адресами узлов, к которым вы хотите подключиться, и поля имени пользователя и пароля для аутентификации:

var (
  clusterURLs = []string{"https://es1:9200", "https://es2:9200", "https://es3:9200"}
  username    = "foo"
  password    = "bar"
)
cfg := elasticsearch.Config{
  Addresses: clusterURLs,
  Username:  username,
  Password:  password,
}
es, err := elasticsearch.NewClient(cfg)
// ...

Используйте поле APIKey для аутентификации с помощью ключей API, которыми проще управлять через Elasticsearch API или Kibana, чем через имена пользователей и пароли.

При использовании Elasticsearch Service в Elastic Cloud вы можете указать клиенту на кластер, используя Cloud ID:

cfg := elasticsearch.Config{
  CloudID: "my-cluster:dXMtZWFzdC0xLZC5pbyRjZWM2ZjI2MWE3NGJm...",
  APIKey:  "VnVhQ2ZHY0JDZGJrUW...",
}
es, err := elasticsearch.NewClient(cfg)
// ...

Примечание. Не забывайте, что вам все равно необходимо предоставить учетные данные для аутентификации при использовании Cloud ID.

Обычная потребность в настраиваемых развертываниях или демонстрациях, ориентированных на безопасность, - предоставить клиенту центр сертификации, чтобы он мог проверить сертификат сервера. Это может быть достигнуто несколькими способами, но наиболее простой вариант - использовать поле CACert, передав ему часть байтов с полезной нагрузкой сертификата:

cert, _ := ioutil.ReadFile("path/to/ca.crt")
cfg := elasticsearch.Config{
  // ...
  CACert: cert,
}
es, err := elasticsearch.NewClient(cfg)
// ...

Взгляните на папку _examples/security репозитория, чтобы увидеть полную демонстрацию, в которой вы можете генерировать пользовательские сертификаты с помощью утилиты командной строки elasticsearch-certutil и запускать кластер с соответствующей конфигурацией.

Глобальные заголовки

В определенных сценариях, например, при использовании аутентификации на основе токенов или при взаимодействии с прокси-серверами, вам может потребоваться добавить заголовки HTTP к клиентским запросам. Хотя вы можете добавлять их в вызовы API, используя метод WithHeader(), удобнее устанавливать их глобально, в конфигурации клиента:

cfg := elasticsearch.Config{
  // ...
  Header: http.Header(map[string][]string{
    "Authorization": {"Bearer dGhpcyBpcyBub3QgYSByZWFs..."},
  }),
}
es, err := elasticsearch.NewClient(cfg)
// ...

Ведение журнала и метрики

Во время разработки крайне важно внимательно следить за тем, что отправляется в кластер Elasticsearch и что он получает. Самый простой способ добиться этого - просто распечатать детали запроса и ответа на консоли или в файле. Клиентский пакет Go предоставляет несколько отдельных компонентов регистратора. Для отладки во время разработки, возможно, наиболее полезен estransport.ColorLogger - он выводит на консоль сжатую, отформатированную и раскрашенную информацию:

cfg := elasticsearch.Config{
  Logger: &estransport.ColorLogger{Output: os.Stdout},
}
es, _ := elasticsearch.NewClient(cfg)
es.Info()
// > GET http://localhost:9200/ 200 OK 11ms

По умолчанию тело запроса и ответа не печатается - для этого установите соответствующие параметры регистратора:

cfg := elasticsearch.Config{
  Logger: &estransport.ColorLogger{
    Output:             os.Stdout,
    EnableRequestBody:  true,
    EnableResponseBody: true,
  },
}
es, _ := elasticsearch.NewClient(cfg)
es.Info()
// > GET http://localhost:9200/ 200 OK 6ms
// >     « {
// >     «   "name" : "es1",
// >     «   "cluster_name" : "go-elasticsearch",
// >     ...

Примечание. Компонент estransport.TextLogger печатает информацию без использования каких-либо специальных символов и цветов.

Обращаясь за помощью на форумах, часто бывает полезно представить список операций в независимом формате, чтобы их можно было легко "воспроизвести" локально, без понимания или установки определенного языка программирования. Идеальный способ сделать это - отформатировать вывод как последовательность исполняемых команд curl с помощью estransport.CurlLogger:

cfg := elasticsearch.Config{
  Logger: &estransport.CurlLogger{
    Output:             os.Stdout,
    EnableRequestBody:  true,
    EnableResponseBody: true,
  },
}
es, _ := elasticsearch.NewClient(cfg)
es.Index(
  "test",
  strings.NewReader(`{"title" : "logging"}`),
  es.Index.WithRefresh("true"),
  es.Index.WithPretty(),
  es.Index.WithFilterPath("result", "_id"),
)
// > curl -X POST -H 'Content-Type: application/json' 'http://localhost:9200/test/_doc?pretty&filter_path=result%2C_id&refresh=true' -d \
// > '{
// > "title": "logging"
// > }'
// > # => 2020-07-23T13:12:05Z [201 Created] 65ms
// > # {
// > #  "_id": "_YPNe3MBdF-KdkKEZqF_",
// > #  "result": "created"
// > # }

При регистрации клиентских операций в производственной среде вывод в виде обычного текста не подходит, потому что вы хотите хранить журналы в виде структурированных данных, вполне возможно, в самом Elasticsearch. В этом случае вы можете использовать estransport.JSONLogger для вывода записей в виде документов JSON в формате, совместимом с Elastic Common Schema (ECS):

cfg := elasticsearch.Config{
  Logger: &estransport.JSONLogger{Output: os.Stdout},
}
es, _ := elasticsearch.NewClient(cfg)
es.Info()
// > {"@timestamp":"2020-07-23T13:12:05Z","event":{"duration":10970000},"url":{"scheme":"http","domain":"localhost","port":9200,"path":"/","query":""},"http":{"request":{"method":"GET"},"response":{"status_code":200}}}

В соответствии с духом расширяемости клиента все перечисленные регистраторы являются всего лишь реализациями интерфейса estransport.Logger. Чтобы использовать настраиваемый регистратор, просто реализуйте этот интерфейс:

$ go doc -short github.com/elastic/go-elasticsearch/v7/estransport.Logger
type Logger interface {
  LogRoundTrip(*http.Request, *http.Response, error, time.Time, time.Duration) error
  // ...
}

Пример _examples/logging/custom.go демонстрирует, как использовать пакет github.com/rs/zerolog в качестве "драйвера" журналирования путем реализации интерфейса для типа CustomLogger.

Еще одна функция клиента, полезная в производстве или для отладки, - это возможность экспортировать различные метрики о себе: количество запросов и сбоев, коды состояния ответа и подробные сведения о соединениях. Установите для параметра EnableMetrics значение true и используйте метод Metrics() для получения информации. Пример _examples/instrumentation/expvar.go показывает интеграцию с пакетом expvar.

Повторные попытки

Клиент управляет подключениями и повторяет запросы при определенных условиях. Теперь давайте посмотрим на соответствующие параметры конфигурации.

По умолчанию клиент повторяет запрос до трех раз; чтобы установить другой предел, используйте поле MaxRetries. Чтобы изменить список кодов состояния ответа, которые следует повторить, используйте поле RetryOnStatus. Вместе с опцией RetryBackoff вы можете использовать его для повторения запросов, когда сервер отправляет ответ 429 Too Many Requests:

cfg := elasticsearch.Config{
  RetryOnStatus: []int{429, 502, 503, 504},
  RetryBackoff:  func(i int) time.Duration {
    // Простая экспоненциальная задержка
    d := time.Duration(math.Exp2(float64(i))) * time.Second
    fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d)
    return d
  },
}
es, err := elasticsearch.NewClient(cfg)
// ...

Поскольку функция RetryBackoff возвращает только time.Duration, вы можете обеспечить более надежную реализацию отката, используя сторонний пакет, такой как github.com/cenkalti/backoff:

import "github.com/cenkalti/backoff/v4"

retryBackoff := backoff.NewExponentialBackOff()
retryBackoff.InitialInterval = time.Second

cfg := elasticsearch.Config{
  RetryOnStatus: []int{429, 502, 503, 504},
  RetryBackoff: func(i int) time.Duration {
    if i == 1 {
      retryBackoff.Reset()
    }
    d := retryBackoff.NextBackOff()
    fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d)
    return d
  },
}
es, err := elasticsearch.NewClient(cfg)
// ...

Обнаружение узлов

Параметры DiscoverNodesOnStart и DiscoverNodesInterval определяют, должен ли клиент выполнять обнаружение узлов ("прослушивание") во время инициализации или периодически; оба отключены по умолчанию.

Примечание. Включите обнаружение узлов только тогда, когда клиент подключен к кластеру напрямую, а не когда кластер находится за прокси-сервером, что также имеет место при использовании Elasticsearch Service.

Поле ConnectionPoolFunc позволяет предоставить настраиваемую реализацию пула соединений, которая удовлетворяет интерфейсу estransport.ConnectionPool. Однако это обычно полезно только в сложных топологиях сети.

Пользовательский селектор подключения, передаваемый через поле Selector, может быть более практичным в определенных ситуациях, например, с учетом зоны AWS, или для реализации взвешенного циклического селектора.

Наиболее инвазивным вариантом конфигурации является поле Transport, которое позволяет полностью заменить HTTP-клиент по умолчанию, используемый пакетом, а именно http.DefaultTransport. Возможно, вы захотите сделать это в ситуациях, когда вы хотите настроить таймауты, параметры TLS или прокси-сервера или любые другие низкоуровневые детали HTTP:

cfg := elasticsearch.Config{
  // ...
  Transport: &http.Transport{
    Proxy: ...
    MaxIdleConnsPerHost:   ...,
    ResponseHeaderTimeout: ...,
    DialContext:           (&net.Dialer{
      Timeout:   ...,
      KeepAlive: ...,
    }).DialContext,
    TLSClientConfig: &tls.Config{
      MinVersion: ...,
      // ...
    },
  },
}
es, err := elasticsearch.NewClient(cfg)
// ...

Кастомный транспорт

Поскольку поле Transport принимает любую реализацию http.RoundTripper, можно передать настраиваемую реализацию. Давайте посмотрим на пример, в котором мы подсчитываем количество запросов, следуя демонстрации _examples/customization.go:

type CountingTransport struct {
  count uint64
}

func (t *CountingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
  atomic.AddUint64(&t.count, 1)
  return http.DefaultTransport.RoundTrip(req)
}

tp := CountingTransport{}
cfg := elasticsearch.Config{Transport: &tp}
es, err := elasticsearch.NewClient(cfg)
// ...
fmt.Printf("%80s\n", fmt.Sprintf("Total Requests: %d", atomic.LoadUint64(&tp.count)))

Как правило, нет необходимости заменять HTTP-клиент по умолчанию на пользовательскую реализацию, за одним конкретным исключением: имитация клиента в модульных тестах. В следующем примере тип mockTransport определяет поле RoundTripFunc, которое позволяет возвращать конкретный ответ для определенных тестов.

// mockTransport определяет фиктивный транспорт для модульных тестов
type mockTransport struct {
  RoundTripFunc func(req *http.Request) (*http.Response, error)
}

// RoundTripFunc реализует http.RoundTripper
func (t *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
  return t.RoundTripFunc(req)
}

func TestClientSuccess(t *testing.T) {
  tp := &mockTransport{
    RoundTripFunc: func(req *http.Request) (*http.Response, error) {
      // Возвращаем успешный фиктивный ответ
      return &http.Response{
        Status:     "200 OK",
        StatusCode: 200,
        Body:       ioutil.NopCloser(strings.NewReader("HELLO")),
      }, nil
    },
  }

  cfg := elasticsearch.Config{Transport: &tp}
  es, _ := elasticsearch.NewClient(cfg)
  res, _ := es.Info()

  t.Log(res)
}

Фиктивный ответ распечатывается при выполнении теста:

go test -v tmp/client_mocking_test.go
// === RUN   TestClientSuccess
//     TestClientSuccess: client_mocking_test.go:42: [200 OK] HELLO
// --- PASS: TestClientSuccess (0.00s)
// ...

Примечание. В определенных ситуациях желательно заменить HTTP-клиент из стандартной библиотеки на более производительный, например github.com/valyala/fasthttp, который имеет заметную разницу в производительности. Запустите тесты в _examples/fasthttp, чтобы измерить разницу в вашей собственной среде.


Читайте также:


Комментариев нет:

Отправить комментарий