diff --git a/fbpcs/data_processing/sharding/sharding.py b/fbpcs/data_processing/sharding/sharding.py index 224b1c354..2a9afb6bc 100644 --- a/fbpcs/data_processing/sharding/sharding.py +++ b/fbpcs/data_processing/sharding/sharding.py @@ -45,6 +45,7 @@ def shard_on_container( tmp_directory: str = "/tmp/", hmac_key: Optional[str] = None, wait_for_containers: bool = True, + should_log_container_urls: bool = False, ) -> ContainerInstance: pass @@ -61,5 +62,6 @@ async def shard_on_container_async( tmp_directory: str = "/tmp/", hmac_key: Optional[str] = None, wait_for_containers: bool = True, + should_log_container_urls: bool = False, ) -> ContainerInstance: pass diff --git a/fbpcs/data_processing/sharding/sharding_cpp.py b/fbpcs/data_processing/sharding/sharding_cpp.py index db63192c9..f83d2bc4b 100644 --- a/fbpcs/data_processing/sharding/sharding_cpp.py +++ b/fbpcs/data_processing/sharding/sharding_cpp.py @@ -18,6 +18,7 @@ from fbpcp.service.storage import PathType, StorageService from fbpcs.common.util.wait_for_containers import wait_for_containers_async from fbpcs.data_processing.sharding.sharding import ShardingService, ShardType +from fbpcs.experimental.cloud_logs.log_retriever import CloudProvider, LogRetriever from fbpcs.onedocker_binary_names import OneDockerBinaryNames from fbpcs.pid.service.pid_service.pid_stage import PIDStage @@ -139,6 +140,7 @@ def shard_on_container( hmac_key: Optional[str] = None, container_timeout: Optional[int] = None, wait_for_containers: bool = True, + should_log_container_urls: bool = False, ) -> ContainerInstance: return asyncio.run( self.shard_on_container_async( @@ -153,6 +155,7 @@ def shard_on_container( hmac_key, container_timeout, wait_for_containers, + should_log_container_urls, ) ) @@ -169,6 +172,7 @@ async def shard_on_container_async( hmac_key: Optional[str] = None, container_timeout: Optional[int] = None, wait_for_containers: bool = True, + should_log_container_urls: bool = False, ) -> ContainerInstance: logger = logging.getLogger(__name__) timeout = container_timeout or DEFAULT_CONTAINER_TIMEOUT_IN_SEC @@ -212,6 +216,18 @@ async def shard_on_container_async( ) )[0] + if should_log_container_urls: + # Log the URL once... since the DataProcessingStage doesn't expose + # the containers, we handle the logic directly in each stage like + # so. It's kind of weird. T107574607 is tracking this. + # Hope we're using AWS! + try: + log_retriever = LogRetriever(CloudProvider.AWS) + log_url = log_retriever.get_log_url(container.instance_id) + logging.info(f"Container URL -> {log_url}") + except Exception: + logging.warning("Failed to retrieve log URL") + logger.info("Task started") if wait_for_containers: # Busy wait until the container is finished diff --git a/fbpcs/private_computation/service/prepare_data_stage_service.py b/fbpcs/private_computation/service/prepare_data_stage_service.py index 798042b7d..34c8ff4d9 100644 --- a/fbpcs/private_computation/service/prepare_data_stage_service.py +++ b/fbpcs/private_computation/service/prepare_data_stage_service.py @@ -203,6 +203,7 @@ async def _run_sharder_service( onedocker_svc=self._onedocker_svc, binary_version=binary_config.binary_version, tmp_directory=binary_config.tmp_directory, + should_log_container_urls=True, ) coros.append(coro) diff --git a/fbpcs/private_computation/service/private_computation.py b/fbpcs/private_computation/service/private_computation.py index 1de663513..5716ed239 100644 --- a/fbpcs/private_computation/service/private_computation.py +++ b/fbpcs/private_computation/service/private_computation.py @@ -59,6 +59,7 @@ PrivateComputationStageService, PrivateComputationStageServiceArgs, ) +from fbpcs.private_computation.service.utils import get_log_urls from fbpcs.utils.optional import unwrap_or_default T = TypeVar("T") @@ -306,10 +307,18 @@ async def run_stage_async( raise e finally: self.instance_repository.update(pc_instance) + + try: + log_urls = get_log_urls(pc_instance) + for key, url in log_urls.items(): + self.logger.info(f"Log for {key} at {url}") + except Exception: + self.logger.warning("Failed to retrieve log URLs for instance") + return pc_instance # TODO T88759390: make an async version of this function - # Optioinal stage, validate the correctness of aggregated results for injected synthetic data + # Optional stage, validate the correctness of aggregated results for injected synthetic data def validate_metrics( self, instance_id: str, diff --git a/fbpcs/private_computation/service/run_binary_base_service.py b/fbpcs/private_computation/service/run_binary_base_service.py index d6b479ee9..e7204afbd 100644 --- a/fbpcs/private_computation/service/run_binary_base_service.py +++ b/fbpcs/private_computation/service/run_binary_base_service.py @@ -10,6 +10,7 @@ from fbpcp.entity.container_instance import ContainerInstanceStatus from fbpcp.service.onedocker import OneDockerService +from fbpcs.experimental.cloud_logs.log_retriever import CloudProvider, LogRetriever from fbpcs.private_computation.service.constants import DEFAULT_CONTAINER_TIMEOUT_IN_SEC @@ -36,6 +37,18 @@ async def start_and_wait_for_containers( [container.instance_id for container in pending_containers] ) + # Log the URL once... since the DataProcessingStage doesn't expose the + # containers, we handle the logic directly in each stage like so. + # It's kind of weird. T107574607 is tracking this. + # Hope we're using AWS! + log_retriever = LogRetriever(CloudProvider.AWS) + for i, container in enumerate(containers): + try: + log_url = log_retriever.get_log_url(container.instance_id) + logger.info(f"Container[{i}] URL -> {log_url}") + except Exception: + logger.warning(f"Could not look up URL for container[{i}]") + # Busy wait until all containers are finished any_failed = False for shard, container in enumerate(containers): diff --git a/fbpcs/private_computation/service/utils.py b/fbpcs/private_computation/service/utils.py index f8d3ed1ce..b65c24441 100644 --- a/fbpcs/private_computation/service/utils.py +++ b/fbpcs/private_computation/service/utils.py @@ -8,6 +8,7 @@ import functools +import logging import warnings from typing import Any, Dict, List, Optional @@ -16,6 +17,8 @@ from fbpcp.entity.mpc_instance import MPCInstanceStatus from fbpcp.service.mpc import MPCService from fbpcs.common.entity.pcs_mpc_instance import PCSMPCInstance +from fbpcs.experimental.cloud_logs.log_retriever import CloudProvider, LogRetriever +from fbpcs.pid.entity.pid_instance import PIDInstance from fbpcs.private_computation.entity.private_computation_instance import ( PrivateComputationInstance, PrivateComputationInstanceStatus, @@ -201,6 +204,45 @@ def get_updated_pc_status_mpc_game( return status +def get_log_urls( + private_computation_instance: PrivateComputationInstance, +) -> Dict[str, str]: + """Get log urls for most recently run containers + + Arguments: + private_computation_instance: The PC instance that is being updated + + Returns: + The latest status for private_computation_instance as an ordered dict + """ + # Get the last pid or mpc instance + last_instance = private_computation_instance.instances[-1] + + # TODO - hope we're using AWS! + log_retriever = LogRetriever(CloudProvider.AWS) + + res = {} + if isinstance(last_instance, PIDInstance): + pid_current_stage = last_instance.current_stage + if not pid_current_stage: + logging.warning("Unreachable block: no stage has run yet") + return res + containers = last_instance.stages_containers[pid_current_stage] + for i, container in enumerate(containers): + res[f"{pid_current_stage}_{i}"] = log_retriever.get_log_url(container.instance_id) + elif isinstance(last_instance, PCSMPCInstance): + containers = last_instance.containers + for i, container in enumerate(containers): + res[str(i)] = log_retriever.get_log_url(container.instance_id) + else: + logging.warning( + "The last instance of PrivateComputationInstance " + f"{private_computation_instance.instance_id} has no supported way " + "of retrieving log URLs" + ) + return res + + # decorators are a serious pain to add typing for, so I'm not going to bother... # pyre-ignore return typing def deprecated(reason: str): @@ -210,9 +252,11 @@ def deprecated(reason: str): # pyre-ignore return typing def wrap(func): - warning_color = '\033[93m' # orange/yellow ascii escape sequence - end = '\033[0m' # end ascii escape sequence - explanation: str = f"{warning_color}{func.__name__} is deprecated! explanation: {reason}{end}" + warning_color = "\033[93m" # orange/yellow ascii escape sequence + end = "\033[0m" # end ascii escape sequence + explanation: str = ( + f"{warning_color}{func.__name__} is deprecated! explanation: {reason}{end}" + ) @functools.wraps(func) # pyre-ignore typing on args, kwargs, and return