Skip to content

Conversation

@sugihara1997
Copy link

Elasticsearch への output を追加しました。
環境変数 OPENSEARCH_SYNC_ENABLED で、

  • mongo の対象コレクションの状態を Elasticsearch に同期する
  • change stream を Elasticsearch に log としてそのまま保存する
    かを選択できるようにしています。

また、

  • バッファリングを行い bulk api による index をするかどうか
  • バッファリングする場合のバッファサイズ
  • バッファリングかつ同期する場合に集約してから実行するか
    を環境変数で制御できます。

@sugihara1997 sugihara1997 requested a review from syama666 January 31, 2024 01:01

bi := openSearchBulkIndexerImpl{
osClient: client,
queue: make(chan *bulkIndexerItem),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queueってどこで使ってますか?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

すみません、消し忘れだったので remove しました!

}
)

func (o *OpenSearchImpl) ExportToOpenSearch(ctx context.Context, cs primitive.M) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ここのbulkってflushにも正常でリターンすると思うんですが、その場合ってsaveTokenまで走りますか?
そうなるとメモリに溜めてる最中にresume-tokenの位置が進んでいくので、仮にためてたものが正常にflushされなかった場合って欠損しませんか?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

たしかに flush しなかった時に欠損してしまうので少しロジック調整します

id, err := json.Marshal(cs["_id"])
if err != nil {
return errors.InternalServerErrorJsonMarshal.Wrap("Failed to marshal change streams json _id parameter.", err)
return false, errors.InternalServerErrorJsonMarshal.Wrap("Failed to marshal change streams json _id parameter.", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ここ saveRtFlags だから true が正しいのでは?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

あ、間違えた、ごめんなさい。

Copy link
Collaborator

@syama666 syama666 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants