Skip to content

Commit

Permalink
Add get_log_urls util function in PCService (#451)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #451

# What
* Title
# Why
* We want to log these in prod for debugging

Reviewed By: jrodal98

Differential Revision: D32763113

fbshipit-source-id: 976119aefabd7e849dad21c6917adab44e2213d1
  • Loading branch information
Logan Gore authored and facebook-github-bot committed Dec 9, 2021
1 parent 769d1f6 commit e262f85
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 4 deletions.
2 changes: 2 additions & 0 deletions fbpcs/data_processing/sharding/sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def shard_on_container(
tmp_directory: str = "/tmp/",
hmac_key: Optional[str] = None,
wait_for_containers: bool = True,
should_log_container_urls: bool = False,
) -> ContainerInstance:
pass

Expand All @@ -61,5 +62,6 @@ async def shard_on_container_async(
tmp_directory: str = "/tmp/",
hmac_key: Optional[str] = None,
wait_for_containers: bool = True,
should_log_container_urls: bool = False,
) -> ContainerInstance:
pass
16 changes: 16 additions & 0 deletions fbpcs/data_processing/sharding/sharding_cpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from fbpcp.service.storage import PathType, StorageService
from fbpcs.common.util.wait_for_containers import wait_for_containers_async
from fbpcs.data_processing.sharding.sharding import ShardingService, ShardType
from fbpcs.experimental.cloud_logs.log_retriever import CloudProvider, LogRetriever
from fbpcs.onedocker_binary_names import OneDockerBinaryNames
from fbpcs.pid.service.pid_service.pid_stage import PIDStage

Expand Down Expand Up @@ -139,6 +140,7 @@ def shard_on_container(
hmac_key: Optional[str] = None,
container_timeout: Optional[int] = None,
wait_for_containers: bool = True,
should_log_container_urls: bool = False,
) -> ContainerInstance:
return asyncio.run(
self.shard_on_container_async(
Expand All @@ -153,6 +155,7 @@ def shard_on_container(
hmac_key,
container_timeout,
wait_for_containers,
should_log_container_urls,
)
)

Expand All @@ -169,6 +172,7 @@ async def shard_on_container_async(
hmac_key: Optional[str] = None,
container_timeout: Optional[int] = None,
wait_for_containers: bool = True,
should_log_container_urls: bool = False,
) -> ContainerInstance:
logger = logging.getLogger(__name__)
timeout = container_timeout or DEFAULT_CONTAINER_TIMEOUT_IN_SEC
Expand Down Expand Up @@ -212,6 +216,18 @@ async def shard_on_container_async(
)
)[0]

if should_log_container_urls:
# Log the URL once... since the DataProcessingStage doesn't expose
# the containers, we handle the logic directly in each stage like
# so. It's kind of weird. T107574607 is tracking this.
# Hope we're using AWS!
try:
log_retriever = LogRetriever(CloudProvider.AWS)
log_url = log_retriever.get_log_url(container.instance_id)
logging.info(f"Container URL -> {log_url}")
except Exception:
logging.warning("Failed to retrieve log URL")

logger.info("Task started")
if wait_for_containers:
# Busy wait until the container is finished
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ async def _run_sharder_service(
onedocker_svc=self._onedocker_svc,
binary_version=binary_config.binary_version,
tmp_directory=binary_config.tmp_directory,
should_log_container_urls=True,
)
coros.append(coro)

Expand Down
11 changes: 10 additions & 1 deletion fbpcs/private_computation/service/private_computation.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
PrivateComputationStageService,
PrivateComputationStageServiceArgs,
)
from fbpcs.private_computation.service.utils import get_log_urls
from fbpcs.utils.optional import unwrap_or_default

