Skip to content

Commit

Permalink
refactor: minor simplification in incremental backup logic
Browse files Browse the repository at this point in the history
  • Loading branch information
tglman committed Dec 13, 2023
1 parent 8b0d772 commit f999237
Showing 1 changed file with 75 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -482,111 +482,97 @@ private OLogSequenceNumber incrementalBackup(
else freezeId = -1;

try {
final BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(stream);
final ZipOutputStream zipOutputStream =
new ZipOutputStream(
new BufferedOutputStream(stream), Charset.forName(configuration.getCharset()));
try {
final ZipOutputStream zipOutputStream =
new ZipOutputStream(
bufferedOutputStream, Charset.forName(configuration.getCharset()));
try {
final long startSegment;
final OLogSequenceNumber freezeLsn;

if (fromLsn == null) {
try {
UUID databaseInstanceUUID = super.readDatabaseInstanceId();
if (databaseInstanceUUID == null) {
atomicOperationsManager.executeInsideAtomicOperation(
null,
atomicOperation -> {
generateDatabaseInstanceId(atomicOperation);
});
databaseInstanceUUID = super.readDatabaseInstanceId();
}
final ZipEntry zipEntry = new ZipEntry("database_instance.uuid");

zipOutputStream.putNextEntry(zipEntry);
zipOutputStream.flush();
DataOutputStream dos = new DataOutputStream(zipOutputStream);
dos.writeUTF(databaseInstanceUUID.toString());
dos.flush();
// dos.close();
} finally {
zipOutputStream.flush();
}
final long startSegment;
final OLogSequenceNumber freezeLsn;

if (fromLsn == null) {
UUID databaseInstanceUUID = super.readDatabaseInstanceId();
if (databaseInstanceUUID == null) {
atomicOperationsManager.executeInsideAtomicOperation(
null,
atomicOperation -> {
generateDatabaseInstanceId(atomicOperation);
});
databaseInstanceUUID = super.readDatabaseInstanceId();
}
final ZipEntry zipEntry = new ZipEntry("database_instance.uuid");

final long newSegmentFreezeId =
atomicOperationsManager.freezeAtomicOperations(null, null);
try {
final OLogSequenceNumber startLsn = writeAheadLog.end();
zipOutputStream.putNextEntry(zipEntry);
DataOutputStream dos = new DataOutputStream(zipOutputStream);
dos.writeUTF(databaseInstanceUUID.toString());
dos.flush();
}

if (startLsn != null) freezeLsn = startLsn;
else freezeLsn = new OLogSequenceNumber(0, 0);
final long newSegmentFreezeId =
atomicOperationsManager.freezeAtomicOperations(null, null);
try {
final OLogSequenceNumber startLsn = writeAheadLog.end();

writeAheadLog.addCutTillLimit(freezeLsn);
if (startLsn != null) freezeLsn = startLsn;
else freezeLsn = new OLogSequenceNumber(0, 0);

writeAheadLog.appendNewSegment();
startSegment = writeAheadLog.activeSegment();
writeAheadLog.addCutTillLimit(freezeLsn);

getLastMetadata()
.ifPresent(
metadata -> {
try {
writeAheadLog.log(new MetaDataRecord(metadata));
} catch (final IOException e) {
throw new IllegalStateException("Error during write of metadata", e);
}
});
} finally {
atomicOperationsManager.releaseAtomicOperations(newSegmentFreezeId);
}
writeAheadLog.appendNewSegment();
startSegment = writeAheadLog.activeSegment();

try {
backupIv(zipOutputStream);

final byte[] encryptionIv = new byte[16];
final SecureRandom secureRandom = new SecureRandom();
secureRandom.nextBytes(encryptionIv);

backupEncryptedIv(zipOutputStream, encryptionIv);

final String aesKeyEncoded =
getConfiguration()
.getContextConfiguration()
.getValueAsString(OGlobalConfiguration.STORAGE_ENCRYPTION_KEY);
final byte[] aesKey =
aesKeyEncoded == null ? null : Base64.getDecoder().decode(aesKeyEncoded);

if (aesKey != null
&& aesKey.length != 16
&& aesKey.length != 24
&& aesKey.length != 32) {
throw new OInvalidStorageEncryptionKeyException(
"Invalid length of the encryption key, provided size is " + aesKey.length);
}
getLastMetadata()
.ifPresent(
metadata -> {
try {
writeAheadLog.log(new MetaDataRecord(metadata));
} catch (final IOException e) {
throw new IllegalStateException("Error during write of metadata", e);
}
});
} finally {
atomicOperationsManager.releaseAtomicOperations(newSegmentFreezeId);
}

lastLsn = backupPagesWithChanges(fromLsn, zipOutputStream, encryptionIv, aesKey);
final OLogSequenceNumber lastWALLsn =
copyWALToIncrementalBackup(zipOutputStream, startSegment);
try {
backupIv(zipOutputStream);

final byte[] encryptionIv = new byte[16];
final SecureRandom secureRandom = new SecureRandom();
secureRandom.nextBytes(encryptionIv);

backupEncryptedIv(zipOutputStream, encryptionIv);

final String aesKeyEncoded =
getConfiguration()
.getContextConfiguration()
.getValueAsString(OGlobalConfiguration.STORAGE_ENCRYPTION_KEY);
final byte[] aesKey =
aesKeyEncoded == null ? null : Base64.getDecoder().decode(aesKeyEncoded);

if (aesKey != null
&& aesKey.length != 16
&& aesKey.length != 24
&& aesKey.length != 32) {
throw new OInvalidStorageEncryptionKeyException(
"Invalid length of the encryption key, provided size is " + aesKey.length);
}

if (lastWALLsn != null && (lastLsn == null || lastWALLsn.compareTo(lastLsn) > 0)) {
lastLsn = lastWALLsn;
}
} finally {
writeAheadLog.removeCutTillLimit(freezeLsn);
lastLsn = backupPagesWithChanges(fromLsn, zipOutputStream, encryptionIv, aesKey);
final OLogSequenceNumber lastWALLsn =
copyWALToIncrementalBackup(zipOutputStream, startSegment);

if (lastWALLsn != null && (lastLsn == null || lastWALLsn.compareTo(lastLsn) > 0)) {
lastLsn = lastWALLsn;
}
} finally {
try {
zipOutputStream.flush();
} catch (IOException e) {
OLogManager.instance().warn(this, "Failed to flush resource " + zipOutputStream);
}
writeAheadLog.removeCutTillLimit(freezeLsn);
}
} finally {
try {
bufferedOutputStream.flush();
zipOutputStream.finish();
zipOutputStream.flush();
} catch (IOException e) {
OLogManager.instance().warn(this, "Failed to flush resource " + bufferedOutputStream);
OLogManager.instance().warn(this, "Failed to flush resource " + zipOutputStream);
}
}
} finally {
Expand Down

0 comments on commit f999237

Please sign in to comment.