Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package akka.stream.alpakka.couchbase
import akka.annotation.InternalApi
import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.json.JsonObject

/**
* Describes a Couchbase related failure with an error code.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,12 @@ final class CouchbaseSessionRegistry(system: ExtendedActorSystem) extends Extens

import CouchbaseSessionRegistry._

private val blockingDispatcher = system.dispatchers.lookup("akka.actor.default-blocking-io-dispatcher")

private val clusterRegistry = new CouchbaseClusterRegistry(system)

private val sessions = new AtomicReference(Map.empty[SessionKey, Future[CouchbaseSession]])

/**
* Scala API: Get an existing session or start a new one with the given settings and bucket name,
* Scala API: Get an existing session or start a new one with the given settings and bucket, scope and collection names,
* makes it possible to share one session across plugins.
*
* Note that the session must not be stopped manually, it is shut down when the actor system is shutdown,
Expand Down Expand Up @@ -90,7 +88,7 @@ final class CouchbaseSessionRegistry(system: ExtendedActorSystem) extends Extens
// we won cas, initialize session
val session = clusterRegistry
.clusterFor(key.settings)
.flatMap(cluster => CouchbaseSession(cluster, key.bucketName)(blockingDispatcher))(
.flatMap(cluster => CouchbaseSession(cluster, key.bucketName))(
ExecutionContext.parasitic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's important to only use ExecutionContext.parasitic when it's absolutely certain that there will be no exceptions. That might not be the case here? createClusterClient?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've returned the code that was using blockingDispatcher, please lmk if you want to use something else here.
Thank you!

)
promise.completeWith(session)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package akka.stream.alpakka.couchbase.impl

import akka.NotUsed
import akka.annotation.InternalApi
import akka.stream.alpakka.couchbase.scaladsl.{CouchbaseCollectionSession, CouchbaseSession}
import akka.stream.scaladsl.Source
import com.couchbase.client.java.codec.{JsonTranscoder, RawBinaryTranscoder, RawStringTranscoder, Transcoder}
import com.couchbase.client.java.json.JsonValue
import com.couchbase.client.java.kv._
import com.couchbase.client.java.manager.query.{CreateQueryIndexOptions, QueryIndex}
import com.couchbase.client.java.{AsyncCollection, AsyncScope}
import rx.{Observable, RxReactiveStreams}

import java.util
import java.util.concurrent.TimeUnit
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.FiniteDuration
import scala.jdk.FutureConverters.CompletionStageOps

@InternalApi
class CouchbaseCollectionSessionImpl(bucketSession: CouchbaseSession, scopeName: String, collectionName: String) extends CouchbaseCollectionSession{

override def bucket: CouchbaseSession = bucketSession

override def scope: AsyncScope = bucket.underlying.scope(scopeName)
override def underlying: AsyncCollection = scope.collection(collectionName)

override def asJava = new CouchbaseCollectionSessionJavaAdapter(this)

override def insert[T] (document: (String, T)): Future[(String, T)] = {
underlying
.insert(document._1, document._2,
InsertOptions.insertOptions()
.transcoder(chooseTranscoder(document._2.getClass))
)
.asScala.map(_ => document)(ExecutionContext.parasitic)
}

override def insert[T] (document: (String, T), insertOptions: InsertOptions): Future[(String, T)] = {
underlying.insert(document._1, document._2, insertOptions)
.asScala
.map(r => document)(ExecutionContext.parasitic)
}

override def get[T](id: String, target: Class[T]): Future[(String, T)] = {
underlying.get(id, GetOptions.getOptions.transcoder(chooseTranscoder(target)))
.thenApply(gr => {
(id, gr.contentAs(target))
})
.asScala

}

override def getDocument(id: String): Future[(String, JsonValue)] =
underlying.get(id).thenApply(gr => (id, asJsonValue(gr))).asScala

private def asJsonValue(gr: GetResult) =
try {
gr.contentAsObject().asInstanceOf[JsonValue]
} catch {
case ex: Exception => gr.contentAsArray().asInstanceOf[JsonValue]
}

override def getBytes(id: String): Future[(String, Array[Byte])] =
underlying.get(id).thenApply(gr => (id, gr.contentAsBytes())).asScala

override def getDocument(id: String, timeout: FiniteDuration): Future[(String, JsonValue)] =
underlying.get(id)
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
.thenApply(gr => (id, asJsonValue(gr)))
.asScala

override def getBytes(id: String, timeout: FiniteDuration): Future[(String, Array[Byte])] =
underlying.get(id)
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
.thenApply(gr => (id, gr.contentAsBytes()))
.asScala

override def upsert[T](document: (String, T)): Future[(String, T)] = {
underlying.upsert(document._1, document._2,
UpsertOptions.upsertOptions()
.transcoder(chooseTranscoder(document._2.getClass))
)
.asScala.map(_ => document)(ExecutionContext.parasitic)
}

override def upsert[T](document: (String, T), upsertOptions: UpsertOptions): Future[(String, T)] =
underlying.upsert(document._1, document._2, upsertOptions)
.thenApply(_ => document)
.asScala

override def upsert[T](document: (String, T), upsertOptions: UpsertOptions, timeout: FiniteDuration): Future[(String, T)] =
underlying.upsert(document._1, document._2, upsertOptions)
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
.thenApply(_ => document)
.asScala


override def replace[T](document: (String, T)): Future[(String, T)] =
underlying.replace(document._1, document._2)
.thenApply(_ => document)
.asScala

override def replace[T](document: (String, T), replaceOptions: ReplaceOptions): Future[(String, T)] =
underlying.replace(document._1, document._2, replaceOptions)
.thenApply(_ => document)
.asScala

override def replace[T](document: (String, T), replaceOptions: ReplaceOptions, timeout: FiniteDuration): Future[(String, T)] =
underlying.replace(document._1, document._2, replaceOptions)
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
.thenApply(_ => document)
.asScala

override def remove(id: String): Future[String] =
underlying.remove(id)
.thenApply(_ => id)
.asScala

override def remove(id: String, removeOptions: RemoveOptions): Future[String] =
underlying.remove(id, removeOptions)
.thenApply(_ => id)
.asScala

override def remove(id: String, removeOptions: RemoveOptions, timeout: FiniteDuration): Future[String] =
underlying.remove(id, removeOptions)
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
.thenApply(_ => id)
.asScala

override def createIndex(indexName: String, createQueryIndexOptions: CreateQueryIndexOptions, fields: String*): Future[Void] =
underlying
.queryIndexes()
.createIndex(indexName, util.Arrays.asList(fields: _*), createQueryIndexOptions)
.asScala

override def listIndexes(): Source[QueryIndex, NotUsed] =
Source.fromPublisher(
RxReactiveStreams.toPublisher(
Observable.from(underlying.queryIndexes().getAllIndexes())
.flatMap(indexes => Observable.from(indexes))
)
)

private def chooseTranscoder[T](target: Class[T]): Transcoder =
target match {
case _: Class[Array[Byte]] => RawBinaryTranscoder.INSTANCE
case _: Class[String] => RawStringTranscoder.INSTANCE
case _ => bucketSession.cluster().environment().transcoder()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.couchbase.impl

import akka.NotUsed
import akka.annotation.InternalApi
import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession
import akka.stream.alpakka.couchbase.{javadsl, scaladsl}
import akka.stream.javadsl.Source
import com.couchbase.client.java.json.{JsonArray, JsonObject, JsonValue}
import com.couchbase.client.java.kv.{InsertOptions, RemoveOptions, ReplaceOptions, UpsertOptions}
import com.couchbase.client.java.manager.query.{CreateQueryIndexOptions, QueryIndex}
import com.couchbase.client.java.{AsyncCollection, AsyncScope}

import java.time.Duration
import java.util.concurrent.{CompletionStage, TimeUnit}
import scala.concurrent.duration.FiniteDuration
import scala.jdk.FutureConverters._

/**
* INTERNAL API
*/
@InternalApi
private[couchbase] final class CouchbaseCollectionSessionJavaAdapter(delegate: scaladsl.CouchbaseCollectionSession)
extends javadsl.CouchbaseCollectionSession {

override def asScala: scaladsl.CouchbaseCollectionSession = delegate

override def bucket: CouchbaseSession = delegate.bucket.asJava

override def scope: AsyncScope = delegate.scope

override def underlying: AsyncCollection = delegate.underlying

/**
* Insert a JSON document using the default write settings
*
* @param document A tuple where first element is id of the document and second is its value
* @return A Future that completes with the id of the written document when the write is done
*/
override def insert[T](document: (String, T)): CompletionStage[(String, T)] =
delegate.insert(document).asJava

override def insert[T](document: (String, T), insertOptions: InsertOptions): CompletionStage[(String, T)] =
delegate.insert(document, insertOptions).asJava

override def getJsonObject(id: String): CompletionStage[(String, JsonObject)] =
delegate.getJsonObject(id).asJava

override def getJsonArray(id: String): CompletionStage[(String, JsonArray)] =
delegate.getJsonArray(id).asJava

override def get[T](id: String, target: Class[T]): CompletionStage[(String, T)] =
delegate.get(id, target).asJava
/**
* @return A document if found or none if there is no document for the id
*/
override def getDocument(id: String): CompletionStage[(String, JsonValue)] =
delegate.getDocument(id).asJava

/**
* @param id Identifier of the document to fetch
* @return Raw data for the document or none
*/
override def getBytes(id: String): CompletionStage[(String, Array[Byte])] =
delegate.getBytes(id).asJava

/**
* @param timeout fail the returned future with a TimeoutException if it takes longer than this
* @return A document if found or none if there is no document for the id
*/
override def getDocument(id: String, timeout: Duration): CompletionStage[(String, JsonValue)] =
delegate.getDocument(id, FiniteDuration.apply(timeout.toNanos, TimeUnit.NANOSECONDS)).asJava

/**
* @return A raw document data if found or none if there is no document for the id
*/
override def getBytes(id: String, timeout: Duration): CompletionStage[(String, Array[Byte])] =
delegate.getBytes(id, FiniteDuration.apply(timeout.toNanos, TimeUnit.NANOSECONDS)).asJava

/**
* Upsert using the default write settings.
*
* @return a future that completes when the upsert is done
*/
override def upsert[T](document: (String, T)): CompletionStage[(String, T)] =
delegate.upsert(document).asJava

/**
* Upsert using the given write settings
*
* Separate from `upsert` to make the most common case smoother with the type inference
*
* @return a future that completes when the upsert is done
*/
override def upsert[T](document: (String, T), upsertOptions: UpsertOptions): CompletionStage[(String, T)] =
delegate.upsert(document, upsertOptions).asJava

/**
* Upsert using given write settings and timeout
*
* @param document document id and value to upsert
* @param upsertOptions Couchbase UpsertOptions
* @param timeout timeout for the operation
* @return the document id and value
*/
override def upsert[T](document: (String, T), upsertOptions: UpsertOptions, timeout: Duration): CompletionStage[(String, T)] =
delegate.upsert(document, upsertOptions, FiniteDuration.apply(timeout.toNanos, TimeUnit.NANOSECONDS)).asJava

/**
* Replace using the default write settings.
*
* For replacing other types of documents see `replaceDoc`.
*
* @return a future that completes when the replace is done
*/
override def replace[T](document: (String, T)): CompletionStage[(String, T)] =
delegate.replace(document).asJava

/**
* Replace using the given replace options
*
* For replacing other types of documents see `replaceDoc`.
*
* @return a future that completes when the replace is done
*/
override def replace[T](document: (String, T), replaceOptions: ReplaceOptions): CompletionStage[(String, T)] =
delegate.replace(document, replaceOptions).asJava

/**
* Replace using write settings and timeout
*
* @param document document id and value to replace
* @param replaceOptions Couchbase replace options
* @param timeout timeout for the operation
* @return the document id and value
*/
override def replace[T](document: (String, T), replaceOptions: ReplaceOptions, timeout: Duration): CompletionStage[(String, T)] =
delegate.replace(document, replaceOptions, FiniteDuration.apply(timeout.toNanos, TimeUnit.NANOSECONDS)).asJava

/**
* Remove a document by id using the default write settings.
*
* @return Future that completes when the document has been removed, if there is no such document
* the future is failed with a `DocumentDoesNotExistException`
*/
override def remove(id: String): CompletionStage[String] =
delegate.remove(id).asJava

/**
* Remove a document by id using the remove settings.
*
* @return Future that completes when the document has been removed, if there is no such document
* the future is failed with a `DocumentDoesNotExistException`
*/
override def remove(id: String, removeOptions: RemoveOptions): CompletionStage[String] =
delegate.remove(id, removeOptions).asJava

/**
* Removes document with given id, remove options and timeout
*
* @param id id of the document to remove
* @param removeOptions Couchbase remove options
* @param timeout timeout
* @return the id
*/
override def remove(id: String, removeOptions: RemoveOptions, timeout: Duration) =
delegate.remove(id, removeOptions, FiniteDuration.apply(timeout.toNanos, TimeUnit.NANOSECONDS)).asJava

/**
* Create a secondary index for the current collection.
*
* @param indexName the name of the index.
* @param createQueryIndexOptions Couchbase index options
* @param fields the JSON fields to index
* @return a [[scala.concurrent.Future]] of `true` if the index was/will be effectively created, `false`
* if the index existed and `ignoreIfExist` is `true`. Completion of the future does not guarantee the index is online
* and ready to be used.
*/
override def createIndex(indexName: String, createQueryIndexOptions: CreateQueryIndexOptions, fields: String*): CompletionStage[Void] =
delegate.createIndex(indexName, createQueryIndexOptions, fields: _*).asJava

/**
* List the existing secondary indexes for the collection
*/
override def listIndexes(): Source[QueryIndex, NotUsed] =
delegate.listIndexes().asJava
}
Loading