Skip to content

Commit 86d872e

Browse files
author
AnatolyUss
committed
improved disaster recovery mechanism
1 parent 46f1848 commit 86d872e

File tree

6 files changed

+130
-32
lines changed

6 files changed

+130
-32
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ from MySQL to PostgreSQL as easy and smooth as possible.</p>
6161
<a href="mailto:[email protected]?subject=NMIG">[email protected]</a></p>
6262

6363
<h3>VERSION</h3>
64-
<p>Current version is 2.4.0<br />
64+
<p>Current version is 2.5.0<br />
6565
(major version . improvements . bug fixes)</p>
6666

6767

migration/fmtp/ConsistencyEnforcer.js

Lines changed: 108 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
*/
2121
'use strict';
2222

23+
const generateError = require('./ErrorGenerator');
24+
const extraConfigProcessor = require('./ExtraConfigProcessor');
25+
2326
/**
2427
* Update consistency state.
2528
*
@@ -53,14 +56,15 @@ const updateConsistencyState = (self, dataPoolId) => {
5356
}
5457

5558
/**
56-
* Get consistency state.
59+
* Get the `is_started` value of current chunk.
5760
*
5861
* @param {Conversion} self
5962
* @param {Number} dataPoolId
6063
*
6164
* @returns {Promise}
6265
*/
63-
const getConsistencyState = (self, dataPoolId) => {
66+
67+
const getIsStarted = (self, dataPoolId) => {
6468
return new Promise(resolve => {
6569
self._pg.connect((error, client, done) => {
6670
if (error) {
@@ -85,27 +89,123 @@ const getConsistencyState = (self, dataPoolId) => {
8589
});
8690
}
8791

92+
/**
93+
* Current data chunk runs after a disaster recovery.
94+
* Must determine if current chunk has already been loaded.
95+
* This is in order to prevent possible data duplications.
96+
*
97+
* @param {Conversion} self
98+
* @param {Object} chunk
99+
*
100+
* @returns {Promise}
101+
*/
102+
const hasCurrentChunkLoaded = (self, chunk) => {
103+
return new Promise(resolve => {
104+
self._pg.connect((pgError, client, done) => {
105+
if (pgError) {
106+
generateError(self, '\t--[ConsistencyEnforcer::hasCurrentChunkLoaded] Cannot connect to PostgreSQL server...\n' + pgError);
107+
resolve(true);
108+
} else {
109+
const originalTableName = extraConfigProcessor.getTableName(self, chunk._tableName, true);
110+
const sql = 'SELECT EXISTS(SELECT 1 FROM "' + self._schema + '"."' + chunk._tableName
111+
+ '" WHERE "' + self._schema + '_' + originalTableName + '_data_chunk_id_temp" = ' + chunk._id + ');';
112+
113+
client.query(sql, (err, result) => {
114+
done();
115+
116+
if (err) {
117+
generateError(self, '\t--[ConsistencyEnforcer::hasCurrentChunkLoaded] ' + err, sql);
118+
resolve(true);
119+
} else {
120+
resolve(!!result.rows[0].exists);
121+
}
122+
});
123+
}
124+
});
125+
});
126+
}
127+
128+
/**
129+
* Get consistency state.
130+
*
131+
* @param {Conversion} self
132+
* @param {Object} chunk
133+
*
134+
* @returns {Promise}
135+
*/
136+
const getConsistencyState = (self, chunk) => {
137+
return new Promise(resolve => {
138+
getIsStarted(self, chunk._id).then(isStarted => {
139+
if (isStarted) {
140+
hasCurrentChunkLoaded(self, chunk).then(result => resolve(result));
141+
} else {
142+
// Normal migration flow.
143+
resolve(false);
144+
}
145+
});
146+
});
147+
}
148+
88149
/**
89150
* Enforce consistency before processing a chunk of data.
90151
* Ensure there are no any data duplications.
91152
* In case of normal execution - it is a good practice.
92153
* In case of rerunning nmig after unexpected failure - it is absolutely mandatory.
93154
*
94155
* @param {Conversion} self
95-
* @param {Number} chunkId
156+
* @param {Object} chunk
96157
*
97158
* @returns {Promise}
98159
*/
99-
module.exports = (self, chunkId) => {
160+
module.exports.enforceConsistency = (self, chunk) => {
100161
return new Promise(resolve => {
101-
getConsistencyState(self, chunkId).then(isStarted => {
102-
if (isStarted) {
103-
// Current data chunk runs after a disaster recovery.
162+
getConsistencyState(self, chunk).then(hasAlreadyBeenLoaded => {
163+
if (hasAlreadyBeenLoaded) {
164+
/*
165+
* Current data chunk runs after a disaster recovery.
166+
* It has already been loaded.
167+
*/
104168
resolve(false);
105169
} else {
106170
// Normal migration flow.
107-
updateConsistencyState(self, chunkId).then(() => resolve(true));
171+
updateConsistencyState(self, chunk._id).then(() => resolve(true));
108172
}
109173
})
110174
});
111175
};
176+
177+
/**
178+
* Drop the {self._schema + '_' + originalTableName + '_data_chunk_id_temp'} column from current table.
179+
*
180+
* @param {Conversion} self
181+
* @param {String} tableName
182+
*
183+
* @returns {Promise}
184+
*/
185+
module.exports.dropDataChunkIdColumn = (self, tableName) => {
186+
return new Promise(resolve => {
187+
self._pg.connect((pgError, client, done) => {
188+
if (pgError) {
189+
generateError(self, '\t--[ConsistencyEnforcer::dropDataChunkIdColumn] Cannot connect to PostgreSQL server...\n' + pgError);
190+
resolve();
191+
} else {
192+
const originalTableName = extraConfigProcessor.getTableName(self, tableName, true);
193+
const columnToDrop = self._schema + '_' + originalTableName + '_data_chunk_id_temp';
194+
const sql = 'ALTER TABLE "' + self._schema + '"."' + tableName + '" DROP COLUMN "' + columnToDrop + '";';
195+
196+
client.query(sql, (err, result) => {
197+
done();
198+
199+
if (err) {
200+
const errMsg = '\t--[ConsistencyEnforcer::dropDataChunkIdColumn] Failed to drop column "' + columnToDrop + '"\n'
201+
+ '\t--[ConsistencyEnforcer::dropDataChunkIdColumn] '+ err;
202+
203+
generateError(self, errMsg, sql);
204+
}
205+
206+
resolve();
207+
});
208+
}
209+
});
210+
});
211+
};

migration/fmtp/ConstraintsProcessor.js

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ const processIndexAndKey = require('./IndexAndKeyProcessor');
3333
const processComments = require('./CommentsProcessor');
3434
const processForeignKey = require('./ForeignKeyProcessor');
3535
const processViews = require('./ViewGenerator');
36+
const consistencyEnforcer = require('./ConsistencyEnforcer');
37+
const dropDataChunkIdColumn = consistencyEnforcer.dropDataChunkIdColumn;
3638

3739
/**
3840
* Continues migration process after data loading, when migrate_only_data is true.
@@ -46,7 +48,11 @@ const continueProcessAfterDataLoadingShort = self => {
4648

4749
for (let i = 0; i < self._tablesToMigrate.length; ++i) {
4850
const tableName = self._tablesToMigrate[i];
49-
promises.push(sequencesProcessor.setSequenceValue(self, tableName));
51+
promises.push(
52+
dropDataChunkIdColumn(self, tableName).then(() => {
53+
return sequencesProcessor.setSequenceValue(self, tableName);
54+
})
55+
);
5056
}
5157

5258
Promise.all(promises).then(() => {
@@ -77,7 +83,9 @@ const continueProcessAfterDataLoadingLong = self => {
7783
for (let i = 0; i < self._tablesToMigrate.length; ++i) {
7884
const tableName = self._tablesToMigrate[i];
7985
promises.push(
80-
processEnum(self, tableName).then(() => {
86+
dropDataChunkIdColumn(self, tableName).then(() => {
87+
return processEnum(self, tableName);
88+
}).then(() => {
8189
return processNull(self, tableName);
8290
}).then(() => {
8391
return processDefault(self, tableName);

migration/fmtp/DataLoader.js

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ const generateError = require('./ErrorGenerator');
2929
const connect = require('./Connector');
3030
const Conversion = require('./Conversion');
3131
const MessageToMaster = require('./MessageToMaster');
32-
const enforceConsistency = require('./ConsistencyEnforcer');
32+
const consistencyEnforcer = require('./ConsistencyEnforcer');
33+
const enforceConsistency = consistencyEnforcer.enforceConsistency;
3334
const extraConfigProcessor = require('./ExtraConfigProcessor');
3435
const copyFrom = pgCopyStreams.from;
3536
const getBuffer = +process.version.split('.')[0].slice(1) < 6
@@ -44,7 +45,7 @@ process.on('message', signal => {
4445
for (let i = 0; i < signal.chunks.length; ++i) {
4546
promises.push(
4647
connect(self).then(() => {
47-
return enforceConsistency(self, signal.chunks[i]._id);
48+
return enforceConsistency(self, signal.chunks[i]);
4849
}).then(isNormalFlow => {
4950
if (isNormalFlow) {
5051
return populateTableWorker(
@@ -58,19 +59,6 @@ process.on('message', signal => {
5859
);
5960
}
6061

61-
const sql = buildChunkQuery(
62-
extraConfigProcessor.getTableName(self, signal.chunks[i]._tableName, true),
63-
signal.chunks[i]._selectFieldList,
64-
signal.chunks[i]._offset,
65-
signal.chunks[i]._rowsInChunk
66-
);
67-
68-
const strTwelveSpaces = ' ';
69-
const rejectedData = '\n\t--[loadData] Possible data duplication alert!\n\t ' + strTwelveSpaces
70-
+ 'Data, retrievable by following MySQL query:\n' + sql + '\n\t ' + strTwelveSpaces
71-
+ 'may already be migrated.\n\t' + strTwelveSpaces + ' Please, check it.';
72-
73-
log(self, rejectedData, path.join(self._logsDirPath, signal.chunks[i]._tableName + '.log'));
7462
return deleteChunk(self, signal.chunks[i]._id);
7563
})
7664
);
@@ -225,8 +213,9 @@ const populateTableWorker = (self, tableName, strSelectFieldList, offset, rowsIn
225213
generateError(self, '\t--[populateTableWorker] Cannot connect to MySQL server...\n\t' + error);
226214
resolvePopulateTableWorker();
227215
} else {
228-
const csvAddr = path.join(self._tempDirPath, tableName + offset + '.csv');
229-
const sql = buildChunkQuery(extraConfigProcessor.getTableName(self, tableName, true), strSelectFieldList, offset, rowsInChunk);
216+
const csvAddr = path.join(self._tempDirPath, tableName + offset + '.csv');
217+
const originalTableName = extraConfigProcessor.getTableName(self, tableName, true);
218+
const sql = buildChunkQuery(originalTableName, strSelectFieldList, offset, rowsInChunk);
230219

231220
connection.query(sql, (err, rows) => {
232221
connection.release();
@@ -235,7 +224,8 @@ const populateTableWorker = (self, tableName, strSelectFieldList, offset, rowsIn
235224
generateError(self, '\t--[populateTableWorker] ' + err, sql);
236225
resolvePopulateTableWorker();
237226
} else {
238-
rowsInChunk = rows.length;
227+
rowsInChunk = rows.length;
228+
rows[0][self._schema + '_' + originalTableName + '_data_chunk_id_temp'] = dataPoolId;
239229

240230
csvStringify(rows, (csvError, csvString) => {
241231
rows = null;

migration/fmtp/TableProcessor.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,8 @@ module.exports.createTable = (self, tableName) => {
126126
+ '" ' + mapDataTypes(self._dataTypesMap, rows[i].Type) + ',';
127127
}
128128

129-
rows = null;
130-
sql = sql.slice(0, -1) + ');';
129+
sql += '"' + self._schema + '_' + originalTableName + '_data_chunk_id_temp" BIGINT);';
130+
131131
client.query(sql, err => {
132132
done();
133133

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "nmig",
3-
"version": "2.4.0",
3+
"version": "2.5.0",
44
"description": "The database migration app",
55
"author": "Anatoly Khaytovich<[email protected]>",
66
"dependencies": {

0 commit comments

Comments
 (0)