Skip to content

Commit

Permalink
Merge pull request #627 from TNG/mongo-errors-624
Browse files Browse the repository at this point in the history
Mongo errors 624
  • Loading branch information
YaniKolev authored Oct 20, 2023
2 parents 9885c08 + 6702f29 commit 2dc574f
Show file tree
Hide file tree
Showing 12 changed files with 225 additions and 236 deletions.
4 changes: 0 additions & 4 deletions src/repository/JobRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ export class JobRepository extends Repository<JobEntity> {
return job?.executionInfo;
}

async clear(): Promise<void> {
await this.delete();
}

async define(job: JobDefinition): Promise<void> {
const { name, schedule, concurrency, maxRunning } = job;

Expand Down
4 changes: 0 additions & 4 deletions src/repository/Repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ export class Repository<ENTITY extends { _id?: ObjectId }> {
return result.deletedCount;
}

async deleteOne(filter: Filter<ENTITY>): Promise<void> {
await this.collection.deleteOne(filter);
}

// mongodb returns null instead of undefined for optional fields
private mapNullToUndefined(entity: WithId<ENTITY>): WithId<ENTITY> {
const keys = Object.keys(entity);
Expand Down
144 changes: 95 additions & 49 deletions src/repository/SchedulesRepository.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Filter, FindOneAndUpdateOptions, MongoClient, MongoServerError } from 'mongodb';
import { DateTime } from 'luxon';
import { FindOneAndUpdateOptions, MongoClient } from 'mongodb';

import { ScheduleEntity } from './ScheduleEntity';
import { Repository } from './Repository';
import { Logger } from '../logging/Logger';
import { MomoErrorType } from '../logging/error/MomoErrorType';
import { MomoEventData } from '../logging/MomoEvents';

export const SCHEDULES_COLLECTION_NAME = 'schedules';

Expand All @@ -13,6 +14,8 @@ const mongoOptions: FindOneAndUpdateOptions & { includeResultMetadata: true } =
includeResultMetadata: true, // ensures backwards compatibility with mongodb <6
};

const duplicateKeyErrorCode = 11000;

