diff --git a/migrations/20240419100359_config_job.ts b/migrations/20240419100359_config_job.ts new file mode 100644 index 000000000..abf23197f --- /dev/null +++ b/migrations/20240419100359_config_job.ts @@ -0,0 +1,16 @@ +import { Knex } from 'knex'; +import { ConfigJob } from '../src/models'; + +export async function up(knex: Knex): Promise { + await knex.schema.createTable(ConfigJob.tableName, (table) => { + table.increments(); + table.string('job_name').index().notNullable(); + table.integer('expected_row').notNullable(); + table.integer('block_range').notNullable(); + table.integer('acceptance_error').notNullable(); + table.integer('block_balance').notNullable(); + table.boolean('is_sync').defaultTo(false).notNullable(); + }); +} + +export async function down(knex: Knex): Promise {} diff --git a/src/models/block_checkpoint.ts b/src/models/block_checkpoint.ts index 95027fc41..10186f258 100644 --- a/src/models/block_checkpoint.ts +++ b/src/models/block_checkpoint.ts @@ -33,7 +33,7 @@ export class BlockCheckpoint extends BaseModel { jobName: string, lastHeightJobNames: string[], configName?: string - ): Promise<[number, number, BlockCheckpoint]> { + ): Promise<[number, number, BlockCheckpoint, BlockCheckpoint | undefined]> { const [jobCheckpoint, lastHeightCheckpoint] = await Promise.all([ BlockCheckpoint.query().select('*').where('job_name', jobName).first(), BlockCheckpoint.query() @@ -64,6 +64,11 @@ export class BlockCheckpoint extends BaseModel { ) : lastHeightCheckpoint.height; - return [startHeight, endHeight, updateBlockCheckpoint]; + return [ + startHeight, + endHeight, + updateBlockCheckpoint, + lastHeightCheckpoint, + ]; } } diff --git a/src/models/config_job.ts b/src/models/config_job.ts new file mode 100644 index 000000000..c0846b80a --- /dev/null +++ b/src/models/config_job.ts @@ -0,0 +1,115 @@ +import BaseModel from './base'; +import { BlockCheckpoint } from './block_checkpoint'; + +export class ConfigJob extends BaseModel { + id!: number; + + job_name!: string; + + // Expected row get for handle when each time job run + expected_row!: number; + + // Range query by block + block_range!: number; + + // ABS(rows retrieve - expected_row) should less than acceptance_error, if not do the block_balance below + acceptance_error!: number; + + // When get rows by block range not fit with expected_row, then increase/decrease block_range for fit with + block_balance!: number; + + // Keep job run normal or use config job and sync + is_sync!: boolean; + + static get tableName() { + return 'config_jobs'; + } + + /** + * @description: + * @param jobName Job name want to query to get config + * job crawl block complete on that block we want to crawl + */ + static async getConfigJob(jobName: string): Promise { + return ConfigJob.query().select('*').where('job_name', jobName).first(); + } + + // Range job will from currentStartBlock to bestEndBlock + static determineBestRangeBlockForRunJob( + currentStartBlock: number, + currentEndBlock: number, + dependingCheckPointJob: BlockCheckpoint | undefined, + configJob: ConfigJob | undefined + ): number { + let bestEndBlock = currentEndBlock; + + // No config job, then use default config from block checkpoint + if (!configJob || !configJob.is_sync) return bestEndBlock; + + if (dependingCheckPointJob) { + if ( + currentStartBlock + configJob.block_range <= + dependingCheckPointJob.height + ) { + bestEndBlock = currentStartBlock + configJob.block_range; + } else { + bestEndBlock = dependingCheckPointJob.height; + } + } else { + bestEndBlock = currentStartBlock + configJob.block_range; + } + + return bestEndBlock; + } + + // Balance config job + static prepareBalanceJob( + bestEndBlock: number, + dependingCheckPointJob: BlockCheckpoint | undefined, + configJob: ConfigJob | undefined, + currentTotalRowFetch: number + ): ConfigJob | null { + // No job config so keep every thing normal + if (!configJob || !configJob.is_sync) return null; + + // Job depending on another job, so keep current config job + if (bestEndBlock === dependingCheckPointJob?.height) return null; + + const errorRow = configJob.expected_row - currentTotalRowFetch; + // error acceptance + if (Math.abs(errorRow) <= configJob.acceptance_error) return null; + + if (errorRow > 0) { + // increase range block if total row fetch too low with expected row + // eslint-disable-next-line no-param-reassign + configJob.block_range += configJob.block_balance; + } else { + // decrease range block if total row fetch too low with expected row + // eslint-disable-next-line no-param-reassign + configJob.block_range -= configJob.block_balance; + } + + return configJob; + } + + static get jsonSchema() { + return { + type: 'object', + required: [ + 'job_name', + 'expected_row', + 'block_range', + 'acceptance_error', + 'block_balance', + ], + properties: { + id: { type: 'number' }, + job_name: { type: 'string' }, + expected_row: { type: 'number' }, + block_range: { type: 'number' }, + acceptance_error: { type: 'number' }, + block_balance: { type: 'number' }, + }, + }; + } +} diff --git a/src/models/index.ts b/src/models/index.ts index e73204b67..a4ca51d36 100644 --- a/src/models/index.ts +++ b/src/models/index.ts @@ -41,3 +41,4 @@ export * from './erc20_activity'; export * from './evm_proxy_history'; export * from './evm_internal_transaction'; export * from './account_balance'; +export * from './config_job'; diff --git a/src/services/crawl-tx/crawl_tx.service.ts b/src/services/crawl-tx/crawl_tx.service.ts index 5ecefebe7..f459bf854 100644 --- a/src/services/crawl-tx/crawl_tx.service.ts +++ b/src/services/crawl-tx/crawl_tx.service.ts @@ -22,6 +22,7 @@ import { import { Block, BlockCheckpoint, + ConfigJob, Event, Transaction, TransactionMessage, @@ -85,7 +86,7 @@ export default class CrawlTxService extends BullableService { jobName: BULL_JOB_NAME.HANDLE_TRANSACTION, }) public async jobHandlerCrawlTx(): Promise { - const [startBlock, endBlock, blockCheckpoint] = + const [startBlock, endBlock, blockCheckpoint, dependBlockCheckPoint] = await BlockCheckpoint.getCheckpoint( BULL_JOB_NAME.HANDLE_TRANSACTION, [BULL_JOB_NAME.CRAWL_TRANSACTION], @@ -99,11 +100,30 @@ export default class CrawlTxService extends BullableService { if (startBlock >= endBlock) { return; } + + const configJob = await ConfigJob.getConfigJob( + BULL_JOB_NAME.HANDLE_TRANSACTION + ); + const bestEndBlock = ConfigJob.determineBestRangeBlockForRunJob( + startBlock, + endBlock, + dependBlockCheckPoint, + configJob + ); + const listTxRaw = await Transaction.query() .where('height', '>', startBlock) - .andWhere('height', '<=', endBlock) + .andWhere('height', '<=', bestEndBlock) .orderBy('height', 'asc') .orderBy('index', 'asc'); + + const balanceConfigJob = ConfigJob.prepareBalanceJob( + bestEndBlock, + dependBlockCheckPoint, + configJob, + listTxRaw.length + ); + await knex.transaction(async (trx) => { await this.insertRelatedTx(listTxRaw, trx); if (blockCheckpoint) { @@ -115,6 +135,14 @@ export default class CrawlTxService extends BullableService { .returning('id') .transacting(trx); } + if (balanceConfigJob) { + await ConfigJob.query() + .insert(balanceConfigJob) + .onConflict('job_name') + .merge() + .returning('id') + .transacting(trx); + } }); } diff --git a/test/unit/models/config-job.spec.ts b/test/unit/models/config-job.spec.ts new file mode 100644 index 000000000..1f59cb9cc --- /dev/null +++ b/test/unit/models/config-job.spec.ts @@ -0,0 +1,248 @@ +import { Describe, Test } from '@jest-decorated/core'; +import { ServiceBroker } from 'moleculer'; +import { ConfigJob, BlockCheckpoint } from '../../../src/models'; + +@Describe('Test cw721 admin api service') +export default class Cw721AdminTest { + broker = new ServiceBroker({ + logger: false, + }); + + private mockBlockCheckPoint( + jobName: string, + expectedHeight: number + ): BlockCheckpoint { + const newBlockCheckpoint = new BlockCheckpoint(); + newBlockCheckpoint.job_name = jobName; + newBlockCheckpoint.height = expectedHeight; + return newBlockCheckpoint; + } + + private mockConfigJob( + jobName: string, + expectedRow: number, + blockRange: number, + acceptanceError: number, + blockBalance: number + ): ConfigJob { + const newConfigJob = new ConfigJob(); + newConfigJob.job_name = jobName; + newConfigJob.expected_row = expectedRow; + newConfigJob.block_range = blockRange; + newConfigJob.acceptance_error = acceptanceError; + newConfigJob.block_balance = blockBalance; + newConfigJob.is_sync = true; + return newConfigJob; + } + + @Test('Test get best end block with no config job') + public async test1() { + const startBlock = 1; + const endBlock = 100; + const bestEndBlock = ConfigJob.determineBestRangeBlockForRunJob( + startBlock, + endBlock, + undefined, + undefined + ); + expect(bestEndBlock).toBe(endBlock); + } + + @Test( + 'Test get best end block with depend block checkpoint and no config job' + ) + public async test2() { + const startBlock = 1; + const endBlock = 100; + const dependBlockCheckPoint = this.mockBlockCheckPoint( + 'exampleDependingJob', + 50 + ); + + const bestEndBlock = ConfigJob.determineBestRangeBlockForRunJob( + startBlock, + endBlock, + dependBlockCheckPoint, + undefined + ); + + expect(bestEndBlock).toBe(endBlock); + } + + @Test( + 'Test get best end block with config job and no depend block checkpoint' + ) + public async test3() { + const startBlock = 1; + const endBlock = 100; + const configJob = this.mockConfigJob('exampleConfigJob', 1000, 50, 100, 10); + + const bestEndBlock = ConfigJob.determineBestRangeBlockForRunJob( + startBlock, + endBlock, + undefined, + configJob + ); + + expect(bestEndBlock).toBe(startBlock + configJob.block_range); + } + + @Test('Test get best end block with config job and depend block checkpoint') + public async test4() { + const startBlock = 1; + const endBlock = 100; + const configJob = this.mockConfigJob('exampleConfigJob', 1000, 50, 100, 10); + const dependBlockCheckPoint = this.mockBlockCheckPoint( + 'exampleDependingJob', + 40 + ); + + const bestEndBlock = ConfigJob.determineBestRangeBlockForRunJob( + startBlock, + endBlock, + dependBlockCheckPoint, + configJob + ); + + expect(bestEndBlock).toBe(dependBlockCheckPoint.height); + } + + @Test('Test config job with no sync') + public async test5() { + const startBlock = 1; + const endBlock = 100; + const configJob = this.mockConfigJob('exampleConfigJob', 1000, 50, 100, 10); + configJob.is_sync = false; + const dependBlockCheckPoint = this.mockBlockCheckPoint( + 'exampleDependingJob', + 50 + ); + const bestEndBlock = ConfigJob.determineBestRangeBlockForRunJob( + startBlock, + endBlock, + dependBlockCheckPoint, + configJob + ); + expect(bestEndBlock).toBe(endBlock); + } + + @Test('Test prepare balance job with config not sync') + public async test6() { + const bestEndBlock = 100; + const configJob = this.mockConfigJob('exampleConfigJob', 1000, 50, 100, 10); + configJob.is_sync = false; + const dependBlockCheckPoint = this.mockBlockCheckPoint( + 'exampleDependingJob', + 500 + ); + const result = ConfigJob.prepareBalanceJob( + bestEndBlock, + dependBlockCheckPoint, + configJob, + 100 + ); + expect(result).toBe(null); + } + + @Test('Test prepare balance job need to increase range') + public async test7() { + const bestEndBlock = 100; + const baseBlockRange = 50; + const balanceBlock = 10; + const configJob = this.mockConfigJob( + 'exampleConfigJob', + 1000, + baseBlockRange, + 100, + balanceBlock + ); + const dependBlockCheckPoint = this.mockBlockCheckPoint( + 'exampleDependingJob', + 500 + ); + const result = ConfigJob.prepareBalanceJob( + bestEndBlock, + dependBlockCheckPoint, + configJob, + 100 + ); + expect(result).toBeDefined(); + expect(result?.block_range).toEqual(baseBlockRange + balanceBlock); + } + + @Test('Test prepare balance job need to keep config') + public async test8() { + const bestEndBlock = 100; + const baseBlockRange = 50; + const balanceBlock = 10; + const configJob = this.mockConfigJob( + 'exampleConfigJob', + 1000, + baseBlockRange, + 100, + balanceBlock + ); + const dependBlockCheckPoint = this.mockBlockCheckPoint( + 'exampleDependingJob', + 100 + ); + const result = ConfigJob.prepareBalanceJob( + bestEndBlock, + dependBlockCheckPoint, + configJob, + 100 + ); + expect(result).toBe(null); + } + + @Test('Test prepare balance job need to decrease') + public async test9() { + const bestEndBlock = 100; + const baseBlockRange = 50; + const balanceBlock = 10; + const configJob = this.mockConfigJob( + 'exampleConfigJob', + 1000, + baseBlockRange, + 100, + balanceBlock + ); + const dependBlockCheckPoint = this.mockBlockCheckPoint( + 'exampleDependingJob', + 500 + ); + const result = ConfigJob.prepareBalanceJob( + bestEndBlock, + dependBlockCheckPoint, + configJob, + 1200 + ); + expect(result).toBeDefined(); + expect(result?.block_range).toBe(baseBlockRange - balanceBlock); + } + + @Test('Test prepare balance job need to keep because acceptance error') + public async test10() { + const bestEndBlock = 100; + const baseBlockRange = 50; + const balanceBlock = 10; + const configJob = this.mockConfigJob( + 'exampleConfigJob', + 1000, + baseBlockRange, + 100, + balanceBlock + ); + const dependBlockCheckPoint = this.mockBlockCheckPoint( + 'exampleDependingJob', + 500 + ); + const result = ConfigJob.prepareBalanceJob( + bestEndBlock, + dependBlockCheckPoint, + configJob, + 1100 + ); + expect(result).toBe(null); + } +} diff --git a/test/unit/services/cw721/cw721.spec.ts b/test/unit/services/cw721/cw721.spec.ts index af65cad80..ada211afc 100644 --- a/test/unit/services/cw721/cw721.spec.ts +++ b/test/unit/services/cw721/cw721.spec.ts @@ -1878,6 +1878,7 @@ export default class AssetIndexerTest { job_name: 'dfdsfgsg', height: 100, }), + undefined, ]); const msgs = [ SmartContractEvent.fromJson({