Skip to content

Commit 884c976

Browse files
committed
Fixes a compilation error under jdk11; updates unit tests to work with docker-compose.yaml docker couchbase instance; updates couchbase_prep compose service to create required collection
1 parent 228f14b commit 884c976

File tree

10 files changed

+34
-27
lines changed

10 files changed

+34
-27
lines changed

couchbase/src/main/scala/akka/stream/alpakka/couchbase/impl/CouchbaseCollectionSessionImpl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44

55
package akka.stream.alpakka.couchbase.impl
66

7-
import akka.{Done, NotUsed}
87
import akka.annotation.InternalApi
98
import akka.stream.alpakka.couchbase.CouchbaseDocument
109
import akka.stream.alpakka.couchbase.scaladsl.{CouchbaseCollectionSession, CouchbaseSession}
1110
import akka.stream.scaladsl.Source
11+
import akka.{Done, NotUsed}
1212
import com.couchbase.client.java.codec.{RawBinaryTranscoder, RawStringTranscoder, Transcoder}
1313
import com.couchbase.client.java.json.JsonValue
1414
import com.couchbase.client.java.kv._
@@ -18,8 +18,8 @@ import rx.{Observable, RxReactiveStreams}
1818

1919
import java.util
2020
import java.util.concurrent.TimeUnit
21-
import scala.concurrent.{ExecutionContext, Future}
2221
import scala.concurrent.duration.FiniteDuration
22+
import scala.concurrent.{ExecutionContext, Future}
2323
import scala.jdk.FutureConverters.CompletionStageOps
2424