export class SchedulesRepository extends Repository<ScheduleEntity> {
private logger: Logger | undefined;

Expand All @@ -30,97 +33,140 @@ export class SchedulesRepository extends Repository<ScheduleEntity> {
this.logger = logger;
}

getLogData(): MomoEventData {
return { name: this.name, scheduleId: this.scheduleId };
}

async deleteOne(): Promise<void> {
await this.collection.deleteOne({ scheduleId: this.scheduleId });
}

/**
* Checks the state of the schedule represented by this repository.
*
* @param threshold a schedule older than (i.e. timestamp below) the threshold is considered dead and will be replaced
* @returns the schedule's state
*/
private async isActiveSchedule(threshold: number): Promise<boolean> {
const activeSchedule = await this.collection.findOne({ name: this.name });

if (activeSchedule === null || activeSchedule.scheduleId === this.scheduleId) {
return true;
}

return activeSchedule.lastAlive < threshold; // if activeSchedule is too old, take over and make this one active
}

/**
* Checks if there is an alive active schedule in the database for the given name.
* Tries to set this instance as active schedule of this name.
*
* There are 4 possible cases:
*
* If there is one -> return true if it is this schedule, otherwise false.
* If there is no such schedule -> we try inserting this schedule as the active one.
* ↳ If it worked return true, otherwise false.
* 1) Another instance already is active for this name. This instance does not need to become active. Nothing is done.
*
* 2) Another instance was active, but it's last ping was before the threshold. Hence, it is considered dead and this instance will take over and become active. The DB is updated accordingly.
*
* 3) There is currently no active schedule with this name. In this case, this instance will become the active schedule. The DB is updated accordingly.
*
* 4) This instance already is active. It will stay active and send a ping to the DB to indicate that it is still alive.
*
* @returns true if this instance is now active for the schedule with this name (cases 2-4), false otherwise (case 1)
*/
async isActiveSchedule(name: string): Promise<boolean> {
const lastAlive = DateTime.now().toMillis();
const threshold = lastAlive - this.deadScheduleThreshold;
async setActiveSchedule(): Promise<boolean> {
const now = DateTime.now().toMillis();
const threshold = now - this.deadScheduleThreshold;

const active = await this.isActiveSchedule(now);
if (!active) {
this.logger?.debug('This schedule is not active', this.getLogData());
return false;
}

const deadSchedule: Filter<ScheduleEntity> = { name: this.name, lastAlive: { $lt: threshold } };
const thisSchedule: Filter<ScheduleEntity> = { scheduleId: this.scheduleId };

const updatedSchedule: Partial<ScheduleEntity> = {
name: this.name,
scheduleId: this.scheduleId,
lastAlive: now,
executions: {},
};

try {
const result = await this.collection.findOneAndUpdate(
{ lastAlive: { $lt: threshold }, name },
{
$set: {
name,
scheduleId: this.scheduleId,
lastAlive,
executions: {},
},
},
{
...mongoOptions,
upsert: true,
},
await this.collection.updateOne(
// we already checked with isActiveSchedule that this instance should be the active one, but to prevent
// concurrent modification, we use a filter to ensure that we only overwrite a dead schedule or ping this schedule
{ $or: [deadSchedule, thisSchedule] },
{ $set: updatedSchedule },
{ upsert: true },
);

return result.value === null ? false : result.value.scheduleId === this.scheduleId;
return true;
} catch (error) {
// We seem to have a schedule that's alive. The unique name index probably prevented the upsert. Is this one the active schedule?
const aliveSchedule = await this.collection.findOne({ name });
if (aliveSchedule === null) {
if (error instanceof MongoServerError && error.code === duplicateKeyErrorCode) {
this.logger?.debug(
'Cannot set active schedule - another schedule with this name is already active',
this.getLogData(),
);
} else {
this.logger?.error(
'The database reported an unexpected error',
MomoErrorType.internal,
{ scheduleId: this.scheduleId },
this.getLogData(),
error,
);
}
return aliveSchedule?.scheduleId === this.scheduleId;
}
}

async ping(scheduleId = this.scheduleId): Promise<void> {
await this.updateOne({ scheduleId }, { $set: { lastAlive: DateTime.now().toMillis() } });
return false;
}
}

/**
* This unique index ensures that we do not insert more than one active schedule per schedule name
* into the repository.
*/
async createIndex(): Promise<void> {
// this unique index ensures that we do not insert more than one active schedule
// in the repository per schedule name
await this.collection.createIndex({ name: 1 }, { unique: true });
}

async removeJob(scheduleId: string, name: string): Promise<void> {
const schedulesEntity = await this.findOne({ scheduleId });
async removeJob(jobName: string): Promise<void> {
const schedulesEntity = await this.findOne({ scheduleId: this.scheduleId });
if (!schedulesEntity) {
throw new Error(`schedulesEntity not found for scheduleId=${scheduleId}`);
throw new Error(`schedulesEntity not found for scheduleId=${this.scheduleId}`);
}

const executions = schedulesEntity.executions;
delete executions[name];
await this.updateOne({ scheduleId }, { $set: { executions } });
delete executions[jobName];
await this.updateOne({ scheduleId: this.scheduleId }, { $set: { executions } });
}

async addExecution(name: string, maxRunning: number): Promise<{ added: boolean; running: number }> {
async addExecution(jobName: string, maxRunning: number): Promise<{ added: boolean; running: number }> {
if (maxRunning < 1) {
const schedule = await this.collection.findOneAndUpdate(
{ name: this.name },
{ $inc: { [`executions.${name}`]: 1 } },
{ $inc: { [`executions.${jobName}`]: 1 } },
mongoOptions,
);
return { added: schedule.value !== null, running: schedule.value?.executions[name] ?? 0 };
return { added: schedule.value !== null, running: schedule.value?.executions[jobName] ?? 0 };
}

const schedule = await this.collection.findOneAndUpdate(
{
name: this.name,
$or: [{ [`executions.${name}`]: { $lt: maxRunning } }, { [`executions.${name}`]: { $exists: false } }],
$or: [{ [`executions.${jobName}`]: { $lt: maxRunning } }, { [`executions.${jobName}`]: { $exists: false } }],
},
{ $inc: { [`executions.${name}`]: 1 } },
{ $inc: { [`executions.${jobName}`]: 1 } },
mongoOptions,
);

return { added: schedule.value !== null, running: schedule.value?.executions[name] ?? maxRunning };
return { added: schedule.value !== null, running: schedule.value?.executions[jobName] ?? maxRunning };
}

async removeExecution(name: string): Promise<void> {
await this.updateOne({ name: this.name }, { $inc: { [`executions.${name}`]: -1 } });
async removeExecution(jobName: string): Promise<void> {
await this.updateOne({ name: this.name }, { $inc: { [`executions.${jobName}`]: -1 } });
}

async countRunningExecutions(name: string): Promise<number> {
return (await this.findOne({ scheduleId: this.scheduleId }))?.executions[name] ?? 0;
async countRunningExecutions(jobName: string): Promise<number> {
return (await this.findOne({ scheduleId: this.scheduleId }))?.executions[jobName] ?? 0;
}
}
18 changes: 1 addition & 17 deletions src/schedule/MongoSchedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ export class MongoSchedule extends Schedule {
protected readonly scheduleId: string,
protected readonly connection: Connection,
pingIntervalMs: number,
private readonly scheduleName: string,
) {
const schedulesRepository = connection.getSchedulesRepository();
const jobRepository = connection.getJobRepository();
Expand All @@ -39,8 +38,6 @@ export class MongoSchedule extends Schedule {

this.disconnectFct = connection.disconnect.bind(connection);
this.schedulePing = new SchedulePing(
scheduleId,
scheduleName,
schedulesRepository,
this.logger,
pingIntervalMs,
Expand All @@ -61,7 +58,7 @@ export class MongoSchedule extends Schedule {
const scheduleId = uuid();
const connection = await Connection.create(connectionOptions, pingIntervalMs, scheduleId, scheduleName);

return new MongoSchedule(scheduleId, connection, pingIntervalMs, scheduleName);
return new MongoSchedule(scheduleId, connection, pingIntervalMs);
}

/**
Expand All @@ -87,19 +84,6 @@ export class MongoSchedule extends Schedule {
return this.schedulePing.start();
}

/**
* Returns whether this schedule is currently active.
*
* Only the active schedule will schedule jobs and try to execute them.
* There is always only one active schedule per mongo database.
*
* @throws if the database throws
*/
public async isActiveSchedule(): Promise<boolean> {
const schedulesRepository = this.connection.getSchedulesRepository();
return schedulesRepository.isActiveSchedule(this.scheduleName);
}

private async startAllJobs(): Promise<void> {
await Promise.all(Object.values(this.getJobSchedulers()).map(async (jobScheduler) => jobScheduler.start()));
}
Expand Down
8 changes: 1 addition & 7 deletions src/schedule/Schedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,7 @@ export class Schedule extends LogEmitter {

await this.jobRepository.define(job);

this.jobSchedulers[job.name] = JobScheduler.forJob(
this.scheduleId,
job,
this.logger,
this.schedulesRepository,
this.jobRepository,
);
this.jobSchedulers[job.name] = JobScheduler.forJob(job, this.logger, this.schedulesRepository, this.jobRepository);

return true;
}
Expand Down
37 changes: 19 additions & 18 deletions src/schedule/SchedulePing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ export class SchedulePing {
private startJobsStatus: StartJobsStatus = StartJobsStatus.notStarted;

constructor(
private readonly scheduleId: string,
private readonly scheduleName: string,
private readonly schedulesRepository: SchedulesRepository,
private readonly logger: Logger,
private readonly interval: number,
Expand All @@ -30,33 +28,36 @@ export class SchedulePing {
try {
await this.checkActiveSchedule();
} catch (e) {
this.logger.error(errorMessage, MomoErrorType.internal, {}, e);
this.logger.error(errorMessage, MomoErrorType.internal, this.schedulesRepository.getLogData(), e);
}
this.handle = setSafeInterval(this.checkActiveSchedule.bind(this), this.interval, this.logger, errorMessage);
}

private async checkActiveSchedule(): Promise<void> {
const active = await this.schedulesRepository.isActiveSchedule(this.scheduleName);
this.logger.debug(`This schedule is ${active ? '' : 'not '}active`);
if (active) {
await this.schedulesRepository.ping(this.scheduleId);
if (this.startJobsStatus === StartJobsStatus.notStarted) {
this.startJobsStatus = StartJobsStatus.inProgress;
this.logger.debug('This schedule just turned active');

await this.startAllJobs();

this.startJobsStatus = StartJobsStatus.finished;
this.logger.debug('Finished starting scheduled jobs');
}
const active = await this.schedulesRepository.setActiveSchedule();
if (!active) {
return;
}

this.logger.debug('This schedule is active', this.schedulesRepository.getLogData());

if (this.startJobsStatus !== StartJobsStatus.notStarted) {
return;
}

this.startJobsStatus = StartJobsStatus.inProgress;
this.logger.debug('This schedule just turned active', this.schedulesRepository.getLogData());
await this.startAllJobs();
this.startJobsStatus = StartJobsStatus.finished;
this.logger.debug('Finished starting scheduled jobs', this.schedulesRepository.getLogData());
}

async stop(): Promise<void> {
if (this.handle) {
this.logger.debug('stop SchedulePing', { scheduleId: this.scheduleId });
this.logger.debug('stop SchedulePing', this.schedulesRepository.getLogData());
clearInterval(this.handle);
}
await this.schedulesRepository.deleteOne({ scheduleId: this.scheduleId });

await this.schedulesRepository.deleteOne();
}
}
6 changes: 2 additions & 4 deletions src/scheduler/JobScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,19 @@ export class JobScheduler {
constructor(
private readonly jobName: string,
private readonly jobExecutor: JobExecutor,
private readonly scheduleId: string,
private readonly schedulesRepository: SchedulesRepository,
private readonly jobRepository: JobRepository,
private readonly logger: Logger,
) {}

static forJob(
scheduleId: string,
job: Job,
logger: Logger,
schedulesRepository: SchedulesRepository,
jobRepository: JobRepository,
): JobScheduler {
const executor = new JobExecutor(job.handler, schedulesRepository, jobRepository, logger);
return new JobScheduler(job.name, executor, scheduleId, schedulesRepository, jobRepository, logger);
return new JobScheduler(job.name, executor, schedulesRepository, jobRepository, logger);
}

getUnexpectedErrorCount(): number {
Expand Down Expand Up @@ -102,7 +100,7 @@ export class JobScheduler {
if (this.executableSchedule) {
this.executableSchedule.stop();
this.jobExecutor.stop();
await this.schedulesRepository.removeJob(this.scheduleId, this.jobName);
await this.schedulesRepository.removeJob(this.jobName);
this.executableSchedule = undefined;
}
}
Expand Down
Loading

0 comments on commit 2dc574f

Please sign in to comment.