Skip to content

Commit 982701c

Browse files
committed
Updates the integration to not use tuples and addresses other PR comments
1 parent 824b61b commit 982701c

File tree

13 files changed

+356
-335
lines changed

13 files changed

+356
-335
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package akka.stream.alpakka.couchbase
2+
3+
class CouchbaseDocument[T](id: String, document: T) {
4+
def getId: String = id;
5+
def getDocument: T = document;
6+
}
Lines changed: 56 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package akka.stream.alpakka.couchbase.impl
22

3-
import akka.NotUsed
3+
import akka.{Done, NotUsed}
44
import akka.annotation.InternalApi
5+
import akka.stream.alpakka.couchbase.CouchbaseDocument
56
import akka.stream.alpakka.couchbase.scaladsl.{CouchbaseCollectionSession, CouchbaseSession}
67
import akka.stream.scaladsl.Source
78
import com.couchbase.client.java.codec.{JsonTranscoder, RawBinaryTranscoder, RawStringTranscoder, Transcoder}
@@ -17,8 +18,11 @@ import scala.concurrent.{ExecutionContext, Future}
1718
import scala.concurrent.duration.FiniteDuration
1819
import scala.jdk.FutureConverters.CompletionStageOps
1920

21+
/**
22+
* INTERNAL API
23+
*/
2024
@InternalApi
21-
class CouchbaseCollectionSessionImpl(bucketSession: CouchbaseSession, scopeName: String, collectionName: String) extends CouchbaseCollectionSession{
25+
private[couchbase] class CouchbaseCollectionSessionImpl(bucketSession: CouchbaseSession, scopeName: String, collectionName: String) extends CouchbaseCollectionSession{
2226

2327
override def bucket: CouchbaseSession = bucketSession
2428

@@ -27,32 +31,30 @@ class CouchbaseCollectionSessionImpl(bucketSession: CouchbaseSession, scopeName:
2731

2832
override def asJava = new CouchbaseCollectionSessionJavaAdapter(this)
2933

30-
override def insert[T] (document: (String, T)): Future[(String, T)] = {
34+
override def insert[T] (id: String, document: T): Future[Done] = {
3135
underlying
32-
.insert(document._1, document._2,
36+
.insert(id, document,
3337
InsertOptions.insertOptions()
34-
.transcoder(chooseTranscoder(document._2.getClass))
38+
.transcoder(chooseTranscoder(document.getClass))
3539
)
36-
.asScala.map(_ => document)(ExecutionContext.parasitic)
40+
.asScala
41+
.map(_ => Done)(ExecutionContext.parasitic)
3742
}
3843

39-
override def insert[T] (document: (String, T), insertOptions: InsertOptions): Future[(String, T)] = {
40-
underlying.insert(document._1, document._2, insertOptions)
44+
override def insert[T] (id: String, document: T, insertOptions: InsertOptions): Future[Done] = {
45+
underlying.insert(id, document, insertOptions)
4146
.asScala
42-
.map(r => document)(ExecutionContext.parasitic)
47+
.map(_ => Done)(ExecutionContext.parasitic)
4348
}
4449

45-
override def get[T](id: String, target: Class[T]): Future[(String, T)] = {
50+
override def get[T](id: String, target: Class[T]): Future[CouchbaseDocument[T]] = {
4651
underlying.get(id, GetOptions.getOptions.transcoder(chooseTranscoder(target)))
47-
.thenApply(gr => {
48-
(id, gr.contentAs(target))
49-
})
52+
.thenApply(gr => new CouchbaseDocument[T](id, gr.contentAs(target)))
5053
.asScala
51-
5254
}
5355

54-
override def getDocument(id: String): Future[(String, JsonValue)] =
55-
underlying.get(id).thenApply(gr => (id, asJsonValue(gr))).asScala
56+
override def getDocument(id: String): Future[CouchbaseDocument[JsonValue]] =
57+
underlying.get(id).thenApply(gr => new CouchbaseDocument[JsonValue](id, asJsonValue(gr))).asScala
5658

5759
private def asJsonValue(gr: GetResult) =
5860
try {
@@ -61,77 +63,78 @@ class CouchbaseCollectionSessionImpl(bucketSession: CouchbaseSession, scopeName:
6163
case ex: Exception => gr.contentAsArray().asInstanceOf[JsonValue]
6264
}
6365

64-
override def getBytes(id: String): Future[(String, Array[Byte])] =
65-
underlying.get(id).thenApply(gr => (id, gr.contentAsBytes())).asScala
66+
override def getBytes(id: String): Future[CouchbaseDocument[Array[Byte]]] =
67+
underlying.get(id).thenApply(gr => new CouchbaseDocument[Array[Byte]](id, gr.contentAsBytes())).asScala
6668

67-
override def getDocument(id: String, timeout: FiniteDuration): Future[(String, JsonValue)] =
69+
override def getDocument(id: String, timeout: FiniteDuration): Future[CouchbaseDocument[JsonValue]] =
6870
underlying.get(id)
6971
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
70-
.thenApply(gr => (id, asJsonValue(gr)))
72+
.thenApply(gr => new CouchbaseDocument[JsonValue](id, asJsonValue(gr)))
7173
.asScala
7274

73-
override def getBytes(id: String, timeout: FiniteDuration): Future[(String, Array[Byte])] =
75+
override def getBytes(id: String, timeout: FiniteDuration): Future[CouchbaseDocument[Array[Byte]]] =
7476
underlying.get(id)
7577
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
76-
.thenApply(gr => (id, gr.contentAsBytes()))
78+
.thenApply(gr => new CouchbaseDocument[Array[Byte]](id, gr.contentAsBytes()))
7779
.asScala
7880

79-
override def upsert[T](document: (String, T)): Future[(String, T)] = {
80-
underlying.upsert(document._1, document._2,
81+
override def upsert[T](id: String, document: T): Future[Done] = {
82+
underlying.upsert(id, document,
8183
UpsertOptions.upsertOptions()
82-
.transcoder(chooseTranscoder(document._2.getClass))
84+
.transcoder(chooseTranscoder(document.getClass))
8385
)
84-
.asScala.map(_ => document)(ExecutionContext.parasitic)
86+
.asScala.map(_ => Done)(ExecutionContext.parasitic)
8587
}
8688

87-
override def upsert[T](document: (String, T), upsertOptions: UpsertOptions): Future[(String, T)] =
88-
underlying.upsert(document._1, document._2, upsertOptions)
89-
.thenApply(_ => document)
89+
override def upsert[T](id: String, document: T, upsertOptions: UpsertOptions): Future[Done] =
90+
underlying.upsert(id, document, upsertOptions)
91+
.thenApply(_ => Done)
9092
.asScala
9193

92-
override def upsert[T](document: (String, T), upsertOptions: UpsertOptions, timeout: FiniteDuration): Future[(String, T)] =
93-
underlying.upsert(document._1, document._2, upsertOptions)
94+
override def upsert[T](id: String, document: T, upsertOptions: UpsertOptions, timeout: FiniteDuration): Future[Done] =
95+
underlying.upsert(id, document, upsertOptions)
9496
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
95-
.thenApply(_ => document)
97+
.thenApply(_ => Done)
9698
.asScala
9799

98100

99-
override def replace[T](document: (String, T)): Future[(String, T)] =
100-
underlying.replace(document._1, document._2)
101-
.thenApply(_ => document)
101+
override def replace[T](id: String, document: T): Future[Done] =
102+
underlying.replace(id, document)
103+
.thenApply(_ => Done)
102104
.asScala
103105

104-
override def replace[T](document: (String, T), replaceOptions: ReplaceOptions): Future[(String, T)] =
105-
underlying.replace(document._1, document._2, replaceOptions)
106-
.thenApply(_ => document)
106+
override def replace[T](id: String, document: T, replaceOptions: ReplaceOptions): Future[Done] =
107+
underlying.replace(id, document, replaceOptions)
108+
.thenApply(_ => Done)
107109
.asScala
108110

109-
override def replace[T](document: (String, T), replaceOptions: ReplaceOptions, timeout: FiniteDuration): Future[(String, T)] =
110-
underlying.replace(document._1, document._2, replaceOptions)
111+
override def replace[T](id: String, document: T, replaceOptions: ReplaceOptions, timeout: FiniteDuration): Future[Done] =
112+
underlying.replace(id, document, replaceOptions)
111113
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
112-
.thenApply(_ => document)
114+
.thenApply(_ => Done)
113115
.asScala
114116

115-
override def remove(id: String): Future[String] =
117+
override def remove(id: String): Future[Done] =
116118
underlying.remove(id)
117-
.thenApply(_ => id)
119+
.thenApply(_ => Done)
118120
.asScala
119121

120-
override def remove(id: String, removeOptions: RemoveOptions): Future[String] =
122+
override def remove(id: String, removeOptions: RemoveOptions): Future[Done] =
121123
underlying.remove(id, removeOptions)
122-
.thenApply(_ => id)
124+
.thenApply(_ => Done)
123125
.asScala
124126

125-
override def remove(id: String, removeOptions: RemoveOptions, timeout: FiniteDuration): Future[String] =
127+
override def remove(id: String, removeOptions: RemoveOptions, timeout: FiniteDuration): Future[Done] =
126128
underlying.remove(id, removeOptions)
127129
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
128-
.thenApply(_ => id)
130+
.thenApply(_ => Done)
129131
.asScala
130132

131-
override def createIndex(indexName: String, createQueryIndexOptions: CreateQueryIndexOptions, fields: String*): Future[Void] =
133+
override def createIndex(indexName: String, createQueryIndexOptions: CreateQueryIndexOptions, fields: String*): Future[Done] =
132134
underlying
133135
.queryIndexes()
134136
.createIndex(indexName, util.Arrays.asList(fields: _*), createQueryIndexOptions)
137+
.thenApply(_ => Done)
135138
.asScala
136139

137140
override def listIndexes(): Source[QueryIndex, NotUsed] =
@@ -142,10 +145,9 @@ class CouchbaseCollectionSessionImpl(bucketSession: CouchbaseSession, scopeName:
142145
)
143146
)
144147

145-
private def chooseTranscoder[T](target: Class[T]): Transcoder =
146-
target match {
147-
case _: Class[Array[Byte]] => RawBinaryTranscoder.INSTANCE
148-
case _: Class[String] => RawStringTranscoder.INSTANCE
149-
case _ => bucketSession.cluster().environment().transcoder()
150-
}
148+
private def chooseTranscoder[T](target: Class[T]): Transcoder = {
149+
if (target == classOf[Array[Byte]]) RawBinaryTranscoder.INSTANCE
150+
else if (target == classOf[String]) RawStringTranscoder.INSTANCE
151+
else bucketSession.cluster().environment().transcoder()
152+
}
151153
}

couchbase/src/main/scala/akka/stream/alpakka/couchbase/impl/CouchbaseCollectionSessionJavaAdapter.scala

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44

55
package akka.stream.alpakka.couchbase.impl
66

7-
import akka.NotUsed
7+
import akka.{Done, NotUsed}
88
import akka.annotation.InternalApi
99
import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession
10-
import akka.stream.alpakka.couchbase.{javadsl, scaladsl}
10+
import akka.stream.alpakka.couchbase.{CouchbaseDocument, javadsl, scaladsl}
1111
import akka.stream.javadsl.Source
1212
import com.couchbase.client.java.json.{JsonArray, JsonObject, JsonValue}
1313
import com.couchbase.client.java.kv.{InsertOptions, RemoveOptions, ReplaceOptions, UpsertOptions}
@@ -16,6 +16,7 @@ import com.couchbase.client.java.{AsyncCollection, AsyncScope}
1616

1717
import java.time.Duration
1818
import java.util.concurrent.{CompletionStage, TimeUnit}
19+
import scala.concurrent.Future
1920
import scala.concurrent.duration.FiniteDuration
2021
import scala.jdk.FutureConverters._
2122

@@ -40,53 +41,53 @@ private[couchbase] final class CouchbaseCollectionSessionJavaAdapter(delegate: s
4041
* @param document A tuple where first element is id of the document and second is its value
4142
* @return A Future that completes with the id of the written document when the write is done
4243
*/
43-
override def insert[T](document: (String, T)): CompletionStage[(String, T)] =
44-
delegate.insert(document).asJava
44+
override def insert[T](id: String, document: T): CompletionStage[Done] =
45+
delegate.insert(id, document).asJava
4546

46-
override def insert[T](document: (String, T), insertOptions: InsertOptions): CompletionStage[(String, T)] =
47-
delegate.insert(document, insertOptions).asJava
47+
override def insert[T](id: String, document: T, insertOptions: InsertOptions): CompletionStage[Done] =
48+
delegate.insert(id, document, insertOptions).asJava
4849

49-
override def getJsonObject(id: String): CompletionStage[(String, JsonObject)] =
50+
override def getJsonObject(id: String): CompletionStage[CouchbaseDocument[JsonObject]] =
5051
delegate.getJsonObject(id).asJava
5152

52-
override def getJsonArray(id: String): CompletionStage[(String, JsonArray)] =
53+
override def getJsonArray(id: String): CompletionStage[CouchbaseDocument[JsonArray]] =
5354
delegate.getJsonArray(id).asJava
5455

55-
override def get[T](id: String, target: Class[T]): CompletionStage[(String, T)] =
56+
override def get[T](id: String, target: Class[T]): CompletionStage[CouchbaseDocument[T]] =
5657
delegate.get(id, target).asJava
5758
/**
5859
* @return A document if found or none if there is no document for the id
5960
*/
60-
override def getDocument(id: String): CompletionStage[(String, JsonValue)] =
61+
override def getDocument(id: String): CompletionStage[CouchbaseDocument[JsonValue]] =
6162
delegate.getDocument(id).asJava
6263

6364
/**
6465
* @param id Identifier of the document to fetch
6566
* @return Raw data for the document or none
6667
*/
67-
override def getBytes(id: String): CompletionStage[(String, Array[Byte])] =
68+
override def getBytes(id: String): CompletionStage[CouchbaseDocument[Array[Byte]]] =
6869
delegate.getBytes(id).asJava
6970

7071
/**
7172
* @param timeout fail the returned future with a TimeoutException if it takes longer than this
7273
* @return A document if found or none if there is no document for the id
7374
*/
74-
override def getDocument(id: String, timeout: Duration): CompletionStage[(String, JsonValue)] =
75+
override def getDocument(id: String, timeout: Duration): CompletionStage[CouchbaseDocument[JsonValue]] =
7576
delegate.getDocument(id, FiniteDuration.apply(timeout.toNanos, TimeUnit.NANOSECONDS)).asJava
7677

7778
/**
7879
* @return A raw document data if found or none if there is no document for the id
7980
*/
80-
override def getBytes(id: String, timeout: Duration): CompletionStage[(String, Array[Byte])] =
81+
override def getBytes(id: String, timeout: Duration): CompletionStage[CouchbaseDocument[Array[Byte]]] =
8182
delegate.getBytes(id, FiniteDuration.apply(timeout.toNanos, TimeUnit.NANOSECONDS)).asJava
8283

8384
/**
8485
* Upsert using the default write settings.
8586
*
8687
* @return a future that completes when the upsert is done
8788
*/
88-
override def upsert[T](document: (String, T)): CompletionStage[(String, T)] =
89-
delegate.upsert(document).asJava
89+
override def upsert[T](id: String, document: T): CompletionStage[Done] =
90+
delegate.upsert(id, document).asJava
9091

9192
/**
9293
* Upsert using the given write settings
@@ -95,19 +96,20 @@ private[couchbase] final class CouchbaseCollectionSessionJavaAdapter(delegate: s
9596
*
9697
* @return a future that completes when the upsert is done
9798
*/
98-
override def upsert[T](document: (String, T), upsertOptions: UpsertOptions): CompletionStage[(String, T)] =
99-
delegate.upsert(document, upsertOptions).asJava
99+
override def upsert[T](id: String, document: T, upsertOptions: UpsertOptions): CompletionStage[Done] =
100+
delegate.upsert(id, document, upsertOptions).asJava
100101

101102
/**
102103
* Upsert using given write settings and timeout
103104
*
104-
* @param document document id and value to upsert
105+
* @param id document id
106+
* @param document document value to upsert
105107
* @param upsertOptions Couchbase UpsertOptions
106108
* @param timeout timeout for the operation
107-
* @return the document id and value
109+
* @return a future that completes after the operation is done
108110
*/
109-
override def upsert[T](document: (String, T), upsertOptions: UpsertOptions, timeout: Duration): CompletionStage[(String, T)] =
110-
delegate.upsert(document, upsertOptions, FiniteDuration.apply(timeout.toNanos, TimeUnit.NANOSECONDS)).asJava
111+
override def upsert[T](id: String, document: T, upsertOptions: UpsertOptions, timeout: Duration): CompletionStage[Done] =
112+
delegate.upsert(id, document, upsertOptions, FiniteDuration.apply(timeout.toNanos, TimeUnit.NANOSECONDS)).asJava
111113

112114
/**
113115
* Replace using the default write settings.
@@ -116,8 +118,8 @@ private[couchbase] final class CouchbaseCollectionSessionJavaAdapter(delegate: s
116118
*
117119
* @return a future that completes when the replace is done
118120
*/
119-
override def replace[T](document: (String, T)): CompletionStage[(String, T)] =
120-
delegate.replace(document).asJava
121+
override def replace[T](id: String, document: T): CompletionStage[Done] =
122+
delegate.replace(id, document).asJava
121123

122124
/**
123125
* Replace using the given replace options
@@ -126,27 +128,28 @@ private[couchbase] final class CouchbaseCollectionSessionJavaAdapter(delegate: s
126128
*
127129
* @return a future that completes when the replace is done
128130
*/
129-
override def replace[T](document: (String, T), replaceOptions: ReplaceOptions): CompletionStage[(String, T)] =
130-
delegate.replace(document, replaceOptions).asJava
131+
override def replace[T](id: String, document: T, replaceOptions: ReplaceOptions): CompletionStage[Done] =
132+
delegate.replace(id, document, replaceOptions).asJava
131133

132134
/**
133135
* Replace using write settings and timeout
134136
*
135-
* @param document document id and value to replace
137+
* @param id id of the document to replace
138+
* @param document document value to replace
136139
* @param replaceOptions Couchbase replace options
137140
* @param timeout timeout for the operation
138-
* @return the document id and value
141+
* @return a future that completes after the operation is done
139142
*/
140-
override def replace[T](document: (String, T), replaceOptions: ReplaceOptions, timeout: Duration): CompletionStage[(String, T)] =
141-
delegate.replace(document, replaceOptions, FiniteDuration.apply(timeout.toNanos, TimeUnit.NANOSECONDS)).asJava
143+
override def replace[T](id: String, document: T, replaceOptions: ReplaceOptions, timeout: Duration): CompletionStage[Done] =
144+
delegate.replace(id, document, replaceOptions, FiniteDuration.apply(timeout.toNanos, TimeUnit.NANOSECONDS)).asJava
142145

143146
/**
144147
* Remove a document by id using the default write settings.
145148
*
146149
* @return Future that completes when the document has been removed, if there is no such document
147150
* the future is failed with a `DocumentDoesNotExistException`
148151
*/
149-
override def remove(id: String): CompletionStage[String] =
152+
override def remove(id: String): CompletionStage[Done] =
150153
delegate.remove(id).asJava
151154

152155
/**
@@ -155,7 +158,7 @@ private[couchbase] final class CouchbaseCollectionSessionJavaAdapter(delegate: s
155158
* @return Future that completes when the document has been removed, if there is no such document
156159
* the future is failed with a `DocumentDoesNotExistException`
157160
*/
158-
override def remove(id: String, removeOptions: RemoveOptions): CompletionStage[String] =
161+
override def remove(id: String, removeOptions: RemoveOptions): CompletionStage[Done] =
159162
delegate.remove(id, removeOptions).asJava
160163

161164
/**
@@ -179,7 +182,7 @@ private[couchbase] final class CouchbaseCollectionSessionJavaAdapter(delegate: s
179182
* if the index existed and `ignoreIfExist` is `true`. Completion of the future does not guarantee the index is online
180183
* and ready to be used.
181184
*/
182-
override def createIndex(indexName: String, createQueryIndexOptions: CreateQueryIndexOptions, fields: String*): CompletionStage[Void] =
185+
override def createIndex(indexName: String, createQueryIndexOptions: CreateQueryIndexOptions, fields: String*): CompletionStage[Done] =
183186
delegate.createIndex(indexName, createQueryIndexOptions, fields: _*).asJava
184187

185188
/**

0 commit comments

Comments
 (0)