Skip to content

Commit

Permalink
WX-1873 Replace output move option with "copy & update output locatio…
Browse files Browse the repository at this point in the history
…n" (#7565)
  • Loading branch information
aednichols authored Oct 1, 2024
1 parent e54b32e commit 90ff21b
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 73 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: gcpWdlResultsMoving
name: gcpWdlResultsCopying_DestinationInMetadata
testFormat: workflowsuccess
tags: ["copyGcp"]

Expand All @@ -8,51 +8,51 @@ backends: [Papi, Papiv2, GCPBatch]

files {
workflow: wdlResultsCopying/simpleWorkflow.wdl
options: wdlResultsMoving/gcp/options.json
options: wdlResultsCopying_DestinationInMetadata/gcp/options.json
}

metadata {
status: Succeeded
"outputs.simpleWorkflow.outFile": "gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/output.txt"
"outputs.simpleWorkflow.outGlob.0": "gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/glob-"~~
"outputs.simpleWorkflow.outGlob.1": "gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/glob-"~~
"outputs.simpleWorkflow.outGlob.2": "gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/glob-"~~
"outputs.simpleWorkflow.outFile": "gs://centaur-ci-us-east1/copy_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/output.txt"
"outputs.simpleWorkflow.outGlob.0": "gs://centaur-ci-us-east1/copy_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/glob-"~~
"outputs.simpleWorkflow.outGlob.1": "gs://centaur-ci-us-east1/copy_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/glob-"~~
"outputs.simpleWorkflow.outGlob.2": "gs://centaur-ci-us-east1/copy_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/glob-"~~

# TODO: Centaur's JSON flattening does not support nested arrays (`centaur.json.JsonUtils`)
# Consider using JsonPath (XPath for JSON) to traverse the tree, or check in a JSON file to use as expectation.
# "outputs.simpleWorkflow.nested_array.0.0": "gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/arraytest1.txt"
# "outputs.simpleWorkflow.nested_array.0.1": "gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/arraytest2.txt"
# "outputs.simpleWorkflow.nested_array.1.0": "gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/arraytest3.txt"
# "outputs.simpleWorkflow.nested_array.0.0": "gs://centaur-ci-us-east1/copy_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/arraytest1.txt"
# "outputs.simpleWorkflow.nested_array.0.1": "gs://centaur-ci-us-east1/copy_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/arraytest2.txt"
# "outputs.simpleWorkflow.nested_array.1.0": "gs://centaur-ci-us-east1/copy_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/arraytest3.txt"

"outputs.simpleWorkflow.myFoo.complex.right.t": "gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/object_out.txt"
"outputs.simpleWorkflow.myFoo.complex.right.t": "gs://centaur-ci-us-east1/copy_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/object_out.txt"
}

