Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 55 additions & 44 deletions app/buck2_test/src/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,14 +543,14 @@ struct TestExecutionKey {
#[derive(Clone, Dupe, Debug, Eq, Hash, PartialEq, Allocative)]
enum TestExecutionPrefix {
Listing,
Testing(Arc<ForwardRelativePathBuf>),
Testing,
}

impl TestExecutionPrefix {
fn new(stage: &TestStage, session: &TestSession) -> Self {
fn new(stage: &TestStage, _session: &TestSession) -> Self {
match stage {
TestStage::Listing { .. } => TestExecutionPrefix::Listing,
TestStage::Testing { .. } => TestExecutionPrefix::Testing(session.prefix().dupe()),
TestStage::Testing { .. } => TestExecutionPrefix::Testing,
}
}
}
Expand All @@ -559,7 +559,7 @@ impl Display for TestExecutionPrefix {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TestExecutionPrefix::Listing => write!(f, "Listing"),
TestExecutionPrefix::Testing(prefix) => write!(f, "Testing({prefix})"),
TestExecutionPrefix::Testing => write!(f, "Testing"),
}
}
}
Expand Down Expand Up @@ -609,7 +609,7 @@ async fn prepare_and_execute(
) -> Result<ExecuteData, ExecuteError> {
let execute_on_dice = match key.stage.as_ref() {
TestStage::Listing { cacheable, .. } => *cacheable,
TestStage::Testing { .. } => false,
TestStage::Testing { .. } => true,
};
if execute_on_dice {
let result = tokio::select! {
Expand Down Expand Up @@ -1003,8 +1003,6 @@ impl BuckTestOrchestrator<'_> {
action_key_suffix: create_action_key_suffix(&stage),
};

// For test execution, we currently do not do any cache queries

let prepared_action = match executor.prepare_action(&request, digest_config, false) {
Ok(prepared_action) => prepared_action,
Err(e) => return Err(ExecuteError::Error(e.into())),
Expand Down Expand Up @@ -1089,7 +1087,6 @@ impl BuckTestOrchestrator<'_> {
TestStage::Testing {
suite, testcases, ..
} => {
let command = executor.exec_cmd(manager, &prepared_command, cancellation);
let test_suite = Some(TestSuite {
suite_name: suite.clone(),
test_names: testcases.clone(),
Expand All @@ -1098,9 +1095,27 @@ impl BuckTestOrchestrator<'_> {
let start = TestRunStart {
suite: test_suite.clone(),
};
events
let (result, cached) = events
.span_async(start, async move {
let result = command.await;
let (result, cached) = if re_cache_enabled {
match executor
.action_cache(manager, &prepared_command, cancellation)
.await
{
ControlFlow::Continue(manager) => {
let result = executor
.exec_cmd(manager, &prepared_command, cancellation)
.await;
(result, false)
}
ControlFlow::Break(result) => (result, true),
}
} else {
let result = executor
.exec_cmd(manager, &prepared_command, cancellation)
.await;
(result, false)
};
let end = TestRunEnd {
suite: test_suite,
command_report: Some(
Expand All @@ -1114,9 +1129,31 @@ impl BuckTestOrchestrator<'_> {
)
.ok(),
};
(result, end)
((result, cached), end)
})
.await
.await;
if !cached && re_cache_enabled {
let info = CacheUploadInfo {
target: &test_target as _,
digest_config,
mergebase: &None,
re_platform: executor.re_platform(),
};
let _result = match executor
.cache_upload(
&info,
&result,
None,
None,
&prepared_action.action_and_blobs,
)
.await
{
Ok(result) => result,
Err(e) => return Err(ExecuteError::Error(e.into())),
};
}
result
}
};

Expand Down Expand Up @@ -1213,7 +1250,7 @@ impl BuckTestOrchestrator<'_> {
fn executor_config_with_remote_cache_override<'a>(
test_target_node: &'a ConfiguredTargetNode,
executor_override: Option<&'a CommandExecutorConfig>,
stage: &TestStage,
_stage: &TestStage,
) -> buck2_error::Result<Cow<'a, CommandExecutorConfig>> {
let executor_config = match executor_override {
Some(o) => o,
Expand All @@ -1223,31 +1260,14 @@ impl BuckTestOrchestrator<'_> {
.buck_error_context("Error accessing executor config")?,
};

if let TestStage::Listing { .. } = &stage {
return Ok(Cow::Borrowed(executor_config));
}

match &executor_config.executor {
Executor::RemoteEnabled(options) if options.remote_cache_enabled => {
let mut exec_options = options.clone();
exec_options.remote_cache_enabled = false;
let executor_config = CommandExecutorConfig {
executor: Executor::RemoteEnabled(exec_options),
options: executor_config.options.dupe(),
};
Ok(Cow::Owned(executor_config))
}
Executor::Local(_) | Executor::RemoteEnabled(_) | Executor::None => {
Ok(Cow::Borrowed(executor_config))
}
}
Ok(Cow::Borrowed(executor_config))
}

async fn get_command_executor(
dice: &mut DiceComputations<'_>,
fs: &ArtifactFs,
executor_config: &CommandExecutorConfig,
stage: &TestStage,
_stage: &TestStage,
) -> buck2_error::Result<CommandExecutor> {
let CommandExecutorResponse {
executor,
Expand All @@ -1258,15 +1278,6 @@ impl BuckTestOrchestrator<'_> {
output_trees_download_config: _,
} = dice.get_command_executor_from_dice(executor_config).await?;

// Caching is enabled only for listings
let (cache_uploader, action_cache_checker) = match stage {
TestStage::Listing { .. } => (cache_uploader, action_cache_checker),
TestStage::Testing { .. } => (
Arc::new(NoOpCacheUploader {}) as _,
Arc::new(NoOpCommandOptionalExecutor {}) as _,
),
};

let executor = CommandExecutor::new(
executor,
action_cache_checker,
Expand Down Expand Up @@ -1999,9 +2010,9 @@ async fn resolve_output_root(
.resolve_test_discovery(test_target)?
.into_forward_relative_path_buf()
}
TestExecutionPrefix::Testing(prefix) => prefix.join(ForwardRelativePathBuf::unchecked_new(
Uuid::new_v4().to_string(),
)),
TestExecutionPrefix::Testing => {
ForwardRelativePathBuf::unchecked_new(Uuid::new_v4().to_string())
}
};
Ok(output_root)
}
Expand Down