From 7b989323bd3adc97da2560123d64f4883764f25e Mon Sep 17 00:00:00 2001 From: phamphong9981 Date: Fri, 30 Aug 2024 15:27:41 +0700 Subject: [PATCH] feat: reindex erc721 from history --- ci/config.json.ci | 5 +- config.json | 5 +- .../api-gateways/erc721_admin.service.ts | 8 + src/services/evm/erc721.service.ts | 43 ++++-- src/services/evm/erc721_handler.ts | 17 ++- src/services/evm/erc721_reindex.ts | 139 +++++++++++++++++- .../services/erc721/erc721_handler.spec.ts | 4 +- 7 files changed, 199 insertions(+), 22 deletions(-) diff --git a/ci/config.json.ci b/ci/config.json.ci index 66a70ee50..2e5790318 100644 --- a/ci/config.json.ci +++ b/ci/config.json.ci @@ -417,7 +417,10 @@ "chunkSizeInsert": 1000, "mediaPerBatch": 10, "concurrencyHandleTokenMedia": 10, - "timeRefreshErc721Stats": "1 * * * *" + "timeRefreshErc721Stats": "1 * * * *", + "reindex": { + "limitRecordGet": 2 + } }, "jobRefreshMViewAccountBalanceStatistic": { "timeRefreshMViewAccountBalanceStatistic": "*/10 * * * *" diff --git a/config.json b/config.json index 2997cbb74..0b641f42c 100644 --- a/config.json +++ b/config.json @@ -415,7 +415,10 @@ "chunkSizeInsert": 1000, "mediaPerBatch": 10, "concurrencyHandleTokenMedia": 10, - "timeRefreshErc721Stats": "1 * * * *" + "timeRefreshErc721Stats": "1 * * * *", + "reindex": { + "limitRecordGet": 2 + } }, "crawlEvmProxyHistory": { "key": "crawlEvmProxyHistory", diff --git a/src/services/api-gateways/erc721_admin.service.ts b/src/services/api-gateways/erc721_admin.service.ts index 8a817fcb6..951f6989f 100644 --- a/src/services/api-gateways/erc721_admin.service.ts +++ b/src/services/api-gateways/erc721_admin.service.ts @@ -2,6 +2,7 @@ import { Post, Service } from '@ourparentcenter/moleculer-decorators-extended'; import { Context, ServiceBroker } from 'moleculer'; import networks from '../../../network.json' assert { type: 'json' }; import BaseService from '../../base/base.service'; +import { REINDEX_TYPE } from '../evm/erc721_reindex'; @Service({ name: 'erc721-admin', @@ -25,6 +26,11 @@ export default class Erc721AdminService extends BaseService { optional: false, items: 'string', }, + type: { + type: 'enum', + optional: false, + values: Object.values(REINDEX_TYPE), + }, }, }) async erc721Reindexing( @@ -32,6 +38,7 @@ export default class Erc721AdminService extends BaseService { { chainid: string; addresses: string[]; + type: string; }, Record > @@ -43,6 +50,7 @@ export default class Erc721AdminService extends BaseService { `v1.Erc721.reindexing@${selectedChain?.moleculerNamespace}`, { addresses: ctx.params.addresses, + type: ctx.params.type, } ); } diff --git a/src/services/evm/erc721.service.ts b/src/services/evm/erc721.service.ts index 6f396bf2c..bfa299e93 100644 --- a/src/services/evm/erc721.service.ts +++ b/src/services/evm/erc721.service.ts @@ -22,7 +22,7 @@ import { Erc721Contract } from '../../models/erc721_contract'; import { BULL_JOB_NAME, SERVICE } from './constant'; import { Erc721Handler } from './erc721_handler'; import * as Erc721MediaHandler from './erc721_media_handler'; -import { Erc721Reindexer } from './erc721_reindex'; +import { Erc721Reindexer, REINDEX_TYPE } from './erc721_reindex'; const { NODE_ENV } = Config; @Service({ @@ -85,14 +85,13 @@ export default class Erc721Service extends BullableService { ], config.erc721.key ); - const erc721Activities: Erc721Activity[] = - await Erc721Handler.getErc721Activities( - startBlock, - endBlock, - this.logger, - undefined, - trx - ); + const { erc721Activities } = await Erc721Handler.getErc721Activities( + startBlock, + endBlock, + this.logger, + undefined, + trx + ); await this.handleMissingErc721Contract(erc721Activities, trx); if (erc721Activities.length > 0) { const erc721Tokens = _.keyBy( @@ -239,10 +238,17 @@ export default class Erc721Service extends BullableService { queueName: BULL_JOB_NAME.REINDEX_ERC721, jobName: BULL_JOB_NAME.REINDEX_ERC721, }) - async reindexErc721(_payload: { address: `0x${string}` }): Promise { - const { address } = _payload; + async reindexErc721(_payload: { + address: `0x${string}`; + type: string; + }): Promise { + const { address, type } = _payload; const erc721Reindexer = new Erc721Reindexer(this.viemClient, this.logger); - await erc721Reindexer.reindex(address); + if (type === REINDEX_TYPE.HISTORY) { + await erc721Reindexer.reindexFromHistory(address); + } else { + await erc721Reindexer.reindex(address); + } this.logger.info(`Reindex erc721 contract ${address} done.`); } @@ -289,16 +295,24 @@ export default class Erc721Service extends BullableService { items: 'string', optional: false, }, + type: { + type: 'string', + optional: false, + }, }, }) public async reindexing( ctx: Context<{ addresses: `0x${string}`[]; + type: string; }> ) { let { addresses } = ctx.params; - const erc721Reindexer = new Erc721Reindexer(this.viemClient, this.logger); - addresses = await erc721Reindexer.filterReindex(addresses); + const { type } = ctx.params; + if (type === REINDEX_TYPE.CURRENT) { + const erc721Reindexer = new Erc721Reindexer(this.viemClient, this.logger); + addresses = await erc721Reindexer.filterReindex(addresses); + } if (addresses.length > 0) { await Promise.all( addresses.map((address) => @@ -307,6 +321,7 @@ export default class Erc721Service extends BullableService { BULL_JOB_NAME.REINDEX_ERC721, { address, + type, }, { jobId: address, diff --git a/src/services/evm/erc721_handler.ts b/src/services/evm/erc721_handler.ts index 287e33617..242a8f51a 100644 --- a/src/services/evm/erc721_handler.ts +++ b/src/services/evm/erc721_handler.ts @@ -196,7 +196,11 @@ export class Erc721Handler { endBlock: number, logger: Moleculer.LoggerInstance, addresses?: string[], - trx?: Knex.Transaction + trx?: Knex.Transaction, + page?: { + prevEvmEventId?: number; + limitRecordGet: number; + } ) { const [fromTx, toTx] = await Promise.all([ EVMTransaction.query() @@ -230,9 +234,15 @@ export class Erc721Handler { if (trx) { builder.transacting(trx); } + if (page && page.prevEvmEventId !== undefined) { + builder + .andWhere('evm_event.id', '>', page.prevEvmEventId) + .limit(page.limitRecordGet); + } }) .where('evm_event.evm_tx_id', '>=', fromTx.id) .andWhere('evm_event.evm_tx_id', '<=', toTx.id) + .orderBy('evm_event.evm_tx_id', 'asc') .orderBy('evm_event.id', 'asc') .select( 'evm_event.*', @@ -262,7 +272,10 @@ export class Erc721Handler { } } }); - return erc721Activities; + return { + erc721Activities, + prevEvmEventId: erc721Events[erc721Events.length - 1]?.id, + }; } static async updateErc721( diff --git a/src/services/evm/erc721_reindex.ts b/src/services/evm/erc721_reindex.ts index 99d56be04..4b09a2a06 100644 --- a/src/services/evm/erc721_reindex.ts +++ b/src/services/evm/erc721_reindex.ts @@ -1,5 +1,8 @@ +/* eslint-disable no-await-in-loop */ +import _ from 'lodash'; import Moleculer from 'moleculer'; import { PublicClient, getContract } from 'viem'; +import config from '../../../config.json' assert { type: 'json' }; import knex from '../../common/utils/db_connection'; import { Erc721Activity, @@ -9,6 +12,10 @@ import { } from '../../models'; import { Erc721Handler } from './erc721_handler'; +export const REINDEX_TYPE = { + CURRENT: 'current', + HISTORY: 'history', +}; export class Erc721Reindexer { viemClient: PublicClient; @@ -113,14 +120,14 @@ export class Erc721Reindexer { ) .transacting(trx); const [tokens, height] = await this.getCurrentTokens(address); - const activities = await Erc721Handler.getErc721Activities( + const { erc721Activities } = await Erc721Handler.getErc721Activities( 0, height, this.logger, [address], trx ); - await Erc721Handler.updateErc721(activities, tokens, trx); + await Erc721Handler.updateErc721(erc721Activities, tokens, trx); }); const erc721Stats = await Erc721Handler.calErc721Stats([address]); if (erc721Stats.length > 0) { @@ -178,4 +185,132 @@ export class Erc721Reindexer { Number(height), ]; } + + async reindexFromHistory(address: `0x${string}`) { + // stop tracking => if start reindexing, track will be false (although error when reindex) + await Erc721Contract.query() + .patch({ track: false }) + .where('address', address); + // reindex + await knex.transaction(async (trx) => { + const erc721Contract = await Erc721Contract.query() + .transacting(trx) + .joinRelated('evm_smart_contract') + .where('erc721_contract.address', address) + .select( + 'evm_smart_contract.id as evm_smart_contract_id', + 'erc721_contract.id' + ) + .first() + .throwIfNotFound(); + await Erc721Stats.query() + .delete() + .where('erc721_contract_id', erc721Contract.id) + .transacting(trx); + await Erc721Activity.query() + .delete() + .where('erc721_contract_address', address) + .transacting(trx); + await Erc721Token.query() + .delete() + .where('erc721_contract_address', address) + .transacting(trx); + await Erc721Contract.query() + .delete() + .where('address', address) + .transacting(trx); + const contract = getContract({ + address, + abi: Erc721Contract.ABI, + client: this.viemClient, + }); + const [blockHeight, ...contractInfo] = await Promise.all([ + this.viemClient.getBlockNumber(), + contract.read.name().catch(() => Promise.resolve(undefined)), + contract.read.symbol().catch(() => Promise.resolve(undefined)), + ]); + await Erc721Contract.query() + .insert( + Erc721Contract.fromJson({ + evm_smart_contract_id: erc721Contract.evm_smart_contract_id, + address, + symbol: contractInfo[1], + name: contractInfo[0], + track: true, + last_updated_height: Number(blockHeight), + }) + ) + .transacting(trx); + const { limitRecordGet } = config.erc721.reindex; + let prevEvmEventId = 0; + let numChunk = 1; + while (true) { + const resultBuildErc721Activities = + await Erc721Handler.getErc721Activities( + 0, + Number(blockHeight), + this.logger, + [address], + trx, + { + prevEvmEventId, + limitRecordGet, + } + ); + const { erc721Activities } = resultBuildErc721Activities; + if (erc721Activities.length > 0) { + const erc721Tokens = _.keyBy( + await Erc721Token.query() + .whereIn( + ['erc721_contract_address', 'token_id'], + erc721Activities.map((e) => [ + e.erc721_contract_address, + // if token_id undefined (case approval_all), replace by null => not get any token (because token must have token_id) + e.token_id || null, + ]) + ) + .transacting(trx), + (o) => `${o.erc721_contract_address}_${o.token_id}` + ); + const erc721Handler = new Erc721Handler( + erc721Tokens, + erc721Activities + ); + erc721Handler.process(); + await Erc721Handler.updateErc721( + erc721Activities, + Object.values(erc721Handler.erc721Tokens), + trx + ); + prevEvmEventId = erc721Activities[erc721Activities.length - 1].id; + this.logger.info( + `Reindex erc721 contract ${address}: Chunk ${numChunk} done` + ); + numChunk += 1; + } + prevEvmEventId = resultBuildErc721Activities.prevEvmEventId; + if (prevEvmEventId === undefined) { + break; + } + } + this.logger.info(`Reindex erc721 ${address} done.`); + }); + const erc721Stats = await Erc721Handler.calErc721Stats([address]); + if (erc721Stats.length > 0) { + // Upsert erc721 stats + await Erc721Stats.query() + .insert( + erc721Stats.map((e) => + Erc721Stats.fromJson({ + total_activity: e.total_activity, + transfer_24h: e.transfer_24h, + erc721_contract_id: e.erc721_contract_id, + }) + ) + ) + .onConflict('erc721_contract_id') + .merge() + .returning('id'); + } + } } diff --git a/test/unit/services/erc721/erc721_handler.spec.ts b/test/unit/services/erc721/erc721_handler.spec.ts index 8bdee2bdb..b384146dc 100644 --- a/test/unit/services/erc721/erc721_handler.spec.ts +++ b/test/unit/services/erc721/erc721_handler.spec.ts @@ -162,7 +162,7 @@ export default class Erc721HandlerTest { }), ]; await EvmEvent.query().insert(erc721Events).transacting(trx); - const erc721Activities = await Erc721Handler.getErc721Activities( + const { erc721Activities } = await Erc721Handler.getErc721Activities( 21937979, 21937985, this.broker.logger, @@ -193,7 +193,7 @@ export default class Erc721HandlerTest { from: '0x1317df02a4e712265f5376a9d34156f73ebad640', to: '0xe39633931ec4a1841e438b15005a6f141d30789e', }); - const erc721ActivitiesByContract = + const { erc721Activities: erc721ActivitiesByContract } = await Erc721Handler.getErc721Activities( 21937979, 21937985,