Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.http4s._
import org.http4s.client.Client
import org.http4s.client.middleware.{Logger, Retry, RetryPolicy}
import org.http4s.headers.Authorization
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.ember.client.EmberClientBuilder

import scala.concurrent.duration._

Expand All @@ -30,9 +30,10 @@ object GeneratedLeonardoClient {

val client: Resource[IO, Client[IO]] =
for {
blockingEc <- ExecutionContexts.cachedThreadPool[IO]
retryPolicy = RetryPolicy[IO](RetryPolicy.exponentialBackoff(30 seconds, 5))
client <- BlazeClientBuilder[IO](blockingEc).resource.map(c => Retry(retryPolicy)(c))
client <- EmberClientBuilder
.default[IO]
.build
.map(c => Retry(RetryPolicy[IO](RetryPolicy.exponentialBackoff(30 seconds, 5)))(c))
} yield Logger[IO](logHeaders = true, logBody = true)(client)

def runtimeInStateOrError(status: ClusterStatus): DoneCheckable[GetRuntimeResponse] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ import org.broadinstitute.dsde.workbench.leonardo.http.DiskRoutesTestJsonCodec._
import org.broadinstitute.dsde.workbench.leonardo.http.RuntimeRoutesTestJsonCodec._
import org.broadinstitute.dsde.workbench.leonardo.http._
import org.broadinstitute.dsde.workbench.model.google.GoogleProject
import org.broadinstitute.dsde.workbench.util2.ExecutionContexts
import org.http4s._
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.circe.CirceEntityEncoder._
import org.http4s.circe.CirceEntityCodec._
import org.http4s.client.Client
import org.http4s.client.middleware.{Logger, Retry, RetryPolicy}
import org.http4s.ember.client.EmberClientBuilder
import org.http4s.headers._
import org.typelevel.log4cats.StructuredLogger
import org.typelevel.log4cats.slf4j.Slf4jLogger
Expand Down Expand Up @@ -53,9 +52,10 @@ object LeonardoApiClient {

val client: Resource[IO, Client[IO]] =
for {
blockingEc <- ExecutionContexts.cachedThreadPool[IO]
retryPolicy = RetryPolicy[IO](RetryPolicy.exponentialBackoff(30 seconds, 5))
client <- BlazeClientBuilder[IO](blockingEc).resource.map(c => Retry(retryPolicy)(c))
client <- EmberClientBuilder
.default[IO]
.build
.map(c => Retry(RetryPolicy[IO](RetryPolicy.exponentialBackoff(30 seconds, 5)))(c))
} yield Logger[IO](logHeaders = true, logBody = true)(client)

val defaultCreateRequestZone = ZoneName("us-east1-b")
Expand Down Expand Up @@ -149,9 +149,8 @@ object LeonardoApiClient {
headers = Headers(authHeader, defaultMediaType, traceIdHeader),
uri = rootUri.withPath(
Uri.Path.unsafeFromString(s"/api/google/v1/runtimes/${googleProject.value}/${runtimeName.asString}")
),
entity = createRuntime2Request
)
)
).withEntity(createRuntime2Request)
)
.use { resp =>
if (!resp.status.isSuccess) {
Expand Down Expand Up @@ -222,9 +221,8 @@ object LeonardoApiClient {
headers = Headers(authHeader, defaultMediaType, traceIdHeader),
uri = rootUri.withPath(
Uri.Path.unsafeFromString(s"/api/google/v1/runtimes/${googleProject.value}/${runtimeName.asString}")
),
entity = req
)
)
).withEntity(req)
)
.use { resp =>
if (!resp.status.isSuccess) {
Expand All @@ -235,7 +233,7 @@ object LeonardoApiClient {
}
} yield r

// This line causes the body to be decoded as JSON, which will prevent error messagges from being seen
// This line causes the body to be decoded as JSON, which will prevent error messages from being seen
// If you care about the error message, place the function before this line
import org.http4s.circe.CirceEntityDecoder._

Expand Down Expand Up @@ -364,9 +362,8 @@ object LeonardoApiClient {
method = Method.POST,
headers = Headers(authHeader, defaultMediaType, traceIdHeader),
uri = rootUri
.withPath(Uri.Path.unsafeFromString(s"/api/google/v1/disks/${googleProject.value}/${diskName.value}")),
entity = createDiskRequest
)
.withPath(Uri.Path.unsafeFromString(s"/api/google/v1/disks/${googleProject.value}/${diskName.value}"))
).withEntity(createDiskRequest)
)
.use { resp =>
if (!resp.status.isSuccess) {
Expand All @@ -391,9 +388,8 @@ object LeonardoApiClient {
method = Method.PATCH,
headers = Headers(authHeader, defaultMediaType, traceIdHeader),
uri = rootUri
.withPath(Uri.Path.unsafeFromString(s"/api/google/v1/disks/${googleProject.value}/${diskName.value}")),
entity = req
)
.withPath(Uri.Path.unsafeFromString(s"/api/google/v1/disks/${googleProject.value}/${diskName.value}"))
).withEntity(req)
)
.use { resp =>
if (!resp.status.isSuccess) {
Expand Down Expand Up @@ -519,9 +515,8 @@ object LeonardoApiClient {
method = Method.POST,
headers = Headers(authHeader, defaultMediaType, traceIdHeader),
uri = rootUri
.withPath(Uri.Path.unsafeFromString(s"/api/google/v1/apps/${googleProject.value}/${appName.value}")),
entity = createAppRequest
)
.withPath(Uri.Path.unsafeFromString(s"/api/google/v1/apps/${googleProject.value}/${appName.value}"))
).withEntity(createAppRequest)
)
.use { resp =>
if (resp.status.isSuccess)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package org.broadinstitute.dsde.workbench.leonardo

import cats.effect.{IO, Ref}
import cats.implicits._
import com.comcast.ip4s.IpLiteralSyntax
import fs2._
import fs2.io.net.Network
import org.broadinstitute.dsde.workbench.leonardo.BillingProjectFixtureSpec.proxyRedirectServerPortKey
import org.http4s.blaze.server.BlazeServerBuilder
import org.http4s.ember.server.EmberServerBuilder
import org.http4s.client.Client
import org.http4s.dsl.io._
import org.http4s.headers.{`Content-Type`, Referer}
Expand Down Expand Up @@ -34,7 +36,7 @@ object ProxyRedirectClient {
def startServer(): IO[Int] =
for {
serverAndShutDown <- ProxyRedirectClient.server
port = serverAndShutDown._1.address.port.value
port = serverAndShutDown._1.address.getPort
_ <- serverRef.modify(mp => (mp, mp + (port -> serverAndShutDown)))
} yield port

Expand Down Expand Up @@ -87,16 +89,22 @@ object ProxyRedirectClient {
.intersperse("\n")
.through(text.utf8.encode)

private def server: IO[(Server, IO[Unit])] = {
private def server(implicit network: Network[IO]): IO[(Server, IO[Unit])] = {
val route = HttpRoutes
.of[IO] { case GET -> Root / "proxyRedirectClient" :? Rurl(rurl) =>
Ok(getContent(rurl), `Content-Type`(MediaType.text.html))
}
.orNotFound
for {
// Note this uses `bindAny` which will bind to an arbitrary port. We can't use a dedicated port
// Note this uses port 0, which will bind to an arbitrary port. We can't use a dedicated port
// because multiple test suites may be running on the same host in different class loaders.
server <- BlazeServerBuilder[IO].bindAny("0.0.0.0").withHttpApp(route).resource.allocated
server <- EmberServerBuilder
.default[IO]
.withHost(ipv4"0.0.0.0")
.withPort(port"0")
.withHttpApp(route)
.build
.allocated
} yield server
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.broadinstitute.dsde.workbench.leonardo
package dao

import _root_.fs2._
import _root_.io.circe._
import _root_.org.typelevel.log4cats.StructuredLogger
import akka.http.scaladsl.model.StatusCode._
Expand Down Expand Up @@ -69,9 +68,8 @@ class HttpSamDAO[F[_]](httpClient: Client[F],
Request[F](
method = Method.POST,
uri = config.samUri.withPath(Uri.Path.unsafeFromString(s"/register/user/v2/self")),
entity = Entity.strict(Chunk.array("app.terra.bio/#terms-of-service".getBytes(UTF_8)).toByteVector),
headers = Headers(leoToken)
)
).withEntity("app.terra.bio/#terms-of-service")
)
.whenA(!isRegistered.enabled)
} yield ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import com.github.benmanes.caffeine.cache.Caffeine
import com.google.api.gax.longrunning.OperationFuture
import com.google.cloud.compute.v1.Operation
import fs2.Stream
import fs2.io.net.Network
import com.comcast.ip4s.{Ipv4Address => Ip4sIpv4Address}
import io.kubernetes.client.openapi.ApiClient
import org.broadinstitute.dsde.workbench.google2.GKEModels.KubernetesClusterId
import org.broadinstitute.dsde.workbench.google2.{GooglePublisher, GoogleSubscriber}
Expand Down Expand Up @@ -46,19 +48,20 @@ import org.broadinstitute.dsde.workbench.openTelemetry.OpenTelemetryMetrics
import org.broadinstitute.dsde.workbench.util2.messaging.{CloudPublisher, CloudSubscriber, ReceivedMessage}
import org.broadinstitute.dsp.HelmInterpreter
import org.http4s.Request
import org.http4s.blaze.client
import org.http4s.client.RequestKey
import org.http4s.client.middleware.{Logger => Http4sLogger, Metrics, Retry, RetryPolicy}
import org.typelevel.log4cats.StructuredLogger
import scalacache.Cache
import scalacache.caffeine.CaffeineCache
import org.http4s.Uri.Ipv4Address

import java.net.{InetSocketAddress, SocketException}
import java.time.Instant
import java.util.concurrent.TimeUnit
import javax.net.ssl.SSLContext
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import fs2.io.net.tls.TLSContext

/**
* This class builds the baseline dependencies for the Leo App.
Expand All @@ -72,6 +75,7 @@ class BaselineDependenciesBuilder {
)(implicit
logger: StructuredLogger[F],
F: Async[F],
network: Network[F],
ec: ExecutionContext,
as: ActorSystem,
dbRef: DbReference[F],
Expand Down Expand Up @@ -294,14 +298,12 @@ class BaselineDependenciesBuilder {
.recordStats()
.build[K, V]()

private def buildHttpClient[F[_]: Async: StructuredLogger](
private def buildHttpClient[F[_]: Async: StructuredLogger: Network](
sslContext: SSLContext,
dnsResolver: RequestKey => Either[Throwable, InetSocketAddress],
metricsPrefix: Option[String],
withRetry: Boolean
): Resource[F, org.http4s.client.Client[F]] = {
// Retry all SocketExceptions to deal with pooled HTTP connections getting closed.
// See https://broadworkbench.atlassian.net/browse/IA-4069.
val retryPolicy = RetryPolicy[F](
RetryPolicy.exponentialBackoff(30 seconds, 5),
(req, result) =>
Expand All @@ -311,24 +313,55 @@ class BaselineDependenciesBuilder {
}
)

val tlsContext = TLSContext.Builder.forAsync[F].fromSSLContext(sslContext)

// Middleware to apply custom DNS resolution
val dnsMiddleware: org.http4s.client.Client[F] => org.http4s.client.Client[F] = { client =>
org.http4s.client.Client[F] { req =>
val resolvedReq = req.uri.host match {
case Some(host) =>
val requestKey = RequestKey.fromRequest(req)
dnsResolver(requestKey) match {
case Right(addr) =>
// Use ip4s conversion and wrap with org.http4s.Uri.Ipv4Address
val maybeHost: Option[org.http4s.Uri.Host] = addr.getAddress match {
case inet4: java.net.Inet4Address =>
val ip4sAddr = Ip4sIpv4Address.fromInet4Address(inet4)
Some(Ipv4Address(ip4sAddr))
case _ => None
}
val newUri = req.uri.copy(
authority = req.uri.authority.map(a => a.copy(host = maybeHost.getOrElse(a.host)))
)
req.withUri(newUri).putHeaders(org.http4s.headers.Host(host.renderString))
case Left(err) =>
StructuredLogger[F].warn(err)(s"DNS resolution failed for ${host.renderString}, using default")
req
}
case None => req
}
client.run(resolvedReq)
}
}

for {
httpClient <- client
.BlazeClientBuilder[F]
.withSslContext(sslContext)
// Note a custom resolver is needed for making requests through the Leo proxy
// (for example HttpJupyterDAO). Otherwise the proxyResolver falls back to default
// hostname resolution, so it's okay to use for all clients.
.withCustomDnsResolver(dnsResolver)
.withConnectTimeout(30 seconds)
.withRequestTimeout(60 seconds)
.withMaxTotalConnections(100)
.withMaxWaitQueueLimit(1024)
.withMaxIdleDuration(30 seconds)
.resource
httpClient <- org.http4s.ember.client.EmberClientBuilder.default
.withTLSContext(tlsContext)
.withTimeout(60 seconds)
.withMaxTotal(100)
.withIdleConnectionTime(30 seconds)
.build
.map { baseClient =>
// Wrap the client with DNS middleware to produce a client that resolves per-request
dnsMiddleware(baseClient)
}

httpClientWithLogging = Http4sLogger[F](logHeaders = true, logBody = false, logAction = Some(s => logAction(s)))(
httpClient
)

clientWithRetry = if (withRetry) Retry(retryPolicy)(httpClientWithLogging) else httpClientWithLogging

finalClient <- metricsPrefix match {
case None => Resource.pure[F, org.http4s.client.Client[F]](clientWithRetry)
case Some(prefix) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.http4s.client.Client
import org.http4s._
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.ember.client.EmberClientBuilder
import org.broadinstitute.dsde.workbench.leonardo.TestUtils.appContext
import scala.concurrent.ExecutionContext.global

Expand Down Expand Up @@ -86,10 +86,14 @@ class HTTPAppDescriptorDAOSpec extends AnyFlatSpec with Matchers with BeforeAndA

// allows http retrieval
def withAppDescriptorDAO(testCode: HttpAppDescriptorDAO[IO] => Any): Unit = {
val daoResource = for {
client <- BlazeClientBuilder[IO](global).resource
clientWithLogging = Logger[IO](logHeaders = true, logBody = false)(client)
} yield new HttpAppDescriptorDAO[IO](clientWithLogging)
val daoResource: cats.effect.Resource[IO, HttpAppDescriptorDAO[IO]] =
EmberClientBuilder
.default[IO]
.build
.map { client =>
val clientWithLogging = Logger[IO](logHeaders = true, logBody = false)(client)
new HttpAppDescriptorDAO[IO](clientWithLogging)
}

daoResource.use(dao => IO(testCode(dao))).unsafeRunSync()(cats.effect.unsafe.IORuntime.global)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.broadinstitute.dsde.workbench.leonardo.ContainerRegistry.{DockerHub,
import org.broadinstitute.dsde.workbench.leonardo.RuntimeImageType.{Jupyter, RStudio}
import org.broadinstitute.dsde.workbench.leonardo.dao.HttpDockerDAO._
import org.broadinstitute.dsde.workbench.leonardo.model.InvalidImage
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.ember.client.EmberClientBuilder
import org.http4s.client.middleware.Logger
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpec
Expand Down Expand Up @@ -64,11 +64,14 @@ class HttpDockerDAOSpec extends AnyFlatSpec with Matchers with BeforeAndAfterAll
)

def withDockerDAO(testCode: HttpDockerDAO[IO] => Any): Unit = {
val dockerDAOResource = for {
client <- BlazeClientBuilder[IO](scala.concurrent.ExecutionContext.global).resource
clientWithLogging = Logger[IO](logHeaders = true, logBody = false)(client)
dockerDAO = HttpDockerDAO[IO](clientWithLogging)
} yield dockerDAO
val dockerDAOResource: cats.effect.Resource[IO, HttpDockerDAO[IO]] =
EmberClientBuilder
.default[IO]
.build
.map { client =>
val clientWithLogging = Logger[IO](logHeaders = true, logBody = false)(client)
HttpDockerDAO[IO](clientWithLogging)
}

dockerDAOResource.use(dao => IO(testCode(dao))).unsafeRunSync()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ class AppDependenciesBuilderSpec

private def createBaselineDependenciesBuilderMock = {
val baselineDependenciesBuilder = mock[BaselineDependenciesBuilder]
when(baselineDependenciesBuilder.createBaselineDependencies[IO]()(any(), any(), any(), any(), any(), any(), any()))
when(
baselineDependenciesBuilder
.createBaselineDependencies[IO]()(any(), any(), any(), any(), any(), any(), any(), any())
)
.thenReturn(Resource.pure[IO, BaselineDependencies[IO]](baselineDependenciesMock))

baselineDependenciesBuilder
Expand Down
Loading
Loading