Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
Expand All @@ -31,6 +32,7 @@
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -65,11 +67,23 @@ public void preCommitStoreFile(final ObserverContext<? extends RegionCoprocessor
+ "data replication.");
return;
}
TableName tableName = env.getRegionInfo().getTable();
if (
env.getRegion().getTableDescriptor().getColumnFamily(family).getScope()
!= HConstants.REPLICATION_SCOPE_GLOBAL
) {
LOG
.debug("Skipping recording bulk load entries in preCommitStoreFile for table:{}, family:{},"
+ " Because the replication is not enabled", tableName, Bytes.toString(family));
return;
}

// This is completely cheating AND getting a HRegionServer from a RegionServerEnvironment is
// just going to break. This is all private. Not allowed. Regions shouldn't assume they are
// hosted in a RegionServer. TODO: fix.
RegionServerServices rss = ((HasRegionServerServices) env).getRegionServerServices();
Replication rep = (Replication) ((HRegionServer) rss).getReplicationSourceService();
rep.addHFileRefsToQueue(env.getRegionInfo().getTable(), family, pairs);

rep.addHFileRefsToQueue(tableName, family, pairs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -60,14 +63,15 @@
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand Down Expand Up @@ -121,6 +125,10 @@ public class TestBulkLoadReplication extends TestReplicationBase {
@ClassRule
public static TemporaryFolder testFolder = new TemporaryFolder();

private static ReplicationQueueStorage queueStorage;

private static boolean replicationPeersAdded = false;

@BeforeClass
public static void setUpBeforeClass() throws Exception {
setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID);
Expand All @@ -129,6 +137,8 @@ public static void setUpBeforeClass() throws Exception {
setupConfig(UTIL3, "/3");
TestReplicationBase.setUpBeforeClass();
startThirdCluster();
queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getConnection(),
UTIL1.getConfiguration());
}

private static void startThirdCluster() throws Exception {
Expand All @@ -152,22 +162,27 @@ private static void startThirdCluster() throws Exception {
@Before
@Override
public void setUpBase() throws Exception {
// "super.setUpBase()" already sets replication from 1->2,
// then on the subsequent lines, sets 2->1, 2->3 and 3->2.
// So we have following topology: "1 <-> 2 <->3"
super.setUpBase();
ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3);
// adds cluster1 as a remote peer on cluster2
UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
// adds cluster3 as a remote peer on cluster2
UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
// adds cluster2 as a remote peer on cluster3
UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
setupCoprocessor(UTIL1);
setupCoprocessor(UTIL2);
setupCoprocessor(UTIL3);
// removing the peer and adding again causing the previously completed bulk load jobs getting
// submitted again, adding a check to add the peers only once.
if (!replicationPeersAdded) {
// "super.setUpBase()" already sets replication from 1->2,
// then on the subsequent lines, sets 2->1, 2->3 and 3->2.
// So we have following topology: "1 <-> 2 <->3"
super.setUpBase();
ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3);
// adds cluster1 as a remote peer on cluster2
UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
// adds cluster3 as a remote peer on cluster2
UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
// adds cluster2 as a remote peer on cluster3
UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
setupCoprocessor(UTIL1);
setupCoprocessor(UTIL2);
setupCoprocessor(UTIL3);
replicationPeersAdded = true;
}
BULK_LOADS_COUNT = new AtomicInteger(0);
}

Expand Down Expand Up @@ -195,15 +210,6 @@ private void setupCoprocessor(HBaseTestingUtil cluster) {
});
}

@After
@Override
public void tearDownBase() throws Exception {
super.tearDownBase();
UTIL2.getAdmin().removeReplicationPeer(PEER_ID1);
UTIL2.getAdmin().removeReplicationPeer(PEER_ID3);
UTIL3.getAdmin().removeReplicationPeer(PEER_ID2);
}

protected static void setupBulkLoadConfigsForCluster(Configuration config,
String clusterReplicationId) throws Exception {
config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
Expand Down Expand Up @@ -322,4 +328,81 @@ public void postBulkLoadHFile(ObserverContext<? extends RegionCoprocessorEnviron
});
}
}

@Test
public void testBulkloadReplicationActiveActiveForNoRepFamily() throws Exception {
Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
byte[] row = Bytes.toBytes("004");
byte[] value = Bytes.toBytes("v4");
assertBulkLoadConditionsForNoRepFamily(row, value, UTIL1, peer1TestTable, peer2TestTable,
peer3TestTable);
// additional wait to make sure no extra bulk load happens
Thread.sleep(400);
assertEquals(1, BULK_LOADS_COUNT.get());
assertEquals(0, queueStorage.getAllHFileRefs().size());
}

private void assertBulkLoadConditionsForNoRepFamily(byte[] row, byte[] value,
HBaseTestingUtil utility, Table... tables) throws Exception {
BULK_LOAD_LATCH = new CountDownLatch(1);
bulkLoadOnClusterForNoRepFamily(row, value, utility);
assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
assertTableHasValue(tables[0], row, value);
assertTableNotHasValue(tables[1], row, value);
assertTableNotHasValue(tables[2], row, value);
}

private void bulkLoadOnClusterForNoRepFamily(byte[] row, byte[] value, HBaseTestingUtil cluster)
throws Exception {
String bulkloadFile = createHFileForNoRepFamilies(row, value, cluster.getConfiguration());
Path bulkLoadFilePath = new Path(bulkloadFile);
copyToHdfsForNoRepFamily(bulkloadFile, cluster.getDFSCluster());
BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration());
Map<byte[], List<Path>> family2Files = new HashMap<>();
List<Path> files = new ArrayList<>();
files.add(new Path(
BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/" + bulkLoadFilePath.getName()));
family2Files.put(noRepfamName, files);
bulkLoadHFilesTool.bulkLoad(tableName, family2Files);
}

private String createHFileForNoRepFamilies(byte[] row, byte[] value, Configuration clusterConfig)
throws IOException {
ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY);
cellBuilder.setRow(row).setFamily(TestReplicationBase.noRepfamName)
.setQualifier(Bytes.toBytes("1")).setValue(value).setType(Cell.Type.Put);

HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig);
// TODO We need a way to do this without creating files
File hFileLocation = testFolder.newFile();
FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
try {
hFileFactory.withOutputStream(out);
hFileFactory.withFileContext(new HFileContextBuilder().build());
HFile.Writer writer = hFileFactory.create();
try {
writer.append(new KeyValue(cellBuilder.build()));
} finally {
writer.close();
}
} finally {
out.close();
}
return hFileLocation.getAbsoluteFile().getAbsolutePath();
}

private void copyToHdfsForNoRepFamily(String bulkLoadFilePath, MiniDFSCluster cluster)
throws Exception {
Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/");
cluster.getFileSystem().mkdirs(bulkLoadDir);
cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
}

private void assertTableNotHasValue(Table table, byte[] row, byte[] value) throws IOException {
Get get = new Get(row);
Result result = table.get(get);
assertNotEquals(Bytes.toString(value), Bytes.toString(result.value()));
}
}