1
/
5

悩んだ末のElasticsearchへのデータ送信アーキテクチャ

こんにちは、那須です。

過去に何度かElasticsearchの運用関連の記事を書きましたが、今回はElasticsearchにデータを入れたいのにいろんな事情があって入れられず困っていたことと、それを解決するまでの流れを共有します。

▼目次

  • 何が問題だったのか?
  • 原因は何だったのか?
  • 解決する上で困っていたこと
  • 最初はリトライ処理を実装した
  • 最終的にどう解決したのか?
  • 実装内容
  • さいごに

何が問題だったのか?

私の所属するdotDではonedogという愛犬の健康管理ができるモバイルアプリを提供しています。このonedogから位置情報がAWS環境に送られてくるのですが、この位置情報を時々Elasticsearchに保存できないといった事象が発生していました。

具体的には、status code: 429でes_rejected_execution_exceptionというエラー(以降、429エラー)が発生していて、Writeキューがいっぱいになってこれ以上Elasticsearchで処理できない(書き込み拒否)という状況になっていました。最初はごく稀に発生していたのですが、2021年6月になると週に1回見るようになってきました。

このまま放っておくと大規模なデータ欠落が発生してしまうと思いました。そうなるとユーザの皆様にお届けできるサービスの価値が下がってしまいます。そうなる前にこれを解決しようと考えました。

原因は何だったのか?

429エラーの原因はWriteキューからリクエストが溢れることなので、単純にElasticsearchへのリクエストを減らすか、リクエスト数をうまく分散させて特定の時間に集中させないことが重要です。それが簡単にできれば良かったのですが、現実はそう甘くはありませんでした。

以下のCloudWatchの画面は1週間分のElasticsearchのIndexingRateのメトリクスです。ご覧の通り、1日に2回のピークがあります。必ずではないのですが、このピーク時間帯に429エラーが発生していました。

そしてElasticsearchへのデータ送信は、Lambda関数から_bulkリクエストを送る形で実装していました。ユーザからの位置情報保存のアクセスが発生するたびに_bulkリクエストがElasticsearchに送られている形です。ピーク時間帯にはこの_bulkリクエストが集中して発生してしまい、その結果429エラーが時々発生していたというわけです。

解決する上で困っていたこと

リクエストさえ集中しなければいいのですが、以下の要件があって単純に解決できない状況でした。

アプリのある機能を提供するために、位置情報はすぐに検索できる必要がありました。なので位置情報保存のAPIアクセスがあれば、すぐにLambda経由で都度_bulkリクエストを送信していたという背景があります。

またElasticsearchを運用されている方であればわかると思うのですが、データを保存してもrefreshと呼ばれる処理が実行されるまでは新たに保存されたデータは検索対象になりません。ただ、リクエストごとにrefreshするのはElasticsearchに負荷がかかります。負荷が上がると429エラーが発生する可能性が高くります。ですので、refresh動作についてはrefresh_intervalで設定した時間毎にしています。この時点で既にリアルタイムではなくなってますが、現状は仕方ありません。

一方、Elasticsearchへのデータ送信のベストプラクティスはいくつかあって、AWSのSAの方に相談するとKinesis Data Firehoseを使って送信するのがいいと伺いました。ただ、Kinesis Data Firehoseを使ってみようと思ってドキュメントを読み進めていくうちに、以下の文言にひっかかりました。

Amazon Kinesis Data Firehose Data Delivery
Describes data delivery for Amazon Kinesis Data Firehose.
https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html

最短でバッファに1MB溜まるか60秒待ってからElasticsearchに送信されるので、残念ながら要件を満たすことができません。設定も簡単なので利用したいけど、ちょっと利用できないなぁという状態です。

最初はリトライ処理を実装した

AWSから以下のページが公開されているのは知っていました。

Amazon OpenSearch サービスで検索拒否または書き込み拒否を解決する
Amazon OpenSearch Service クラスターに検索または書き込みリクエストを送信すると、そのリクエストは拒否されます。
https://repost.aws/ja/knowledge-center/opensearch-resolve-429-error

