Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.github.ambry.clustermap;

public class FileStoreException extends RuntimeException{

private static final long serialVersionUID = 1L;
private final FileStoreErrorCode error;

public FileStoreException(String s, FileStoreErrorCode error) {
super(s);
this.error = error;
}

public FileStoreException(String s, FileStoreErrorCode error, Throwable throwable) {
super(s, throwable);
this.error = error;
}

public enum FileStoreErrorCode{
FileStoreRunningFailure
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public interface ReplicaSyncUpManager {
* @throws InterruptedException
*/
void waitBootstrapCompleted(String partitionName) throws InterruptedException;
void initiateFileCopy(ReplicaId replicaId);
void waitForFileCopyCompleted(String partitionName) throws InterruptedException;

/**
* Update replica lag (in byte) between two replicas (local and peer replica) and check sync-up status.
Expand Down Expand Up @@ -64,6 +66,8 @@ boolean updateReplicaLagAndCheckSyncStatus(ReplicaId localReplica, ReplicaId pee
*/
void onBootstrapComplete(ReplicaId replicaId);

void onFileCopyComplete(ReplicaId replicaId);

/**
* Deactivation on given replica is complete.
* @param replicaId the replica which completes deactivation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,13 @@ public enum StateModelListenerType {
* leadership hand-off occurs. For example, if any replica becomes LEADER from STANDBY, it is supposed to replicate
* data from VCR nodes. This is part of two-way replication between Ambry and cloud.
*/
CloudToStoreReplicationManagerListener
CloudToStoreReplicationManagerListener,

/**
* The partition state change listener owned by Helix participant. It takes actions when partition state transition
* occurs.
*/
FileCopyManagerListener


}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public enum TransitionErrorCode {
/**
* If the resource name is not a numeric number.
*/
InvalidResourceName
InvalidResourceName,

FileCopyFailure
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.github.ambry.config;

public class FileCopyConfig {

public static final String PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK = "parallel.partition.hydration.count.per.disk";
@Config(PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK)
public final int parallelPartitionHydrationCountPerDisk;

public static final String NUMBER_OF_FILE_COPY_THREADS = "number.of.file.copy.threads";
@Config(NUMBER_OF_FILE_COPY_THREADS)
public final int numberOfFileCopyThreads;

public static final String FILE_CHUNK_TIMEOUT_IN_MINUTES = "file.chunk.timeout.in.minutes";
@Config(FILE_CHUNK_TIMEOUT_IN_MINUTES)
public final long fileChunkTimeoutInMins;

/**
* The frequency at which the data gets flushed to disk
*/
public static final String STORE_DATA_FLUSH_INTERVAL_IN_MBS = "store.data.flush.interval.In.MBs";
@Config(STORE_DATA_FLUSH_INTERVAL_IN_MBS)
@Default("1000")
public final long storeDataFlushIntervalInMbs;

public static final String File_COPY_META_DATA_FILE_NAME = "file.copy.meta.data.file.name";
@Config(File_COPY_META_DATA_FILE_NAME)
@Default("sealed_logs_metadata_file")
public final String fileCopyMetaDataFileName;

public FileCopyConfig(VerifiableProperties verifiableProperties) {
fileCopyMetaDataFileName = verifiableProperties.getString(File_COPY_META_DATA_FILE_NAME, "sealed_logs_metadata_file");
parallelPartitionHydrationCountPerDisk = verifiableProperties.getInt(PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK, 1);
numberOfFileCopyThreads = verifiableProperties.getInt(NUMBER_OF_FILE_COPY_THREADS, 4);
fileChunkTimeoutInMins = verifiableProperties.getInt(FILE_CHUNK_TIMEOUT_IN_MINUTES, 5);
storeDataFlushIntervalInMbs = verifiableProperties.getLong(STORE_DATA_FLUSH_INTERVAL_IN_MBS, 1000);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.github.ambry.config;

public enum ServerReplicationMode {
BLOB_BASED,
FILE_BASED;
}
23 changes: 23 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/config/StoreConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -674,15 +674,38 @@ public class StoreConfig {
public final boolean storeBlockStaleBlobStoreToStart;
public final static String storeBlockStaleBlobStoreToStartName = "store.block.stale.blob.store.to.start";

/**
* Config to Decide Replication Protocol For Hydration Of Newly Added Replicas
*/
public static final String SERVER_REPLICATION_PROTOCOL_FOR_HYDRATION = "server.replication.protocol.for.hydration";
@Config(SERVER_REPLICATION_PROTOCOL_FOR_HYDRATION)
public final ServerReplicationMode serverReplicationProtocolForHydration;


/**
* Whether to attempt reshuffling of reordered disks and subsequent process termination.
*/
@Config("store.reshuffle.disks.on.reorder")
@Default("false")
public final boolean storeReshuffleDisksOnReorder;

public static final String FILE_COPY_IN_PROGRESS_FILE_NAME = "file.copy.in.progress.file.name";
@Config(FILE_COPY_IN_PROGRESS_FILE_NAME)
@Default("file_copy_in_progress")
public final String fileCopyInProgressFileName;

public static final String BOOTSTRAP_IN_PROGRESS_FILE = "bootstrap.in.progress.file.name";
@Config(BOOTSTRAP_IN_PROGRESS_FILE)
@Default("bootstrap_in_progress")
public final String bootstrapInProgressFile;

public final static String storeReshuffleDisksOnReorderName = "store.reshuffle.disks.on.reorder";

public StoreConfig(VerifiableProperties verifiableProperties) {
bootstrapInProgressFile = verifiableProperties.getString(BOOTSTRAP_IN_PROGRESS_FILE, "bootstrap_in_progress");
fileCopyInProgressFileName = verifiableProperties.getString(FILE_COPY_IN_PROGRESS_FILE_NAME, "file_copy_in_progress");
serverReplicationProtocolForHydration = verifiableProperties.getEnum(SERVER_REPLICATION_PROTOCOL_FOR_HYDRATION,
ServerReplicationMode.class, ServerReplicationMode.BLOB_BASED);
storeKeyFactory = verifiableProperties.getString("store.key.factory", "com.github.ambry.commons.BlobIdFactory");
storeDataFlushIntervalSeconds = verifiableProperties.getLong("store.data.flush.interval.seconds", 60);
storeIndexMaxMemorySizeBytes = verifiableProperties.getInt("store.index.max.memory.size.bytes", 20 * 1024 * 1024);
Expand Down
15 changes: 15 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ public interface RequestAPI {
*/
void handleReplicaMetadataRequest(NetworkRequest request) throws IOException, InterruptedException;

/**
*
* @param request
* @throws IOException
* @throws InterruptedException
*/

/**
* Replicate one specific Blob from a remote host to the local store.
* @param request The request that contains the remote host information and the blob id to be replicated.
Expand Down Expand Up @@ -116,4 +123,12 @@ default void handleAdminRequest(NetworkRequest request) throws InterruptedExcept
default void handleUndeleteRequest(NetworkRequest request) throws InterruptedException, IOException {
throw new UnsupportedOperationException("Undelete request not supported on this node");
}

default void handleFileMetaDataRequest(NetworkRequest request) throws InterruptedException, IOException{
throw new UnsupportedOperationException("File Meta Data request not supported on this node");
}

default void handleFileChunkRequest(NetworkRequest request) throws InterruptedException, IOException{
throw new UnsupportedOperationException("File Chunk request not supported on this node");
}
}
15 changes: 15 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/server/StoreManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import com.github.ambry.store.FileStore;



/**
Expand All @@ -34,6 +36,17 @@ public interface StoreManager {
*/
boolean addBlobStore(ReplicaId replica);

boolean addFileStore(ReplicaId replicaId);

void setUpReplica(String partitionName);


/**
* Build state after filecopy is completed
* @param partitionName the partition id for which state is to be built..
*/
void buildStateForFileCopy(String partitionName);

/**
* Remove store from storage manager.
* @param id the {@link PartitionId} associated with store
Expand Down Expand Up @@ -62,6 +75,8 @@ public interface StoreManager {
*/
Store getStore(PartitionId id);

FileStore getFileStore(PartitionId id);

/**
* Get replicaId on current node by partition name. (There should be at most one replica belonging to specific
* partition on single node)
Expand Down
34 changes: 34 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/store/FileMetaData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.github.ambry.store;

import java.util.List;


public class FileMetaData {
SealedFileInfo sealedSegments;
List<SealedFileInfo> indexSegments;
List<SealedFileInfo> bloomFilters;

public SealedFileInfo getSealedSegments() {
return sealedSegments;
}

public void setSealedSegments(SealedFileInfo sealedSegments) {
this.sealedSegments = sealedSegments;
}

public List<SealedFileInfo> getIndexSegments() {
return indexSegments;
}

public void setIndexSegments(List<SealedFileInfo> indexSegments) {
this.indexSegments = indexSegments;
}

public List<SealedFileInfo> getBloomFilters() {
return bloomFilters;
}

public void setBloomFilters(List<SealedFileInfo> bloomFilters) {
this.bloomFilters = bloomFilters;
}
}
19 changes: 19 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/store/SealedFileInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.github.ambry.store;

public class SealedFileInfo {
private String fileName;
private final long fileSize;

public SealedFileInfo(String fileName, Long fileSize) {
this.fileName = fileName;
this.fileSize = fileSize;
}
public String getFileName() {
return fileName;
}

public Long getFileSize() {
return fileSize;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public CloudStorageManager(VerifiableProperties properties, VcrMetrics vcrMetric
public boolean addBlobStore(ReplicaId replica) {
return createAndStartBlobStoreIfAbsent(replica.getPartitionId()) != null;
}
@Override
public void buildStateForFileCopy(String partitionName){
// no-op
}

@Override
public boolean shutdownBlobStore(PartitionId id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
public class AmbryReplicaSyncUpManager implements ReplicaSyncUpManager {
private static final Logger logger = LoggerFactory.getLogger(AmbryReplicaSyncUpManager.class);
private final ConcurrentHashMap<String, CountDownLatch> partitionToBootstrapLatch = new ConcurrentHashMap<>();

private final ConcurrentHashMap<String, CountDownLatch> partitionToFileCopyLatch = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Boolean> partitionToFileCopySuccessLatch = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, CountDownLatch> partitionToDeactivationLatch = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, CountDownLatch> partitionToDisconnectionLatch = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Boolean> partitionToBootstrapSuccess = new ConcurrentHashMap<>();
Expand All @@ -63,6 +66,12 @@ public void initiateBootstrap(ReplicaId replicaId) {
ReplicaState.BOOTSTRAP));
}

@Override
public void initiateFileCopy(ReplicaId replicaId){
partitionToFileCopyLatch.put(replicaId.getPartitionId().toPathString(), new CountDownLatch(1));
partitionToFileCopySuccessLatch.put(replicaId.getPartitionId().toPathString(), false);
}

@Override
public void initiateDeactivation(ReplicaId replicaId) {
partitionToDeactivationLatch.put(replicaId.getPartitionId().toPathString(), new CountDownLatch(1));
Expand Down Expand Up @@ -101,6 +110,22 @@ public void waitBootstrapCompleted(String partitionName) throws InterruptedExcep
}
}

@Override
public void waitForFileCopyCompleted(String partitionName) throws InterruptedException {
CountDownLatch latch = partitionToFileCopyLatch.get(partitionName);
if(latch == null) {
logger.info("Skipping file copy for existing partition {}", partitionName);
} else{
logger.info("Waiting for new partition to {} to comeplete FileCopy", partitionName);
latch.await();
partitionToFileCopyLatch.remove(partitionName);
if(!partitionToFileCopySuccessLatch.remove(partitionName)){
throw new StateTransitionException("Partition " + partitionName + " failed to copy files.", FileCopyFailure);
}
logger.info("File Copy is complete on partition {}", partitionName);
}
}

@Override
public void waitDeactivationCompleted(String partitionName) throws InterruptedException {
CountDownLatch latch = partitionToDeactivationLatch.get(partitionName);
Expand Down Expand Up @@ -192,6 +217,12 @@ public void onBootstrapComplete(ReplicaId replicaId) {
countDownLatch(partitionToBootstrapLatch, replicaId.getPartitionId().toPathString());
}

@Override
public void onFileCopyComplete(ReplicaId replicaId){
partitionToFileCopySuccessLatch.put(replicaId.getPartitionId().toPathString(), true);
countDownLatch(partitionToFileCopyLatch, replicaId.getPartitionId().toPathString());
}

@Override
public void onDeactivationComplete(ReplicaId replicaId) {
partitionToDeactivationSuccess.put(replicaId.getPartitionId().toPathString(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,18 @@ public void onPartitionBecomeBootstrapFromOffline(String partitionName) {
if (storageManagerListener != null) {
storageManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName);
}

/**
* Should be invoked after storage manager listener to ensure that the replica is added to the store.
* Conditional execution based on requirement for File Copy.
*/
PartitionStateChangeListener fileCopyManagerListener =
partitionStateChangeListeners.get(StateModelListenerType.FileCopyManagerListener);
if(fileCopyManagerListener != null){
fileCopyManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName);
replicaSyncUpManager.waitForFileCopyCompleted(partitionName);
}

// 2. take actions in replication manager (add new replica if necessary)
PartitionStateChangeListener replicationManagerListener =
partitionStateChangeListeners.get(StateModelListenerType.ReplicationManagerListener);
Expand All @@ -882,6 +894,11 @@ public void onPartitionBecomeBootstrapFromOffline(String partitionName) {
if (statsManagerListener != null) {
statsManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName);
}
} catch (InterruptedException e) {
//TODO: Handle the exception more gracefully.
logger.error("Bootstrap was interrupted on partition {}", partitionName);
localPartitionAndState.put(partitionName, ReplicaState.ERROR);
throw new StateTransitionException("Bootstrap failed or was interrupted", BootstrapFailure);
} catch (Exception e) {
localPartitionAndState.put(partitionName, ReplicaState.ERROR);
throw e;
Expand Down
19 changes: 19 additions & 0 deletions ambry-file-transfer/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
plugins {
id 'java'
}

group = 'com.github.ambry'
version = '0.4.512'

repositories {
mavenCentral()
}

dependencies {
testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
}

test {
useJUnitPlatform()
}
Loading