Skip to content

Commit

Permalink
AN-214 Redirect workflow existence queries from metadata to summary (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
aednichols authored Oct 25, 2024
1 parent 3b75b19 commit c4093a0
Show file tree
Hide file tree
Showing 12 changed files with 17 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,6 @@ class MetadataSlickDatabase(originalDatabaseConfig: Config)
} yield ()
}

override def metadataEntryExists(workflowExecutionUuid: String)(implicit ec: ExecutionContext): Future[Boolean] = {
val action = dataAccess.metadataEntryExistsForWorkflowExecutionUuid(workflowExecutionUuid).result
runTransaction(action)
}

override def metadataSummaryEntryExists(
workflowExecutionUuid: String
)(implicit ec: ExecutionContext): Future[Boolean] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,6 @@ trait MetadataEntryComponent {
}.size
)

val metadataEntryExistsForWorkflowExecutionUuid = Compiled((workflowExecutionUuid: Rep[String]) =>
(for {
metadataEntry <- metadataEntries
if metadataEntry.workflowExecutionUuid === workflowExecutionUuid
} yield metadataEntry).exists
)

def metadataEntryExistsForWorkflowExecutionUuid(workflowId: Rep[String], key: Rep[String]): Rep[Boolean] =
metadataEntries
.filter(metadataEntry =>
metadataEntry.workflowExecutionUuid === workflowId &&
metadataEntry.metadataKey === key &&
metadataEntry.metadataValue.isDefined
)
.exists