2525
/**

couchbase/src/main/scala/akka/stream/alpakka/couchbase/impl/CouchbaseCollectionSessionJavaAdapter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44

55
package akka.stream.alpakka.couchbase.impl
66

7-
import akka.{Done, NotUsed}
87
import akka.annotation.InternalApi
98
import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession
109
import akka.stream.alpakka.couchbase.{javadsl, scaladsl, CouchbaseDocument}
1110
import akka.stream.javadsl.Source
11+
import akka.{Done, NotUsed}
1212
import com.couchbase.client.java.json.{JsonArray, JsonObject, JsonValue}
1313
import com.couchbase.client.java.kv.{InsertOptions, RemoveOptions, ReplaceOptions, UpsertOptions}
1414
import com.couchbase.client.java.manager.query.{CreateQueryIndexOptions, QueryIndex}

couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseCollectionSession.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44

55
package akka.stream.alpakka.couchbase.scaladsl
66

7-
import akka.{Done, NotUsed}
87
import akka.annotation.DoNotInherit
98
import akka.stream.alpakka.couchbase.CouchbaseDocument
109
import akka.stream.alpakka.couchbase.impl.CouchbaseCollectionSessionImpl
1110
import akka.stream.scaladsl.Source
11+
import akka.{Done, NotUsed}
1212
import com.couchbase.client.java.json.{JsonArray, JsonObject, JsonValue}
1313
import com.couchbase.client.java.kv.{InsertOptions, RemoveOptions, ReplaceOptions, UpsertOptions}
1414
import com.couchbase.client.java.manager.query.{CreateQueryIndexOptions, QueryIndex}

couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseSession.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import akka.{Done, NotUsed}
1313
import com.couchbase.client.java._
1414
import com.couchbase.client.java.json.JsonObject
1515
import com.couchbase.client.java.query._
16+
import org.slf4j.LoggerFactory
1617

1718
import java.util.concurrent.atomic.AtomicReference
1819
import scala.collection.mutable
@@ -24,6 +25,7 @@ import scala.concurrent.{ExecutionContext, Future}
2425
* @see [[akka.stream.alpakka.couchbase.CouchbaseSessionRegistry]]
2526
*/
2627
object CouchbaseSession {
28+
private val log = LoggerFactory.getLogger(classOf[CouchbaseSession])
2729

2830
/**
2931
* Create a session against the given bucket. The couchbase client used to connect will be created and then closed when
@@ -68,7 +70,12 @@ object CouchbaseSession {
6870
enrichedSettings.nodes.mkString(","),
6971
clusterOptions
7072
)
71-
})
73+
}).andThen(c => {
74+
log.debug("created couchbase cluster client for " + enrichedSettings.username)
75+
}).recover(err => {
76+
log.error("failed to create couchbase cluster", err)
77+
throw err
78+
})
7279
}
7380

7481
}

couchbase/src/test/java/docs/javadsl/CouchbaseExamplesTest.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
import akka.Done;
88
import akka.actor.ActorSystem;
9-
import akka.http.scaladsl.server.util.Tuple;
109
import akka.stream.Materializer;
1110
// #deleteWithResult
1211
import akka.stream.alpakka.couchbase.*;
@@ -16,7 +15,6 @@
1615
import akka.stream.alpakka.couchbase.javadsl.CouchbaseFlow;
1716
import akka.stream.alpakka.couchbase.javadsl.CouchbaseSource;
1817
import akka.stream.alpakka.couchbase.testing.CouchbaseSupportClass;
19-
import akka.stream.alpakka.couchbase.testing.TestObject;
2018
import akka.stream.alpakka.testkit.javadsl.LogCapturingJunit4;
2119
import akka.stream.javadsl.Sink;
2220
import akka.stream.javadsl.Source;
@@ -26,28 +24,21 @@
2624
import com.couchbase.client.java.env.ClusterEnvironment;
2725

2826
import com.couchbase.client.java.json.JsonObject;
29-
import com.couchbase.client.java.json.JsonValue;
3027
import org.junit.*;
3128

32-
import java.time.Duration;
3329
import java.util.ArrayList;
3430
import java.util.Arrays;
3531
import java.util.List;
36-
import java.util.Optional;
3732
import java.util.concurrent.*;
33+
import java.util.stream.Collectors;
3834
// #registry
3935
// #session
4036
import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession;
4137
// #session
4238
// #registry
43-
import java.util.stream.Collectors;
4439
// #sessionFromBucket
45-
import com.couchbase.client.java.Bucket;
4640
// #sessionFromBucket
4741

48-
import scala.Tuple2;
49-
import scala.concurrent.duration.FiniteDuration;
50-
5142
import static org.hamcrest.CoreMatchers.*;
5243
import static org.hamcrest.MatcherAssert.assertThat;
5344
import static org.junit.Assert.assertEquals;
@@ -221,8 +212,8 @@ public void upsertWithResult() throws Exception {
221212
List<CouchbaseWriteFailure> failedDocs =
222213
writeResults.stream()
223214
.filter(CouchbaseWriteResult::isFailure)
224-
.map(res -> (CouchbaseWriteFailure) res)
225-
.toList();
215+
.map(CouchbaseWriteFailure.class::cast)
216+
.collect(Collectors.toUnmodifiableList());
226217
// #upsertDocWithResult
227218

228219
assertThat(writeResults.size(), is(sampleSequence.size()));
@@ -294,7 +285,7 @@ public void replaceWithResult() throws Exception {
294285
writeResults.stream()
295286
.filter(CouchbaseWriteResult::isFailure)
296287
.map(CouchbaseWriteFailure.class::cast)
297-
.toList();
288+
.collect(Collectors.toUnmodifiableList());
298289
// #replaceDocWithResult
299290

300291
assertThat(writeResults.size(), is(list.size()));

couchbase/src/test/resources/discovery.conf

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ alpakka.couchbase {
1111
name = couchbase-service
1212
lookup-timeout = 1 s
1313
}
14-
username = "anotherUser"
15-
password = "differentPassword"
14+
username = "Administrator"
15+
password = "password"
1616
}
1717
}
1818

couchbase/src/test/scala/akka/stream/alpakka/couchbase/testing/CouchbaseSupport.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ trait CouchbaseSupport {
3939
val sessionSettings = CouchbaseSessionSettings(actorSystem)
4040
val bucketName = "akka"
4141
val scopeName = "alpakka"
42-
val collectionName = "karakalpaka"
42+
val collectionName = "alpakka-collection"
4343

4444
var session: CouchbaseSession = _
4545

couchbase/src/test/scala/docs/scaladsl/CouchbaseFlowSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ import akka.stream.alpakka.couchbase.testing.CouchbaseSupport
1010
import akka.stream.alpakka.couchbase.{CouchbaseDeleteFailure, CouchbaseDeleteResult, CouchbaseDocument}
1111
import akka.stream.alpakka.testkit.scaladsl.LogCapturing
1212
import akka.stream.scaladsl.{Sink, Source}
13+
import akka.stream.testkit.scaladsl.StreamTestKit._
1314
import com.couchbase.client.core.error.{DocumentNotFoundException, DurabilityImpossibleException}
1415
import com.couchbase.client.core.msg.kv.DurabilityLevel
16+
import com.couchbase.client.java.codec.RawBinaryTranscoder
1517
import com.couchbase.client.java.json.JsonObject
1618
import com.couchbase.client.java.kv.{RemoveOptions, UpsertOptions}
1719
import org.scalatest._
1820
import org.scalatest.concurrent.ScalaFutures
19-
import akka.stream.testkit.scaladsl.StreamTestKit._
20-
import com.couchbase.client.java.codec.RawBinaryTranscoder
2121

2222
import scala.collection.immutable
2323
import scala.concurrent.Future

couchbase/src/test/scala/docs/scaladsl/CouchbaseSessionExamplesSpec.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,15 @@ class CouchbaseSessionExamplesSpec
2828

2929
override implicit def patienceConfig: PatienceConfig = PatienceConfig(10.seconds, 250.millis)
3030

31-
override def beforeAll(): Unit = super.beforeAll()
32-
override def afterAll(): Unit = super.afterAll()
31+
override def beforeAll(): Unit = {
32+
super.beforeAll()
33+
upsertSampleData(bucketName, scopeName, collectionName)
34+
}
3335

36+
override def afterAll(): Unit = {
37+
cleanAllInCollection(bucketName, scopeName, collectionName)
38+
super.afterAll()
39+
}
3440
"a Couchbasesession" should {
3541
"be managed by the registry" in {
3642
// #registry

docker-compose.yml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ services:
2626
ports:
2727
- "9042:9042"
2828
couchbase:
29-
image: couchbase:community-5.1.1
29+
image: couchbase:community-7.6.2
3030
ports:
3131
- "8091-8094:8091-8094"
3232
- "11210:11210"
3333
couchbase_prep:
34-
image: couchbase:community-5.1.1
34+
image: couchbase:community-7.6.2
3535
links:
3636
- "couchbase"
3737
entrypoint: ""
@@ -46,6 +46,9 @@ services:
4646
couchbase-cli bucket-create -c couchbase -u Administrator -p password --bucket akka --bucket-type couchbase --bucket-ramsize 100 --bucket-replica 1 --wait
4747
couchbase-cli bucket-create -c couchbase -u Administrator -p password --bucket akkaquery --bucket-type couchbase --bucket-ramsize 100 --bucket-replica 1 --wait
4848
sleep 2 # just wait a tiny bit more after creating the bucket
49+
couchbase-cli collection-manage -c couchbase -u Administrator -p password --bucket akka --create-scope alpakka
50+
couchbase-cli collection-manage -c couchbase -u Administrator -p password --bucket akka --create-collection alpakka.alpakka-collection
51+
sleep 2 # just wait a tiny bit more after creating the collection
4952
echo 'CREATE PRIMARY INDEX ON akkaquery USING GSI;' | cbq -c Administrator:password -e http://couchbase:8093
5053
"
5154
elasticmq:

0 commit comments

Comments
 (0)