# The `centaur-ci-us-east1` bucket is in a different region than the workflow runs
fileSystemCheck: "gcs"
outputExpectations: {
### Destination

"gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/output.txt": 1
"gs://centaur-ci-us-east1/copy_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/output.txt": 1

# We don't currently have a way to check for glob contents, so do the next best thing by asserting the expected count
"gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/": 8
# "gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/glob-3860fac5129fd76b802f49dcd7f9a9f6/foo.zardoz": 1
# "gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/glob-3860fac5129fd76b802f49dcd7f9a9f6/bar.zardoz": 1
# "gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/glob-3860fac5129fd76b802f49dcd7f9a9f6/baz.zardoz": 1
"gs://centaur-ci-us-east1/copy_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/": 8
# "gs://centaur-ci-us-east1/copy_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/glob-3860fac5129fd76b802f49dcd7f9a9f6/foo.zardoz": 1
# "gs://centaur-ci-us-east1/copy_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/glob-3860fac5129fd76b802f49dcd7f9a9f6/bar.zardoz": 1
# "gs://centaur-ci-us-east1/copy_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/glob-3860fac5129fd76b802f49dcd7f9a9f6/baz.zardoz": 1

"gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/arraytest1.txt": 1
"gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/arraytest2.txt": 1
"gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/arraytest3.txt": 1
"gs://centaur-ci-us-east1/copy_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/arraytest1.txt": 1
"gs://centaur-ci-us-east1/copy_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/arraytest2.txt": 1
"gs://centaur-ci-us-east1/copy_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/arraytest3.txt": 1

"gs://centaur-ci-us-east1/move_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/object_out.txt": 1
"gs://centaur-ci-us-east1/copy_destination/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/object_out.txt": 1

### Source

"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/output.txt": 0
"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/output.txt": 1

"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/arraytest1.txt": 0
"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/arraytest2.txt": 0
"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/simpleWorkflow<<UUID>>/call-simpleStdoutTask/arraytest3.txt": 0
"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/arraytest1.txt": 1
"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/arraytest2.txt": 1
"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/arraytest3.txt": 1

"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/object_out.txt": 0
"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/simpleWorkflow/<<UUID>>/call-simpleStdoutTask/object_out.txt": 1
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: gcpWdlResultsMovingFail
name: gcpWdlResultsCopying_DestinationInMetadata_Fail
testFormat: workflowfailure
tags: ["copyGcp"]

Expand All @@ -8,7 +8,7 @@ backends: [Papi, Papiv2, GCPBatch]

files {
workflow: wdlResultsCopying/simpleWorkflow.wdl
options: wdlResultsMoving/gcp/options_fail.json
options: wdlResultsCopying_DestinationInMetadata/gcp/options_fail.json
}

metadata {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"use_relative_output_paths": false,
"final_workflow_outputs_dir": "gs://centaur-ci-us-east1/wf_results",
"final_workflow_outputs_mode": "copy",
"final_workflow_outputs_dir_metadata": "source",
"final_workflow_log_dir": "gs://centaur-ci-us-east1/wf_logs",
"final_call_logs_dir": "gs://centaur-ci-us-east1/cl_logs",
"read_from_cache": false,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"jes_gcs_root": "gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci",
"final_workflow_outputs_dir": "gs://centaur-ci-us-east1/move_destination",
"final_workflow_outputs_mode": "move",
"final_workflow_outputs_dir": "gs://centaur-ci-us-east1/copy_destination",
"final_workflow_outputs_dir_metadata": "destination",
"read_from_cache": false,
"write_to_cache": false
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"jes_gcs_root": "gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci",
"final_workflow_outputs_dir": "gs://non-existent-bucket/move_destination",
"final_workflow_outputs_mode": "move",
"final_workflow_outputs_dir": "gs://non-existent-bucket/copy_destination",
"final_workflow_outputs_dir_metadata": "destination",
"read_from_cache": false,
"write_to_cache": false
}
20 changes: 11 additions & 9 deletions core/src/main/scala/cromwell/core/WorkflowOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,21 @@ object WorkflowOptions {
case object FinalCallLogsDir extends WorkflowOption("final_call_logs_dir")
case object FinalWorkflowOutputsDir extends WorkflowOption("final_workflow_outputs_dir")
case object UseRelativeOutputPaths extends WorkflowOption(name = "use_relative_output_paths")
case object FinalWorkflowOutputsMode extends WorkflowOption("final_workflow_outputs_mode") {
// Default to Copy because that was originally the only behavior
def fromString(s: Option[String]): FinalWorkflowOutputsMode =
case object FinalWorkflowOutputsDirMetadata extends WorkflowOption("final_workflow_outputs_dir_metadata") {
// Default to Source because that was originally the only behavior
def fromString(s: Option[String]): FinalWorkflowOutputsDirMetadata =
s match {
case Some("copy") => Copy
case Some("move") => Move
case _ => Copy
case Some("source") => Source
case Some("destination") => Destination
case _ => Source
}
}

sealed trait FinalWorkflowOutputsMode
case object Copy extends FinalWorkflowOutputsMode
case object Move extends FinalWorkflowOutputsMode
sealed trait FinalWorkflowOutputsDirMetadata
// Metadata points to original location (copy source)
case object Source extends FinalWorkflowOutputsDirMetadata
// Metadata points to new location (copy destination)
case object Destination extends FinalWorkflowOutputsDirMetadata

// Misc.
case object DefaultRuntimeOptions extends WorkflowOption("default_runtime_attributes")
Expand Down
16 changes: 8 additions & 8 deletions docs/wf_options/Overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,21 @@ Example `options.json`:
```

## Output Copying
|Option| Value | Description |
|---|-----------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|`final_workflow_outputs_dir`| A directory available to Cromwell | Specifies a path where final workflow outputs will be written. If this is not specified, workflow outputs will not be copied out of the Cromwell workflow execution directory/path. |
|`final_workflow_outputs_mode`| `"copy"` or `"move"` | `"copy"` is the default and preserves the source files. `"move"` performs a copy-delete sequence to clean up the source.<br/><br/>For the `"move"` option only, the `/outputs` endpoint points to the destination.
|`use_relative_output_paths`| A boolean | When set to `true` this will copy all the outputs relative to their execution directory. my_final_workflow_outputs_dir/~~MyWorkflow/af76876d8-6e8768fa/call-MyTask/execution/~~output_of_interest . Cromwell will throw an exception when this leads to collisions. When the option is not set it will default to `false`. |
|`final_workflow_log_dir`| A directory available to Cromwell | Specifies a path where per-workflow logs will be written. If this is not specified, per-workflow logs will not be copied out of the Cromwell workflow log temporary directory/path before they are deleted. |
|`final_call_logs_dir`| A directory available to Cromwell | Specifies a path where final call logs will be written. If this is not specified, call logs will not be copied out of the Cromwell workflow execution directory/path. |
| Option | Value | Description |
|--------------------------------------|-----------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `final_workflow_outputs_dir` | A directory available to Cromwell | Specifies a path where final workflow outputs will be written. If this is not specified, workflow outputs will not be copied out of the Cromwell workflow execution directory/path. |
| `final_workflow_outputs_dir_metadata`| `"source"` or `"destination"` | Specifies whether the `/outputs` endpoint returns the source or destination of the copy. `"source"` is the original behavior, and the default when the parameter is not supplied.
| `use_relative_output_paths` | A boolean | When set to `true` this will copy all the outputs relative to their execution directory. my_final_workflow_outputs_dir/~~MyWorkflow/af76876d8-6e8768fa/call-MyTask/execution/~~output_of_interest . Cromwell will throw an exception when this leads to collisions. When the option is not set it will default to `false`. |
| `final_workflow_log_dir` | A directory available to Cromwell | Specifies a path where per-workflow logs will be written. If this is not specified, per-workflow logs will not be copied out of the Cromwell workflow log temporary directory/path before they are deleted. |
| `final_call_logs_dir` | A directory available to Cromwell | Specifies a path where final call logs will be written. If this is not specified, call logs will not be copied out of the Cromwell workflow execution directory/path. |

Note that these directories should be using the same filesystem as the workflow. Eg if you run on Google's PAPI, you should provide `gs://...` paths.

Example `options.json`:
```json
{
"final_workflow_outputs_dir": "/Users/michael_scott/cromwell/outputs",
"final_workflow_outputs_mode": "copy",
"final_workflow_outputs_dir_metadata": "source",
"use_relative_output_paths": true,
"final_workflow_log_dir": "/Users/michael_scott/cromwell/wf_logs",
"final_call_logs_dir": "/Users/michael_scott/cromwell/call_logs"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package cromwell.engine

import cromwell.backend.BackendWorkflowDescriptor
import cromwell.core.WorkflowOptions.{FinalWorkflowOutputsDir, FinalWorkflowOutputsMode, WorkflowOption}
import cromwell.core.WorkflowOptions.{FinalWorkflowOutputsDir, FinalWorkflowOutputsDirMetadata, WorkflowOption}
import cromwell.core.callcaching.CallCachingMode
import cromwell.core.path.PathBuilder
import wom.callable.Callable
Expand Down Expand Up @@ -35,6 +35,6 @@ case class EngineWorkflowDescriptor(topLevelCallable: Callable,
def finalWorkflowOutputsDir: Option[String] =
getWorkflowOption(FinalWorkflowOutputsDir)

def finalWorkflowOutputsMode: FinalWorkflowOutputsMode =
FinalWorkflowOutputsMode.fromString(getWorkflowOption(FinalWorkflowOutputsMode))
def finalWorkflowOutputsDirMetadata: FinalWorkflowOutputsDirMetadata =
FinalWorkflowOutputsDirMetadata.fromString(getWorkflowOption(FinalWorkflowOutputsDirMetadata))
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import cromwell.backend._
import cromwell.backend.standard.callcaching.BlacklistCache
import cromwell.core.Dispatcher._
import cromwell.core.ExecutionStatus._
import cromwell.core.WorkflowOptions.Move
import cromwell.core.WorkflowOptions.Destination
import cromwell.core._
import cromwell.core.io.AsyncIo
import cromwell.core.logging.WorkflowLogging
Expand Down Expand Up @@ -417,8 +417,8 @@ case class WorkflowExecutionActor(params: WorkflowExecutionActorParams)

def handleSuccessfulWorkflowOutputs(outputs: Map[GraphOutputNode, WomValue]) = {
val fileMap: FileRelocationMap =
(workflowDescriptor.finalWorkflowOutputsDir, workflowDescriptor.finalWorkflowOutputsMode) match {
case (Some(outputDir), Move) =>
(workflowDescriptor.finalWorkflowOutputsDir, workflowDescriptor.finalWorkflowOutputsDirMetadata) match {
case (Some(outputDir), Destination) =>
outputFilePathMapping(outputDir, workflowDescriptor, params.initializationData, outputs.values.toSeq) map {
case (src, dst) =>
(src, dst)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import cromwell.backend.BackendWorkflowFinalizationActor.{
}
import cromwell.backend.AllBackendInitializationData
import cromwell.core.Dispatcher.IoDispatcher
import cromwell.core.WorkflowOptions.{Copy, Move}
import cromwell.core._
import cromwell.core.io.AsyncIoActorClient
import cromwell.core.path.Path
Expand Down Expand Up @@ -94,28 +93,12 @@ class CopyWorkflowOutputsActor(workflowId: WorkflowId,
Future.sequence(copies)
}

private def moveWorkflowOutputs(outputsDir: String): Future[Seq[Unit]] = {
val outputFilePaths =
outputFilePathMapping(outputsDir, workflowDescriptor, initializationData, workflowOutputs.outputs.values.toSeq)

markDuplicates(outputFilePaths)

val moves = outputFilePaths.toList map { case (srcPath, dstPath) =>
asyncIo.copyAsync(srcPath, dstPath) flatMap { _ =>
asyncIo.deleteAsync(srcPath)
}
}

Future.sequence(moves)
}

/**
* Happens after everything else runs
*/
final def afterAll()(implicit ec: ExecutionContext): Future[FinalizationResponse] =
(workflowDescriptor.finalWorkflowOutputsDir, workflowDescriptor.finalWorkflowOutputsMode) match {
case (Some(outputsDir), Copy) => copyWorkflowOutputs(outputsDir) map { _ => FinalizationSuccess }
case (Some(outputsDir), Move) => moveWorkflowOutputs(outputsDir) map { _ => FinalizationSuccess }
workflowDescriptor.finalWorkflowOutputsDir match {
case Some(outputsDir) => copyWorkflowOutputs(outputsDir) map { _ => FinalizationSuccess }
case _ => Future.successful(FinalizationSuccess)
}
}

0 comments on commit 90ff21b

Please sign in to comment.