T = TypeVar("T")
Expand Down Expand Up @@ -306,10 +307,18 @@ async def run_stage_async(
raise e
finally:
self.instance_repository.update(pc_instance)

try:
log_urls = get_log_urls(pc_instance)
for key, url in log_urls.items():
self.logger.info(f"Log for {key} at {url}")
except Exception:
self.logger.warning("Failed to retrieve log URLs for instance")

return pc_instance

# TODO T88759390: make an async version of this function
# Optioinal stage, validate the correctness of aggregated results for injected synthetic data
# Optional stage, validate the correctness of aggregated results for injected synthetic data
def validate_metrics(
self,
instance_id: str,
Expand Down
13 changes: 13 additions & 0 deletions fbpcs/private_computation/service/run_binary_base_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from fbpcp.entity.container_instance import ContainerInstanceStatus
from fbpcp.service.onedocker import OneDockerService
from fbpcs.experimental.cloud_logs.log_retriever import CloudProvider, LogRetriever
from fbpcs.private_computation.service.constants import DEFAULT_CONTAINER_TIMEOUT_IN_SEC


Expand All @@ -36,6 +37,18 @@ async def start_and_wait_for_containers(
[container.instance_id for container in pending_containers]
)

# Log the URL once... since the DataProcessingStage doesn't expose the
# containers, we handle the logic directly in each stage like so.
# It's kind of weird. T107574607 is tracking this.
# Hope we're using AWS!
log_retriever = LogRetriever(CloudProvider.AWS)
for i, container in enumerate(containers):
try:
log_url = log_retriever.get_log_url(container.instance_id)
logger.info(f"Container[{i}] URL -> {log_url}")
except Exception:
logger.warning(f"Could not look up URL for container[{i}]")

# Busy wait until all containers are finished
any_failed = False
for shard, container in enumerate(containers):
Expand Down
50 changes: 47 additions & 3 deletions fbpcs/private_computation/service/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@


import functools
import logging
import warnings
from typing import Any, Dict, List, Optional

Expand All @@ -16,6 +17,8 @@
from fbpcp.entity.mpc_instance import MPCInstanceStatus
from fbpcp.service.mpc import MPCService
from fbpcs.common.entity.pcs_mpc_instance import PCSMPCInstance
from fbpcs.experimental.cloud_logs.log_retriever import CloudProvider, LogRetriever
from fbpcs.pid.entity.pid_instance import PIDInstance
from fbpcs.private_computation.entity.private_computation_instance import (
PrivateComputationInstance,
PrivateComputationInstanceStatus,
Expand Down Expand Up @@ -201,6 +204,45 @@ def get_updated_pc_status_mpc_game(
return status


def get_log_urls(
private_computation_instance: PrivateComputationInstance,
) -> Dict[str, str]:
"""Get log urls for most recently run containers
Arguments:
private_computation_instance: The PC instance that is being updated
Returns:
The latest status for private_computation_instance as an ordered dict
"""
# Get the last pid or mpc instance
last_instance = private_computation_instance.instances[-1]

# TODO - hope we're using AWS!
log_retriever = LogRetriever(CloudProvider.AWS)

res = {}
if isinstance(last_instance, PIDInstance):
pid_current_stage = last_instance.current_stage
if not pid_current_stage:
logging.warning("Unreachable block: no stage has run yet")
return res
containers = last_instance.stages_containers[pid_current_stage]
for i, container in enumerate(containers):
res[f"{pid_current_stage}_{i}"] = log_retriever.get_log_url(container.instance_id)
elif isinstance(last_instance, PCSMPCInstance):
containers = last_instance.containers
for i, container in enumerate(containers):
res[str(i)] = log_retriever.get_log_url(container.instance_id)
else:
logging.warning(
"The last instance of PrivateComputationInstance "
f"{private_computation_instance.instance_id} has no supported way "
"of retrieving log URLs"
)
return res


# decorators are a serious pain to add typing for, so I'm not going to bother...
# pyre-ignore return typing
def deprecated(reason: str):
Expand All @@ -210,9 +252,11 @@ def deprecated(reason: str):

# pyre-ignore return typing
def wrap(func):
warning_color = '\033[93m' # orange/yellow ascii escape sequence
end = '\033[0m' # end ascii escape sequence
explanation: str = f"{warning_color}{func.__name__} is deprecated! explanation: {reason}{end}"
warning_color = "\033[93m" # orange/yellow ascii escape sequence
end = "\033[0m" # end ascii escape sequence
explanation: str = (
f"{warning_color}{func.__name__} is deprecated! explanation: {reason}{end}"
)

@functools.wraps(func)
# pyre-ignore typing on args, kwargs, and return
Expand Down

0 comments on commit e262f85

Please sign in to comment.