Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/job config #767

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions migrations/20240419100359_config_job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { Knex } from 'knex';
import { ConfigJob } from '../src/models';

export async function up(knex: Knex): Promise<void> {
await knex.schema.createTable(ConfigJob.tableName, (table) => {
table.increments();
table.string('job_name').index().notNullable();
table.integer('expected_row').notNullable();
table.integer('block_range').notNullable();
table.integer('acceptance_error').notNullable();
table.integer('block_balance').notNullable();
table.boolean('is_sync').defaultTo(false).notNullable();
});
}

export async function down(knex: Knex): Promise<void> {}
9 changes: 7 additions & 2 deletions src/models/block_checkpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export class BlockCheckpoint extends BaseModel {
jobName: string,
lastHeightJobNames: string[],
configName?: string
): Promise<[number, number, BlockCheckpoint]> {
): Promise<[number, number, BlockCheckpoint, BlockCheckpoint | undefined]> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vậy là chưa dùng cái param cuối à?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

à, chỉ return thêm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Đúng rồi anh, em extend cái func của Phong ra để handle thêm cái logic job A phụ thuộc vào job B,C,D

const [jobCheckpoint, lastHeightCheckpoint] = await Promise.all([
BlockCheckpoint.query().select('*').where('job_name', jobName).first(),
BlockCheckpoint.query()
Expand Down Expand Up @@ -64,6 +64,11 @@ export class BlockCheckpoint extends BaseModel {
)
: lastHeightCheckpoint.height;

return [startHeight, endHeight, updateBlockCheckpoint];
return [
startHeight,
endHeight,
updateBlockCheckpoint,
lastHeightCheckpoint,
];
}
}
115 changes: 115 additions & 0 deletions src/models/config_job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import BaseModel from './base';
import { BlockCheckpoint } from './block_checkpoint';

export class ConfigJob extends BaseModel {
id!: number;

job_name!: string;

// Expected row get for handle when each time job run
expected_row!: number;

// Range query by block
block_range!: number;

// ABS(rows retrieve - expected_row) should less than acceptance_error, if not do the block_balance below
acceptance_error!: number;

// When get rows by block range not fit with expected_row, then increase/decrease block_range for fit with
block_balance!: number;

// Keep job run normal or use config job and sync
is_sync!: boolean;

static get tableName() {
return 'config_jobs';
}

/**
* @description:
* @param jobName Job name want to query to get config
* job crawl block complete on that block we want to crawl
*/
static async getConfigJob(jobName: string): Promise<ConfigJob | undefined> {
return ConfigJob.query().select('*').where('job_name', jobName).first();
}

// Range job will from currentStartBlock to bestEndBlock
static determineBestRangeBlockForRunJob(
currentStartBlock: number,
currentEndBlock: number,
dependingCheckPointJob: BlockCheckpoint | undefined,
configJob: ConfigJob | undefined
): number {
let bestEndBlock = currentEndBlock;

// No config job, then use default config from block checkpoint
if (!configJob || !configJob.is_sync) return bestEndBlock;

if (dependingCheckPointJob) {
if (
currentStartBlock + configJob.block_range <=
dependingCheckPointJob.height
) {
bestEndBlock = currentStartBlock + configJob.block_range;
} else {
bestEndBlock = dependingCheckPointJob.height;
}
} else {
bestEndBlock = currentStartBlock + configJob.block_range;
}

return bestEndBlock;
}

// Balance config job
static prepareBalanceJob(
bestEndBlock: number,
dependingCheckPointJob: BlockCheckpoint | undefined,
configJob: ConfigJob | undefined,
currentTotalRowFetch: number
): ConfigJob | null {
// No job config so keep every thing normal
if (!configJob || !configJob.is_sync) return null;

// Job depending on another job, so keep current config job
if (bestEndBlock === dependingCheckPointJob?.height) return null;

const errorRow = configJob.expected_row - currentTotalRowFetch;
// error acceptance
if (Math.abs(errorRow) <= configJob.acceptance_error) return null;

if (errorRow > 0) {
// increase range block if total row fetch too low with expected row
// eslint-disable-next-line no-param-reassign
configJob.block_range += configJob.block_balance;
} else {
// decrease range block if total row fetch too low with expected row
// eslint-disable-next-line no-param-reassign
configJob.block_range -= configJob.block_balance;
}

return configJob;
}

static get jsonSchema() {
return {
type: 'object',
required: [
'job_name',
'expected_row',
'block_range',
'acceptance_error',
'block_balance',
],
properties: {
id: { type: 'number' },
job_name: { type: 'string' },
expected_row: { type: 'number' },
block_range: { type: 'number' },
acceptance_error: { type: 'number' },
block_balance: { type: 'number' },
},
};
}
}
1 change: 1 addition & 0 deletions src/models/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ export * from './erc20_activity';
export * from './evm_proxy_history';
export * from './evm_internal_transaction';
export * from './account_balance';
export * from './config_job';
32 changes: 30 additions & 2 deletions src/services/crawl-tx/crawl_tx.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
import {
Block,
BlockCheckpoint,
ConfigJob,
Event,
Transaction,
TransactionMessage,
Expand Down Expand Up @@ -85,7 +86,7 @@ export default class CrawlTxService extends BullableService {
jobName: BULL_JOB_NAME.HANDLE_TRANSACTION,
})
public async jobHandlerCrawlTx(): Promise<void> {
const [startBlock, endBlock, blockCheckpoint] =
const [startBlock, endBlock, blockCheckpoint, dependBlockCheckPoint] =
await BlockCheckpoint.getCheckpoint(
BULL_JOB_NAME.HANDLE_TRANSACTION,
[BULL_JOB_NAME.CRAWL_TRANSACTION],
Expand All @@ -99,11 +100,30 @@ export default class CrawlTxService extends BullableService {
if (startBlock >= endBlock) {
return;
}

const configJob = await ConfigJob.getConfigJob(
BULL_JOB_NAME.HANDLE_TRANSACTION
);
const bestEndBlock = ConfigJob.determineBestRangeBlockForRunJob(
startBlock,
endBlock,
dependBlockCheckPoint,
configJob
);

const listTxRaw = await Transaction.query()
.where('height', '>', startBlock)
.andWhere('height', '<=', endBlock)
.andWhere('height', '<=', bestEndBlock)
.orderBy('height', 'asc')
.orderBy('index', 'asc');

const balanceConfigJob = ConfigJob.prepareBalanceJob(
bestEndBlock,
dependBlockCheckPoint,
configJob,
listTxRaw.length
);

await knex.transaction(async (trx) => {
await this.insertRelatedTx(listTxRaw, trx);
if (blockCheckpoint) {
Expand All @@ -115,6 +135,14 @@ export default class CrawlTxService extends BullableService {
.returning('id')
.transacting(trx);
}
if (balanceConfigJob) {
await ConfigJob.query()
.insert(balanceConfigJob)
.onConflict('job_name')
.merge()
.returning('id')
.transacting(trx);
}
});
}

Expand Down
Loading
Loading