diff --git a/ambry-api/src/main/java/com/github/ambry/config/FileCopyBasedReplicationConfig.java b/ambry-api/src/main/java/com/github/ambry/config/FileCopyBasedReplicationConfig.java index 5f62253010..32a05ca22d 100644 --- a/ambry-api/src/main/java/com/github/ambry/config/FileCopyBasedReplicationConfig.java +++ b/ambry-api/src/main/java/com/github/ambry/config/FileCopyBasedReplicationConfig.java @@ -91,7 +91,7 @@ public class FileCopyBasedReplicationConfig { public FileCopyBasedReplicationConfig(VerifiableProperties verifiableProperties) { fileCopyMetaDataFileName = verifiableProperties.getString(FILE_COPY_META_DATA_FILE_NAME, "segments_metadata_file"); fileCopyParallelPartitionHydrationCountPerDisk = verifiableProperties.getInt(FILE_COPY_PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK, 1); - fileCopyNumberOfFileCopyThreads = verifiableProperties.getInt(FILE_COPY_NUMBER_OF_FILE_COPY_THREADS, 4); + fileCopyNumberOfFileCopyThreads = verifiableProperties.getInt(FILE_COPY_NUMBER_OF_FILE_COPY_THREADS, 1); fileCopyDataFlushIntervalInMbs = verifiableProperties.getLong(FILE_COPY_DATA_FLUSH_INTERVAL_IN_MBS, 1000); fileCopyReplicaTimeoutSecs = verifiableProperties.getLong(FILE_COPY_REPLICA_TIMEOUT_SECS, 36000); fileCopySchedulerWaitTimeSecs = verifiableProperties.getLong(FILE_COPY_SCHEDULER_WAIT_TIME_SECS, 30); diff --git a/ambry-api/src/main/java/com/github/ambry/store/PartitionFileStore.java b/ambry-api/src/main/java/com/github/ambry/store/PartitionFileStore.java index 15e0c4a093..0760381b56 100644 --- a/ambry-api/src/main/java/com/github/ambry/store/PartitionFileStore.java +++ b/ambry-api/src/main/java/com/github/ambry/store/PartitionFileStore.java @@ -53,6 +53,7 @@ StoreFileChunk readStoreFileChunkFromDisk(String fileName, long offset, long siz */ void moveAllRegularFiles(String srcDirPath, String destDirPath) throws IOException; + void cleanUpDirectory(String srcPath); /** * move a pre-allocated file in the target path from diskSpaceAllocator's pre-allocated files for the store * diff --git a/ambry-api/src/main/java/com/github/ambry/store/SegmentTracker.java b/ambry-api/src/main/java/com/github/ambry/store/SegmentTracker.java new file mode 100644 index 0000000000..91338cb15e --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/store/SegmentTracker.java @@ -0,0 +1,24 @@ +package com.github.ambry.store; + +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; + + +public class SegmentTracker { + private final RandomAccessFile randomAccessFile; + + + public SegmentTracker(RandomAccessFile randomAccessFile) { + this.randomAccessFile = randomAccessFile; + } + public boolean isOpen(){ + return this.randomAccessFile.getChannel().isOpen(); + } + + public FileChannel getRandomAccessFilechannel() { + return randomAccessFile.getChannel(); + } + +} diff --git a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java index 792c118c0f..282781cc8c 100644 --- a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java +++ b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java @@ -878,10 +878,33 @@ public void onPartitionBecomeBootstrapFromPreBootstrap(String partitionName) { @Override public void onPartitionBecomeBootstrapFromOffline(String partitionName) { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } participantMetrics.incStateTransitionMetric(partitionName, ReplicaState.OFFLINE, ReplicaState.BOOTSTRAP); try { + logger.info("FCH TEST: Transition For Partition PreBootstrap From Offline {}", partitionName); // 1. take actions in storage manager (add new replica if necessary) - onPartitionBecomePreBootstrapFromOffline(partitionName); + try{ + onPartitionBecomePreBootstrapFromOffline(partitionName); + }catch (Exception e){ + logger.error("Error in onPartitionBecomePreBootstrapFromOffline", e); + localPartitionAndState.put(partitionName, ReplicaState.ERROR); + throw e; + } + logger.info("FCH TEST: Transition For Partition FileCopy from PreBootstrap {}", partitionName); + PartitionStateChangeListener fileCopyStateChangeListener = + partitionStateChangeListeners.get(StateModelListenerType.FileCopyManagerListener); + + if (fileCopyStateChangeListener != null) { + logger.info("FCH TEST: Listener invoked for partition {}", partitionName); + fileCopyStateChangeListener.onPartitionBecomeBootstrapFromOffline(partitionName); + replicaSyncUpManager.waitForFileCopyCompleted(partitionName); + } + logger.info("FCH TEST: Transition For Partition Bootstrap from PreBootstrap {}", partitionName); + onPartitionBecomeBootstrapFromPreBootstrap(partitionName); // 2. take actions in replication manager (add new replica if necessary) PartitionStateChangeListener replicationManagerListener = @@ -897,7 +920,11 @@ public void onPartitionBecomeBootstrapFromOffline(String partitionName) { } } catch (Exception e) { localPartitionAndState.put(partitionName, ReplicaState.ERROR); - throw e; + try { + throw e; + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } } logger.info("Before setting partition {} to bootstrap", partitionName); localPartitionAndState.put(partitionName, ReplicaState.BOOTSTRAP); @@ -905,7 +932,7 @@ public void onPartitionBecomeBootstrapFromOffline(String partitionName) { } @Override - public void onPartitionBecomeStandbyFromBootstrap(String partitionName) { + public void onPartitionBecomeStandbyFromBootstrap(String partitionName){ participantMetrics.incStateTransitionMetric(partitionName, ReplicaState.BOOTSTRAP, ReplicaState.STANDBY); try { // 1. take actions in replication manager (wait for replica to finish bootstrapping) diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/DiskAwareFileCopyThreadPoolManager.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/DiskAwareFileCopyThreadPoolManager.java index 423958123b..92c78560e0 100644 --- a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/DiskAwareFileCopyThreadPoolManager.java +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/DiskAwareFileCopyThreadPoolManager.java @@ -110,6 +110,7 @@ public List getDiskIdsToHydrate() { public void submitReplicaForHydration(ReplicaId replicaId, FileCopyStatusListener fileCopyStatusListener, FileCopyHandler fileCopyHandler) { try { + logger.info("FCH TEST: Submit"); threadQueueLock.lock(); DiskId diskId = replicaId.getDiskId(); FileCopyThread fileCopyThread = new FileCopyThread(fileCopyHandler, fileCopyStatusListener); @@ -134,18 +135,18 @@ public void submitReplicaForHydration(ReplicaId replicaId, FileCopyStatusListene @Override public void stopAndRemoveReplicaFromThreadPool(ReplicaId replicaId) throws InterruptedException { threadQueueLock.lock(); - logger.info("Stopping and removing replica {} from thread pool", replicaId); + logger.info("FCH TEST: Stopping and removing replica {} from thread pool", replicaId); FileCopyThread fileCopyThread = replicaToFileCopyThread.get(replicaId); if (fileCopyThread == null || !fileCopyThread.isAlive()) { - logger.info("No thread found for replica {}. Nothing to stop.", replicaId); + logger.info("FCH TEST: No thread found for replica {}. Nothing to stop.", replicaId); threadQueueLock.unlock(); return; } long threadShutDownInitiationTime = System.currentTimeMillis(); - logger.info("Stopping thread for replica {}", replicaId); + logger.info("FCH TEST: Stopping thread for replica {}", replicaId); fileCopyThread.shutDown(); - logger.info("Thread for replica {} stopped in {} ms", replicaId, + logger.info("FCH TEST: Thread for replica {} stopped in {} ms", replicaId, System.currentTimeMillis() - threadShutDownInitiationTime); threadQueueLock.unlock(); } diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyBasedReplicationManager.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyBasedReplicationManager.java index 61ef41a3db..35513413de 100644 --- a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyBasedReplicationManager.java +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyBasedReplicationManager.java @@ -16,6 +16,7 @@ import com.codahale.metrics.MetricRegistry; import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.clustermap.ClusterParticipant; +import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.PartitionStateChangeListener; import com.github.ambry.clustermap.ReplicaId; import com.github.ambry.clustermap.ReplicaSyncUpManager; @@ -30,8 +31,16 @@ import com.github.ambry.replica.prioritization.PrioritizationManager; import com.github.ambry.replica.prioritization.PrioritizationManagerFactory; import com.github.ambry.server.StoreManager; +import java.io.BufferedReader; +import java.io.FileReader; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +90,7 @@ public FileCopyBasedReplicationManager(FileCopyBasedReplicationConfig fileCopyBa if (clusterParticipant != null) { clusterParticipant.registerPartitionStateChangeListener(StateModelListenerType.FileCopyManagerListener, new PartitionStateChangeListenerImpl()); - logger.info("File Copy Manager's state change listener registered!"); + logger.info("FCH TEST: File Copy Manager's state change listener registered!"); } else { throw new InstantiationException("File Copy Manager cannot be instantiated without a ClusterParticipant"); } @@ -90,6 +99,7 @@ public FileCopyBasedReplicationManager(FileCopyBasedReplicationConfig fileCopyBa this.prioritizationManager = prioritizationManager; this.fileCopyBasedReplicationScheduler = fileCopyBasedReplicationSchedulerFactory.getFileCopyBasedReplicationScheduler(); this.fileCopyBasedReplicationSchedulerThread = new Thread(fileCopyBasedReplicationScheduler); + if(!prioritizationManager.isRunning()) { throw new InstantiationException("File Copy cannot run when Prioritization Manager is not running"); } @@ -103,23 +113,79 @@ public void start() throws InterruptedException, IOException { logger.info("Starting FileCopyBasedReplicationManager"); fileCopyBasedReplicationSchedulerThread.start(); isRunning = true; - logger.info("FileCopyBasedReplicationManager started"); + logger.info("FCH TEST: FileCopyBasedReplicationManager started"); +// PartitionStateChangeListenerImpl partitionStateChangeListener = new PartitionStateChangeListenerImpl(); +// List partitionIds = Arrays.asList(20l, 127l); +// +// logger.info("FCH TEST: All Partitions to be hydrated up: {}", storeManager.getLocalPartitions().stream().map( +// PartitionId::getId).collect(Collectors.toList())); +// +// List partitionIdList = +// storeManager.getLocalPartitions().stream().filter(p -> partitionIds.contains(p.getId())).collect(Collectors.toList()); +// +// logger.info("FCH TEST: Partitions to be hydrated up: {}", partitionIdList); +// //Integrate clean up. +// ExecutorService executor = Executors.newFixedThreadPool(30); // use appropriate number of threads +// +// for (PartitionId partitionId : partitionIdList) { +// executor.execute(() -> { +// try { +// partitionStateChangeListener.onPartitionBecomeBootstrapFromOffline(String.valueOf(partitionId.getId())); +// } catch (Exception e) { +// logger.error("FCH TEST: Failed to build state for file copy for partition {}", partitionId, e); +// } +// }); +// } } public void shutdown() throws InterruptedException { - logger.info("Shutting down FileCopyBasedReplicationManager"); + logger.info("FCH TEST: Shutting down FileCopyBasedReplicationManager"); fileCopyBasedReplicationScheduler.shutdown(); fileCopyBasedReplicationSchedulerThread.join(); isRunning = false; - logger.info("FileCopyBasedReplicationManager shutdown"); + logger.info("FCH TEST: FileCopyBasedReplicationManager shutdown"); + } + + public static List readFromFile(String fileName) { + String line; + String[] values = new String[0]; + try (BufferedReader br = new BufferedReader(new FileReader(fileName))) { + while ((line = br.readLine()) != null) { + // Split by comma + values = line.split(","); + for (String value : values) { + System.out.print(value.trim() + " "); + } + System.out.println(); // Newline after each row + } + } catch (IOException e) { + e.printStackTrace(); + } + return Arrays.asList(values); } class PartitionStateChangeListenerImpl implements PartitionStateChangeListener { @Override public void onPartitionBecomeBootstrapFromOffline(String partitionName) { + //List partitionIds = Arrays.asList();//Arrays.asList(419l); + List fileCopyPartitionNames = readFromFile("/mnt/u001/partitions.txt").stream().map(Long::valueOf).collect( + Collectors.toList()); + logger.info("FCH TEST: Filtered partitions {}", + fileCopyPartitionNames); + logger.info("FCH TEST: FileCopyBasedReplicationManager: onPartitionBecomeBootstrapFromOffline for partition {}", + partitionName); + logger.info("FCH TEST: All Partitions to be hydrated are: {}", storeManager.getLocalPartitions().stream().map( + PartitionId::getId).collect(Collectors.toList())); + if(!fileCopyPartitionNames.contains(Long.valueOf(partitionName))) { + logger.warn("FCH TEST: Partition {} is not part of the list of partitions to be hydrated up. Ignoring state change", partitionName); + return; + } + + logger.info("Started hydration for partitions {}", partitionName); + if(!isRunning){ - logger.info("FileCopyBasedReplicationManager is not running. Ignoring state change for partition: {}", partitionName); + logger.info("FCH TEST: FileCopyBasedReplicationManager is not running. Ignoring state change for partition: {}", partitionName); throw new StateTransitionException("FileCopyBasedReplicationManager is not running. Ignoring state " + "change for partition: " + partitionName, StateTransitionException. TransitionErrorCode.FileCopyBasedReplicationManagerNotRunning); @@ -138,28 +204,30 @@ public void onPartitionBecomeBootstrapFromOffline(String partitionName) { * If the file copy was already completed, then no need to do it again. */ if(storeManager.isFileExists(replicaId.getPartitionId(), storeConfig.storeFileCopyCompletedFileName)){ - logger.info("File Copy Was Completed For Replica: " + replicaId.getPartitionId().toPathString()); + logger.info("FCH TEST: File Copy Was Completed For Replica: " + replicaId.getPartitionId().toPathString()); return; } - logger.info("Initiated File Copy Wait On ReplicaSyncUpManager for Replica: {}", replicaId.getPartitionId().toPathString()); + logger.info("FCH TEST: Initiated File Copy Wait On ReplicaSyncUpManager for Replica: {}", replicaId.getPartitionId().toPathString()); + replicaSyncUpManager.initiateFileCopy(replicaId); - logger.info("Adding Replica to Prioritization Manager For Replica: {}", replicaId.getPartitionId().toPathString()); + logger.info("FCH TEST: Adding Replica to Prioritization Manager For Replica: {}", replicaId.getPartitionId().toPathString()); prioritizationManager.addReplica(replicaId); try { - logger.info("Waiting for File Copy to be completed for Replica: {}", replicaId.getPartitionId().toPathString()); - replicaSyncUpManager.waitForFileCopyCompleted(partitionName); - logger.info("File Copy Completed for Replica: {}", replicaId.getPartitionId().toPathString()); - } catch (InterruptedException e) { + logger.info("FCH TEST: Waiting for File Copy to be completed for Replica: {}", replicaId.getPartitionId().toPathString()); + //replicaSyncUpManager.waitForFileCopyCompleted(partitionName); + logger.info("FCH TEST: File Copy Completed for Replica: {}", replicaId.getPartitionId().toPathString()); + } catch (Exception e) { logger.error("File copy for partition {} was interrupted", partitionName); throw new StateTransitionException("File copy for partition " + partitionName + " was interrupted", StateTransitionException.TransitionErrorCode.FileCopyProtocolFailure); - } catch (StateTransitionException e){ - logger.error("File copy for partition {} failed", partitionName); - throw e; } +// } catch (StateTransitionException e){ +// logger.error("File copy for partition {} failed", partitionName); +// throw e; +// } } @Override public void onPartitionBecomeStandbyFromBootstrap(String partitionName) { diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyBasedReplicationSchedulerImpl.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyBasedReplicationSchedulerImpl.java index 72e6a2ee1c..b47de24f00 100644 --- a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyBasedReplicationSchedulerImpl.java +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyBasedReplicationSchedulerImpl.java @@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory; -class FileCopyBasedReplicationSchedulerImpl implements FileCopyBasedReplicationScheduler{ +class FileCopyBasedReplicationSchedulerImpl implements FileCopyBasedReplicationScheduler { private final FileCopyBasedReplicationConfig fileCopyBasedReplicationConfig; private final FileCopyHandlerFactory fileCopyHandlerFactory; private final ClusterMap clusterMap; @@ -88,6 +88,7 @@ public FileCopyBasedReplicationSchedulerImpl(@Nonnull FileCopyHandlerFactory fil this.replicaToStatusListenerMap = new ConcurrentHashMap<>(); } + @Override public void run(){ isRunning = true; logger.info("FileCopyBasedReplicationSchedulerImpl Started"); @@ -113,7 +114,7 @@ List findStarvedReplicas() { && System.currentTimeMillis() / 1000 - replicaToStartTimeMap.get(replica) > fileCopyBasedReplicationConfig.fileCopyReplicaTimeoutSecs) { - logger.info("Replica: {} is starved for hydration. Time since start: {} seconds", + logger.info("FCH TEST: Replica: {} is starved for hydration. Time since start: {} seconds", replica.getPartitionId().toPathString(), System.currentTimeMillis() / 1000 - replicaToStartTimeMap.get(replica)); replicasToDropFromHydration.add(replica); @@ -140,17 +141,17 @@ public void shutdown() throws InterruptedException { @Override public void scheduleFileCopy() throws InterruptedException { - logger.info("Starting File Copy Scheduler"); + logger.info("FCH TEST: Starting File Copy Scheduler"); while(isRunning){ - + logger.info("FCH TEST: Sleeping For File Copy Scheduler Wait Time: " + fileCopyBasedReplicationConfig.fileCopySchedulerWaitTimeSecs); Thread.sleep(fileCopyBasedReplicationConfig.fileCopySchedulerWaitTimeSecs*1000); List replicasToDropForHydration = findStarvedReplicas(); if(!replicasToDropForHydration.isEmpty()){ - logger.info("Found Replicas To Drop From Hydration: " + replicasToDropForHydration.stream() + logger.info("FCH TEST: Found Replicas To Drop From Hydration: " + replicasToDropForHydration.stream() .map(replicaId -> replicaId.getPartitionId().toPathString()).collect(Collectors.toList())); } else{ - logger.info("No Replicas To Drop From Hydration In Current Cycle"); + logger.info("FCH TEST: No Replicas To Drop From Hydration In Current Cycle"); } for(ReplicaId replica: replicasToDropForHydration){ @@ -171,9 +172,14 @@ public void scheduleFileCopy() throws InterruptedException { } List disksToHydrate = fileCopyBasedReplicationThreadPoolManager.getDiskIdsToHydrate(); + for(DiskId diskId: disksToHydrate){ List replicaIds = getNextReplicaToHydrate(diskId, fileCopyBasedReplicationConfig.fileCopyParallelPartitionHydrationCountPerDisk); - logger.info("Starting Hydration For Disk: {} with ReplicaId: {}", diskId, replicaIds.stream().map(replicaId -> replicaId.getPartitionId().toPathString())); + if(replicaIds == null || replicaIds.isEmpty()){ + logger.info("FCH TEST: No Replicas To Hydrate For Disk: " + diskId.getMountPath()); + continue; + } + logger.info("FCH TEST: Starting Hydration For Disk: {} with ReplicaId: {}", diskId, replicaIds.stream().map(replicaId -> replicaId.getPartitionId().toPathString())); if(!replicaIds.isEmpty()){ for(ReplicaId replicaId: replicaIds) { @@ -193,6 +199,18 @@ public void scheduleFileCopy() throws InterruptedException { fileCopyStatusListener.onFileCopyFailure(e); continue; } + try{ + /** + * Use FileCopyTemporaryDirectoryName to create a temporary directory for file copy. + * This will be used to write the files which are not yet written and can be cleaned + * up without + */ + createTemporaryDirectoryForFileCopyIfAbsent(replicaId, fileCopyBasedReplicationConfig); + } catch (IOException e){ + logger.error("Error Creating Temporary Directory For Replica: " + replicaId.getPartitionId().toPathString()); + fileCopyStatusListener.onFileCopyFailure(e); + continue; + } fileCopyBasedReplicationThreadPoolManager.submitReplicaForHydration(replicaId, fileCopyStatusListener, fileCopyHandler); @@ -202,7 +220,7 @@ public void scheduleFileCopy() throws InterruptedException { replicaToStartTimeMap.put(replicaId, System.currentTimeMillis()/1000); } } else{ - logger.info("No Replicas To Hydrate For Disk: " + diskId); + logger.info("FCH TEST: No Replicas To Hydrate For Disk: " + diskId); } } } @@ -222,6 +240,15 @@ void createFileCopyInProgressFileIfAbsent(ReplicaId replica) throws IOException } } + void createTemporaryDirectoryForFileCopyIfAbsent(ReplicaId replica, FileCopyBasedReplicationConfig fileCopyBasedReplicationConfig) throws IOException { + logger.info("FCH TEST: Creating Temporary Directory For File Copy: " + fileCopyBasedReplicationConfig.fileCopyTemporaryDirectoryName); + File fileCopyTemporaryDirectory = new File(replica.getReplicaPath(), fileCopyBasedReplicationConfig.fileCopyTemporaryDirectoryName); + if (!fileCopyTemporaryDirectory.exists()) { + fileCopyTemporaryDirectory.mkdirs(); + } + } + + @Override public int getThreadPoolSize() { return fileCopyBasedReplicationThreadPoolManager.getThreadPoolSize(); @@ -243,6 +270,7 @@ public ReplicaId getReplicaId() { @Override public void onFileCopySuccess() { + logger.info("FCH TEST: Hydration Completed For Replica: " + replicaId.getPartitionId().toPathString()); removeReplicaFromFileCopy(replicaId); replicaSyncUpManager.onFileCopyComplete(replicaId); } diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyThread.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyThread.java index 49e973378a..cecdc670da 100644 --- a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyThread.java +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyThread.java @@ -63,7 +63,7 @@ public class FileCopyThread extends Thread { @Override public void run() { - logger.info("Starting FileCopyThread: {} for replicaId: {}", threadName, fileCopyStatusListener.getReplicaId()); + logger.info("FCH TEST: Starting FileCopyThread: {} for replicaId: {}", threadName, fileCopyStatusListener.getReplicaId()); try { ReplicaId replicaId = fileCopyStatusListener.getReplicaId(); @@ -71,13 +71,20 @@ public void run() { throw new IllegalStateException("ReplicaId cannot be null"); } + logger.info("FCH TEST: ReplicaId Mount Path Is {}", replicaId.getMountPath()); + //TODO add logic to get the source and target replica id ReplicaId targetReplicaId = FileCopyUtils.getPeerForFileCopy(replicaId.getPartitionId(), replicaId.getDataNodeId().getDatacenterName()); - - if(targetReplicaId == null) { - throw new IllegalStateException("Target ReplicaId cannot be null"); + if (targetReplicaId == null) { + logger.warn("No peer replica found for file copy for replicaId: {}", replicaId); + fileCopyStatusListener.onFileCopyFailure(new IOException("No peer replica found for file copy")); + return; } + logger.info("FCH TEST: Starting file copy from {} to {}", replicaId.getDataNodeId(), targetReplicaId.getDataNodeId()); + + + FileCopyInfo fileCopyInfo = new FileCopyInfo(START_CORRELATION_ID, CLIENT_ID, replicaId, targetReplicaId); fileCopyHandler.start(); // Start the file copy process diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/handler/StoreFileCopyHandler.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/handler/StoreFileCopyHandler.java index 7a39c9d9d0..67ea6bf0fc 100644 --- a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/handler/StoreFileCopyHandler.java +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/handler/StoreFileCopyHandler.java @@ -79,6 +79,8 @@ public class StoreFileCopyHandler implements FileCopyHandler { private static final Logger logger = LoggerFactory.getLogger(StoreFileCopyHandler.class); + boolean x; + /** * Constructor to create StoreFileCopyHandler * @param connectionPool the {@link ConnectionPool} to use for making requests. @@ -101,6 +103,7 @@ public StoreFileCopyHandler( this.clusterMap = clusterMap; this.config = config; this.operationRetryHandler = new OperationRetryHandler(config); + x = (config.getFileCopyHandlerChunkSize != 0); } /** @@ -108,6 +111,7 @@ public StoreFileCopyHandler( * @throws StoreException */ public void start() throws StoreException { + logger.info("FCH TEST: FCH TEST: File Copy handler is running for Replica"); isRunning = true; } @@ -156,6 +160,7 @@ void setOperationRetryHandler(OperationRetryHandler operationRetryHandler) { */ @Override public void copy(@Nonnull FileCopyInfo fileCopyInfo) throws Exception { + logger.info("FCH TEST: Mount Path is {}, DataNode is {}", fileCopyInfo.getSourceReplicaId().getMountPath(), fileCopyInfo.getTargetReplicaId().getDataNodeId()); Objects.requireNonNull(fileCopyInfo, "fileCopyReplicaInfo param cannot be null"); validateIfStoreFileCopyHandlerIsRunning(); @@ -317,7 +322,7 @@ private void processLogSegment(LogInfo logInfo, String partitionToMountFilePath, FileInfo logFileInfo = new StoreFileInfo(logInfo.getLogSegment().getFileName() + "_log", logInfo.getLogSegment().getFileSize()); int chunksInLogSegment = (int) Math.ceil((double) logFileInfo.getFileSize() / config.getFileCopyHandlerChunkSize); - logger.info("Number of chunks in log segment: {} for filename {}", chunksInLogSegment, logFileInfo.getFileName()); + logger.info("FCH TEST: Number of chunks in log segment: {} for filename {}", chunksInLogSegment, logFileInfo.getFileName()); for (int i = 0; i < chunksInLogSegment; i++) { long startOffset = (long) i * config.getFileCopyHandlerChunkSize; diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/utils/OperationRetryHandler.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/utils/OperationRetryHandler.java index f31e0c5da5..dada5d27b6 100644 --- a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/utils/OperationRetryHandler.java +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/utils/OperationRetryHandler.java @@ -69,7 +69,7 @@ public T executeWithRetry(@Nonnull RetryableOperation operation, while (true) { try { result = operation.execute(); - logger.info("{} succeeded after {} attempts", operationName, attempts); + logger.info("FCH TEST: {} succeeded after {} attempts", operationName, attempts); return result; } catch (InterruptedException e) { Thread.currentThread().interrupt(); // Preserve the interrupt status diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/workflow/GetChunkDataWorkflow.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/workflow/GetChunkDataWorkflow.java index eacb115a4e..bc19da3572 100644 --- a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/workflow/GetChunkDataWorkflow.java +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/workflow/GetChunkDataWorkflow.java @@ -106,7 +106,7 @@ public FileCopyGetChunkResponse execute() throws Exception { fileCopyInfo.getSourceReplicaId().getDataNodeId().getHostname(), snapshotId, fileChunkInfo.getStartOffset(), fileChunkInfo.getChunkLengthInBytes(), fileChunkInfo.isChunked()); - logger.info("Sending FileCopyGetChunkRequest: {}", request); + logger.info("FCH TEST: Sending FileCopyGetChunkRequest: {}", request); long startTimeMs = System.currentTimeMillis(); ConnectedChannel connectedChannel = getChannel(fileCopyInfo.getTargetReplicaId().getDataNodeId()); @@ -114,7 +114,7 @@ public FileCopyGetChunkResponse execute() throws Exception { ChannelOutput channelOutput = connectedChannel.sendAndReceive(request); FileCopyGetChunkResponse response = FileCopyGetChunkResponse.readFrom(channelOutput.getInputStream(), clusterMap); - logger.info("Received FileCopyGetChunkResponse in {} ms", System.currentTimeMillis() - startTimeMs); + logger.info("FCH TEST: Received FileCopyGetChunkResponse in {} ms", System.currentTimeMillis() - startTimeMs); return response; } } diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/workflow/GetMetadataWorkflow.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/workflow/GetMetadataWorkflow.java index 1ab1ffd955..9dcf91d4d1 100644 --- a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/workflow/GetMetadataWorkflow.java +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/workflow/GetMetadataWorkflow.java @@ -73,11 +73,13 @@ public FileCopyGetMetaDataResponse execute() throws Exception { ConnectedChannel connectedChannel = getChannel(fileCopyInfo.getTargetReplicaId().getDataNodeId()); - logger.info("Sending FileCopyGetMetaDataRequest: {}", request); + logger.info("FCH TEST: Sending FileCopyGetMetaDataRequest: {}", request); long startTimeMs = System.currentTimeMillis(); ChannelOutput channelOutput = connectedChannel.sendAndReceive(request); + FileCopyGetMetaDataResponse response = FileCopyGetMetaDataResponse.readFrom(channelOutput.getInputStream()); - logger.info("Received FileCopyGetMetaDataResponse in {} ms", System.currentTimeMillis() - startTimeMs); + logger.info("FCH TEST: Response Is: {}" , response.toString()); + logger.info("FCH TEST: Received FileCopyGetMetaDataResponse in {} ms", System.currentTimeMillis() - startTimeMs); return response; } diff --git a/ambry-prioritization/src/main/java/com/github/ambry/replica/prioritization/FCFSPrioritizationManager.java b/ambry-prioritization/src/main/java/com/github/ambry/replica/prioritization/FCFSPrioritizationManager.java index 37f6e0f985..5db7829838 100644 --- a/ambry-prioritization/src/main/java/com/github/ambry/replica/prioritization/FCFSPrioritizationManager.java +++ b/ambry-prioritization/src/main/java/com/github/ambry/replica/prioritization/FCFSPrioritizationManager.java @@ -42,11 +42,13 @@ public class FCFSPrioritizationManager implements PrioritizationManager { public FCFSPrioritizationManager() { + System.out.println("FCFS Inititalized"); diskToReplicaMap = new ConcurrentHashMap<>(); inProgressReplicas = new ConcurrentHashMap<>(); } @Override public void start() { + logger.info("FCH TEST: FCH TEST: FCFS Pz Manager has Started"); isRunning = true; } @@ -62,7 +64,7 @@ public boolean isRunning() { } @Override - public List getPartitionListForDisk(@Nonnull DiskId diskId, @Nonnegative int numberOfReplicasPerDisk) { + public synchronized List getPartitionListForDisk(@Nonnull DiskId diskId, @Nonnegative int numberOfReplicasPerDisk) { if(!isRunning){ logger.error("Failed to get partition list for disk {}", diskId); throw new StateTransitionException("Failed to get partition list for disk " + diskId, PrioritizationManagerRunningFailure); @@ -76,7 +78,8 @@ public List getPartitionListForDisk(@Nonnull DiskId diskId, @Nonnegat } int numberOfReplicasToBeRemoved = Math.min(numberOfReplicasPerDisk, replicaListForDisk.size()); - logger.info("Getting {} replicas for disk {}", numberOfReplicasToBeRemoved, diskId.getMountPath()); + logger.info("FCH TEST: Getting {} replicas for disk {}", numberOfReplicasToBeRemoved, diskId.getMountPath()); + List replicasToReturn = new LinkedList<>(replicaListForDisk.subList(0, numberOfReplicasToBeRemoved)); @@ -97,10 +100,12 @@ public List getInProgressReplicaIdsForDisk(DiskId diskId) { @Override public synchronized boolean addReplica(ReplicaId replicaId) { + logger.info("FCH TEST: Trying to Add Replica to FCFS"); validateIfPzManagerIsRunningOrThrowException(replicaId); + logger.info("FCH TEST: Prioritizer is Running"); diskToReplicaMap.putIfAbsent(replicaId.getDiskId(), new LinkedList<>()); diskToReplicaMap.get(replicaId.getDiskId()).add(replicaId); - logger.info("Added partition {} to prioritization Manager For Disk {}", replicaId.getReplicaPath(), + logger.info("FCH TEST: Added partition {} to prioritization Manager For Disk {}", replicaId.getReplicaPath(), replicaId.getDiskId().getMountPath()); return true; } diff --git a/ambry-prioritization/src/main/java/com/github/ambry/replica/prioritization/PrioritizationManager.java b/ambry-prioritization/src/main/java/com/github/ambry/replica/prioritization/PrioritizationManager.java index 47ebae6537..7139ccb445 100644 --- a/ambry-prioritization/src/main/java/com/github/ambry/replica/prioritization/PrioritizationManager.java +++ b/ambry-prioritization/src/main/java/com/github/ambry/replica/prioritization/PrioritizationManager.java @@ -51,6 +51,7 @@ public interface PrioritizationManager { */ List getPartitionListForDisk(DiskId diskId, int numberOfReplicasPerDisk); + /** * Get the list of partitions that are in progress for the given disk. * @param diskId the {@link DiskId} for which the list of partitions are in progress. diff --git a/ambry-prioritization/src/main/java/com/github/ambry/replica/prioritization/ReplicationPrioritizationManager.java b/ambry-prioritization/src/main/java/com/github/ambry/replica/prioritization/ReplicationPrioritizationManager.java index 8eaffd8a67..43611e1e04 100644 --- a/ambry-prioritization/src/main/java/com/github/ambry/replica/prioritization/ReplicationPrioritizationManager.java +++ b/ambry-prioritization/src/main/java/com/github/ambry/replica/prioritization/ReplicationPrioritizationManager.java @@ -117,7 +117,6 @@ public ReplicationPrioritizationManager(ReplicationEngine replicationEngine, Clu long initialDelay = replicationConfig.prioritizationSchedulerInitialDelayMinutes; // Schedule periodic runs for prioritization run this.scheduler.scheduleAtFixedRate(this, initialDelay, scheduleIntervalMinutes, TimeUnit.MINUTES); - this.replicationPrioritizationMetrics = new ReplicationPrioritizationMetrics(metricRegistry); logger.info("ReplicationPrioritizationManager initialized with prioritization window of {} hours, schedule interval of {} minutes, " + "and min batch size of {} partitions", prioritizationWindowMs, scheduleIntervalMinutes, @@ -143,7 +142,7 @@ public void run() { public void shutdown() { if (!scheduler.isTerminated()) { - logger.info("Shutting down ReplicationPrioritizationManager"); + logger.info("FCH TEST: Shutting down ReplicationPrioritizationManager"); Utils.shutDownExecutorService(scheduler, 5, TimeUnit.SECONDS); // clear all maps currentlyReplicatingPriorityPartitions.clear(); @@ -153,7 +152,7 @@ public void shutdown() { prioritizedPartitions.clear(); isHighPriorityReplicationRunning.set(false); } else { - logger.info("ReplicationPrioritizationManager already shut down"); + logger.info("FCH TEST: ReplicationPrioritizationManager already shut down"); } } @@ -169,7 +168,7 @@ public void shutdown() { */ void startPrioritizationCycle() { try { - logger.info("Starting partition prioritization run"); + logger.info("FCH TEST: Starting partition prioritization run"); // 1. Get all bootstrapping partitions from StorageManager allBootstrappingPartitions = getAllBootstrappingPartitionsForNode(); @@ -187,31 +186,34 @@ void startPrioritizationCycle() { partitionIds.removeAll(currentlyReplicatingPriorityPartitions); if (partitionIds.isEmpty()) { - logger.info("All bootstrapping partitions are already being replicated"); + logger.info("FCH TEST: All bootstrapping partitions are already being replicated"); } Map> disruptionsByPartition = fetchDisruptions(new ArrayList<>(partitionIds)); if (disruptionsByPartition == null || disruptionsByPartition.isEmpty()) { - logger.info("No disruptions found for any partitions"); + logger.info("FCH TEST: No disruptions found for any partitions"); } else { - logger.info("Fetched disruptions for {} partitions", disruptionsByPartition.size()); + logger.info("FCH TEST: Fetched disruptions for {} partitions", disruptionsByPartition.size()); } // 4. Create prioritized partition lists prioritizedPartitions = prioritizePartitions(partitionIds, disruptionsByPartition); replicationPrioritizationMetrics.updatePrioritizedPartitions(prioritizedPartitions); - prioritizedPartitions.keySet().forEach(priorityTier -> logger.info("Found {} partitions in {} category", prioritizedPartitions.get(priorityTier).size(), priorityTier)); + prioritizedPartitions.keySet().forEach(priorityTier -> logger.info("FCH TEST: Found {} partitions in {} category", prioritizedPartitions.get(priorityTier).size(), priorityTier)); // 5. Update replication priorities - if (shouldResetReplication()) { - logger.info("No new high-priority partitions identified " + if (prioritizedPartitions.entrySet().stream(). + filter(entry -> entry.getKey() != PriorityTier.NORMAL). + allMatch(entry -> entry.getValue().isEmpty()) + && !isHighPriorityReplicationRunning.get()) { + logger.info("FCH TEST: No new high-priority partitions identified " + "and no existing high priority run, enabling replication for disabled partitions"); if (disabledReplicationPartitions.isEmpty()) { - logger.info("No disabled partitions to enable"); + logger.info("FCH TEST: No disabled partitions to enable"); } else { - logger.info("Found {} disabled partitions to enable", disabledReplicationPartitions.size()); + logger.info("FCH TEST: Found {} disabled partitions to enable", disabledReplicationPartitions.size()); resetToNormalReplication(); } } else { @@ -230,7 +232,7 @@ void startPrioritizationCycle() { } } - logger.info("Identified {} high-priority partitions for replication", highPriorityPartitions.size()); + logger.info("FCH TEST: Identified {} high-priority partitions for replication", highPriorityPartitions.size()); updateReplicationSet(highPriorityPartitions); } @@ -264,7 +266,7 @@ private Map> fetchDisruptions(List par disruptionService.batchDisruptionsByPartition(new ArrayList<>(partitionIds)); } catch (Exception e) { logger.error("Error fetching disruptions from DisruptionService", e); - logger.info("Proceeding with prioritization based on MIN_ACTIVE_REPLICA", e); + logger.info("FCH TEST: Proceeding with prioritization based on MIN_ACTIVE_REPLICA", e); } return disruptionsByPartition; @@ -316,19 +318,19 @@ private Map> assignPriorityTierToPartitions(Set

populateDisruptionData(Map copyOfDisabledReplicationPartitions = new HashSet<>(disabledReplicationPartitions); @@ -400,7 +402,7 @@ private void processCompletedPartitions() { logger.info("Removed {} completed partitions from replication set", completedPartitions.size()); if (currentlyReplicatingPriorityPartitions.isEmpty()) { - logger.info("No currently replicating partitions, disabling high-priority replication"); + logger.info("FCH TEST: No currently replicating partitions, disabling high-priority replication"); isHighPriorityReplicationRunning.set(false); } completedPartitions.clear(); @@ -422,7 +424,7 @@ private void updateReplicationSet(Set highPriorityPartitions) { newHighPriorityPartitions.removeAll(currentlyReplicatingPriorityPartitions); if (newHighPriorityPartitions.isEmpty() && isHighPriorityReplicationRunning.get()) { - logger.info("No new high-priority partitions identified, continuing with current replication set"); + logger.info("FCH TEST: No new high-priority partitions identified, continuing with current replication set"); return; } @@ -434,7 +436,7 @@ private void updateReplicationSet(Set highPriorityPartitions) { } else { // We're already in high-priority mode, just add the new partitions if (!newHighPriorityPartitions.isEmpty()) { - logger.info("Adding {} new high-priority partitions to existing replication set", + logger.info("FCH TEST: Adding {} new high-priority partitions to existing replication set", newHighPriorityPartitions.size()); // Record new high priority partitions @@ -489,7 +491,7 @@ private void startHighPriorityReplication(Set highPriorityPartition isHighPriorityReplicationRunning.set(true); lastReplicationActivityMs = time.milliseconds(); - logger.info("Started high-priority replication for {} partitions", currentlyReplicatingPriorityPartitions.size()); + logger.info("FCH TEST: Started high-priority replication for {} partitions", currentlyReplicatingPriorityPartitions.size()); } catch (Exception e) { logger.error("Error starting high-priority replication", e); } @@ -506,7 +508,7 @@ private void addNewPartitions() { return; } - logger.info("Adding {} additional partitions to reach minimum batch size of {}", + logger.info("FCH TEST: Adding {} additional partitions to reach minimum batch size of {}", additionalPartitionsNeeded, minBatchSizeForHighPriorityPartitions); // Get all partitions for this node @@ -515,7 +517,7 @@ private void addNewPartitions() { remainingPartitions.removeAll(currentlyReplicatingPriorityPartitions); if (remainingPartitions.isEmpty()) { - logger.info("No additional partitions available to add to batch"); + logger.info("FCH TEST: No additional partitions available to add to batch"); return; } @@ -548,7 +550,7 @@ private void fillUptoBatchSize(int additionalPartitionsNeeded, Set currentlyReplicatingPriorityPartitions.addAll(partitionsToAdd); added += partitionsToAdd.size(); - logger.info("Added {} partitions from {} category", partitionsToAdd.size(), priorityTier); + logger.info("FCH TEST: Added {} partitions from {} category", partitionsToAdd.size(), priorityTier); if (added >= additionalPartitionsNeeded) { break; @@ -568,14 +570,14 @@ private void controlReplicationThreads() { // Disable non-priority partitions if (!partitionsToDisable.isEmpty()) { - logger.info("Disabling replication for {} non-priority partitions", partitionsToDisable.size()); + logger.info("FCH TEST: Disabling replication for {} non-priority partitions", partitionsToDisable.size()); replicationEngine.controlReplicationForPartitions(partitionsToDisable, Collections.emptyList(), false); disabledReplicationPartitions.addAll(partitionsToDisable); } // Enable high-priority partitions - logger.info("Enabling replication for {} high-priority partitions", currentlyReplicatingPriorityPartitions.size()); + logger.info("FCH TEST: Enabling replication for {} high-priority partitions", currentlyReplicatingPriorityPartitions.size()); replicationEngine.controlReplicationForPartitions(currentlyReplicatingPriorityPartitions, Collections.emptyList(), true); disabledReplicationPartitions.removeAll(currentlyReplicatingPriorityPartitions); replicationPrioritizationMetrics.updateDisabledReplicationPartitions(disabledReplicationPartitions); @@ -614,7 +616,7 @@ private int calculateLocalReplicaCount(PartitionId partition) { Set states = new HashSet<>(Arrays.asList(ReplicaState.LEADER, ReplicaState.STANDBY)); Map> localDCReplicas = (Map>) partition.getReplicaIdsByStates(states, datacenterName); - logger.info("Found {} local replicas for partition {}", localDCReplicas.values().stream().mapToInt(List::size).sum(), + logger.info("FCH TEST: Found {} local replicas for partition {}", localDCReplicas.values().stream().mapToInt(List::size).sum(), partition.toPathString()); return localDCReplicas.values().stream().mapToInt(List::size).sum(); } diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetChunkResponse.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetChunkResponse.java index 1d9696be16..a86ef02539 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetChunkResponse.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetChunkResponse.java @@ -253,6 +253,7 @@ public boolean release() { */ @Override public void prepareBuffer(){ + long startTime = System.currentTimeMillis(); super.prepareBuffer(); bufferToSend.writeBytes(partitionId.getBytes()); Utils.serializeString(bufferToSend, fileName, Charset.defaultCharset()); @@ -261,6 +262,7 @@ public void prepareBuffer(){ bufferToSend.writeBoolean(isLastChunk); try { bufferToSend.writeBytes(chunkStream, chunkStream.available()); + logger.info("FileCopyGetChunkResponse Buffer Preparation: {} ms", System.currentTimeMillis() - startTime); } catch (IOException e) { logger.info("Error while writing chunkStream", e); throw new RuntimeException(e); diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java index 5da8a2132f..7c63004f65 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java @@ -1699,9 +1699,11 @@ public void handleFileCopyGetMetaDataRequest(NetworkRequest request) throws Inte try { fileCopyGetMetaDataRequest = FileCopyGetMetaDataRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap); + logger.info("FCH TEST: Trying to get Meta Data For Partiton:" + fileCopyGetMetaDataRequest.getPartitionId().toPathString()); ServerErrorCode error = validateRequest(fileCopyGetMetaDataRequest.getPartitionId(), RequestOrResponseType.FileCopyGetMetaDataRequest, false); + logger.info("FCH TEST: Request is Validated"); if (error != ServerErrorCode.NoError) { logger.error("Validating FileCopyGetMetaDataRequest failed with error {} for request {}", error, fileCopyGetMetaDataRequest); @@ -1816,6 +1818,8 @@ public void handleFileCopyGetChunkRequest(NetworkRequest request) throws Interru new ServerNetworkResponseMetrics(metrics.fileCopyGetChunkResponseQueueTimeInMs, metrics.fileCopyGetChunkSendTimeInMs, metrics.fileCopyGetChunkTotalTimeInMs, null, null, totalTimeSpent)); + publicAccessLogger.info("{} {} totalProcessingTime {}", fileCopyGetChunkRequest, response, SystemTime.getInstance().milliseconds()- startTime); + } /** diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java index 7b07f2ad03..6067b7c280 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java @@ -41,9 +41,11 @@ import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.ConnectionPoolConfig; import com.github.ambry.config.DiskManagerConfig; +import com.github.ambry.config.FileCopyBasedReplicationConfig; import com.github.ambry.config.Http2ClientConfig; import com.github.ambry.config.NettyConfig; import com.github.ambry.config.NetworkConfig; +import com.github.ambry.config.ReplicaPrioritizationConfig; import com.github.ambry.config.ReplicationConfig; import com.github.ambry.config.SSLConfig; import com.github.ambry.config.ServerConfig; @@ -51,6 +53,11 @@ import com.github.ambry.config.StatsManagerConfig; import com.github.ambry.config.StoreConfig; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.filetransfer.FileCopyBasedReplicationManager; +import com.github.ambry.filetransfer.FileCopyBasedReplicationSchedulerFactory; +import com.github.ambry.filetransfer.FileCopyBasedReplicationSchedulerFactoryImpl; +import com.github.ambry.filetransfer.handler.FileCopyHandlerFactory; +import com.github.ambry.filetransfer.handler.StoreFileCopyHandlerFactory; import com.github.ambry.messageformat.BlobStoreHardDelete; import com.github.ambry.messageformat.BlobStoreRecovery; import com.github.ambry.network.BlockingChannelConnectionPool; @@ -74,6 +81,10 @@ import com.github.ambry.protocol.RequestHandlerPool; import com.github.ambry.repair.RepairRequestsDb; import com.github.ambry.repair.RepairRequestsDbFactory; +import com.github.ambry.replica.prioritization.FCFSPrioritizationManager; +import com.github.ambry.replica.prioritization.FileBasedReplicationPrioritizationManagerFactory; +import com.github.ambry.replica.prioritization.PrioritizationManager; +import com.github.ambry.replica.prioritization.PrioritizationManagerFactory; import com.github.ambry.replica.prioritization.ReplicationPrioritizationManager; import com.github.ambry.replica.prioritization.disruption.DisruptionService; import com.github.ambry.replica.prioritization.disruption.factory.DisruptionServiceFactory; @@ -228,6 +239,8 @@ public void startup() throws InstantiationException { SSLConfig sslConfig = new SSLConfig(properties); ClusterMapConfig clusterMapConfig = new ClusterMapConfig(properties); StatsManagerConfig statsConfig = new StatsManagerConfig(properties); + FileCopyBasedReplicationConfig fileCopyBasedReplicationConfig = new FileCopyBasedReplicationConfig(properties); + ReplicaPrioritizationConfig replicaPrioritizationConfig = new ReplicaPrioritizationConfig(properties); // verify the configs properties.verify(); @@ -492,6 +505,22 @@ public void startup() throws InstantiationException { } logger.info("started"); long processingTime = SystemTime.getInstance().milliseconds() - startTime; + + FileCopyHandlerFactory + fileCopyHandlerFactory = new StoreFileCopyHandlerFactory(connectionPool, storageManager, clusterMap, fileCopyBasedReplicationConfig); + + PrioritizationManager prioritizationManager = new FCFSPrioritizationManager(); + logger.info("starting FCFS PZ MANAGER"); + prioritizationManager.start(); + logger.info("started FCFS PZ MANAGER"); + DataNodeId nodeId = clusterMap.getDataNodeId(networkConfig.hostName, networkConfig.port); + + FileCopyBasedReplicationSchedulerFactory fileCopyBasedReplicationSchedulerFactory = new FileCopyBasedReplicationSchedulerFactoryImpl(fileCopyHandlerFactory, + fileCopyBasedReplicationConfig, clusterMap, prioritizationManager, storageManager, storeConfig, nodeId, clusterParticipant ); + FileCopyBasedReplicationManager fileCopyBasedReplicationManager = new FileCopyBasedReplicationManager(fileCopyBasedReplicationConfig, clusterMapConfig, + storageManager, clusterMap, networkClientFactory, new MetricRegistry(), clusterParticipant, fileCopyBasedReplicationSchedulerFactory, fileCopyHandlerFactory, + prioritizationManager, storeConfig, replicaPrioritizationConfig); + testE2EFlow(fileCopyBasedReplicationManager); metrics.serverStartTimeInMs.update(processingTime); logger.info("Server startup time in Ms {}", processingTime); } catch (Exception e) { @@ -500,6 +529,15 @@ public void startup() throws InstantiationException { } } + + public void testE2EFlow(FileCopyBasedReplicationManager fileCopyBasedReplicationManager){ + try { + fileCopyBasedReplicationManager.start(); + } catch (Exception e) { + logger.error("FCBRM Failed"); + logger.error(e.toString()); + } + } /** * This method is expected to be called in the exit path as long as the AmbryServer instance construction was * successful. This is expected to be called even if {@link #startup()} did not succeed. diff --git a/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java b/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java index 9b02322aee..0c9462c1b9 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java +++ b/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java @@ -1423,6 +1423,7 @@ public List getLogSegmentMetadataFiles(boolean includeActiveLogSegment) * Param includeActiveLogSegment is used to determine if the active log segment should be included in the result. */ private List getLogSegments(boolean includeActiveLogSegment) { + logger.info("FCH TEST: Returning Log Segments: {}", log.getAllLogSegmentNames().stream().map(x -> x.toString())); return log.getAllLogSegmentNames().stream() .filter(segmentName -> includeActiveLogSegment || !segmentName.equals(log.getActiveSegment().getName())) .map(segmentName -> log.getSegment(segmentName)) diff --git a/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java b/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java index 4c3535cff4..0593c0ac06 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java +++ b/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java @@ -24,7 +24,9 @@ import com.github.ambry.utils.Throttler; import com.github.ambry.utils.Time; import com.github.ambry.utils.Utils; +import java.io.BufferedReader; import java.io.File; +import java.io.FileReader; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; @@ -52,6 +54,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -171,6 +174,34 @@ public class DiskManager { compactionManager = new CompactionManager(disk.getMountPath(), storeConfig, stores.values(), metrics, time); } + + public List readFromFile(String fileName) { + List allValues = new ArrayList<>(); + logger.info("FCH TEST: Reading from file: " + fileName); + File file = new File(fileName); + if(file.exists()){ + logger.info("FCH TEST: File Exists: " + fileName); + } + try (BufferedReader br = new BufferedReader(new FileReader(fileName))) { + String line; + logger.info("FCH TEST: Reading from file2: " + fileName); + while ((line = br.readLine()) != null) { + logger.info("FCH TEST: Values read from file are: " + line); + String[] values = line.split(","); + + for (String value : values) { + String trimmedValue = value.trim(); + allValues.add(trimmedValue); + } + } + + } catch (IOException e) { + e.printStackTrace(); + } + + return allValues; + } + /** * Starts all the stores on this disk. * @throws InterruptedException @@ -200,7 +231,22 @@ void start(boolean shouldRemoveUnexpectedDirs) throws InterruptedException { } Thread thread = Utils.newThread("store-startup-" + partitionAndStore.getKey(), () -> { try { - partitionAndStore.getValue().start(); + List partitionNames = readFromFile("/mnt/u001/partitions.txt"); + logger.info("FCH TEST: Partition Names To be Filtered In Disk Manager Are: {}", partitionNames); + List longPartitionNames = partitionNames.stream().map(x-> Long.valueOf(x)).collect( + Collectors.toList()); + logger.info("FCH TEST: Partition Names To be Filtered As long In Disk Manager Are: {}", longPartitionNames); + + if(longPartitionNames.contains(partitionAndStore.getKey().getId())) { + partitionAndStore.getValue().initialize(); + logger.info("FCH TEST: Contents Of Directory {} Before Clean up Is: {}",partitionAndStore.getValue().getDataDir() ,Arrays.stream(new File(partitionAndStore.getValue().getDataDir()).listFiles()).map(File::getName).collect( + Collectors.toList())); + partitionAndStore.getValue().getFileStore().cleanUpDirectory(partitionAndStore.getValue().getDataDir()); + + logger.info("FCH TEST: Contents Of Directory {} After Clean up Is: {}", partitionAndStore.getValue().getDataDir(), new File(partitionAndStore.getValue().getDataDir()).listFiles()); + } + else + partitionAndStore.getValue().start(); } catch (Exception e) { numStoreFailures.incrementAndGet(); logger.error("Exception while starting store for the {}", partitionAndStore.getKey(), e); diff --git a/ambry-store/src/main/java/com/github/ambry/store/FileStore.java b/ambry-store/src/main/java/com/github/ambry/store/FileStore.java index 522377a661..bdd593801b 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/FileStore.java +++ b/ambry-store/src/main/java/com/github/ambry/store/FileStore.java @@ -32,11 +32,15 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.nio.file.StandardCopyOption; import java.nio.file.StandardOpenOption; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,6 +91,12 @@ public class FileStore implements PartitionFileStore { // Size of each log segment private long segmentSize; + ExecutorService executor = Executors.newSingleThreadExecutor(); + + Future currentFuture = null; + +// private SegmentFileTracker segmentTracker; + /** * Creates a new FileStore instance. * @param partitionToMountPath partition path for Filestore to access @@ -145,7 +155,7 @@ public StoreFileChunk readStoreFileChunkFromDisk(String fileName, long offset, l throws StoreException, IOException { // Verify service is running before proceeding validateIfFileStoreIsRunning(); - + long startTime = System.currentTimeMillis(); File file = validateAndGetFile(fileName); if (!isChunked) { return new StoreFileChunk(new DataInputStream(Files.newInputStream(file.toPath())), file.length()); @@ -157,7 +167,6 @@ public StoreFileChunk readStoreFileChunkFromDisk(String fileName, long offset, l } // Seek to the specified offset randomAccessFile.seek(offset); - // Allocate buffer for reading data ByteBuffer buf = ByteBuffer.allocate((int) size); @@ -170,6 +179,7 @@ public StoreFileChunk readStoreFileChunkFromDisk(String fileName, long offset, l } // Prepare buffer for reading buf.flip(); + logger.info("Time taken For Chunk Read is {} milliseconds", System.currentTimeMillis() - startTime); // return file chunk buffer read return StoreFileChunk.from(buf); } catch (FileNotFoundException e) { @@ -184,6 +194,20 @@ public StoreFileChunk readStoreFileChunkFromDisk(String fileName, long offset, l } } +// public SegmentFileTracker createFileAndReturnSegmentTracker(String fileName){ +// RandomAccessFile randomAccessFile = null; +// try { +// File file = new File(partitionToMountPath + File.separator + fileName); +// if (!file.exists()) { +// file.createNewFile(); +// } +// randomAccessFile = new RandomAccessFile(file, "rw"); +// } catch (IOException e) { +// logger.error("Error while creating file: {}", fileName, e); +// } +// return segmentTracker; +// } + /** * Writes data from an input stream to a file. * @param outputFilePath The path where the file should be written @@ -195,15 +219,18 @@ public StoreFileChunk readStoreFileChunkFromDisk(String fileName, long offset, l public void writeStoreFileChunkToDisk(String outputFilePath, StoreFileChunk storeFileChunk) throws IOException { // Verify service is running validateIfFileStoreIsRunning(); - + long startTime = System.currentTimeMillis(); // Validate input Objects.requireNonNull(storeFileChunk, "storeFileChunk must not be null"); Objects.requireNonNull(storeFileChunk.getStream(), "dataInputStream in storeFileChunk must not be null"); + // Can add buffered streaming to avoid memory overusage if multiple threads calling FileStore. // Read the entire file content into memory int fileSize = storeFileChunk.getStream().available(); + logger.info("FCH TEST: File size of the array is {}", fileSize); byte[] content = Utils.readBytesFromStream(storeFileChunk.getStream(), fileSize); + logger.info("FCH TEST: Time taken for Chunk Coversion is {} milliseconds", System.currentTimeMillis() - startTime); try { synchronized (storeWriteLock) { @@ -215,9 +242,24 @@ public void writeStoreFileChunkToDisk(String outputFilePath, StoreFileChunk stor // Throwing IOException if the parent directory does not exist throw new IOException("Parent directory does not exist: " + parentDir); } + if(currentFuture != null){ + long startTimeBlocker = System.currentTimeMillis(); + currentFuture.get(); + logger.info("Thread Is Blocked For {}", System.currentTimeMillis()- startTimeBlocker); + } // Write content to file with create and append options, which will create a new file if file doesn't exist // and append to the existing file if file exists - Files.write(outputPath, content, StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.SYNC); + Future future = executor.submit(() -> { + try (FileOutputStream fos = new FileOutputStream(outputPath.toFile(), true)) { + fos.write(content); + fos.flush(); + } catch (IOException e) { + logger.error("Error while writing chunk to file: {}", outputFilePath, e); + } + }); + currentFuture = future; + //Files.write(outputPath, content, StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.SYNC); + logger.info("FCH TEST: Time taken for File Write is {} milliseconds", System.currentTimeMillis() - startTime); } } catch (Exception e) { logger.error("Error while writing chunk to file: {}", outputFilePath, e); @@ -250,7 +292,7 @@ public void moveAllRegularFiles(String srcDirPath, String destDirPath) throws IO if (!srcDirPath.startsWith(partitionToMountPath + File.separator)) { throw new IOException("Source directory is not under mount path: " + partitionToMountPath); } - if (!destDirPath.startsWith(partitionToMountPath + File.separator)) { + if (!destDirPath.startsWith(partitionToMountPath)) { throw new IOException("Destination directory is not under mount path: " + partitionToMountPath); } @@ -296,6 +338,45 @@ public void moveAllRegularFiles(String srcDirPath, String destDirPath) throws IO logger.info("All regular files are moved from: {} to: {}", srcDirPath, destDirPath); } + @Override + public void cleanUpDirectory(String srcPath) { + // Verify service is running. + validateIfFileStoreIsRunning(); + + logger.info("FCH TEST: Cleaning up directory: {}", srcPath); + // Validate input. + Objects.requireNonNull(srcPath, "srcPath must not be null"); + + try { + synchronized (storeWriteLock) { + Path source = Paths.get(srcPath); + if (!Files.exists(source)) { + throw new IOException("Source directory does not exist: " + srcPath); + } + try (Stream entries = Files.list(source)) { + entries.forEach(path -> { + try { + // Recursively walk and delete each child path + Files.walk(path) + .sorted(Comparator.reverseOrder()) // Delete children before parents + .forEach(p -> { + try { + Files.delete(p); + } catch (IOException e) { + System.err.println("Failed to delete: " + p + " - " + e.getMessage()); + } + }); + } catch (IOException e) { + System.err.println("Failed to walk path: " + path + " - " + e.getMessage()); + } + }); + } + } + } catch (Exception e) { + logger.error("Unexpected error while cleaning up directory: {}", srcPath, e); + } + } + /** * Allocates a file in the specified path. * @param path The path where the file should be allocated @@ -545,5 +626,6 @@ public List retrieve(InputStream inputStream) throws IOException { stream.close(); } } + } } diff --git a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java index 7d3ed97a4d..f8edbfd954 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java +++ b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java @@ -949,7 +949,7 @@ public void onPartitionBecomePreBootstrapFromOffline(String partitionName) { // For case 3, we should throw exception to make replica stay in ERROR state (thus, frontends won't pick this replica) // For case 4, we check it's current used capacity and put it in BOOTSTRAP state if necessary. This is to ensure // it catches up with peers before serving PUT traffic (or being selected as LEADER) - store = getStore(replica.getPartitionId(), false); + store = getStore(replica.getPartitionId(), true); // store should be in started if this is not a first time added replica // as we will start all stores on the host during a restart @@ -973,19 +973,19 @@ public void onPartitionBecomePreBootstrapFromOffline(String partitionName) { // if store's used capacity is less than or equal to header size, we create a bootstrap_in_progress file and force // it to stay in BOOTSTRAP state when catching up with peers. - long storeUsedCapacity = store.getSizeInBytes(); - if (storeUsedCapacity <= HEADER_SIZE) { - logger.info( - "Store {} has used capacity {} less than or equal to {} bytes, consider it recently created and make it go through bootstrap process.", - partitionName, storeUsedCapacity, HEADER_SIZE); - try { - createBootstrapFileIfAbsent(replica); - } catch (IOException e) { - logger.error("Failed to create bootstrap file for store {}", partitionName); - throw new StateTransitionException("Failed to create bootstrap file for " + partitionName, - ReplicaOperationFailure); - } - } +// long storeUsedCapacity = store.getSizeInBytes(); +// if (storeUsedCapacity <= HEADER_SIZE) { +// logger.info( +// "Store {} has used capacity {} less than or equal to {} bytes, consider it recently created and make it go through bootstrap process.", +// partitionName, storeUsedCapacity, HEADER_SIZE); +// try { +// createBootstrapFileIfAbsent(replica); +// } catch (IOException e) { +// logger.error("Failed to create bootstrap file for store {}", partitionName); +// throw new StateTransitionException("Failed to create bootstrap file for " + partitionName, +// ReplicaOperationFailure); +// } +// } } } diff --git a/tempFile b/tempFile new file mode 100644 index 0000000000..e69de29bb2