Skip to content

Commit 154ceab

Browse files
committed
Addresses PR comments for Couchbase integration
1 parent 8ddfda8 commit 154ceab

File tree

11 files changed

+231
-203
lines changed

11 files changed

+231
-203
lines changed

couchbase/src/main/scala/akka/stream/alpakka/couchbase/CouchbaseDocument.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44

55
package akka.stream.alpakka.couchbase
66

7-
class CouchbaseDocument[T](id: String, document: T) {
7+
import scala.reflect.ClassTag
8+
9+
class CouchbaseDocument[T: ClassTag](val id: String, val document: T) {
810
def getId: String = id;
911
def getDocument: T = document;
12+
1013
}

couchbase/src/main/scala/akka/stream/alpakka/couchbase/CouchbaseSessionRegistry.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ object CouchbaseSessionRegistry extends ExtensionId[CouchbaseSessionRegistry] wi
4545

4646
final class CouchbaseSessionRegistry(system: ExtendedActorSystem) extends Extension {
4747

48+
private val blockingDispatcher = system.dispatchers.lookup("akka.actor.default-blocking-io-dispatcher")
49+
4850
import CouchbaseSessionRegistry._
4951

5052
private val clusterRegistry = new CouchbaseClusterRegistry(system)
@@ -88,9 +90,7 @@ final class CouchbaseSessionRegistry(system: ExtendedActorSystem) extends Extens
8890
// we won cas, initialize session
8991
val session = clusterRegistry
9092
.clusterFor(key.settings)
91-
.flatMap(cluster => CouchbaseSession(cluster, key.bucketName))(
92-
ExecutionContext.parasitic
93-
)
93+
.flatMap(cluster => CouchbaseSession(cluster, key.bucketName))(blockingDispatcher)
9494
promise.completeWith(session)
9595
promise.future
9696
} else {

couchbase/src/main/scala/akka/stream/alpakka/couchbase/impl/CouchbaseCollectionSessionImpl.scala

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
2121
import scala.concurrent.duration.FiniteDuration
2222
import scala.concurrent.{ExecutionContext, Future}
2323
import scala.jdk.FutureConverters.CompletionStageOps
24+
import scala.reflect.ClassTag
2425

2526
/**
2627
* INTERNAL API
@@ -38,30 +39,31 @@ private[couchbase] class CouchbaseCollectionSessionImpl(bucketSession: Couchbase
3839

3940
override def asJava = new CouchbaseCollectionSessionJavaAdapter(this)
4041

41-
override def insert[T](id: String, document: T): Future[Done] = {
42+
override def insert[T: ClassTag](id: String, document: T): Future[Done] = {
4243
underlying
4344
.insert(id,
4445
document,
4546
InsertOptions
4647
.insertOptions()
47-
.transcoder(chooseTranscoder(document.getClass)))
48+
.transcoder(chooseTranscoder[T]()))
4849
.asScala
4950
.map(_ => Done)(ExecutionContext.parasitic)
5051
}
5152

52-
override def insert[T](id: String, document: T, insertOptions: InsertOptions): Future[Done] = {
53+
override def insert[T: ClassTag](id: String, document: T, insertOptions: InsertOptions): Future[Done] = {
5354
if (insertOptions.build.transcoder() == null) {
54-
insertOptions.transcoder(chooseTranscoder(document.getClass))
55+
insertOptions.transcoder(chooseTranscoder[T]())
5556
}
5657
underlying
5758
.insert(id, document, insertOptions)
5859
.asScala
5960
.map(_ => Done)(ExecutionContext.parasitic)
6061
}
6162

62-
override def get[T](id: String, target: Class[T]): Future[CouchbaseDocument[T]] = {
63+
override def get[T: ClassTag](id: String): Future[CouchbaseDocument[T]] = {
64+
val target: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
6365
underlying
64-
.get(id, GetOptions.getOptions.transcoder(chooseTranscoder(target)))
66+
.get(id, GetOptions.getOptions.transcoder(chooseTranscoder[T]()))
6567
.thenApply(gr => new CouchbaseDocument[T](id, gr.contentAs(target)))
6668
.asScala
6769
}
@@ -93,33 +95,33 @@ private[couchbase] class CouchbaseCollectionSessionImpl(bucketSession: Couchbase
9395
.thenApply(gr => new CouchbaseDocument[Array[Byte]](id, gr.contentAsBytes()))
9496
.asScala
9597

96-
override def upsert[T](id: String, document: T): Future[Done] = {
98+
override def upsert[T: ClassTag](id: String, document: T): Future[Done] = {
9799
underlying
98100
.upsert(id,
99101
document,
100102
UpsertOptions
101103
.upsertOptions()
102-
.transcoder(chooseTranscoder(document.getClass)))
104+
.transcoder(chooseTranscoder[T]()))
103105
.asScala
104106
.map(_ => Done)(ExecutionContext.parasitic)
105107
}
106108

107-
override def upsert[T](id: String, document: T, upsertOptions: UpsertOptions): Future[Done] = {
109+
override def upsert[T: ClassTag](id: String, document: T, upsertOptions: UpsertOptions): Future[Done] = {
108110
if (upsertOptions.build().transcoder() == null) {
109-
upsertOptions.transcoder(chooseTranscoder(document.getClass))
111+
upsertOptions.transcoder(chooseTranscoder[T]())
110112
}
111113
underlying
112114
.upsert(id, document, upsertOptions)
113115
.thenApply(_ => Done)
114116
.asScala
115117
}
116118

117-
override def upsert[T](id: String,
118-
document: T,
119-
upsertOptions: UpsertOptions,
120-
timeout: FiniteDuration): Future[Done] = {
119+
override def upsert[T: ClassTag](id: String,
120+
document: T,
121+
upsertOptions: UpsertOptions,
122+
timeout: FiniteDuration): Future[Done] = {
121123
if (upsertOptions.build().transcoder() == null) {
122-
upsertOptions.transcoder(chooseTranscoder(document.getClass))
124+
upsertOptions.transcoder(chooseTranscoder[T]())
123125
}
124126
underlying
125127
.upsert(id, document, upsertOptions)
@@ -128,28 +130,28 @@ private[couchbase] class CouchbaseCollectionSessionImpl(bucketSession: Couchbase
128130
.asScala
129131
}
130132

131-
override def replace[T](id: String, document: T): Future[Done] =
133+
override def replace[T: ClassTag](id: String, document: T): Future[Done] =
132134
underlying
133135
.replace(id, document)
134136
.thenApply(_ => Done)
135137
.asScala
136138

137-
override def replace[T](id: String, document: T, replaceOptions: ReplaceOptions): Future[Done] = {
139+
override def replace[T: ClassTag](id: String, document: T, replaceOptions: ReplaceOptions): Future[Done] = {
138140
if (replaceOptions.build.transcoder() == null) {
139-
replaceOptions.transcoder(chooseTranscoder(document.getClass))
141+
replaceOptions.transcoder(chooseTranscoder[T]())
140142
}
141143
underlying
142144
.replace(id, document, replaceOptions)
143145
.thenApply(_ => Done)
144146
.asScala
145147
}
146148

147-
override def replace[T](id: String,
148-
document: T,
149-
replaceOptions: ReplaceOptions,
150-
timeout: FiniteDuration): Future[Done] = {
149+
override def replace[T: ClassTag](id: String,
150+
document: T,
151+
replaceOptions: ReplaceOptions,
152+
timeout: FiniteDuration): Future[Done] = {
151153
if (replaceOptions.build.transcoder() == null) {
152-
replaceOptions.transcoder(chooseTranscoder(document.getClass))
154+
replaceOptions.transcoder(chooseTranscoder[T]())
153155
}
154156
underlying
155157
.replace(id, document, replaceOptions)
@@ -195,7 +197,8 @@ private[couchbase] class CouchbaseCollectionSessionImpl(bucketSession: Couchbase
195197
)
196198
)
197199

198-
private def chooseTranscoder[T](target: Class[T]): Transcoder = {
200+
private def chooseTranscoder[T: ClassTag](): Transcoder = {
201+
val target: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
199202
if (target == classOf[Array[Byte]]) RawBinaryTranscoder.INSTANCE
200203
else if (target == classOf[String]) RawStringTranscoder.INSTANCE
201204
else bucketSession.cluster().environment().transcoder()

couchbase/src/main/scala/akka/stream/alpakka/couchbase/impl/CouchbaseCollectionSessionJavaAdapter.scala

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import java.time.Duration
1818
import java.util.concurrent.{CompletionStage, TimeUnit}
1919
import scala.concurrent.duration.FiniteDuration
2020
import scala.jdk.FutureConverters._
21+
import scala.reflect.ClassTag
2122

2223
/**
2324
* INTERNAL API
@@ -40,10 +41,10 @@ private[couchbase] final class CouchbaseCollectionSessionJavaAdapter(delegate: s
4041
* @param document A tuple where first element is id of the document and second is its value
4142
* @return A Future that completes with the id of the written document when the write is done
4243
*/
43-
override def insert[T](id: String, document: T): CompletionStage[Done] =
44+
override def insert[T: ClassTag](id: String, document: T): CompletionStage[Done] =
4445
delegate.insert(id, document).asJava
4546

46-
override def insert[T](id: String, document: T, insertOptions: InsertOptions): CompletionStage[Done] =
47+
override def insert[T: ClassTag](id: String, document: T, insertOptions: InsertOptions): CompletionStage[Done] =
4748
delegate.insert(id, document, insertOptions).asJava
4849

4950
override def getJsonObject(id: String): CompletionStage[CouchbaseDocument[JsonObject]] =
@@ -52,8 +53,8 @@ private[couchbase] final class CouchbaseCollectionSessionJavaAdapter(delegate: s
5253
override def getJsonArray(id: String): CompletionStage[CouchbaseDocument[JsonArray]] =
5354
delegate.getJsonArray(id).asJava
5455

55-
override def get[T](id: String, target: Class[T]): CompletionStage[CouchbaseDocument[T]] =
56-
delegate.get(id, target).asJava
56+
override def get[T: ClassTag](id: String, target: Class[T]): CompletionStage[CouchbaseDocument[T]] =
57+
delegate.get[T](id).asJava
5758

5859
/**
5960
* @return A document if found or none if there is no document for the id
@@ -86,7 +87,7 @@ private[couchbase] final class CouchbaseCollectionSessionJavaAdapter(delegate: s
8687
*
8788
* @return a future that completes when the upsert is done
8889
*/
89-
override def upsert[T](id: String, document: T): CompletionStage[Done] =
90+
override def upsert[T: ClassTag](id: String, document: T): CompletionStage[Done] =
9091
delegate.upsert(id, document).asJava
9192

9293
/**
@@ -96,7 +97,7 @@ private[couchbase] final class CouchbaseCollectionSessionJavaAdapter(delegate: s
9697
*
9798
* @return a future that completes when the upsert is done
9899
*/
99-
override def upsert[T](id: String, document: T, upsertOptions: UpsertOptions): CompletionStage[Done] =
100+
override def upsert[T: ClassTag](id: String, document: T, upsertOptions: UpsertOptions): CompletionStage[Done] =
100101
delegate.upsert(id, document, upsertOptions).asJava
101102

102103
/**
@@ -108,10 +109,10 @@ private[couchbase] final class CouchbaseCollectionSessionJavaAdapter(delegate: s
108109
* @param timeout timeout for the operation
109110
* @return a future that completes after the operation is done
110111
*/
111-
override def upsert[T](id: String,
112-
document: T,
113-
upsertOptions: UpsertOptions,
114-
timeout: Duration): CompletionStage[Done] =
112+
override def upsert[T: ClassTag](id: String,
113+
document: T,
114+
upsertOptions: UpsertOptions,
115+
timeout: Duration): CompletionStage[Done] =
115116
delegate.upsert(id, document, upsertOptions, FiniteDuration.apply(timeout.toNanos, TimeUnit.NANOSECONDS)).asJava
116117

117118
/**
@@ -121,7 +122,7 @@ private[couchbase] final class CouchbaseCollectionSessionJavaAdapter(delegate: s
121122
*
122123
* @return a future that completes when the replace is done
123124
*/
124-
override def replace[T](id: String, document: T): CompletionStage[Done] =
125+
override def replace[T: ClassTag](id: String, document: T): CompletionStage[Done] =
125126
delegate.replace(id, document).asJava
126127

127128
/**
@@ -131,7 +132,7 @@ private[couchbase] final class CouchbaseCollectionSessionJavaAdapter(delegate: s
131132
*
132133
* @return a future that completes when the replace is done
133134
*/
134-
override def replace[T](id: String, document: T, replaceOptions: ReplaceOptions): CompletionStage[Done] =
135+
override def replace[T: ClassTag](id: String, document: T, replaceOptions: ReplaceOptions): CompletionStage[Done] =
135136
delegate.replace(id, document, replaceOptions).asJava
136137

137138
/**
@@ -143,10 +144,10 @@ private[couchbase] final class CouchbaseCollectionSessionJavaAdapter(delegate: s
143144
* @param timeout timeout for the operation
144145
* @return a future that completes after the operation is done
145146
*/
146-
override def replace[T](id: String,
147-
document: T,
148-
replaceOptions: ReplaceOptions,
149-
timeout: Duration): CompletionStage[Done] =
147+
override def replace[T: ClassTag](id: String,
148+
document: T,
149+
replaceOptions: ReplaceOptions,
150+
timeout: Duration): CompletionStage[Done] =
150151
delegate.replace(id, document, replaceOptions, FiniteDuration.apply(timeout.toNanos, TimeUnit.NANOSECONDS)).asJava
151152

152153
/**

couchbase/src/main/scala/akka/stream/alpakka/couchbase/javadsl/CouchbaseCollectionSession.scala

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import com.couchbase.client.java.{AsyncCollection, AsyncScope}
1515

1616
import java.time.Duration
1717
import java.util.concurrent.{CompletionStage, Executor}
18+
import scala.reflect.ClassTag
1819

1920
/**
2021
* Java API: Gives access to Couchbase Collection.
@@ -50,12 +51,12 @@ abstract class CouchbaseCollectionSession {
5051
* @param document A tuple where first element is id of the document and second is its value
5152
* @return A Future that completes with the id of the written document when the write is done
5253
*/
53-
def insert[T](id: String, document: T): CompletionStage[Done]
54-
def insert[T](id: String, document: T, insertOptions: InsertOptions): CompletionStage[Done]
54+
def insert[T: ClassTag](id: String, document: T): CompletionStage[Done]
55+
def insert[T: ClassTag](id: String, document: T, insertOptions: InsertOptions): CompletionStage[Done]
5556

5657
def getJsonObject(id: String): CompletionStage[CouchbaseDocument[JsonObject]]
5758
def getJsonArray(id: String): CompletionStage[CouchbaseDocument[JsonArray]]
58-
def get[T](id: String, target: Class[T]): CompletionStage[CouchbaseDocument[T]]
59+
def get[T: ClassTag](id: String, target: Class[T]): CompletionStage[CouchbaseDocument[T]]
5960

6061
/**
6162
* @return A document if found or none if there is no document for the id
@@ -83,7 +84,7 @@ abstract class CouchbaseCollectionSession {
8384
* Upsert using the default write settings.
8485
* @return a future that completes when the upsert is done
8586
*/
86-
def upsert[T](id: String, document: T): CompletionStage[Done]
87+
def upsert[T: ClassTag](id: String, document: T): CompletionStage[Done]
8788

8889
/**
8990
* Upsert using the given write settings
@@ -92,7 +93,7 @@ abstract class CouchbaseCollectionSession {
9293
*
9394
* @return a future that completes when the upsert is done
9495
*/
95-
def upsert[T](id: String, document: T, upsertOptions: UpsertOptions): CompletionStage[Done]
96+
def upsert[T: ClassTag](id: String, document: T, upsertOptions: UpsertOptions): CompletionStage[Done]
9697

9798
/**
9899
* Upsert using given write settings and timeout
@@ -102,7 +103,10 @@ abstract class CouchbaseCollectionSession {
102103
* @param timeout timeout for the operation
103104
* @return the document id and value
104105
*/
105-
def upsert[T](id: String, document: T, upsertOptions: UpsertOptions, timeout: Duration): CompletionStage[Done]
106+
def upsert[T: ClassTag](id: String,
107+
document: T,
108+
upsertOptions: UpsertOptions,
109+
timeout: Duration): CompletionStage[Done]
106110

107111
/**
108112
* Replace using the default write settings.
@@ -111,7 +115,7 @@ abstract class CouchbaseCollectionSession {
111115
*
112116
* @return a future that completes when the replace is done
113117
*/
114-
def replace[T](id: String, document: T): CompletionStage[Done]
118+
def replace[T: ClassTag](id: String, document: T): CompletionStage[Done]
115119

116120
/**
117121
* Replace using the given replace options
@@ -120,7 +124,7 @@ abstract class CouchbaseCollectionSession {
120124
*
121125
* @return a future that completes when the replace is done
122126
*/
123-
def replace[T](id: String, document: T, replaceOptions: ReplaceOptions): CompletionStage[Done]
127+
def replace[T: ClassTag](id: String, document: T, replaceOptions: ReplaceOptions): CompletionStage[Done]
124128

125129
/**
126130
* Replace using write settings and timeout
@@ -130,7 +134,10 @@ abstract class CouchbaseCollectionSession {
130134
* @param timeout timeout for the operation
131135
* @return the document id and value
132136
*/
133-
def replace[T](id: String, document: T, replaceOptions: ReplaceOptions, timeout: Duration): CompletionStage[Done]
137+
def replace[T: ClassTag](id: String,
138+
document: T,
139+
replaceOptions: ReplaceOptions,
140+
timeout: Duration): CompletionStage[Done]
134141

135142
/**
136143
* Remove a document by id using the default write settings.

0 commit comments

Comments
 (0)