Skip to content

Commit 0808fcc

Browse files
authored
feat(couchbase): Support collections (#3396)
1 parent 10d0835 commit 0808fcc

31 files changed

+2059
-1744
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ProblemFilters.exclude[Problem]("akka.stream.alpakka.couchbase.*")
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/*
2+
* Copyright (C) since 2016 Lightbend Inc. <https://akka.io>
3+
*/
4+
5+
package akka.stream.alpakka.couchbase
6+
7+
final class CouchbaseDocument[T](val id: String, val document: T) {
8+
def getId: String = id;
9+
def getDocument: T = document;
10+
}

couchbase/src/main/scala/akka/stream/alpakka/couchbase/CouchbaseResponseException.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
package akka.stream.alpakka.couchbase
66
import akka.annotation.InternalApi
7-
import com.couchbase.client.java.document.json.JsonObject
7+
import com.couchbase.client.java.json.JsonObject
88

99
/**
1010
* Describes a Couchbase related failure with an error code.

couchbase/src/main/scala/akka/stream/alpakka/couchbase/CouchbaseSessionRegistry.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,16 @@ object CouchbaseSessionRegistry extends ExtensionId[CouchbaseSessionRegistry] wi
4545

4646
final class CouchbaseSessionRegistry(system: ExtendedActorSystem) extends Extension {
4747

48-
import CouchbaseSessionRegistry._
49-
5048
private val blockingDispatcher = system.dispatchers.lookup("akka.actor.default-blocking-io-dispatcher")
5149

50+
import CouchbaseSessionRegistry._
51+
5252
private val clusterRegistry = new CouchbaseClusterRegistry(system)
5353

5454
private val sessions = new AtomicReference(Map.empty[SessionKey, Future[CouchbaseSession]])
5555

5656
/**
57-
* Scala API: Get an existing session or start a new one with the given settings and bucket name,
57+
* Scala API: Get an existing session or start a new one with the given settings and bucket, scope and collection names,
5858
* makes it possible to share one session across plugins.
5959
*
6060
* Note that the session must not be stopped manually, it is shut down when the actor system is shutdown,
@@ -90,9 +90,7 @@ final class CouchbaseSessionRegistry(system: ExtendedActorSystem) extends Extens
9090
// we won cas, initialize session
9191
val session = clusterRegistry
9292
.clusterFor(key.settings)
93-
.flatMap(cluster => CouchbaseSession(cluster, key.bucketName)(blockingDispatcher))(
94-
ExecutionContext.parasitic
95-
)
93+
.flatMap(cluster => CouchbaseSession(cluster, key.bucketName))(blockingDispatcher)
9694
promise.completeWith(session)
9795
promise.future
9896
} else {
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
* Copyright (C) since 2016 Lightbend Inc. <https://akka.io>
3+
*/
4+
5+
package akka.stream.alpakka.couchbase.impl
6+
7+
import akka.annotation.InternalApi
8+
import akka.stream.alpakka.couchbase.CouchbaseDocument
9+
import akka.stream.alpakka.couchbase.scaladsl.{CouchbaseCollectionSession, CouchbaseSession}
10+
import akka.stream.scaladsl.Source
11+
import akka.{Done, NotUsed}
12+
import com.couchbase.client.java.codec.{RawBinaryTranscoder, RawStringTranscoder, Transcoder}
13+
import com.couchbase.client.java.json.JsonValue
14+
import com.couchbase.client.java.kv._
15+
import com.couchbase.client.java.manager.query.{CreateQueryIndexOptions, QueryIndex}
16+
import com.couchbase.client.java.{AsyncCollection, AsyncScope}
17+
import rx.{Observable, RxReactiveStreams}
18+
19+
import java.util
20+
import java.util.concurrent.TimeUnit
21+
import scala.concurrent.duration.FiniteDuration
22+
import scala.concurrent.{ExecutionContext, Future}
23+
import scala.jdk.FutureConverters.CompletionStageOps
24+
import scala.reflect.ClassTag
25+
26+
/**
27+
* INTERNAL API
28+
*/
29+
@InternalApi
30+
private[couchbase] class CouchbaseCollectionSessionImpl(bucketSession: CouchbaseSession,
31+
scopeName: String,
32+
collectionName: String)
33+
extends CouchbaseCollectionSession {
34+
35+
override def bucket: CouchbaseSession = bucketSession
36+
37+
override def scope: AsyncScope = bucket.underlying.scope(scopeName)
38+
override def underlying: AsyncCollection = scope.collection(collectionName)
39+
40+
override def asJava = new CouchbaseCollectionSessionJavaAdapter(this)
41+
42+
override def insert[T](id: String, document: T): Future[Done] = {
43+
underlying
44+
.insert(id,
45+
document,
46+
InsertOptions
47+
.insertOptions()
48+
.transcoder(chooseTranscoder(document.getClass)))
49+
.asScala
50+
.map(_ => Done)(ExecutionContext.parasitic)
51+
}
52+
53+
override def insert[T](id: String, document: T, insertOptions: InsertOptions): Future[Done] = {
54+
underlying
55+
.insert(id, document, insertOptions)
56+
.asScala
57+
.map(_ => Done)(ExecutionContext.parasitic)
58+
}
59+
60+
override def get[T](id: String)(implicit ct: ClassTag[T]): Future[CouchbaseDocument[T]] = {
61+
val target: Class[T] = ct.runtimeClass.asInstanceOf[Class[T]]
62+
underlying
63+
.get(id, GetOptions.getOptions.transcoder(chooseTranscoder(target)))
64+
.asScala
65+
.map(gr => new CouchbaseDocument[T](id, gr.contentAs(target)))(ExecutionContext.parasitic)
66+
}
67+
68+
override def getDocument(id: String): Future[CouchbaseDocument[JsonValue]] =
69+
underlying
70+
.get(id)
71+
.asScala
72+
.map(gr => new CouchbaseDocument[JsonValue](id, asJsonValue(gr)))(ExecutionContext.parasitic)
73+
74+
private def asJsonValue(gr: GetResult) =
75+
try {
76+
gr.contentAsObject().asInstanceOf[JsonValue]
77+
} catch {
78+
case ex: Exception => gr.contentAsArray().asInstanceOf[JsonValue]
79+
}
80+
81+
override def getBytes(id: String): Future[CouchbaseDocument[Array[Byte]]] =
82+
underlying
83+
.get(id)
84+
.asScala
85+
.map(gr => new CouchbaseDocument[Array[Byte]](id, gr.contentAsBytes()))(ExecutionContext.parasitic)
86+
87+
override def getDocument(id: String, timeout: FiniteDuration): Future[CouchbaseDocument[JsonValue]] =
88+
underlying
89+
.get(id)
90+
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
91+
.asScala
92+
.map(gr => new CouchbaseDocument[JsonValue](id, asJsonValue(gr)))(ExecutionContext.parasitic)
93+
94+
override def getBytes(id: String, timeout: FiniteDuration): Future[CouchbaseDocument[Array[Byte]]] =
95+
underlying
96+
.get(id)
97+
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
98+
.asScala
99+
.map(gr => new CouchbaseDocument[Array[Byte]](id, gr.contentAsBytes()))(ExecutionContext.parasitic)
100+
101+
override def upsert[T](id: String, document: T): Future[Done] = {
102+
underlying
103+
.upsert(id,
104+
document,
105+
UpsertOptions
106+
.upsertOptions()
107+
.transcoder(chooseTranscoder(document.getClass)))
108+
.asScala
109+
.map(_ => Done)(ExecutionContext.parasitic)
110+
}
111+
112+
override def upsert[T](id: String, document: T, upsertOptions: UpsertOptions): Future[Done] = {
113+
underlying
114+
.upsert(id, document, upsertOptions)
115+
.asScala
116+
.map(_ => Done)(ExecutionContext.parasitic)
117+
}
118+
119+
override def upsert[T](id: String,
120+
document: T,
121+
upsertOptions: UpsertOptions,
122+
timeout: FiniteDuration): Future[Done] = {
123+
underlying
124+
.upsert(id, document, upsertOptions)
125+
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
126+
.asScala
127+
.map(_ => Done)(ExecutionContext.parasitic)
128+
}
129+
130+
override def replace[T](id: String, document: T): Future[Done] = {
131+
val replaceOptions = ReplaceOptions.replaceOptions()
132+
replaceOptions.transcoder(chooseTranscoder(document.getClass))
133+
underlying
134+
.replace(id, document, replaceOptions)
135+
.asScala
136+
.map(_ => Done)(ExecutionContext.parasitic)
137+
}
138+
139+
override def replace[T](id: String, document: T, replaceOptions: ReplaceOptions): Future[Done] = {
140+
underlying
141+
.replace(id, document, replaceOptions)
142+
.asScala
143+
.map(_ => Done)(ExecutionContext.parasitic)
144+
}
145+
146+
override def replace[T](id: String,
147+
document: T,
148+
replaceOptions: ReplaceOptions,
149+
timeout: FiniteDuration): Future[Done] = {
150+
underlying
151+
.replace(id, document, replaceOptions)
152+
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
153+
.asScala
154+
.map(_ => Done)(ExecutionContext.parasitic)
155+
}
156+
157+
override def remove(id: String): Future[Done] =
158+
underlying
159+
.remove(id)
160+
.asScala
161+
.map(_ => Done)(ExecutionContext.parasitic)
162+
163+
override def remove(id: String, removeOptions: RemoveOptions): Future[Done] =
164+
underlying
165+
.remove(id, removeOptions)
166+
.asScala
167+
.map(_ => Done)(ExecutionContext.parasitic)
168+
169+
override def remove(id: String, removeOptions: RemoveOptions, timeout: FiniteDuration): Future[Done] =
170+
underlying
171+
.remove(id, removeOptions)
172+
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
173+
.asScala
174+
.map(_ => Done)(ExecutionContext.parasitic)
175+
176+
override def createIndex(indexName: String,
177+
createQueryIndexOptions: CreateQueryIndexOptions,
178+
fields: String*): Future[Done] =
179+
underlying
180+
.queryIndexes()
181+
.createIndex(indexName, util.Arrays.asList(fields: _*), createQueryIndexOptions)
182+
.asScala
183+
.map(_ => Done)(ExecutionContext.parasitic)
184+
185+
override def listIndexes(): Source[QueryIndex, NotUsed] =
186+
Source.fromPublisher(
187+
RxReactiveStreams.toPublisher(
188+
Observable
189+
.from(underlying.queryIndexes().getAllIndexes())
190+
.flatMap(indexes => Observable.from(indexes))
191+
)
192+
)
193+
194+
private def chooseTranscoder[T](target: Class[T]): Transcoder = {
195+
if (target == classOf[Array[Byte]]) RawBinaryTranscoder.INSTANCE
196+
else if (target == classOf[String]) RawStringTranscoder.INSTANCE
197+
else bucketSession.cluster().environment().transcoder()
198+
}
199+
}

0 commit comments

Comments
 (0)