Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,15 @@ public void writeString(String s, String tag) throws IOException {
out.write(bb.array(), bb.position(), bb.limit());
}

public void writeBytes(byte[] b, String string) throws IOException {
if (b == null) {
out.writeInt(-1);
return;
}
out.write(b, 0, b.length);
}


public void writeBuffer(byte[] barr, String tag)
throws IOException {
if (barr == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public interface OutputArchive {

void writeString(String s, String tag) throws IOException;

public void writeBytes(byte[] b, String string) throws IOException;

void writeBuffer(byte[] buf, String tag)
throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public void writeString(String s, String tag) throws IOException {
throwExceptionOnError(tag);
}

public void writeBytes(byte[] b, String string) throws IOException {
writeBuffer(b, string);
}


public void writeBuffer(byte[] buf, String tag)
throws IOException {
printCommaUnlessFirst();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.zookeeper.server;

import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.PrintWriter;
Expand All @@ -35,6 +36,9 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.jute.BinaryInputArchive;
Expand Down Expand Up @@ -1572,6 +1576,116 @@ private void setupQuota() {
traverseNode(quotaPath);
}

/**
* this method uses a stringbuilder to create a new path for children. This
* is faster than string appends ( str1 + str2).
*
* @param oa
* OutputArchive to write to.
* @param path
* a string builder.
* @throws IOException
*/
void serializeNode(OutputArchive oa) throws IOException {
long startTime = System.currentTimeMillis();
ForkJoinPool customThreadPool = new ForkJoinPool(8);

Map<Integer, SegmentedList<Map.Entry<String, DataNode>>> sortMap = new ConcurrentHashMap<>();
// Submit task and wait for its completion
try {
customThreadPool.submit(() ->
nodes.entrySet().parallelStream().forEach(entry -> {
int keyCount = countOccurrences(entry.getKey(), '/');
// Use thread-safe computeIfAbsent and synchronized list or ConcurrentLinkedQueue
sortMap.computeIfAbsent(keyCount, k -> new SegmentedList<>(64))
.add(entry);
})
).get(); // wait for completion

long midTime = System.currentTimeMillis();

ByteArrayOutputStream[] baos = new ByteArrayOutputStream[8];
BinaryOutputArchive[] localArchive = new BinaryOutputArchive[8];
for(int i = 0; i < baos.length; i++) {
baos[i] = new ByteArrayOutputStream(256 * 1024);
localArchive[i] = BinaryOutputArchive.getArchive(baos[i]);
}

for(int j = 0; j < sortMap.size(); j++) {
List<Entry<String, DataNode>> nodeList = sortMap.get(j).toListSnapshot();

ArrayList<ForkJoinTask<?>> tasks = new ArrayList<>();
for (int i = 0; i < 8; i++) {
int taskId = i;
tasks.add(customThreadPool.submit(() -> {
// Task logic for thread
// System.out.println("Executing task " + taskId + " on thread " + Thread.currentThread().getName());
for(int k= taskId; k < nodeList.size(); k += 8) {
DataNode nodeCopy;
DataNode node = nodeList.get(k).getValue();
synchronized (node) {
StatPersisted statCopy = new StatPersisted();
copyStatPersisted(node.stat, statCopy);
//we do not need to make a copy of node.data because the contents
//are never changed
nodeCopy = new DataNode(node.data, node.acl, statCopy);
}
if(nodeList.get(k).getKey().compareTo("/") != 0) {
try {
serializeNodeData(localArchive[taskId], nodeList.get(k).getKey(), nodeCopy);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
if(baos[taskId].size() > 128 * 1024) {
try {
flushBuffer(oa, baos[taskId]);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
try {
flushBuffer(oa, baos[taskId]);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
}
// Wait for all tasks to complete
for (ForkJoinTask<?> task : tasks) {
task.join();
}
// System.out.println("Size of level " + i + " = " + sortMap.get(i).size());
}

long endTime = System.currentTimeMillis();
LOG.error("serialize took: " + (midTime - startTime) + " ms, writing: " + (endTime - midTime) + " ms");
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
customThreadPool.shutdown();
}
}

public synchronized void flushBuffer(OutputArchive oa, ByteArrayOutputStream baos) throws IOException {
if(baos.size() > 0) {
oa.writeBytes(baos.toByteArray(), "batch");
baos.reset();
}
}

public static int countOccurrences(String haystack, char needle) {
int count = 0;
char[] chars = haystack.toCharArray();
for (char c : chars) {
if (c == needle) {
count++;
}
}
return count;
}

/**
* this method uses a stringbuilder to create a new path for children. This
* is faster than string appends ( str1 + str2).
Expand Down Expand Up @@ -1612,7 +1726,7 @@ void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
}
}

// visiable for test
// visible for test
public void serializeNodeData(OutputArchive oa, String path, DataNode node) throws IOException {
oa.writeString(path, "path");
oa.writeRecord(node, "node");
Expand All @@ -1622,18 +1736,22 @@ public void serializeAcls(OutputArchive oa) throws IOException {
aclCache.serialize(oa);
}

public void serializeNodes(OutputArchive oa) throws IOException {
serializeNode(oa, new StringBuilder());
public void serializeNodes(OutputArchive oa, boolean isLeaderBootupSnapshot) throws IOException {
if(isLeaderBootupSnapshot) {
serializeNode(oa);
} else {
serializeNode(oa, new StringBuilder());
}
// / marks end of stream
// we need to check if clear had been called in between the snapshot.
if (root != null) {
oa.writeString("/", "path");
}
}

public void serialize(OutputArchive oa, String tag) throws IOException {
public void serialize(OutputArchive oa, String tag, boolean isLeaderBootupSnapshot) throws IOException {
serializeAcls(oa);
serializeNodes(oa);
serializeNodes(oa, isLeaderBootupSnapshot);
}

public void deserialize(InputArchive ia, String tag) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.apache.zookeeper.server;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;

public class SegmentedList<E> {

private final int segmentCount;
private final List<E>[] segments;
private final ReentrantLock[] locks;

@SuppressWarnings("unchecked")
public SegmentedList(int segmentCount) {
this.segmentCount = segmentCount;
this.segments = (List<E>[]) new List[segmentCount];
this.locks = new ReentrantLock[segmentCount];

for (int i = 0; i < segmentCount; i++) {
segments[i] = new ArrayList<>();
locks[i] = new ReentrantLock();
}
}

private int segmentIndex(Object element) {
return (element == null ? 0 : (element.hashCode() & 0x7fffffff) % segmentCount);
}

public void add(E element) {
int seg = segmentIndex(element);
locks[seg].lock();
try {
segments[seg].add(element);
} finally {
locks[seg].unlock();
}
}

public List<E> toListSnapshot() {
// snapshot copied under all locks to ensure consistency
List<E> snapshot = new ArrayList<>();
for (int i = 0; i < segmentCount; i++) {
locks[i].lock();
}
try {
for (List<E> segment : segments) {
snapshot.addAll(segment);
}
} finally {
for (int i = segmentCount - 1; i >= 0; i--) {
locks[i].unlock();
}
}
return snapshot;
}

public int size() {
int size = 0;
for (int i = 0; i < segmentCount; i++) {
locks[i].lock();
}
try {
for (List<E> segment : segments) {
size += segment.size();
}
} finally {
for (int i = segmentCount - 1; i >= 0; i--) {
locks[i].unlock();
}
}
return size;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void run() {
new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
zks.takeSnapshot(false);
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ public void deserializeSnapshot(InputArchive ia) throws IOException {
* @throws InterruptedException
*/
public void serializeSnapshot(OutputArchive oa) throws IOException, InterruptedException {
SerializeUtils.serializeSnapshot(getDataTree(), oa, getSessionWithTimeOuts());
SerializeUtils.serializeSnapshot(getDataTree(), oa, getSessionWithTimeOuts(), false);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's separate API changes to a separate PR. We can have core snapshot changes in this PR

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to check just the logic change, you can checkout the older commit - d6474f8

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,17 +530,17 @@ public void loadData() throws IOException, InterruptedException {
}

// Make a clean snapshot
takeSnapshot();
takeSnapshot(true);
}

public void takeSnapshot() {
takeSnapshot(false);
public void takeSnapshot(boolean isLeaderBootupSnapshot) {
takeSnapshot(false, isLeaderBootupSnapshot);
}

public void takeSnapshot(boolean syncSnap) {
public void takeSnapshot(boolean syncSnap, boolean isLeaderBootupSnapshot) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, add a config to determine whether parallel serialization is enabled. In case not it should use regular snapshot mechanism

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, parallel serilazation will be config driven, along with the number of threads.

long start = Time.currentElapsedTime();
try {
txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap, isLeaderBootupSnapshot);
} catch (IOException e) {
LOG.error("Severe unrecoverable error, exiting", e);
// This is a severe error that we cannot recover from,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,15 @@ protected void serialize(
DataTree dt,
Map<Long, Integer> sessions,
OutputArchive oa,
FileHeader header) throws IOException {
FileHeader header,
boolean isLeaderBootupSnapshot) throws IOException {
// this is really a programmatic error and not something that can
// happen at runtime
if (header == null) {
throw new IllegalStateException("Snapshot's not open for writing: uninitialized header");
}
header.serialize(oa, "fileheader");
SerializeUtils.serializeSnapshot(dt, oa, sessions);
SerializeUtils.serializeSnapshot(dt, oa, sessions, isLeaderBootupSnapshot);
}

/**
Expand All @@ -261,12 +262,13 @@ public synchronized void serialize(
DataTree dt,
Map<Long, Integer> sessions,
File snapShot,
boolean fsync) throws IOException {
boolean fsync,
boolean isLeaderBootupSnapshot) throws IOException {
if (!close) {
try (CheckedOutputStream snapOS = SnapStream.getOutputStream(snapShot, fsync)) {
OutputArchive oa = BinaryOutputArchive.getArchive(snapOS);
FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
serialize(dt, sessions, oa, header);
serialize(dt, sessions, oa, header, isLeaderBootupSnapshot);
SnapStream.sealStream(snapOS, oa);

// Digest feature was added after the CRC to make it backward
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener l
if (trustEmptyDB) {
/* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
* or use Map on save() */
save(dt, (ConcurrentHashMap<Long, Integer>) sessions, false);
save(dt, (ConcurrentHashMap<Long, Integer>) sessions, false, false);

/* return a zxid of 0, since we know the database is empty */
return 0L;
Expand Down Expand Up @@ -482,12 +482,12 @@ public TxnHeader getLastLoggedTxnHeader() {
public void save(
DataTree dataTree,
ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
boolean syncSnap) throws IOException {
boolean syncSnap, boolean isLeaderBootupSnapshot) throws IOException {
long lastZxid = dataTree.lastProcessedZxid;
File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile);
try {
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap, isLeaderBootupSnapshot);
} catch (IOException e) {
if (snapshotFile.length() == 0) {
/* This may be caused by a full disk. In such a case, the server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public interface SnapShot {
* @param fsync sync the snapshot immediately after write
* @throws IOException
*/
void serialize(DataTree dt, Map<Long, Integer> sessions, File name, boolean fsync) throws IOException;
void serialize(DataTree dt, Map<Long, Integer> sessions, File name, boolean fsync, boolean isLeaderBootupSnapshot) throws IOException;

/**
* find the most recent snapshot file
Expand Down
Loading