Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make code more Scala 2.13 friendly #172

Merged
merged 13 commits into from
Nov 17, 2019
20 changes: 11 additions & 9 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
language: scala

services:
- rabbitmq

sudo: false

scala:
- 2.12.6
- 2.12.8
- 2.11.12

jdk:
- oraclejdk7
- oraclejdk8
- openjdk7
- openjdk8

# inspired by https://github.com/hashicorp/nomad-scala-sdk/blob/master/.travis.yml
matrix:
exclude:
# We build Scala 2.11 on Java 7, so let's not bother with Java 8
- jdk: oraclejdk8
- jdk: openjdk8
scala: 2.11.12

services:
- rabbitmq

sudo: false
- jdk: openjdk7
scala: 2.12.8

cache:
directories:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package com.spingo.op_rabbit

import java.util

import airbrake.{AirbrakeNoticeBuilder, AirbrakeNotifier}
import com.rabbitmq.client.Envelope
import com.rabbitmq.client.AMQP.BasicProperties
import org.slf4j.LoggerFactory
import com.typesafe.config.ConfigFactory
import scala.collection.JavaConversions._

import scala.collection.JavaConverters._
import scala.language.postfixOps

/**
== BATTERIES NOT INCLUDED ==
Expand Down Expand Up @@ -42,30 +46,36 @@ class AirbrakeLogger(appName: String, airbrakeKey: String, environment: String)
val notice = new AirbrakeNoticeBuilder(airbrakeKey, exception, environment) {
setRequest(s"consumer://$appName/$name", "consume") // it's a faux URL, but reduces weirdness when clicking in airbrake webapp

val headerProperties: Map[String, String] = Option(properties.getHeaders) map { _.map { case (k,v) => (s"HEADER:$k", v.toString) }.toMap } getOrElse Map.empty
val headerProperties: Map[String, String] = Option(properties.getHeaders.asScala) map { _.map { case (k,v) => (s"HEADER:$k", v.toString) }.toMap } getOrElse Map.empty

session(
Map("consumerTag" -> consumerTag))
asJavaMap(Map("consumerTag" -> consumerTag)))

request(
Map(
asJavaMap(Map(
"body" -> bodyAsString(body, properties),
"deliveryTag" -> envelope.getDeliveryTag.toString,
"redeliver" -> envelope.isRedeliver.toString,
"exchange" -> envelope.getExchange,
"routingKey" -> envelope.getRoutingKey) ++ headerProperties)
"routingKey" -> envelope.getRoutingKey) ++ headerProperties))

environment(
Map(
asJavaMap(Map(
"host" -> java.net.InetAddress.getLocalHost.getHostName,
"consumer" -> name))
"consumer" -> name)))

projectRoot(appName)
}
new AirbrakeNotifier().notify(notice.newNotice())
} catch {
case e: Throwable => log.error("Unable to send airbrake notification for error", e)
}

private def asJavaMap(map: Map[String, String]): java.util.Map[String, Object] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this method needed? Isn't it sufficient just to call "Map().asJava" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asJava was returning Map<String, String> when the Airbrake APIs require Map<String, Object> so I thought it was better to create a function to produce maps of the required type

val hmap = new util.HashMap[String, Object]()
hmap.putAll(map.asJava)
hmap
}
}

/**
Expand Down
22 changes: 11 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import java.util.Properties

val json4sVersion = "3.5.3"
val circeVersion = "0.9.0"
val akkaVersion = "2.5.9"
val playVersion = "2.6.10"
val json4sVersion = "3.6.6"
val circeVersion = "0.12.0-M3"
val akkaVersion = "2.5.23"
val playVersion = "2.7.4"

val appProperties = {
val prop = new Properties()
Expand All @@ -16,15 +16,15 @@ val assertNoApplicationConf = taskKey[Unit]("Makes sure application.conf isn't p
val commonSettings = Seq(
organization := "com.spingo",
version := appProperties.getProperty("version"),
scalaVersion := "2.12.6",
crossScalaVersions := Seq("2.12.6", "2.11.12"),
scalaVersion := "2.12.8",
crossScalaVersions := Seq("2.12.8", "2.11.12", "2.13.0"),
libraryDependencies ++= Seq(
"com.chuusai" %% "shapeless" % "2.3.3",
"com.typesafe" % "config" % "1.3.2",
"com.newmotion" %% "akka-rabbitmq" % "5.0.0",
"org.slf4j" % "slf4j-api" % "1.7.25",
"com.typesafe" % "config" % "1.3.4",
"com.newmotion" %% "akka-rabbitmq" % "5.1.1",
"org.slf4j" % "slf4j-api" % "1.7.26",
"ch.qos.logback" % "logback-classic" % "1.2.3" % "test",
"org.scalatest" %% "scalatest" % "3.0.4" % "test",
"org.scalatest" %% "scalatest" % "3.0.8" % "test",
"com.spingo" %% "scoped-fixtures" % "2.0.0" % "test",
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
Expand Down Expand Up @@ -110,7 +110,7 @@ lazy val `spray-json` = (project in file("./addons/spray-json")).
settings(commonSettings: _*).
settings(
name := "op-rabbit-spray-json",
libraryDependencies += "io.spray" %% "spray-json" % "1.3.4").
libraryDependencies += "io.spray" %% "spray-json" % "1.3.5").
dependsOn(core)

lazy val airbrake = (project in file("./addons/airbrake/")).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ object ConnectionParams {
}

private def readHosts(config: Config): Seq[String] = {
Try(config.getStringList(HostsParamPath).asScala).getOrElse(readCommaSeparatedHosts(config))
Try(config.getStringList(HostsParamPath).asScala.toSeq).getOrElse(readCommaSeparatedHosts(config))
}

private def readCommaSeparatedHosts(config: Config): Seq[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private [op_rabbit] class SubscriptionActor(

initialized.tryFailure(new RuntimeException("Subscription stopped before it had a chance to initialize"))
closed.tryComplete(
payload.shutdownCause.map(Failure(_)).getOrElse(Success(Unit))
payload.shutdownCause.map(Failure(_)).getOrElse(Success(()))
)
stop()
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/com/spingo/op_rabbit/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ package object op_rabbit {
type Directive1[T] = Directive[::[T, HNil]]
type Deserialized[T] = Either[Rejection.ExtractRejection, T]

protected val futureUnit: Future[Unit] = Future.successful(Unit)
protected val futureUnit: Future[Unit] = Future.successful(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object HeaderValue {
val serializable = value
}
case class MapHeaderValue(value: Map[String, HeaderValue]) extends HeaderValue {
lazy val serializable = value.mapValues(_.serializable).asJava
lazy val serializable = value.mapValues(_.serializable).toMap.asJava
override def asString(sourceCharset: Charset) = {
val b = new StringBuilder()
b += '{'
Expand Down Expand Up @@ -119,9 +119,9 @@ object HeaderValue {
implicit val convertFromBoolean : ToHeaderValue[Boolean , BooleanHeaderValue] = BooleanHeaderValue(_)
implicit val convertFromJavaBoolean : ToHeaderValue[java.lang.Boolean , BooleanHeaderValue] = BooleanHeaderValue(_)
implicit val convertFromByteArray : ToHeaderValue[Array[Byte] , ByteArrayHeaderValue] = ByteArrayHeaderValue(_)
implicit def convertFromMap[T](implicit converter: ToHeaderValue[T, HeaderValue]): ToHeaderValue[Map[String, T], MapHeaderValue] = { m => MapHeaderValue(m.mapValues(converter)) }
implicit def convertFromMap[T](implicit converter: ToHeaderValue[T, HeaderValue]): ToHeaderValue[Map[String, T], MapHeaderValue] = { m => MapHeaderValue(m.mapValues(converter).toMap) }
implicit def convertFromSeq[T](implicit converter: ToHeaderValue[T, HeaderValue]): ToHeaderValue[Seq[T], SeqHeaderValue] = { s => SeqHeaderValue(s.map(converter)) }
implicit def convertFromJavaList[T](implicit converter: ToHeaderValue[T, HeaderValue]): ToHeaderValue[java.util.List[T], SeqHeaderValue] = { list => SeqHeaderValue(list.asScala.map(converter)) }
implicit def convertFromJavaList[T](implicit converter: ToHeaderValue[T, HeaderValue]): ToHeaderValue[java.util.List[T], SeqHeaderValue] = { list => SeqHeaderValue(list.asScala.toSeq.map(converter)) }

def apply[T](value: T)(implicit converter: ToHeaderValue[T, HeaderValue]): HeaderValue =
if (value == null) NullHeaderValue else converter(value)
Expand All @@ -148,7 +148,7 @@ object HeaderValue {
case v: Array[Byte] => apply(v)
case null => NullHeaderValue
case v: java.util.List[_] =>
SeqHeaderValue(v.asScala.map { case v: Object => from(v) })
SeqHeaderValue(v.asScala.toSeq.map { case v: Object => from(v) })
case v: Array[Object] =>
SeqHeaderValue(v.map(from))
case otherwise =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.spingo.op_rabbit

import com.rabbitmq.client.Address
import com.rabbitmq.client.{Address, ConnectionFactory}
import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
import org.scalatest.{FunSpec, Matchers}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

class ConnectionParamsUriSpec extends FunSpec with Matchers {
private val defaultConfig = ConfigFactory.load()
Expand All @@ -25,7 +25,7 @@ class ConnectionParamsUriSpec extends FunSpec with Matchers {

describe("URI configuration parameters") {
it("compose single host configuration") {
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(Map("uri" -> "amqp://user:secret@localhost:5672/vhost")))
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(Map("uri" -> "amqp://user:secret@localhost:5672/vhost").asJava))
val params = ConnectionParams.fromConfig(config.getConfig(connectionPath))

params.hosts should contain theSameElementsAs Seq(new Address("localhost", 5672))
Expand All @@ -37,7 +37,7 @@ class ConnectionParamsUriSpec extends FunSpec with Matchers {
}

it("compose single host configuration with default credentials") {
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(Map("uri" -> "amqp://localhost:5672/vhost")))
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(Map("uri" -> "amqp://localhost:5672/vhost").asJava))
val params = ConnectionParams.fromConfig(config.getConfig(connectionPath))

params.hosts should contain theSameElementsAs Seq(new Address("localhost", 5672))
Expand All @@ -49,7 +49,7 @@ class ConnectionParamsUriSpec extends FunSpec with Matchers {
}

it("compose multiple host configuration") {
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(Map("uri" -> "amqp://user:secret@localhost:5672,127.0.0.1:5672/vhost")))
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(Map("uri" -> "amqp://user:secret@localhost:5672,127.0.0.1:5672/vhost").asJava))
val params = ConnectionParams.fromConfig(config.getConfig(connectionPath))

params.hosts should contain theSameElementsAs Seq(new Address("localhost", 5672), new Address("127.0.0.1", 5672))
Expand All @@ -61,7 +61,7 @@ class ConnectionParamsUriSpec extends FunSpec with Matchers {
}

it("compose multiple host configuration with default credentials") {
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(Map("uri" -> "amqp://localhost:5672,127.0.0.1:5672/vhost")))
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(Map("uri" -> "amqp://localhost:5672,127.0.0.1:5672/vhost").asJava))
val params = ConnectionParams.fromConfig(config.getConfig(connectionPath))

params.hosts should contain theSameElementsAs Seq(new Address("localhost", 5672), new Address("127.0.0.1", 5672))
Expand All @@ -73,7 +73,7 @@ class ConnectionParamsUriSpec extends FunSpec with Matchers {
}

it("compose multiple host configuration with TLS protection") {
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(Map("uri" -> "amqps://user:secret@localhost:5672,127.0.0.1:5672/vhost")))
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(Map("uri" -> "amqps://user:secret@localhost:5672,127.0.0.1:5672/vhost").asJava))
val params = ConnectionParams.fromConfig(config.getConfig(connectionPath))

params.hosts should contain theSameElementsAs Seq(new Address("localhost", 5672), new Address("127.0.0.1", 5672))
Expand All @@ -83,11 +83,12 @@ class ConnectionParamsUriSpec extends FunSpec with Matchers {
params.virtualHost should equal("vhost")
params.connectionTimeout should equal(10000)
params.requestedHeartbeat should equal(60)
params.requestedChannelMax should equal(0)
params.requestedChannelMax should equal(ConnectionFactory.DEFAULT_CHANNEL_MAX)
}

it("compose multiple host configuration with TLS protection and additional URL parameters") {
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(Map("uri" -> "amqps://user:secret@localhost:5672,127.0.0.1:5672/vhost?connection_timeout=20000&heartbeat=30&channel_max=2&auth_mechanism=external")))
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(
Map("uri" -> "amqps://user:secret@localhost:5672,127.0.0.1:5672/vhost?connection_timeout=20000&heartbeat=30&channel_max=2&auth_mechanism=external").asJava))
val params = ConnectionParams.fromConfig(config.getConfig(connectionPath))

params.hosts should contain theSameElementsAs Seq(new Address("localhost", 5672), new Address("127.0.0.1", 5672))
Expand All @@ -103,31 +104,34 @@ class ConnectionParamsUriSpec extends FunSpec with Matchers {

it("failed to compose multiple host configuration with TLS protection and additional URL parameters cause unsupported parameter specified") {
val e = intercept[IllegalArgumentException] {
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(Map("uri" -> "amqps://user:secret@localhost:5672,127.0.0.1:5672/vhost?connection_timeout=20000&heartbeat=30&channel_max=2&auth_mechanism=external&unsupported_parameter=1")))
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(
Map("uri" -> "amqps://user:secret@localhost:5672,127.0.0.1:5672/vhost?connection_timeout=20000&heartbeat=30&channel_max=2&auth_mechanism=external&unsupported_parameter=1").asJava))
ConnectionParams.fromConfig(config.getConfig(connectionPath))
}
e.getMessage should equal("The URL parameter [unsupported_parameter] is not supported")
}

it("failed to compose multiple host configuration with TLS protection and additional URL parameters cause parameter value has incorrect format") {
val e = intercept[IllegalArgumentException] {
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(Map("uri" -> "amqps://user:secret@localhost:5672,127.0.0.1:5672/vhost?connection_timeout=20000&heartbeat=30&channel_max=two&auth_mechanism=external")))
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(
Map("uri" -> "amqps://user:secret@localhost:5672,127.0.0.1:5672/vhost?connection_timeout=20000&heartbeat=30&channel_max=two&auth_mechanism=external").asJava))
ConnectionParams.fromConfig(config.getConfig(connectionPath))
}
e.getMessage should equal("The URL parameter [channel_max] value should be integer")
}

it("failed to compose multiple host configuration with TLS protection and additional URL parameters cause auth mechanism incorrect value") {
val e = intercept[IllegalArgumentException] {
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(Map("uri" -> "amqps://user:secret@localhost:5672,127.0.0.1:5672/vhost?connection_timeout=20000&heartbeat=30&channel_max=2&auth_mechanism=custom")))
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(
Map("uri" -> "amqps://user:secret@localhost:5672,127.0.0.1:5672/vhost?connection_timeout=20000&heartbeat=30&channel_max=2&auth_mechanism=custom").asJava))
ConnectionParams.fromConfig(config.getConfig(connectionPath))
}
e.getMessage should equal("The URL parameter [auth_mechanism] supports PLAIN or EXTERNAL values only")
}

it("failed to compose multiple host configuration partial credentials") {
val e = intercept[IllegalArgumentException] {
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(Map("uri" -> "amqps://user@localhost:5672,127.0.0.1:5672/vhost")))
val config = defaultConfig.withValue(connectionPath, ConfigValueFactory.fromMap(Map("uri" -> "amqps://user@localhost:5672,127.0.0.1:5672/vhost").asJava))
ConnectionParams.fromConfig(config.getConfig(connectionPath))
}
e.getMessage should equal("The URL authority should contains user and password")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import com.rabbitmq.client.Channel
import com.spingo.op_rabbit.RabbitControl
import com.spingo.op_rabbit.{MessageForPublicationLike, RabbitMarshaller, RabbitUnmarshaller}
import com.spingo.scoped_fixtures.ScopedFixtures

import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration._
import scala.language.postfixOps

trait RabbitTestHelpers extends ScopedFixtures {
implicit val timeout = Timeout(5 seconds)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package com.spingo.op_rabbit.properties

import org.scalatest.{FunSpec, Matchers}
import com.spingo.op_rabbit.properties._
import com.rabbitmq.client.impl.LongStringHelper
import com.rabbitmq.client.AMQP.BasicProperties.Builder
import com.rabbitmq.client.AMQP.BasicProperties

import scala.language.postfixOps

class PropertiesSpec extends FunSpec with Matchers {
describe("Setting properties") {
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.1.5
sbt.version=1.2.8