リトライ処理自体が最初はなかったので、まずは数秒待ってからリトライするよう実装しました。そうすると、リトライすることで位置情報のデータは問題なくElasticsearchに送信されたのですが、Lambda関数の実行時間がその分長くなってしまいました。またリトライの数に比例してElasticsearchへの負荷も上がったように見えました。これは悪手だなと思いました。

AWSからはエクスポネンシャルバックオフアルゴリズムでリトライすることが推奨されていますが、この方法でもLambda関数の実行時間が長くなってしまいます。実際に1回目のリトライでも429エラーが返ってきたことがありますので、その場合はさらに実行時間が伸びます。

ピーク時間帯以外はすぐにデータ送信して、ピーク時間帯は429エラーが発生しても何度もリトライするけどLambda関数の実行時間は可能な限り短くしたい。そういうやり方がないか考えた結果、以下のようになりました。


最終的にどう解決したのか?

なぜすぐに思いつかなかったのか不思議なんですが、このようにエラー発生時のみ非同期処理にすればいいことに気がつきました。正常時はLambda関数から直接_bulkリクエストを送り、429エラー発生時のみKinesis Data Firehose経由でデータを送ることにしました。

こうすることで、ピーク時間帯に429エラーが発生するとリアルタイムで保存と検索ができない(60秒以内だが待たないといけない)データが出てくるのですが、ほとんどのデータはほぼリアルタイムで利用することができます。また、何度もしつこく_bulkリクエストを送ることがなくなるのでElasticsearchへの負荷も下がります。そして負荷が下がるので429エラーの発生頻度も減少する、といった好循環が生まれました。

そしてKinesis Data Firehoseは最大2時間の間、リトライを試してくれます。Lambdaだと最長の15分までしかリトライすることができませんので、これでほぼ確実にデータを欠落することなく保存することができるようになりました。

実装内容

Python3で実際に実装したLambda関数の内容を一部抜粋してご紹介します。Kinesis Data Firehoseにデータ送信する関数を作成して、それにデータのリストを渡しているだけの単純なものです。Kinesis Data Firehoseを作成するCloudFormationやCDKのテンプレートも作って紹介したかったのですが、思いのほかテンプレート作成に時間がかかりそうだったので、残念ですが今回は手作業で対応しました。

from elasticsearch import helpers
import boto3
import json

try:
  helpers.bulk(connection, data)
except helpers.BulkIndexError as bulk_index_error:
  retry_list = []
  error_list = []
  for error in bulk_index_error.errors:
  if error.get('index').get('status') == 429:
  retry_list.append(error.get('index').get('data'))
  else:
  error_list.append(error)
  if retry_list:
  data_retry = (
  r for r in retry_list
  )
  logger.info('Retry send request using Kinesis Data Firehose.')
  try:
  response = send_to_firehose(data_retry)
  except:
  logger.error('Failed to send data to Kinesis Data Firehose.')
  if error_list:
  logger.error(f'Error occurred other than status 429: {error_list}')

def send_to_firehose(data_list):
records = []
client = boto3.client('firehose')
for data in data_list:
data_dict = {
'Data': json.dumps(data) + '\n'
}
records.append(data_dict)
logger.debug(f'Data sent to Kinesis Data Firehose : {records}')
response = client.put_record_batch(
DeliveryStreamName=stream_name,
Records=records
)
logger.debug(f'Data received from Kinesis Data Firehose : {response}')
return responsefrom elasticsearch import helpers
import boto3
import json

try:
  helpers.bulk(connection, data)
except helpers.BulkIndexError as bulk_index_error:
  retry_list = []
  error_list = []
  for error in bulk_index_error.errors:
  if error.get('index').get('status') == 429:
  retry_list.append(error.get('index').get('data'))
  else:
  error_list.append(error)
  if retry_list:
  data_retry = (
  r for r in retry_list
  )
  logger.info('Retry send request using Kinesis Data Firehose.')
  try:
  response = send_to_firehose(data_retry)
  except:
  logger.error('Failed to send data to Kinesis Data Firehose.')
  if error_list:
  logger.error(f'Error occurred other than status 429: {error_list}')

