Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reindex erc721 from history #894

Open
wants to merge 2 commits into
base: evm
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ci/config.json.ci
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,10 @@
"chunkSizeInsert": 1000,
"mediaPerBatch": 10,
"concurrencyHandleTokenMedia": 10,
"timeRefreshErc721Stats": "1 * * * *"
"timeRefreshErc721Stats": "1 * * * *",
"reindex": {
"limitRecordGet": 2
}
},
"jobRefreshMViewAccountBalanceStatistic": {
"timeRefreshMViewAccountBalanceStatistic": "*/10 * * * *"
Expand Down
5 changes: 4 additions & 1 deletion config.json
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,10 @@
"chunkSizeInsert": 1000,
"mediaPerBatch": 10,
"concurrencyHandleTokenMedia": 10,
"timeRefreshErc721Stats": "1 * * * *"
"timeRefreshErc721Stats": "1 * * * *",
"reindex": {
"limitRecordGet": 2
}
},
"crawlEvmProxyHistory": {
"key": "crawlEvmProxyHistory",
Expand Down
8 changes: 8 additions & 0 deletions src/services/api-gateways/erc721_admin.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Post, Service } from '@ourparentcenter/moleculer-decorators-extended';
import { Context, ServiceBroker } from 'moleculer';
import networks from '../../../network.json' assert { type: 'json' };
import BaseService from '../../base/base.service';
import { REINDEX_TYPE } from '../evm/erc721_reindex';

@Service({
name: 'erc721-admin',
Expand All @@ -25,13 +26,19 @@ export default class Erc721AdminService extends BaseService {
optional: false,
items: 'string',
},
type: {
type: 'enum',
optional: false,
values: Object.values(REINDEX_TYPE),
},
},
})
async erc721Reindexing(
ctx: Context<
{
chainid: string;
addresses: string[];
type: string;
},
Record<string, unknown>
>
Expand All @@ -43,6 +50,7 @@ export default class Erc721AdminService extends BaseService {
`v1.Erc721.reindexing@${selectedChain?.moleculerNamespace}`,
{
addresses: ctx.params.addresses,
type: ctx.params.type,
}
);
}
Expand Down
43 changes: 29 additions & 14 deletions src/services/evm/erc721.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { Erc721Contract } from '../../models/erc721_contract';
import { BULL_JOB_NAME, SERVICE } from './constant';
import { Erc721Handler } from './erc721_handler';
import * as Erc721MediaHandler from './erc721_media_handler';
import { Erc721Reindexer } from './erc721_reindex';
import { Erc721Reindexer, REINDEX_TYPE } from './erc721_reindex';

