Skip to content

Commit

Permalink
Load TsFile version upon openning a TsFileSequenceReader (#254)
Browse files Browse the repository at this point in the history
* Load TsFile version upon openning a TsFileSequenceReader

* handle exception
  • Loading branch information
jt2594838 authored Sep 30, 2024
1 parent efb39a6 commit 89cfae7
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,8 @@ public class NotCompatibleTsFileException extends TsFileRuntimeException {
public NotCompatibleTsFileException(String message) {
super(message);
}

public NotCompatibleTsFileException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,8 @@ public TsFileRuntimeException(String message) {
public TsFileRuntimeException(Throwable cause) {
super(cause);
}

public TsFileRuntimeException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.tsfile.encrypt.EncryptUtils;
import org.apache.tsfile.encrypt.IDecryptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.NotCompatibleTsFileException;
import org.apache.tsfile.exception.StopReadTsFileByInterruptException;
import org.apache.tsfile.exception.TsFileRuntimeException;
import org.apache.tsfile.exception.TsFileStatisticsMistakesException;
Expand Down Expand Up @@ -157,7 +158,9 @@ public TsFileSequenceReader(String file, boolean loadMetadataSize) throws IOExce
}
this.file = file;
tsFileInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file);

try {
loadFileVersion();
if (loadMetadataSize) {
loadMetadataSize();
}
Expand Down Expand Up @@ -221,17 +224,24 @@ public TsFileSequenceReader(TsFileInput input, long fileMetadataPos, int fileMet
}

private void loadFileVersion() throws IOException {
tsFileInput.position(TSFileConfig.MAGIC_STRING.getBytes(TSFileConfig.STRING_CHARSET).length);
final ByteBuffer buffer = ByteBuffer.allocate(1);
tsFileInput.read(buffer);
buffer.flip();
fileVersion = buffer.get();
try {
tsFileInput.position(TSFileConfig.MAGIC_STRING.getBytes(TSFileConfig.STRING_CHARSET).length);
final ByteBuffer buffer = ByteBuffer.allocate(1);
tsFileInput.read(buffer);
buffer.flip();
fileVersion = buffer.get();

checkFileVersion();
configDeserializer();
checkFileVersion();
configDeserializer();

tsFileInput.position(0);
} catch (Exception e) {
tsFileInput.close();
throw new NotCompatibleTsFileException(e);
}
}

private void configDeserializer() throws IOException {
private void configDeserializer() {
if (fileVersion == TSFileConfig.VERSION_NUMBER_V3) {
deserializeConfig = CompatibilityUtils.v3DeserializeConfig;
}
Expand All @@ -244,8 +254,6 @@ private void checkFileVersion() throws FileVersionTooOldException {
}

public void loadMetadataSize() throws IOException {
loadFileVersion();

ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
if (readTailMagic().equals(TSFileConfig.MAGIC_STRING)) {
tsFileInput.read(
Expand Down Expand Up @@ -1863,10 +1871,11 @@ public long selfCheck(
if (fileSize < headerLength) {
return TsFileCheckStatus.INCOMPATIBLE_FILE;
}
if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic())
|| (TSFileConfig.VERSION_NUMBER != readVersionNumber())) {
if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic())) {
return TsFileCheckStatus.INCOMPATIBLE_FILE;
}
fileVersion = readVersionNumber();
checkFileVersion();

tsFileInput.position(headerLength);
boolean isComplete = isComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,29 +107,35 @@ public RestorableTsFileIOWriter(File file, boolean truncate) throws IOException
return;
}

if (file.exists()) {
try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
schema.setEnabledUpdateSchema(false);
truncatedSize = reader.selfCheck(schema, chunkGroupMetadataList, true);
minPlanIndex = reader.getMinPlanIndex();
maxPlanIndex = reader.getMaxPlanIndex();
if (truncatedSize == TsFileCheckStatus.COMPLETE_FILE) {
crashed = false;
canWrite = false;
out.close();
} else if (truncatedSize == TsFileCheckStatus.INCOMPATIBLE_FILE) {
out.close();
throw new NotCompatibleTsFileException(
String.format("%s is not in TsFile format.", file.getAbsolutePath()));
} else {
crashed = true;
canWrite = true;
// remove broken data
if (truncate) {
out.truncate(truncatedSize);
try {
if (file.exists()) {
try (TsFileSequenceReader reader =
new TsFileSequenceReader(file.getAbsolutePath(), false)) {
schema.setEnabledUpdateSchema(false);
truncatedSize = reader.selfCheck(schema, chunkGroupMetadataList, true);
minPlanIndex = reader.getMinPlanIndex();
maxPlanIndex = reader.getMaxPlanIndex();
if (truncatedSize == TsFileCheckStatus.COMPLETE_FILE) {
crashed = false;
canWrite = false;
out.close();
} else if (truncatedSize == TsFileCheckStatus.INCOMPATIBLE_FILE) {
out.close();
throw new NotCompatibleTsFileException(
String.format("%s is not in TsFile format.", file.getAbsolutePath()));
} else {
crashed = true;
canWrite = true;
// remove broken data
if (truncate) {
out.truncate(truncatedSize);
}
}
}
}
} catch (Exception e) {
out.close();
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,26 @@

package org.apache.tsfile.compatibility;

import org.apache.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.apache.tsfile.read.TsFileReader;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.read.expression.QueryExpression;
import org.apache.tsfile.read.query.dataset.QueryDataSet;
import org.apache.tsfile.write.schema.Schema;

import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.apache.tsfile.read.TsFileCheckStatus.COMPLETE_FILE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class CompatibilityTest {
Expand All @@ -45,6 +52,19 @@ public void testReadV3() {
readOneRow();
}

@Test
public void testV3SelfCheck() throws IOException {
Schema schema = new Schema();
List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>();
try (TsFileSequenceReader sequenceReader = new TsFileSequenceReader(fileName)) {
assertEquals(COMPLETE_FILE, sequenceReader.selfCheck(schema, chunkGroupMetadataList, false));
assertTrue(sequenceReader.isComplete());
}
assertTrue(schema.containsDevice(Factory.DEFAULT_FACTORY.create("d1")));
assertTrue(schema.containsDevice(Factory.DEFAULT_FACTORY.create("d2")));
assertEquals(2, chunkGroupMetadataList.size());
}

private void readOneRow() {
readOneRow(5);
}
Expand Down

0 comments on commit 89cfae7

Please sign in to comment.