val metadataEntriesForWorkflowExecutionUuidAndMetadataKey =
Compiled((workflowExecutionUuid: Rep[String], metadataKey: Rep[String]) =>
(for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ trait MetadataSqlDatabase extends SqlDatabase {
labelMetadataKey: String
)(implicit ec: ExecutionContext): Future[Unit]

def metadataEntryExists(workflowExecutionUuid: String)(implicit ec: ExecutionContext): Future[Boolean]

def metadataSummaryEntryExists(workflowExecutionUuid: String)(implicit ec: ExecutionContext): Future[Boolean]

def queryMetadataEntries(workflowExecutionUuid: String, timeout: Duration)(implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ trait CromwellApiService
} ~
path("workflows" / Segment / Segment / "timing") { (_, possibleWorkflowId) =>
instrumentRequest {
onComplete(validateWorkflowIdInMetadata(possibleWorkflowId, serviceRegistryActor)) {
onComplete(validateWorkflowIdInMetadataSummaries(possibleWorkflowId, serviceRegistryActor)) {
case Success(workflowId) => completeTimingRouteResponse(metadataLookupForTimingRoute(workflowId))
case Failure(e: UnrecognizedWorkflowException) => e.failRequest(StatusCodes.NotFound)
case Failure(e: InvalidWorkflowException) => e.failRequest(StatusCodes.BadRequest)
Expand Down Expand Up @@ -159,7 +159,7 @@ trait CromwellApiService
path("workflows" / Segment / Segment / "releaseHold") { (_, possibleWorkflowId) =>
post {
instrumentRequest {
val response = validateWorkflowIdInMetadata(possibleWorkflowId, serviceRegistryActor) flatMap {
val response = validateWorkflowIdInMetadataSummaries(possibleWorkflowId, serviceRegistryActor) flatMap {
workflowId =>
workflowStoreActor
.ask(WorkflowStoreActor.WorkflowOnHoldToSubmittedCommand(workflowId))
Expand Down Expand Up @@ -319,20 +319,6 @@ object CromwellApiService {
case e: Exception => e.errorRequest(StatusCodes.InternalServerError)
}

def validateWorkflowIdInMetadata(possibleWorkflowId: String, serviceRegistryActor: ActorRef)(implicit
timeout: Timeout,
executor: ExecutionContext
): Future[WorkflowId] =
Try(WorkflowId.fromString(possibleWorkflowId)) match {
case Success(w) =>
serviceRegistryActor.ask(ValidateWorkflowIdInMetadata(w)).mapTo[WorkflowValidationResponse] flatMap {
case RecognizedWorkflowId => Future.successful(w)
case UnrecognizedWorkflowId => validateWorkflowIdInMetadataSummaries(possibleWorkflowId, serviceRegistryActor)
case FailedToCheckWorkflowId(t) => Future.failed(t)
}
case Failure(_) => Future.failed(InvalidWorkflowException(possibleWorkflowId))
}

def validateWorkflowIdInMetadataSummaries(possibleWorkflowId: String, serviceRegistryActor: ActorRef)(implicit
timeout: Timeout,
executor: ExecutionContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ object MetadataRouteSupport {
case FailedToGetArchiveStatusAndEndTime(e) => Future.failed(e)
}

validateWorkflowIdInMetadata(possibleWorkflowId, serviceRegistryActor) flatMap { id =>
validateWorkflowIdInMetadataSummaries(possibleWorkflowId, serviceRegistryActor) flatMap { id =>
/*
for requests made to one of /metadata, /logs or /outputs endpoints, perform an additional check to see
if metadata for the workflow has been archived and deleted or not (as they interact with metadata table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import cromwell.services.{FailedMetadataJsonResponse, SuccessfulMetadataJsonResp
import cromwell.webservice.PartialWorkflowSources
import cromwell.webservice.WebServiceUtils.{completeResponse, materializeFormData, EnhancedThrowable}
import cromwell.webservice.routes.CromwellApiService
import cromwell.webservice.routes.CromwellApiService.{validateWorkflowIdInMetadata, UnrecognizedWorkflowException}
import cromwell.webservice.routes.CromwellApiService.{
validateWorkflowIdInMetadataSummaries,
UnrecognizedWorkflowException
}
import cromwell.webservice.routes.MetadataRouteSupport.{metadataBuilderActorRequest, metadataQueryRequest}
import cromwell.webservice.routes.wes.WesResponseJsonSupport._
import cromwell.webservice.routes.wes.WesRouteSupport.{respondWithWesError, _}
Expand Down Expand Up @@ -94,9 +97,10 @@ trait WesRouteSupport extends HttpInstrumentation {
}
},
path("runs" / Segment / "status") { possibleWorkflowId =>
val response = validateWorkflowIdInMetadata(possibleWorkflowId, serviceRegistryActor).flatMap(w =>
serviceRegistryActor.ask(GetStatus(w)).mapTo[MetadataServiceResponse]
)
val response =
validateWorkflowIdInMetadataSummaries(possibleWorkflowId, serviceRegistryActor).flatMap(w =>
serviceRegistryActor.ask(GetStatus(w)).mapTo[MetadataServiceResponse]
)
// WES can also return a 401 or a 403 but that requires user auth knowledge which Cromwell doesn't currently have
onComplete(response) {
case Success(SuccessfulMetadataJsonResponse(_, jsObject)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,15 +574,13 @@ object CromwellApiServiceSpec {
val SucceededWorkflowId = WorkflowId.fromString("0cb43b8c-0259-4a19-b7fe-921ced326738")
val FailedWorkflowId = WorkflowId.fromString("df501790-cef5-4df7-9b48-8760533e3136")
val SummarizedWorkflowId = WorkflowId.fromString("f0000000-0000-0000-0000-000000000000")
val UnsummarizedWorkflowId = WorkflowId.fromString("00001111-aaaa-bbbb-cccc-ddddffffeeee")
val WorkflowIdExistingOnlyInSummaryTable = WorkflowId.fromString("f0000000-0000-0000-0000-000000000011")
val ArchivedWorkflowId = WorkflowId.fromString("c4c6339c-2145-47fb-acc5-b5cb8d2809f5")
val ArchivedAndDeletedWorkflowId = WorkflowId.fromString("abc1234d-2145-47fb-acc5-b5cb8d2809f5")
val wesWorkflowId = WorkflowId.randomId()
val SummarizedWorkflowIds = Set(
SummarizedWorkflowId,
WorkflowIdExistingOnlyInSummaryTable,
ArchivedWorkflowId,
ArchivedAndDeletedWorkflowId
val UnsummarizedWorkflowIds = Set(
UnsummarizedWorkflowId
)
val RecognizedWorkflowIds = Set(
ExistingWorkflowId,
Expand All @@ -593,6 +591,7 @@ object CromwellApiServiceSpec {
SucceededWorkflowId,
FailedWorkflowId,
SummarizedWorkflowId,
WorkflowIdExistingOnlyInSummaryTable,
ArchivedWorkflowId,
ArchivedAndDeletedWorkflowId,
wesWorkflowId
Expand Down Expand Up @@ -684,11 +683,8 @@ object CromwellApiServiceSpec {
None
)
sender() ! response
case ValidateWorkflowIdInMetadata(id) =>
if (RecognizedWorkflowIds.contains(id)) sender() ! MetadataService.RecognizedWorkflowId
else sender() ! MetadataService.UnrecognizedWorkflowId
case ValidateWorkflowIdInMetadataSummaries(id) =>
if (SummarizedWorkflowIds.contains(id)) sender() ! MetadataService.RecognizedWorkflowId
if (RecognizedWorkflowIds.contains(id)) sender() ! MetadataService.RecognizedWorkflowId
else sender() ! MetadataService.UnrecognizedWorkflowId
case FetchWorkflowMetadataArchiveStatusAndEndTime(id) =>
id match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ class MetadataRouteSupportSpec extends AsyncFlatSpec with ScalatestRouteTest wit
|}
""".stripMargin

val unsummarizedId = CromwellApiServiceSpec.ExistingWorkflowId
val unsummarizedId = CromwellApiServiceSpec.UnsummarizedWorkflowId
Patch(s"/workflows/$version/$unsummarizedId/labels",
HttpEntity(ContentTypes.`application/json`, validLabelsJson)
) ~>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ object MetadataService {
case object RefreshSummary extends MetadataServiceAction
case object SendMetadataTableSizeMetrics extends MetadataServiceAction

final case class ValidateWorkflowIdInMetadata(possibleWorkflowId: WorkflowId) extends MetadataServiceAction
final case class ValidateWorkflowIdInMetadataSummaries(possibleWorkflowId: WorkflowId) extends MetadataServiceAction
final case class FetchWorkflowMetadataArchiveStatusAndEndTime(workflowId: WorkflowId) extends MetadataServiceAction
final case class FetchFailedJobsMetadataWithWorkflowId(workflowId: WorkflowId) extends BuildWorkflowMetadataJsonAction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,6 @@ trait MetadataDatabaseAccess {
_ map { case (id, labelsForId) => WorkflowId.fromString(id) -> labelsForId }
}

def workflowWithIdExistsInMetadata(possibleWorkflowId: String)(implicit ec: ExecutionContext): Future[Boolean] =
metadataDatabaseInterface.metadataEntryExists(possibleWorkflowId)

def workflowWithIdExistsInMetadataSummaries(possibleWorkflowId: String)(implicit
ec: ExecutionContext
): Future[Boolean] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,6 @@ case class MetadataServiceActor(serviceConfig: Config, globalConfig: Config, ser
None
}

private def validateWorkflowIdInMetadata(possibleWorkflowId: WorkflowId, sender: ActorRef): Unit =
workflowWithIdExistsInMetadata(possibleWorkflowId.toString) onComplete {
case Success(true) => sender ! RecognizedWorkflowId
case Success(false) => sender ! UnrecognizedWorkflowId
case Failure(e) =>
sender ! FailedToCheckWorkflowId(
new RuntimeException(s"Failed lookup attempt for workflow ID $possibleWorkflowId", e)
)
}

private def validateWorkflowIdInMetadataSummaries(possibleWorkflowId: WorkflowId, sender: ActorRef): Unit =
workflowWithIdExistsInMetadataSummaries(possibleWorkflowId.toString) onComplete {
case Success(true) => sender ! RecognizedWorkflowId
Expand Down Expand Up @@ -258,7 +248,6 @@ case class MetadataServiceActor(serviceConfig: Config, globalConfig: Config, ser
case action: PutMetadataActionAndRespond => writeActor forward action
// Assume that listen messages are directed to the write metadata actor
case listen: Listen => writeActor forward listen
case v: ValidateWorkflowIdInMetadata => validateWorkflowIdInMetadata(v.possibleWorkflowId, sender())
case v: ValidateWorkflowIdInMetadataSummaries =>
validateWorkflowIdInMetadataSummaries(v.possibleWorkflowId, sender())
case g: FetchWorkflowMetadataArchiveStatusAndEndTime =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,6 @@ class WriteMetadataActorSpec extends TestKitSuite with AnyFlatSpecLike with Matc
Future.failed(WriteMetadataActorSpec.IntermittentException)
}

override def metadataEntryExists(workflowExecutionUuid: String)(implicit ec: ExecutionContext): Nothing =
notImplemented()

override def metadataSummaryEntryExists(workflowExecutionUuid: String)(implicit ec: ExecutionContext): Nothing =
notImplemented()

Expand Down

0 comments on commit c4093a0

Please sign in to comment.