Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ProblemFilters.exclude[Problem]("akka.stream.alpakka.couchbase.*")
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.couchbase

import scala.reflect.ClassTag

class CouchbaseDocument[T: ClassTag](val id: String, val document: T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the constructor part of the public API also for Java? (That could make sense for example for users constructing CouchBaseDocument instances in their tests) In that case T : ClassTag is problematic and there should be an explicit constructor for Java accepting Class<T>.

Also, make it final if not intended for user subclassing as public API.

def getId: String = id;
def getDocument: T = document;

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package akka.stream.alpakka.couchbase
import akka.annotation.InternalApi
import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.json.JsonObject

/**
* Describes a Couchbase related failure with an error code.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ object CouchbaseSessionRegistry extends ExtensionId[CouchbaseSessionRegistry] wi

final class CouchbaseSessionRegistry(system: ExtendedActorSystem) extends Extension {

import CouchbaseSessionRegistry._

private val blockingDispatcher = system.dispatchers.lookup("akka.actor.default-blocking-io-dispatcher")

import CouchbaseSessionRegistry._

private val clusterRegistry = new CouchbaseClusterRegistry(system)

private val sessions = new AtomicReference(Map.empty[SessionKey, Future[CouchbaseSession]])

/**
* Scala API: Get an existing session or start a new one with the given settings and bucket name,
* Scala API: Get an existing session or start a new one with the given settings and bucket, scope and collection names,
* makes it possible to share one session across plugins.
*
* Note that the session must not be stopped manually, it is shut down when the actor system is shutdown,
Expand Down Expand Up @@ -90,9 +90,7 @@ final class CouchbaseSessionRegistry(system: ExtendedActorSystem) extends Extens
// we won cas, initialize session
val session = clusterRegistry
.clusterFor(key.settings)
.flatMap(cluster => CouchbaseSession(cluster, key.bucketName)(blockingDispatcher))(
ExecutionContext.parasitic
)
.flatMap(cluster => CouchbaseSession(cluster, key.bucketName))(blockingDispatcher)
promise.completeWith(session)
promise.future
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.couchbase.impl

import akka.annotation.InternalApi
import akka.stream.alpakka.couchbase.CouchbaseDocument
import akka.stream.alpakka.couchbase.scaladsl.{CouchbaseCollectionSession, CouchbaseSession}
import akka.stream.scaladsl.Source
import akka.{Done, NotUsed}
import com.couchbase.client.java.codec.{RawBinaryTranscoder, RawStringTranscoder, Transcoder}
import com.couchbase.client.java.json.JsonValue
import com.couchbase.client.java.kv._
import com.couchbase.client.java.manager.query.{CreateQueryIndexOptions, QueryIndex}
import com.couchbase.client.java.{AsyncCollection, AsyncScope}
import rx.{Observable, RxReactiveStreams}

import java.util
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.FutureConverters.CompletionStageOps
import scala.reflect.ClassTag

/**
* INTERNAL API
*/
@InternalApi
private[couchbase] class CouchbaseCollectionSessionImpl(bucketSession: CouchbaseSession,
scopeName: String,
collectionName: String)
extends CouchbaseCollectionSession {

override def bucket: CouchbaseSession = bucketSession

override def scope: AsyncScope = bucket.underlying.scope(scopeName)
override def underlying: AsyncCollection = scope.collection(collectionName)

override def asJava = new CouchbaseCollectionSessionJavaAdapter(this)

override def insert[T: ClassTag](id: String, document: T): Future[Done] = {
underlying
.insert(id,
document,
InsertOptions
.insertOptions()
.transcoder(chooseTranscoder[T]()))
.asScala
.map(_ => Done)(ExecutionContext.parasitic)
}

override def insert[T: ClassTag](id: String, document: T, insertOptions: InsertOptions): Future[Done] = {
if (insertOptions.build.transcoder() == null) {
insertOptions.transcoder(chooseTranscoder[T]())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not great mutating a parameter passed in through a public API like this but since InsertOptions is mutable and there is no copy method in that API I guess there is now way around it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing this out. I think it is better to remove this altogether and have the client error out if it is unable to select a transcoder or if it was not provided — the developers can always fix that by adding an explicit transcoder into the options.

}
underlying
.insert(id, document, insertOptions)
.asScala
.map(_ => Done)(ExecutionContext.parasitic)
}

override def get[T: ClassTag](id: String): Future[CouchbaseDocument[T]] = {
val target: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
underlying
.get(id, GetOptions.getOptions.transcoder(chooseTranscoder[T]()))
.thenApply(gr => new CouchbaseDocument[T](id, gr.contentAs(target)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will likely run on the common FJP if not already completed when thenApply is invoked (in which case it executes on the calling thread), better do it as a map on the Scala Future where execution context is explicit, to keep it on the same thread pool as the other stuff here.

Same for the other thenApplys in this class.

.asScala
}

override def getDocument(id: String): Future[CouchbaseDocument[JsonValue]] =
underlying.get(id).thenApply(gr => new CouchbaseDocument[JsonValue](id, asJsonValue(gr))).asScala

private def asJsonValue(gr: GetResult) =
try {
gr.contentAsObject().asInstanceOf[JsonValue]
} catch {
case ex: Exception => gr.contentAsArray().asInstanceOf[JsonValue]
}

override def getBytes(id: String): Future[CouchbaseDocument[Array[Byte]]] =
underlying.get(id).thenApply(gr => new CouchbaseDocument[Array[Byte]](id, gr.contentAsBytes())).asScala

override def getDocument(id: String, timeout: FiniteDuration): Future[CouchbaseDocument[JsonValue]] =
underlying
.get(id)
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
.thenApply(gr => new CouchbaseDocument[JsonValue](id, asJsonValue(gr)))
.asScala

override def getBytes(id: String, timeout: FiniteDuration): Future[CouchbaseDocument[Array[Byte]]] =
underlying
.get(id)
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
.thenApply(gr => new CouchbaseDocument[Array[Byte]](id, gr.contentAsBytes()))
.asScala

override def upsert[T: ClassTag](id: String, document: T): Future[Done] = {
underlying
.upsert(id,
document,
UpsertOptions
.upsertOptions()
.transcoder(chooseTranscoder[T]()))
.asScala
.map(_ => Done)(ExecutionContext.parasitic)
}

override def upsert[T: ClassTag](id: String, document: T, upsertOptions: UpsertOptions): Future[Done] = {
if (upsertOptions.build().transcoder() == null) {
upsertOptions.transcoder(chooseTranscoder[T]())
}
underlying
.upsert(id, document, upsertOptions)
.thenApply(_ => Done)
.asScala
}

override def upsert[T: ClassTag](id: String,
document: T,
upsertOptions: UpsertOptions,
timeout: FiniteDuration): Future[Done] = {
if (upsertOptions.build().transcoder() == null) {
upsertOptions.transcoder(chooseTranscoder[T]())
}
underlying
.upsert(id, document, upsertOptions)
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
.thenApply(_ => Done)
.asScala
}

override def replace[T: ClassTag](id: String, document: T): Future[Done] =
underlying
.replace(id, document)
.thenApply(_ => Done)
.asScala

override def replace[T: ClassTag](id: String, document: T, replaceOptions: ReplaceOptions): Future[Done] = {
if (replaceOptions.build.transcoder() == null) {
replaceOptions.transcoder(chooseTranscoder[T]())
}
underlying
.replace(id, document, replaceOptions)
.thenApply(_ => Done)
.asScala
}

override def replace[T: ClassTag](id: String,
document: T,
replaceOptions: ReplaceOptions,
timeout: FiniteDuration): Future[Done] = {
if (replaceOptions.build.transcoder() == null) {
replaceOptions.transcoder(chooseTranscoder[T]())
}
underlying
.replace(id, document, replaceOptions)
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
.thenApply(_ => Done)
.asScala
}

override def remove(id: String): Future[Done] =
underlying
.remove(id)
.thenApply(_ => Done)
.asScala

override def remove(id: String, removeOptions: RemoveOptions): Future[Done] =
underlying
.remove(id, removeOptions)
.thenApply(_ => Done)
.asScala

override def remove(id: String, removeOptions: RemoveOptions, timeout: FiniteDuration): Future[Done] =
underlying
.remove(id, removeOptions)
.orTimeout(timeout.toMillis, TimeUnit.MILLISECONDS)
.thenApply(_ => Done)
.asScala

override def createIndex(indexName: String,
createQueryIndexOptions: CreateQueryIndexOptions,
fields: String*): Future[Done] =
underlying
.queryIndexes()
.createIndex(indexName, util.Arrays.asList(fields: _*), createQueryIndexOptions)
.thenApply(_ => Done)
.asScala

override def listIndexes(): Source[QueryIndex, NotUsed] =
Source.fromPublisher(
RxReactiveStreams.toPublisher(
Observable
.from(underlying.queryIndexes().getAllIndexes())
.flatMap(indexes => Observable.from(indexes))
)
)

private def chooseTranscoder[T: ClassTag](): Transcoder = {
val target: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
if (target == classOf[Array[Byte]]) RawBinaryTranscoder.INSTANCE
else if (target == classOf[String]) RawStringTranscoder.INSTANCE
else bucketSession.cluster().environment().transcoder()
}
}
Loading
Loading