Skip to content

Commit

Permalink
refactor: change setActiveSchedule to call isActiveSchedule internally
Browse files Browse the repository at this point in the history
Signed-off-by: Ute Weiss <[email protected]>
  • Loading branch information
weissu42 committed Oct 6, 2023
1 parent eeaadd7 commit 327f351
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 133 deletions.
36 changes: 25 additions & 11 deletions src/repository/SchedulesRepository.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Filter, FindOneAndUpdateOptions, MongoClient, MongoServerError } from 'mongodb';
import { DateTime } from 'luxon';

import { ScheduleEntity } from './ScheduleEntity';
import { Repository } from './Repository';
Expand Down Expand Up @@ -43,15 +44,10 @@ export class SchedulesRepository extends Repository<ScheduleEntity> {
/**
* Checks the state of the schedule represented by this repository.
*
* INACTIVE: There is currently no active instance for a schedule with this name.
* DIFFERENT_INSTANCE_ACTIVE: Another instance (but not this one) is active for the schedule with this name.
* THIS_INSTANCE_ACTIVE: This instance is active for the schedule with this name.
*
* @param now timestamp in milliseconds
* @param threshold a schedule older than (i.e. timestamp below) the threshold is considered dead and will be replaced
* @returns the schedule's state
*/
async isActiveSchedule(now: number): Promise<boolean> {
const threshold = now - this.deadScheduleThreshold;
private async isActiveSchedule(threshold: number): Promise<boolean> {
const activeSchedule = await this.collection.findOne({ name: this.name });

if (activeSchedule === null || activeSchedule.scheduleId === this.scheduleId) {
Expand All @@ -62,14 +58,30 @@ export class SchedulesRepository extends Repository<ScheduleEntity> {
}

/**
* Tries to set this instance as active
* Tries to set this instance as active schedule of this name.
*
* There are 4 possible cases:
*
* 1) Another instance already is active for this name. This instance does not need to become active. Nothing is done.
*
* 2) Another instance was active, but it's last ping was before the threshold. Hence, it is considered dead and this instance will take over and become active. The DB is updated accordingly.
*
* @param now timestamp in milliseconds
* @returns true if this instance is now active for the schedule with this name, false otherwise
* 3) There is currently no active schedule with this name. In this case, this instance will become the active schedule. The DB is updated accordingly.
*
* 4) This instance already is active. It will stay active and send a ping to the DB to indicate that it is still alive.
*
* @returns true if this instance is now active for the schedule with this name (cases 2-4), false otherwise (case 1)
*/
async setActiveSchedule(now: number): Promise<boolean> {
async setActiveSchedule(): Promise<boolean> {
const now = DateTime.now().toMillis();
const threshold = now - this.deadScheduleThreshold;

const active = await this.isActiveSchedule(now);
if (!active) {
this.logger?.debug('This schedule is not active', this.getLogData());
return false;
}

const deadSchedule: Filter<ScheduleEntity> = { name: this.name, lastAlive: { $lt: threshold } };
const thisSchedule: Filter<ScheduleEntity> = { scheduleId: this.scheduleId };

Expand All @@ -82,6 +94,8 @@ export class SchedulesRepository extends Repository<ScheduleEntity> {

try {
await this.collection.updateOne(
// we already checked with isActiveSchedule that this instance should be the active one, but to prevent
// concurrent modification, we use a filter to ensure that we only overwrite a dead schedule or ping this schedule
{ $or: [deadSchedule, thisSchedule] },
{ $set: updatedSchedule },
{ ...mongoOptions, upsert: true },
Expand Down
10 changes: 1 addition & 9 deletions src/schedule/SchedulePing.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { DateTime } from 'luxon';

import { SchedulesRepository } from '../repository/SchedulesRepository';
import { Logger } from '../logging/Logger';
import { setSafeInterval } from '../timeout/safeTimeouts';
Expand Down Expand Up @@ -36,14 +34,8 @@ export class SchedulePing {
}

private async checkActiveSchedule(): Promise<void> {
const now = DateTime.now().toMillis();
const active = await this.schedulesRepository.isActiveSchedule(now);
const active = await this.schedulesRepository.setActiveSchedule();
if (!active) {
this.logger.debug('This schedule is not active', this.schedulesRepository.getLogData());
return;
}

if (!(await this.schedulesRepository.setActiveSchedule(now))) {
return;
}

Expand Down
98 changes: 18 additions & 80 deletions test/repository/SchedulesRepository.integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { DateTime } from 'luxon';

import { Connection } from '../../src/Connection';
import { SchedulesRepository } from '../../src/repository/SchedulesRepository';
import { sleep } from '../utils/sleep';

describe('SchedulesRepository', () => {
const scheduleName = 'schedule';
Expand Down Expand Up @@ -32,27 +33,26 @@ describe('SchedulesRepository', () => {

describe('setActiveSchedule', () => {
it('sets single schedule as active', async () => {
await schedulesRepository.setActiveSchedule(DateTime.now().toMillis());
await schedulesRepository.setActiveSchedule();

const schedule = await schedulesRepository.findOne({ name: scheduleName, scheduleId });
expect(schedule).not.toBeNull();
});

it('updates timestamp', async () => {
const now = DateTime.now().toMillis();
await schedulesRepository.setActiveSchedule(now - pingInterval);
await schedulesRepository.setActiveSchedule();
const schedulesEntity = await schedulesRepository.findOne({ scheduleId });

await schedulesRepository.setActiveSchedule(now);
await sleep(pingInterval);
await schedulesRepository.setActiveSchedule();

const schedulesEntityAfterPing = await schedulesRepository.findOne({ scheduleId });
expect(schedulesEntityAfterPing?.lastAlive).toBeGreaterThan(schedulesEntity!.lastAlive);
});

it('refuses to set another active schedule', async () => {
const now = DateTime.now().toMillis();
const lastAlive = now - 10;
await schedulesRepository.setActiveSchedule(lastAlive);
await schedulesRepository.setActiveSchedule();
const lastAlive = DateTime.now().toMillis();

const anotherScheduleId = 'not active';
const anotherInstance = await Connection.create(
Expand All @@ -65,7 +65,7 @@ describe('SchedulesRepository', () => {
const debug = jest.fn();
anotherSchedulesRepository.setLogger({ debug, error: jest.fn() });

const active = await anotherSchedulesRepository.setActiveSchedule(now);
const active = await anotherSchedulesRepository.setActiveSchedule();
await anotherInstance.disconnect();

expect(debug).toHaveBeenCalled();
Expand All @@ -74,20 +74,18 @@ describe('SchedulesRepository', () => {
const schedules = await schedulesRepository.find({ name: scheduleName });
expect(schedules).toHaveLength(1);
expect(schedules[0]?.scheduleId).toEqual(scheduleId);
expect(schedules[0]?.lastAlive).toEqual(lastAlive);
expect(schedules[0]?.lastAlive).toBeLessThanOrEqual(lastAlive);
});

it('only one schedule of many concurrent ones is active', async () => {
it('sets only one schedule of many concurrent ones as active', async () => {
const connections = await Promise.all(
['a', 'b', 'c', 'd', 'e'].map(async (id) =>
Connection.create({ url: mongo.getUri() }, pingInterval, id, scheduleName),
),
);

const active = await Promise.all(
connections.map(async (connection) =>
connection.getSchedulesRepository().setActiveSchedule(DateTime.now().toMillis()),
),
connections.map(async (connection) => connection.getSchedulesRepository().setActiveSchedule()),
);

await Promise.all(connections.map(async (connection) => connection.disconnect()));
Expand All @@ -97,15 +95,15 @@ describe('SchedulesRepository', () => {
expect(schedules).toHaveLength(1);
});

it('should replace dead schedules', async () => {
const now = DateTime.now().toMillis();
await schedulesRepository.setActiveSchedule(now - 2 * pingInterval - 10);
it('replaces a dead schedule', async () => {
await schedulesRepository.setActiveSchedule();
const otherScheduleId = 'other schedule ID';

secondConnection = await Connection.create({ url: mongo.getUri() }, pingInterval, otherScheduleId, scheduleName);
const secondSchedulesRepository = secondConnection.getSchedulesRepository();

const active = await secondSchedulesRepository.setActiveSchedule(now);
await sleep(2 * pingInterval + 10);
const active = await secondSchedulesRepository.setActiveSchedule();

expect(active).toEqual(true);
});
Expand All @@ -117,76 +115,16 @@ describe('SchedulesRepository', () => {
secondConnection = await Connection.create({ url: mongo.getUri() }, pingInterval, otherScheduleId, otherName);
const secondSchedulesRepository = secondConnection.getSchedulesRepository();

const active = await schedulesRepository.setActiveSchedule(DateTime.now().toMillis());
const secondActive = await secondSchedulesRepository.setActiveSchedule(DateTime.now().toMillis());
const active = await schedulesRepository.setActiveSchedule();
const secondActive = await secondSchedulesRepository.setActiveSchedule();

expect(active).toEqual(true);
expect(secondActive).toEqual(true);
});
});

describe('getScheduleState', () => {
it('detects active schedule', async () => {
await schedulesRepository.setActiveSchedule(DateTime.now().toMillis());

const active = await schedulesRepository.isActiveSchedule(DateTime.now().toMillis());

expect(active).toBe(true);
});

it('detects active schedule after ping interval elapsed', async () => {
const now = DateTime.now().toMillis();
await schedulesRepository.setActiveSchedule(now - 2 * pingInterval - 10);

const active = await schedulesRepository.isActiveSchedule(now);

expect(active).toBe(true);
});

it('detects other active schedule with identical name', async () => {
secondConnection = await Connection.create({ url: mongo.getUri() }, pingInterval, 'other schedule', scheduleName);
const secondSchedulesRepository = secondConnection.getSchedulesRepository();

const now = DateTime.now().toMillis();
await schedulesRepository.setActiveSchedule(now);

const active = await secondSchedulesRepository.isActiveSchedule(now);

expect(active).toBe(false);
});

it('does not consider dead schedule with identical name', async () => {
secondConnection = await Connection.create({ url: mongo.getUri() }, pingInterval, 'other schedule', scheduleName);
const secondSchedulesRepository = secondConnection.getSchedulesRepository();
const now = DateTime.now().toMillis();

await schedulesRepository.setActiveSchedule(now - 2 * pingInterval - 10);

const active = await secondSchedulesRepository.isActiveSchedule(now);

expect(active).toBe(true);
});

it('does not consider schedule with different name', async () => {
secondConnection = await Connection.create(
{ url: mongo.getUri() },
pingInterval,
'other schedule',
'other schedule',
);
const secondSchedulesRepository = secondConnection.getSchedulesRepository();

const now = DateTime.now().toMillis();
await schedulesRepository.setActiveSchedule(now);

const active = await secondSchedulesRepository.isActiveSchedule(now);

expect(active).toBe(true);
});
});

describe('with active schedule', () => {
beforeEach(async () => schedulesRepository.setActiveSchedule(DateTime.now().toMillis()));
beforeEach(async () => schedulesRepository.setActiveSchedule());

describe('removeJob', () => {
it('can remove a job', async () => {
Expand Down
8 changes: 4 additions & 4 deletions test/schedule/MongoSchedule.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { anyNumber, instance, mock, verify, when } from 'ts-mockito';
import { instance, mock, verify, when } from 'ts-mockito';

import { SchedulesRepository } from '../../src/repository/SchedulesRepository';
import { JobRepository } from '../../src/repository/JobRepository';
Expand Down Expand Up @@ -27,7 +27,7 @@ describe('MongoSchedule', () => {
});

it('connects and starts the ping and disconnects and stops the ping', async () => {
when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(true);
when(schedulesRepository.setActiveSchedule()).thenResolve(true);

const mongoSchedule = await MongoSchedule.connect({ scheduleName: 'schedule', url: 'mongodb://does.not/matter' });
const secondSchedule = await MongoSchedule.connect({
Expand All @@ -36,9 +36,9 @@ describe('MongoSchedule', () => {
});

await mongoSchedule.start();
verify(schedulesRepository.setActiveSchedule(anyNumber())).once();
verify(schedulesRepository.setActiveSchedule()).once();
await secondSchedule.start();
verify(schedulesRepository.setActiveSchedule(anyNumber())).twice();
verify(schedulesRepository.setActiveSchedule()).twice();

await mongoSchedule.disconnect();
await secondSchedule.disconnect();
Expand Down
5 changes: 2 additions & 3 deletions test/schedule/Schedule.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { anyNumber, deepEqual, instance, mock, when } from 'ts-mockito';
import { deepEqual, instance, mock, when } from 'ts-mockito';
import { ObjectId } from 'mongodb';

import { ExecutionStatus, MomoEvent, MomoJob, MomoOptions, MongoSchedule } from '../../src';
Expand Down Expand Up @@ -39,8 +39,7 @@ describe('Schedule', () => {
jest.clearAllMocks();

when(jobRepository.find(deepEqual({ name: momoJob.name }))).thenResolve([]);
when(schedulesRepository.isActiveSchedule(anyNumber())).thenResolve(true);
when(schedulesRepository.setActiveSchedule(anyNumber())).thenResolve(true);
when(schedulesRepository.setActiveSchedule()).thenResolve(true);

mongoSchedule = await MongoSchedule.connect({ scheduleName, url: 'mongodb://does.not/matter' });
initLoggingForTests(mongoSchedule);
Expand Down
Loading

0 comments on commit 327f351

Please sign in to comment.