const { NODE_ENV } = Config;
@Service({
Expand Down Expand Up @@ -85,14 +85,13 @@ export default class Erc721Service extends BullableService {
],
config.erc721.key
);
const erc721Activities: Erc721Activity[] =
await Erc721Handler.getErc721Activities(
startBlock,
endBlock,
this.logger,
undefined,
trx
);
const { erc721Activities } = await Erc721Handler.getErc721Activities(
startBlock,
endBlock,
this.logger,
undefined,
trx
);
await this.handleMissingErc721Contract(erc721Activities, trx);
if (erc721Activities.length > 0) {
const erc721Tokens = _.keyBy(
Expand Down Expand Up @@ -239,10 +238,17 @@ export default class Erc721Service extends BullableService {
queueName: BULL_JOB_NAME.REINDEX_ERC721,
jobName: BULL_JOB_NAME.REINDEX_ERC721,
})
async reindexErc721(_payload: { address: `0x${string}` }): Promise<void> {
const { address } = _payload;
async reindexErc721(_payload: {
address: `0x${string}`;
type: string;
}): Promise<void> {
const { address, type } = _payload;
const erc721Reindexer = new Erc721Reindexer(this.viemClient, this.logger);
await erc721Reindexer.reindex(address);
if (type === REINDEX_TYPE.HISTORY) {
await erc721Reindexer.reindexFromHistory(address);
} else {
await erc721Reindexer.reindex(address);
}
this.logger.info(`Reindex erc721 contract ${address} done.`);
}

Expand Down Expand Up @@ -289,16 +295,24 @@ export default class Erc721Service extends BullableService {
items: 'string',
optional: false,
},
type: {
type: 'string',
optional: false,
},
},
})
public async reindexing(
ctx: Context<{
addresses: `0x${string}`[];
type: string;
}>
) {
let { addresses } = ctx.params;
const erc721Reindexer = new Erc721Reindexer(this.viemClient, this.logger);
addresses = await erc721Reindexer.filterReindex(addresses);
const { type } = ctx.params;
if (type === REINDEX_TYPE.CURRENT) {
const erc721Reindexer = new Erc721Reindexer(this.viemClient, this.logger);
addresses = await erc721Reindexer.filterReindex(addresses);
}
if (addresses.length > 0) {
await Promise.all(
addresses.map((address) =>
Expand All @@ -307,6 +321,7 @@ export default class Erc721Service extends BullableService {
BULL_JOB_NAME.REINDEX_ERC721,
{
address,
type,
},
{
jobId: address,
Expand Down
17 changes: 15 additions & 2 deletions src/services/evm/erc721_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,11 @@ export class Erc721Handler {
endBlock: number,
logger: Moleculer.LoggerInstance,
addresses?: string[],
trx?: Knex.Transaction
trx?: Knex.Transaction,
page?: {
prevEvmEventId?: number;
limitRecordGet: number;
}
) {
const [fromTx, toTx] = await Promise.all([
EVMTransaction.query()
Expand Down Expand Up @@ -230,9 +234,15 @@ export class Erc721Handler {
if (trx) {
builder.transacting(trx);
}
if (page && page.prevEvmEventId !== undefined) {
builder
.andWhere('evm_event.id', '>', page.prevEvmEventId)
.limit(page.limitRecordGet);
}
})
.where('evm_event.evm_tx_id', '>=', fromTx.id)
.andWhere('evm_event.evm_tx_id', '<=', toTx.id)
.orderBy('evm_event.evm_tx_id', 'asc')
.orderBy('evm_event.id', 'asc')
.select(
'evm_event.*',
Expand Down Expand Up @@ -262,7 +272,10 @@ export class Erc721Handler {
}
}
});
return erc721Activities;
return {
erc721Activities,
prevEvmEventId: erc721Events[erc721Events.length - 1]?.id,
};
}

static async updateErc721(
Expand Down
139 changes: 137 additions & 2 deletions src/services/evm/erc721_reindex.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
/* eslint-disable no-await-in-loop */
import _ from 'lodash';
import Moleculer from 'moleculer';
import { PublicClient, getContract } from 'viem';
import config from '../../../config.json' assert { type: 'json' };
import knex from '../../common/utils/db_connection';
import {
Erc721Activity,
Expand All @@ -9,6 +12,10 @@ import {
} from '../../models';
import { Erc721Handler } from './erc721_handler';

export const REINDEX_TYPE = {
CURRENT: 'current',
HISTORY: 'history',
};
export class Erc721Reindexer {
viemClient: PublicClient;

Expand Down Expand Up @@ -113,14 +120,14 @@ export class Erc721Reindexer {
)
.transacting(trx);
const [tokens, height] = await this.getCurrentTokens(address);
const activities = await Erc721Handler.getErc721Activities(
const { erc721Activities } = await Erc721Handler.getErc721Activities(
0,
height,
this.logger,
[address],
trx
);
await Erc721Handler.updateErc721(activities, tokens, trx);
await Erc721Handler.updateErc721(erc721Activities, tokens, trx);
});
const erc721Stats = await Erc721Handler.calErc721Stats([address]);
if (erc721Stats.length > 0) {
Expand Down Expand Up @@ -178,4 +185,132 @@ export class Erc721Reindexer {
Number(height),
];
}

async reindexFromHistory(address: `0x${string}`) {
// stop tracking => if start reindexing, track will be false (although error when reindex)
await Erc721Contract.query()
.patch({ track: false })
.where('address', address);
// reindex
await knex.transaction(async (trx) => {
const erc721Contract = await Erc721Contract.query()
.transacting(trx)
.joinRelated('evm_smart_contract')
.where('erc721_contract.address', address)
.select(
'evm_smart_contract.id as evm_smart_contract_id',
'erc721_contract.id'
)
.first()
.throwIfNotFound();
await Erc721Stats.query()
.delete()
.where('erc721_contract_id', erc721Contract.id)
.transacting(trx);
await Erc721Activity.query()
.delete()
.where('erc721_contract_address', address)
.transacting(trx);
await Erc721Token.query()
.delete()
.where('erc721_contract_address', address)
.transacting(trx);
await Erc721Contract.query()
.delete()
.where('address', address)
.transacting(trx);
const contract = getContract({
address,
abi: Erc721Contract.ABI,
client: this.viemClient,
});
const [blockHeight, ...contractInfo] = await Promise.all([
this.viemClient.getBlockNumber(),
contract.read.name().catch(() => Promise.resolve(undefined)),
contract.read.symbol().catch(() => Promise.resolve(undefined)),
]);
await Erc721Contract.query()
.insert(
Erc721Contract.fromJson({
evm_smart_contract_id: erc721Contract.evm_smart_contract_id,
address,
symbol: contractInfo[1],
name: contractInfo[0],
track: true,
last_updated_height: Number(blockHeight),
})
)
.transacting(trx);
const { limitRecordGet } = config.erc721.reindex;
let prevEvmEventId = 0;
let numChunk = 1;
while (true) {
const resultBuildErc721Activities =
await Erc721Handler.getErc721Activities(
0,
Number(blockHeight),
this.logger,
[address],
trx,
{
prevEvmEventId,
limitRecordGet,
}
);
const { erc721Activities } = resultBuildErc721Activities;
if (erc721Activities.length > 0) {
const erc721Tokens = _.keyBy(
await Erc721Token.query()
.whereIn(
['erc721_contract_address', 'token_id'],
erc721Activities.map((e) => [
e.erc721_contract_address,
// if token_id undefined (case approval_all), replace by null => not get any token (because token must have token_id)
e.token_id || null,
])
)
.transacting(trx),
(o) => `${o.erc721_contract_address}_${o.token_id}`
);
const erc721Handler = new Erc721Handler(
erc721Tokens,
erc721Activities
);
erc721Handler.process();
await Erc721Handler.updateErc721(
erc721Activities,
Object.values(erc721Handler.erc721Tokens),
trx
);
prevEvmEventId = erc721Activities[erc721Activities.length - 1].id;
this.logger.info(
`Reindex erc721 contract ${address}: Chunk ${numChunk} done`
);
numChunk += 1;
}
prevEvmEventId = resultBuildErc721Activities.prevEvmEventId;
if (prevEvmEventId === undefined) {
break;
}
}
this.logger.info(`Reindex erc721 ${address} done.`);
});
const erc721Stats = await Erc721Handler.calErc721Stats([address]);
if (erc721Stats.length > 0) {
// Upsert erc721 stats
await Erc721Stats.query()
.insert(
erc721Stats.map((e) =>
Erc721Stats.fromJson({
total_activity: e.total_activity,
transfer_24h: e.transfer_24h,
erc721_contract_id: e.erc721_contract_id,
})
)
)
.onConflict('erc721_contract_id')
.merge()
.returning('id');
}
}
}
4 changes: 2 additions & 2 deletions test/unit/services/erc721/erc721_handler.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ export default class Erc721HandlerTest {
}),
];
await EvmEvent.query().insert(erc721Events).transacting(trx);
const erc721Activities = await Erc721Handler.getErc721Activities(
const { erc721Activities } = await Erc721Handler.getErc721Activities(
this.evmTx.height - 1,
this.evmTx.height,
this.broker.logger,
Expand Down Expand Up @@ -193,7 +193,7 @@ export default class Erc721HandlerTest {
from: '0x1317df02a4e712265f5376a9d34156f73ebad640',
to: '0xe39633931ec4a1841e438b15005a6f141d30789e',
});
const erc721ActivitiesByContract =
const { erc721Activities: erc721ActivitiesByContract } =
await Erc721Handler.getErc721Activities(
this.evmTx.height - 1,
this.evmTx.height,
Expand Down
Loading