diff --git a/btm/pom.xml b/btm/pom.xml
index 1322cd9e..2d5fddd6 100644
--- a/btm/pom.xml
+++ b/btm/pom.xml
@@ -12,7 +12,7 @@
bundle
-
+
javax.transaction
jta
provided
@@ -70,6 +70,24 @@
provided
true
+
+ org.apache.commons
+ commons-io
+ 1.3.2
+ test
+
+
+ org.apache.commons
+ commons-lang3
+ 3.4
+ test
+
+
+ org.easytesting
+ fest-assert-core
+ 2.0M10
+ test
+
diff --git a/btm/src/main/java/bitronix/tm/gui/TransactionLogHeaderPanel.java b/btm/src/main/java/bitronix/tm/gui/TransactionLogHeaderPanel.java
index 0d334a3f..4d5a4a01 100644
--- a/btm/src/main/java/bitronix/tm/gui/TransactionLogHeaderPanel.java
+++ b/btm/src/main/java/bitronix/tm/gui/TransactionLogHeaderPanel.java
@@ -16,6 +16,7 @@
package bitronix.tm.gui;
import bitronix.tm.journal.TransactionLogHeader;
+import bitronix.tm.journal.InterruptibleLockedRandomAccessFile;
import bitronix.tm.utils.Decoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,8 +75,8 @@ public void setPosition(long position) {
}
public void read(File logFile, boolean active) throws IOException {
- RandomAccessFile raf = new RandomAccessFile(logFile, "r");
- TransactionLogHeader header = new TransactionLogHeader(raf.getChannel(), 0L);
+ InterruptibleLockedRandomAccessFile raf = new InterruptibleLockedRandomAccessFile(logFile, "r");
+ TransactionLogHeader header = new TransactionLogHeader(raf, 0L);
raf.close();
if (log.isDebugEnabled()) { log.debug("read header: " + header); }
setLogFile(logFile);
diff --git a/btm/src/main/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFile.java b/btm/src/main/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFile.java
new file mode 100644
index 00000000..a10c967d
--- /dev/null
+++ b/btm/src/main/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFile.java
@@ -0,0 +1,104 @@
+package bitronix.tm.journal;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+
+public class InterruptibleLockedRandomAccessFile {
+
+ private final File file;
+ private final String mode;
+ private RandomAccessFile openedFile;
+ private FileChannel fileChannel;
+ private FileLock fileLock;
+ private long currentPosition = 0;
+ private boolean closed;
+
+ public InterruptibleLockedRandomAccessFile(final File file, final String mode)
+ throws IOException {
+ this.file = file;
+ this.mode = mode;
+ open();
+ }
+
+ private synchronized void open() throws IOException {
+ openedFile = new RandomAccessFile(file, mode);
+ fileChannel = openedFile.getChannel();
+
+ final boolean shared = false;
+ this.fileLock = fileChannel
+ .tryLock(0, TransactionLogHeader.TIMESTAMP_HEADER, shared);
+ if (this.fileLock == null) {
+ throw new IOException("File " + file.getAbsolutePath()
+ + " is locked. Is another instance already running?");
+ }
+ }
+
+ public synchronized final void close() throws IOException {
+ try {
+ if (!fileLock.isValid()) {
+ checkState(!fileChannel.isOpen(), "invalid/unhandled state");
+ return;
+ }
+ fileLock.release();
+ fileChannel.close();
+ openedFile.close();
+ } finally {
+ closed = true;
+ }
+ }
+
+ public synchronized void position(final long newPosition) throws IOException {
+ checkNotClosed();
+ reopenFileChannelIfClosed();
+
+ fileChannel.position(newPosition);
+ currentPosition = newPosition;
+ }
+
+ private void checkNotClosed() {
+ checkState(!closed, "File has been closed");
+ }
+
+ private static void checkState(final boolean expression, final String errorMessage) {
+ if (!expression) {
+ throw new IllegalStateException(errorMessage);
+ }
+ }
+
+ public synchronized void force(final boolean metaData) throws IOException {
+ checkNotClosed();
+ reopenFileChannelIfClosed();
+
+ fileChannel.force(metaData);
+ }
+
+ public synchronized int write(final ByteBuffer src, final long position)
+ throws IOException {
+ checkNotClosed();
+ reopenFileChannelIfClosed();
+
+ return fileChannel.write(src, position);
+ }
+
+ public synchronized void read(final ByteBuffer buffer) throws IOException {
+ checkNotClosed();
+ reopenFileChannelIfClosed();
+
+ fileChannel.read(buffer);
+ currentPosition = fileChannel.position();
+ }
+
+ private void reopenFileChannelIfClosed() throws IOException {
+ if (!fileChannel.isOpen()) {
+ open();
+ }
+
+ if (fileChannel.position() != currentPosition) {
+ fileChannel.position(currentPosition);
+ }
+ }
+}
\ No newline at end of file
diff --git a/btm/src/main/java/bitronix/tm/journal/TransactionLogAppender.java b/btm/src/main/java/bitronix/tm/journal/TransactionLogAppender.java
index 55338cc7..7a96b130 100644
--- a/btm/src/main/java/bitronix/tm/journal/TransactionLogAppender.java
+++ b/btm/src/main/java/bitronix/tm/journal/TransactionLogAppender.java
@@ -15,17 +15,9 @@
*/
package bitronix.tm.journal;
-import bitronix.tm.utils.Uid;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.transaction.Status;
import java.io.File;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -35,6 +27,13 @@
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.transaction.Status;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import bitronix.tm.utils.Uid;
+
/**
* Used to write {@link TransactionLogRecord} objects to a log file.
*
@@ -53,9 +52,7 @@ public class TransactionLogAppender {
public static final int END_RECORD = 0x786e7442;
private final File file;
- private final RandomAccessFile randomeAccessFile;
- private final FileChannel fc;
- private final FileLock lock;
+ private final InterruptibleLockedRandomAccessFile randomAccessFile;
private final TransactionLogHeader header;
private final long maxFileLength;
private final AtomicInteger outstandingWrites;
@@ -70,14 +67,10 @@ public class TransactionLogAppender {
*/
public TransactionLogAppender(File file, long maxFileLength) throws IOException {
this.file = file;
- this.randomeAccessFile = new RandomAccessFile(file, "rw");
- this.fc = randomeAccessFile.getChannel();
- this.header = new TransactionLogHeader(fc, maxFileLength);
+ this.randomAccessFile = new InterruptibleLockedRandomAccessFile(file, "rw");
+ this.header = new TransactionLogHeader(randomAccessFile, maxFileLength);
this.maxFileLength = maxFileLength;
- this.lock = fc.tryLock(0, TransactionLogHeader.TIMESTAMP_HEADER, false);
- if (this.lock == null)
- throw new IOException("transaction log file " + file.getName() + " is locked. Is another instance already running?");
-
+
this.outstandingWrites = new AtomicInteger();
this.danglingRecords = new HashMap>();
@@ -144,7 +137,7 @@ protected void writeLog(TransactionLogRecord tlog) throws IOException {
final long writePosition = tlog.getWritePosition();
while (buf.hasRemaining()) {
- fc.write(buf, writePosition + buf.position());
+ randomAccessFile.write(buf, writePosition + buf.position());
}
trackOutstanding(status, gtrid, uniqueNames);
@@ -273,18 +266,14 @@ public long getPosition() {
return position;
}
-
/**
* Close the appender and the underlying file.
* @throws IOException if an I/O error occurs.
*/
protected void close() throws IOException {
header.setState(TransactionLogHeader.CLEAN_LOG_STATE);
- fc.force(false);
- if (lock != null)
- lock.release();
- fc.close();
- randomeAccessFile.close();
+ randomAccessFile.force(false);
+ randomAccessFile.close();
}
/**
@@ -304,7 +293,7 @@ protected TransactionLogCursor getCursor() throws IOException {
*/
protected void force() throws IOException {
if (log.isDebugEnabled()) { log.debug("forcing log writing"); }
- fc.force(false);
+ randomAccessFile.force(false);
if (log.isDebugEnabled()) { log.debug("done forcing log"); }
}
diff --git a/btm/src/main/java/bitronix/tm/journal/TransactionLogHeader.java b/btm/src/main/java/bitronix/tm/journal/TransactionLogHeader.java
index 748ab635..9f681aac 100644
--- a/btm/src/main/java/bitronix/tm/journal/TransactionLogHeader.java
+++ b/btm/src/main/java/bitronix/tm/journal/TransactionLogHeader.java
@@ -69,7 +69,7 @@ public class TransactionLogHeader {
*/
public final static byte UNCLEAN_LOG_STATE = -1;
- private final FileChannel fc;
+ private final InterruptibleLockedRandomAccessFile file;
private final long maxFileLength;
private volatile int formatId;
@@ -79,25 +79,25 @@ public class TransactionLogHeader {
/**
* TransactionLogHeader are used to control headers of the specified RandomAccessFile.
- * @param fc the file channel to read from.
+ * @param randomAccessFile the file to read from.
* @param maxFileLength the max file length.
* @throws IOException if an I/O error occurs.
*/
- public TransactionLogHeader(FileChannel fc, long maxFileLength) throws IOException {
- this.fc = fc;
+ public TransactionLogHeader(InterruptibleLockedRandomAccessFile randomAccessFile, long maxFileLength) throws IOException {
+ this.file = randomAccessFile;
this.maxFileLength = maxFileLength;
- fc.position(FORMAT_ID_HEADER);
+ randomAccessFile.position(FORMAT_ID_HEADER);
ByteBuffer buf = ByteBuffer.allocate(4 + 8 + 1 + 8);
while (buf.hasRemaining()) {
- this.fc.read(buf);
+ this.file.read(buf);
}
buf.flip();
formatId = buf.getInt();
timestamp = buf.getLong();
state = buf.get();
position = buf.getLong();
- fc.position(position);
+ randomAccessFile.position(position);
if (log.isDebugEnabled()) { log.debug("read header " + this); }
}
@@ -149,7 +149,7 @@ public void setFormatId(int formatId) throws IOException {
buf.putInt(formatId);
buf.flip();
while (buf.hasRemaining()) {
- fc.write(buf, FORMAT_ID_HEADER + buf.position());
+ file.write(buf, FORMAT_ID_HEADER + buf.position());
}
this.formatId = formatId;
}
@@ -165,7 +165,7 @@ public void setTimestamp(long timestamp) throws IOException {
buf.putLong(timestamp);
buf.flip();
while (buf.hasRemaining()) {
- fc.write(buf, TIMESTAMP_HEADER + buf.position());
+ file.write(buf, TIMESTAMP_HEADER + buf.position());
}
this.timestamp = timestamp;
}
@@ -181,7 +181,7 @@ public void setState(byte state) throws IOException {
buf.put(state);
buf.flip();
while (buf.hasRemaining()) {
- fc.write(buf, STATE_HEADER + buf.position());
+ file.write(buf, STATE_HEADER + buf.position());
}
this.state = state;
}
@@ -202,11 +202,11 @@ public void setPosition(long position) throws IOException {
buf.putLong(position);
buf.flip();
while (buf.hasRemaining()) {
- fc.write(buf, CURRENT_POSITION_HEADER + buf.position());
+ file.write(buf, CURRENT_POSITION_HEADER + buf.position());
}
this.position = position;
- fc.position(position);
+ file.position(position);
}
/**
diff --git a/btm/src/test/java/bitronix/tm/journal/ByteBufferUtil.java b/btm/src/test/java/bitronix/tm/journal/ByteBufferUtil.java
new file mode 100644
index 00000000..92f02eda
--- /dev/null
+++ b/btm/src/test/java/bitronix/tm/journal/ByteBufferUtil.java
@@ -0,0 +1,18 @@
+package bitronix.tm.journal;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+
+public final class ByteBufferUtil {
+
+ private ByteBufferUtil() {
+ }
+
+ public static ByteBuffer createByteBuffer(final String input)
+ throws UnsupportedEncodingException {
+ final byte[] inputArray = input.getBytes("UTF-8");
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(inputArray);
+ return byteBuffer;
+ }
+
+}
diff --git a/btm/src/test/java/bitronix/tm/journal/DiskJournalInterruptTest.java b/btm/src/test/java/bitronix/tm/journal/DiskJournalInterruptTest.java
new file mode 100644
index 00000000..d04b99a7
--- /dev/null
+++ b/btm/src/test/java/bitronix/tm/journal/DiskJournalInterruptTest.java
@@ -0,0 +1,73 @@
+package bitronix.tm.journal;
+
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.transaction.Status;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import bitronix.tm.utils.UidGenerator;
+
+/**
+ * Source: http://bitronix-transaction-manager.10986.n7.nabble.com/Fix-for-BTM-138-td1701.html
+ *
+ * @author Kazuya Uno
+ */
+public class DiskJournalInterruptTest {
+
+ private DiskJournal diskJournal = new DiskJournal();
+ private Set names = new HashSet();
+
+ @Before
+ public void setUp() throws IOException {
+ diskJournal.open();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ diskJournal.close();
+ }
+
+ @Test
+ public void testShouldInterruptOnAThreadDontCauseOtherThreadToFail()
+ throws Exception {
+ // given: a thread writing logs
+ Thread thread = new Thread() {
+
+ @Override
+ public void run() {
+ try {
+ writeLog();
+ } catch (IOException e) {
+ // normal
+ }
+ };
+ };
+ thread.start();
+
+ // when thread is interrupted
+ thread.interrupt();
+
+ // this detect closed channel and reopen logs
+ try {
+ writeLog();
+ } catch (ClosedChannelException cce) {
+ // this is expected.
+ }
+
+ // then writing logs should work
+ writeLog();
+
+ }
+
+ private void writeLog() throws IOException {
+ diskJournal.log(Status.STATUS_COMMITTED, UidGenerator.generateUid(),
+ names);
+ }
+
+}
\ No newline at end of file
diff --git a/btm/src/test/java/bitronix/tm/journal/InterruptService.java b/btm/src/test/java/bitronix/tm/journal/InterruptService.java
new file mode 100644
index 00000000..0de8ea52
--- /dev/null
+++ b/btm/src/test/java/bitronix/tm/journal/InterruptService.java
@@ -0,0 +1,53 @@
+package bitronix.tm.journal;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class InterruptService {
+
+ private final InterruptableThreadList interruptableThreadList;
+ private final AtomicLong successfulInterrupts = new AtomicLong();
+ private ExecutorService executorService;
+ private final AtomicBoolean run = new AtomicBoolean(true);
+
+ public InterruptService(final InterruptableThreadList interruptableThreadList) {
+ if (interruptableThreadList == null) {
+ throw new NullPointerException("threadList cannot be null");
+ }
+ this.interruptableThreadList = interruptableThreadList;
+ }
+
+ public void start() {
+ executorService = Executors.newSingleThreadExecutor();
+ final Runnable interrupter = new Runnable() {
+ @Override
+ public void run() {
+ while (run.get()) {
+ final boolean successfulInterrupt = interruptableThreadList
+ .interruptRandomThread();
+ if (successfulInterrupt) {
+ successfulInterrupts.incrementAndGet();
+ }
+ }
+ }
+ };
+ executorService.submit(interrupter);
+ }
+
+ public void stop() throws InterruptedException {
+ run.set(false);
+ executorService.shutdown();
+ final boolean terminated = executorService.awaitTermination(2,
+ TimeUnit.SECONDS);
+ if (!terminated) {
+ throw new IllegalStateException("termination");
+ }
+ }
+
+ public long getSuccessfulInterrupts() {
+ return successfulInterrupts.get();
+ }
+}
diff --git a/btm/src/test/java/bitronix/tm/journal/InterruptableThreadList.java b/btm/src/test/java/bitronix/tm/journal/InterruptableThreadList.java
new file mode 100644
index 00000000..3066942b
--- /dev/null
+++ b/btm/src/test/java/bitronix/tm/journal/InterruptableThreadList.java
@@ -0,0 +1,40 @@
+package bitronix.tm.journal;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+public class InterruptableThreadList {
+
+ private final List threads = new ArrayList();
+
+ public InterruptableThreadList() {
+ }
+
+ public synchronized void addCurrentThread() {
+ final Thread currentThread = Thread.currentThread();
+ threads.add(currentThread);
+ Thread.yield();
+ }
+
+ public synchronized void removeCurrentThread() {
+ final Thread currentThread = Thread.currentThread();
+ threads.remove(currentThread);
+ }
+
+ /**
+ *
+ * @return true on successful interruption
+ */
+ public synchronized boolean interruptRandomThread() {
+ if (threads.isEmpty()) {
+ return false;
+ }
+ final Random random = new Random();
+ final int threadIndex = random.nextInt(threads.size());
+
+ final Thread thread = threads.get(threadIndex);
+ thread.interrupt();
+ return true;
+ }
+}
diff --git a/btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileStressTest.java b/btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileStressTest.java
new file mode 100644
index 00000000..4bf6c746
--- /dev/null
+++ b/btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileStressTest.java
@@ -0,0 +1,149 @@
+package bitronix.tm.journal;
+
+import static bitronix.tm.journal.ByteBufferUtil.createByteBuffer;
+import static org.fest.assertions.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class InterruptibleLockedRandomAccessFileStressTest {
+
+ @Rule
+ public final TemporaryFolder folder = new TemporaryFolder();
+
+ private File inputFile;
+
+ private final InterruptableThreadList threadList = new InterruptableThreadList();
+
+ private final InterruptService interruptService = new InterruptService(
+ threadList);
+
+ @Before
+ public void setUp() throws Exception {
+ inputFile = folder.newFile("bitronix-stresstest.log");
+ interruptService.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ interruptService.stop();
+ }
+
+ @Test
+ public void stressTestWriteInterrupts() throws Exception {
+ final int recordLength = 15;
+ final int taskNumber = 10000;
+ initializeFileContent(recordLength, taskNumber);
+
+ final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile(
+ inputFile, "rw");
+
+ final ExecutorService executorService = Executors.newFixedThreadPool(4);
+
+ final AtomicLong successfulWrites = new AtomicLong();
+ final AtomicLong writeErrors = new AtomicLong();
+ for (int i = 0; i < taskNumber; i++) {
+ final int taskId = i;
+ final String data = createRecord(taskId);
+ assertThat(data.length()).isLessThanOrEqualTo(recordLength);
+ executorService.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ threadList.addCurrentThread();
+ try {
+ final int position = taskId * recordLength;
+ file.write(createByteBuffer(data), position);
+ successfulWrites.incrementAndGet();
+ } catch (final Exception expected) {
+ writeErrors.incrementAndGet();
+ } finally {
+ threadList.removeCurrentThread();
+ }
+ }
+ });
+ }
+
+ shutdownExecutor(executorService, 30, TimeUnit.SECONDS);
+
+ file.close();
+
+ assertThat(successfulWrites.get() + writeErrors.get()).isEqualTo(
+ taskNumber);
+
+ final long writtenRecords = countWrittenRecords(taskNumber);
+ final long missingRecords = countMissingRecords(taskNumber);
+ assertThat(writtenRecords + missingRecords).isEqualTo(taskNumber);
+
+ // System.out.println("written: " + writtenRecords);
+ // System.out.println("missing: " + missingRecords);
+ // System.out.println("successful writes: " + successfulWrites);
+ // System.out.println("write errors: " + writeErrors);
+ // System.out.println("interrupts: "
+ // + interruptService.getSuccessfulInterrupts());
+
+ assertThat(writtenRecords).isGreaterThanOrEqualTo(
+ successfulWrites.get());
+ assertThat(missingRecords).isLessThanOrEqualTo(writeErrors.get());
+ assertThat(interruptService.getSuccessfulInterrupts())
+ .isGreaterThanOrEqualTo(missingRecords);
+ }
+
+ private void initializeFileContent(final int recordLength,
+ final int taskNumber) throws Exception {
+ final String initialFileConent = StringUtils.repeat(".", recordLength
+ * taskNumber);
+ FileUtils.writeStringToFile(inputFile, initialFileConent);
+ }
+
+ private String createRecord(final int recordId) {
+ return String.format("data%5dX", recordId);
+ }
+
+ private long countMissingRecords(final int taskNumber) throws Exception {
+ final String writtenContent = FileUtils.readFileToString(inputFile,
+ "UTF-8");
+ long missingRecords = 0;
+ for (int taskId = 0; taskId < taskNumber; taskId++) {
+ final String data = createRecord(taskId);
+ if (!writtenContent.contains(data)) {
+ missingRecords++;
+ }
+ }
+ return missingRecords;
+ }
+
+ private long countWrittenRecords(final int taskNumber) throws Exception {
+ final String writtenContent = FileUtils.readFileToString(inputFile,
+ "UTF-8");
+ long writtenRecords = 0;
+ for (int taskId = 0; taskId < taskNumber; taskId++) {
+ final String data = createRecord(taskId);
+ if (writtenContent.contains(data)) {
+ writtenRecords++;
+ }
+ }
+ return writtenRecords;
+ }
+
+ private void shutdownExecutor(final ExecutorService executorService,
+ final long timeout, final TimeUnit timeoutUnit)
+ throws InterruptedException {
+ executorService.shutdown();
+ final boolean terminated = executorService.awaitTermination(timeout,
+ timeoutUnit);
+ assertTrue("termination", terminated);
+ }
+}
diff --git a/btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileTest.java b/btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileTest.java
new file mode 100644
index 00000000..e5f8a25f
--- /dev/null
+++ b/btm/src/test/java/bitronix/tm/journal/InterruptibleLockedRandomAccessFileTest.java
@@ -0,0 +1,387 @@
+package bitronix.tm.journal;
+
+import static bitronix.tm.journal.ByteBufferUtil.createByteBuffer;
+import static org.fest.assertions.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class InterruptibleLockedRandomAccessFileTest {
+
+ @Rule
+ public final TemporaryFolder folder = new TemporaryFolder();
+
+ private File inputFile;
+
+ @Before
+ public void setUp() throws Exception {
+ inputFile = folder.newFile("btmlog-test.log");
+ }
+
+ @Test
+ public void testOpenClose() throws Exception {
+ final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile(
+ inputFile, "rw");
+ file.close();
+ }
+
+ @Test
+ public void testLockedOpen() throws Exception {
+ final RandomAccessFile firstFile = new RandomAccessFile(inputFile, "rw");
+ final FileChannel fileChannel = firstFile.getChannel();
+ final FileLock lock = fileChannel.tryLock();
+ assertNotNull("null lock", lock);
+
+ try {
+ new InterruptibleLockedRandomAccessFile(inputFile, "rw");
+ fail("should not open a locked file");
+ } catch (OverlappingFileLockException expected) {
+ } finally {
+ lock.release();
+ fileChannel.close();
+ firstFile.close();
+ }
+ }
+
+ @Test
+ public void testReadAfterClose() throws Exception {
+ final String data = "testdata";
+ FileUtils.writeStringToFile(inputFile, data, "UTF-8");
+
+ final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile(
+ inputFile, "rw");
+ file.close();
+
+ try {
+ readFile(file, 1);
+ fail("should not read after close");
+ } catch (final IllegalStateException expected) {
+ }
+ }
+
+ @Test
+ public void testWriteAfterClose() throws Exception {
+ final String data = "testdata";
+ FileUtils.writeStringToFile(inputFile, data, "UTF-8");
+
+ final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile(
+ inputFile, "rw");
+ file.close();
+
+ final ByteBuffer buffer = createByteBuffer("testdata");
+ final long position = 0L;
+ try {
+ file.write(buffer, position);
+ fail("should not write after close");
+ } catch (final IllegalStateException expected) {
+ }
+ }
+
+ @Test
+ public void testForceAfterClose() throws Exception {
+ final String data = "testdata";
+ FileUtils.writeStringToFile(inputFile, data, "UTF-8");
+
+ final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile(
+ inputFile, "rw");
+ file.close();
+
+ try {
+ final boolean metaData = true;
+ file.force(metaData);
+ fail("should not force after close");
+ } catch (final IllegalStateException expected) {
+ }
+ }
+
+ @Test
+ public void testPositionAfterClose() throws Exception {
+ final String data = "testdata";
+ FileUtils.writeStringToFile(inputFile, data, "UTF-8");
+
+ final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile(
+ inputFile, "rw");
+ file.close();
+
+ try {
+ file.position(1L);
+ fail("should not position after close");
+ } catch (final IllegalStateException expected) {
+ }
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ final String data = "testdata";
+ FileUtils.writeStringToFile(inputFile, data, "UTF-8");
+
+ final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile(
+ inputFile, "rw");
+ verifyRead(data, file);
+ file.close();
+ }
+
+ @Test
+ public void testReadTwice() throws Exception {
+ final String data = "testdata";
+ FileUtils.writeStringToFile(inputFile, data, "UTF-8");
+
+ final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile(
+ inputFile, "rw");
+
+ verifyRead(data, file);
+ file.position(0L);
+ verifyRead(data, file);
+
+ file.close();
+ }
+
+ @Test
+ public void testReadAfterInterrupt() throws Exception {
+ final String data = "testdataTESTDATA";
+ FileUtils.writeStringToFile(inputFile, data, "UTF-8");
+
+ final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile(
+ inputFile, "rw");
+
+ verifyRead(data, file);
+ file.position(0L);
+ interruptCurrentThread();
+ try {
+ verifyRead(data, file);
+ fail("interrupt should close the FileChannel");
+ } catch (final ClosedByInterruptException expected) {
+ }
+
+ clearInterruptedFlag();
+
+ verifyRead(data, file);
+ file.close();
+ }
+
+ private void verifyRead(final String expectedData,
+ final InterruptibleLockedRandomAccessFile file) throws Exception {
+ final int bytesToRead = expectedData.getBytes("UTF-8").length;
+ final String readData = readFile(file, bytesToRead);
+
+ assertThat(readData).isEqualTo(expectedData);
+ }
+
+ private String readFile(final InterruptibleLockedRandomAccessFile file,
+ final int bytesToRead) throws IOException {
+ final ByteBuffer inputBuffer = ByteBuffer.allocate(bytesToRead);
+ file.read(inputBuffer);
+ return toString(inputBuffer);
+ }
+
+ private String toString(ByteBuffer buffer)
+ throws UnsupportedEncodingException {
+ return new String(buffer.array(), "UTF-8");
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ final String data = "testdata";
+
+ final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile(
+ inputFile, "rw");
+
+ final ByteBuffer outputBuffer = createByteBuffer(data);
+
+ final long position = 0L;
+ file.write(outputBuffer, position);
+
+ file.close();
+
+ verifyFileContent(inputFile, data);
+ }
+
+ @Test
+ public void testWriteAndForce() throws Exception {
+ final String data = "testdata";
+
+ final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile(
+ inputFile, "rw");
+
+ final ByteBuffer outputBuffer = createByteBuffer(data);
+
+ final long position = 0L;
+ file.write(outputBuffer, position);
+ file.force(true);
+
+ file.close();
+
+ verifyFileContent(inputFile, data);
+ }
+
+ private void verifyFileContent(final File file, final String expectedData)
+ throws IOException {
+ final String fileContent = FileUtils.readFileToString(inputFile,
+ "UTF-8");
+ assertEquals(expectedData, fileContent);
+ }
+
+ @Test
+ public void testTwoWrites() throws Exception {
+ final String dataOne = "testdata";
+ final String dataTwo = "TESTDATA";
+
+ final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile(
+ inputFile, "rw");
+
+ final ByteBuffer dataOneBuffer = createByteBuffer(dataOne);
+ file.write(dataOneBuffer, 0L);
+ file.write(createByteBuffer(dataTwo), dataOneBuffer.capacity());
+
+ file.close();
+
+ verifyFileContent(inputFile, dataOne + dataTwo);
+ }
+
+ @Test
+ public void testWriteAfterInterrupt() throws Exception {
+ final String dataOne = "testdata";
+ final String dataTwo = "TESTDATA";
+
+ final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile(
+ inputFile, "rw");
+ file.write(createByteBuffer(dataOne), 0L);
+
+ interruptCurrentThread();
+
+ try {
+ file.write(createByteBuffer(dataTwo), dataOne.length());
+ } catch (final ClosedByInterruptException expected) {
+ }
+
+ clearInterruptedFlag();
+
+ final String dataThree = "__third__";
+ file.write(createByteBuffer(dataThree), dataOne.length());
+
+ file.close();
+
+ verifyFileContent(inputFile, dataOne + dataThree);
+ }
+
+ @Test
+ public void testForceAfterInterrupt() throws Exception {
+ final String dataOne = "testdata";
+ final String dataTwo = "TESTDATA";
+
+ final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile(
+ inputFile, "rw");
+ file.write(createByteBuffer(dataOne), 0L);
+
+ interruptCurrentThread();
+
+ try {
+ file.write(createByteBuffer(dataTwo), dataOne.length());
+ } catch (final ClosedByInterruptException expected) {
+ }
+
+ clearInterruptedFlag();
+
+ file.force(true);
+
+ file.close();
+
+ verifyFileContent(inputFile, dataOne);
+ }
+
+ @Test
+ public void testFilePositionSetOnWriteInterrupt() throws Exception {
+ final String dataOne = "testdata";
+ FileUtils.writeStringToFile(inputFile, dataOne, "UTF-8");
+
+ final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile(
+ inputFile, "rw");
+
+ final ByteBuffer outputBuffer = createByteBuffer(dataOne);
+
+ long position = 2;
+ file.position(position);
+
+ interruptCurrentThread();
+ try {
+ file.write(outputBuffer, 1L);
+ fail("writing a FileChannel should fail on an interrupted thread");
+ } catch (ClosedChannelException expected) {
+ }
+ clearInterruptedFlag();
+
+ final String readData = readFile(file, 5);
+ file.close();
+
+ assertEquals("read from file", "stdat", readData);
+ }
+
+ @Test
+ public void testFilePositionSetOnReadInterrupt() throws Exception {
+ final String dataOne = "testdata";
+ FileUtils.writeStringToFile(inputFile, dataOne, "UTF-8");
+
+ final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile(
+ inputFile, "rw");
+
+ readFile(file, 4);
+
+ interruptCurrentThread();
+ try {
+ readFile(file, 2);
+ fail("reading a FileChannel should fail on an interrupted thread");
+ } catch (ClosedChannelException expected) {
+ }
+ clearInterruptedFlag();
+
+ final String readData = readFile(file, 4);
+ file.close();
+
+ assertEquals("read from file", "data", readData);
+ }
+
+ @Test
+ public void testCloseAfterInterrupt() throws Exception {
+ final String data = "testdata";
+ FileUtils.writeStringToFile(inputFile, data, "UTF-8");
+
+ final InterruptibleLockedRandomAccessFile file = new InterruptibleLockedRandomAccessFile(
+ inputFile, "rw");
+
+ interruptCurrentThread();
+ try {
+ readFile(file, 1);
+ fail("should not read after interrupt");
+ } catch (final ClosedChannelException expected) {
+ }
+ clearInterruptedFlag();
+
+ file.close();
+ }
+
+ private void interruptCurrentThread() {
+ Thread.currentThread().interrupt();
+ }
+
+ private void clearInterruptedFlag() {
+ Thread.interrupted();
+ }
+
+}