def send_to_firehose(data_list):
records = []
client = boto3.client('firehose')
for data in data_list:
data_dict = {
'Data': json.dumps(data) + '\n'
}
records.append(data_dict)
logger.debug(f'Data sent to Kinesis Data Firehose : {records}')
response = client.put_record_batch(
DeliveryStreamName=stream_name,
Records=records
)
logger.debug(f'Data received from Kinesis Data Firehose : {response}')
return response

さいごに

Elasticsearchへのデータ送信でエラー発生時に非同期でリトライする処理を思いつくまでの流れをご紹介しました。もっとスマートなやり方があるような気がしていますが、問題は解決できたのでひとまずこれで運用しています。もし「こうすればいいんじゃない?」といったツッコミがあればぜひお願いします!

…というような流れで様々な改善活動を少人数で行っていますが、正直エンジニアが少なすぎて思うように進んでいません。副業で関わる形でも大丈夫ですので、少しでも興味がある方は連絡いただけるとめちゃくちゃ嬉しいです!Wantedlyのリンクはインフラエンジニアですが、他の職種も大募集していますので気軽に連絡ください!


SREエンジニア
SREとして新規事業のプラットフォームを支えるエンジニア募集!!
【世の中に新しい価値を生み出し続ける事業創造ファーム】 私たちは自社事業と共創事業の2つを軸に、技術・知識・経験を相互に作用させることで、新しい事業を生み出し、常にアップデートし続けることで、ひとりひとりの「当たり前」に変化をもたらす価値を創造し続けます。 ①共創事業 大企業様の新規事業戦略立案から開発まで、先方の持っている課題感や構想をヒヤリングすることから始め、事業を成功に導くためのお手伝いをしています。大企業の社会的影響力や利用可能な資源の量、dotDの強みである発想力や敏捷性・柔軟性と、その両方を掛け合わせることでひとりひとりの「当たり前」に変化をもたらす価値を創造し続けます。 ▼直近の案件例 “クルマのデジタルキー”から“スマートシティを支えるサービス”へ「TOKAI RIKA Digitalkey」 https://digitalkey.jp/ TOKAI RIKA Digitalkeyは、クルマ以外のモビリティ、オフィスなどの建物、駐車場などもデジタルキー化を促進させ、物理的な「鍵」のない安全で便利な社会の実現を目指します。様々な場所をデジタルキー化することで、それぞれが繋がり、お客様へのシームレスなサービス提供と課金、事業者様における人員削減などに貢献します。 ②自社事業 dotDには、社会や身近に課題意識を持ったメンバーがおり、課題が深く難易度が高い領域への事業化に可能性やビジョンをもってチャレンジしながらサービス開発をしています。 現在はPetTech(ペット x テクノロジー)、EdTech(エデュケーション x テクノロジー)での新サービス開発を中心に取り組んでいますが、今後もどんどんチャレンジする領域を広げていきます。 愛犬とオーナーに幸せを届けるプラットフォーム「onedog」 https://onedog.io/ja/app/index.html 子どもの本当の好きに出会える課外活動マッチングサービス「meepa」 https://meepa.io/ フィジタル空間を実現し、新たなオンライン体験を模索するプロジェクト「iDovatar」 https://project.idovatar.com/
株式会社dotD


採用情報 | 株式会社dotD
株式会社dotDの採用情報ページです。世の中に必要とされるサービスを生み出し続け、社会インフラとなる企業を目指す私たちと一緒に働く仲間を募集しています。
https://dotd-inc.com/ja/careers/



Invitation from 株式会社dotD
If this story triggered your interest, have a chat with the team?
株式会社dotD's job postings

AWS

Weekly ranking

Show other rankings
Like funabashi yukiko's Story
Let funabashi yukiko's company know you're interested in their content