-
Notifications
You must be signed in to change notification settings - Fork 637
Couchbase: Updates the integration to support collections #3396
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…se dependencies; Tests are passing but I need to add more coverage.
|
Hi @chedim, Thank you for your contribution! We really value the time you've taken to put this together. Before we proceed with reviewing this pull request, please sign the Akka Contributors License Agreement: |
Working on it. Hopefully, it'll not take long 🤞 |
...hbase/src/main/scala/akka/stream/alpakka/couchbase/impl/CouchbaseCollectionSessionImpl.scala
Outdated
Show resolved
Hide resolved
|
Hi @chedim, Thank you for your contribution! We really value the time you've taken to put this together. Before we proceed with reviewing this pull request, please sign the Akka Contributors License Agreement: |
|
We have received the CLA. |
|
We signed the CLA. Authorized logins: chedim and deniswrosa |
|
Closing and re-opening to trigger the CLA validator again. |
|
@ennru @sebastian-alfers @leviramsey Could somebody please review the PR whenever they get a chance? Thank you! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few initial comments/questions...
...hbase/src/main/scala/akka/stream/alpakka/couchbase/impl/CouchbaseCollectionSessionImpl.scala
Outdated
Show resolved
Hide resolved
...hbase/src/main/scala/akka/stream/alpakka/couchbase/impl/CouchbaseCollectionSessionImpl.scala
Outdated
Show resolved
Hide resolved
...hbase/src/main/scala/akka/stream/alpakka/couchbase/impl/CouchbaseCollectionSessionImpl.scala
Outdated
Show resolved
Hide resolved
...hbase/src/main/scala/akka/stream/alpakka/couchbase/impl/CouchbaseCollectionSessionImpl.scala
Outdated
Show resolved
Hide resolved
couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseFlow.scala
Outdated
Show resolved
Hide resolved
|
@chedim We will likely do an Alpakka release in the near future, is this something you would be able to get back to? |
99e1633 to
982701c
Compare
|
@ennru , Updated the PR according to comments — improved method signatures, replaced tuples with CouchbaseDocument and put futureFlows as requested. Please re-review and LMK if any additional changes are needed. Thank you! |
|
I pushed a few formalities forward, but there are even compilation errors. The docs still try to include snippets that don't exist in your new implementation The last obvious bit is MiMa, which triggers as the API changed. That is OK and we can add filters to silence it. |
|
Looking into the compilation issues, sorry for missing them. |
- Fixes a compilation error under jdk11; - updates unit tests to work with docker-compose.yaml docker couchbase instance; - updates couchbase_prep compose service to create required collection - updates to documentation related to the new APIs
884c976 to
55c35fc
Compare
|
HI @ennru, I've updated the PR — compilation errors were on jdk11 (and I was running tests with jdk24, silly me). I've also updated the documentation and docker-compose definitions for ci so that they use newer CB images and create required for tests collections. Tested under jdk11 with please lmk if there are any suggestions or changes you'd like me to make. Thank you! |
|
Updated the PR with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good, only a few minor things
couchbase/src/main/scala/akka/stream/alpakka/couchbase/CouchbaseDocument.scala
Outdated
Show resolved
Hide resolved
| .clusterFor(key.settings) | ||
| .flatMap(cluster => CouchbaseSession(cluster, key.bucketName)(blockingDispatcher))( | ||
| .flatMap(cluster => CouchbaseSession(cluster, key.bucketName))( | ||
| ExecutionContext.parasitic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's important to only use ExecutionContext.parasitic when it's absolutely certain that there will be no exceptions. That might not be the case here? createClusterClient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've returned the code that was using blockingDispatcher, please lmk if you want to use something else here.
Thank you!
| * @tparam T type of the object to return | ||
| * @return a future that completes with created object or errors if the operation failed | ||
| */ | ||
| def get[T](id: String, target: Class[T]): Future[CouchbaseDocument[T]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Scala it would be more idiomatic to use:
def get[T: ClassTag](id: String): Future[CouchbaseDocument[T]]
If you need the actual class of that ClassTag you can use:
val clazz = implicitly[ClassTag[T]].runtimeClass
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went ahead and applied this suggestion everywhere I was using generics, please lmk if that's not what you meant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps that was going too far. I haven't looked at all usages, but in general it should only be used when you actually need the Class or don't have the T as another parameter. This get[T] is a good example. I wouldn't use it as a general replacement for [T].
Also, it must not leak to any javadsl, since this is a Scala only thing.
couchbase/src/main/scala/akka/stream/alpakka/couchbase/scaladsl/CouchbaseFlow.scala
Outdated
Show resolved
Hide resolved
154ceab to
ec0bbed
Compare
|
@patriknw , updated the PR according to the comments, had couple of questions if I understood the comments correctly. |
|
A compiler warning in the example needs to be avoided: |
|
Should be fixed now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a round of reviewing with some feedback
|
|
||
| override def insert[T: ClassTag](id: String, document: T, insertOptions: InsertOptions): Future[Done] = { | ||
| if (insertOptions.build.transcoder() == null) { | ||
| insertOptions.transcoder(chooseTranscoder[T]()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not great mutating a parameter passed in through a public API like this but since InsertOptions is mutable and there is no copy method in that API I guess there is now way around it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pointing this out. I think it is better to remove this altogether and have the client error out if it is unable to select a transcoder or if it was not provided — the developers can always fix that by adding an explicit transcoder into the options.
| val target: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]] | ||
| underlying | ||
| .get(id, GetOptions.getOptions.transcoder(chooseTranscoder[T]())) | ||
| .thenApply(gr => new CouchbaseDocument[T](id, gr.contentAs(target))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will likely run on the common FJP if not already completed when thenApply is invoked (in which case it executes on the calling thread), better do it as a map on the Scala Future where execution context is explicit, to keep it on the same thread pool as the other stuff here.
Same for the other thenApplys in this class.
| document: T, | ||
| replaceOptions: ReplaceOptions, | ||
| timeout: Duration): CompletionStage[Done] = | ||
| delegate.replace(id, document, replaceOptions, FiniteDuration.apply(timeout.toNanos, TimeUnit.NANOSECONDS)).asJava |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use .toScala for the Duration instead, from import scala.jdk.DurationConverters._
| override def underlying: AsyncBucket = cluster.bucket(bucketName) | ||
|
|
||
| override def streamedQuery(query: String): Source[JsonObject, NotUsed] = { | ||
| Source.fromIterator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was an async stream, but now switched to a blocking iterator, is there no way to get an asynchronous stream/publisher from the client anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm updating it to use RxReactiveStreams as it was before. Unfortunately, there is no way to stream results from the server as they arrive, only after all of them are received and collected into a list.
| override def singleResponseQuery(query: String): Future[Option[JsonObject]] = | ||
| cluster | ||
| .query(query) | ||
| .thenApply(getSingleResult(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as abovve with thenApply here
| enrichedSettings.nodes.mkString(","), | ||
| clusterOptions | ||
| ) | ||
| }).andThen(c => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be intentional, but just to make sure c in andThen is a Try and this will execute regardless if it failed or succeeded.
Also a few style things: this is Java syntax for lambdas, in Scala we'd generally use
.andThen { c =>
...
...
}
or
.andThen({ c =>
...
...
})
When a parameter is not actually used, use _ to mark it as ignored.
| clusterOptions | ||
| ) | ||
| }).andThen(c => { | ||
| log.debug("created couchbase cluster client for " + enrichedSettings.username) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use placeholders for logging:
| log.debug("created couchbase cluster client for " + enrichedSettings.username) | |
| log.debug("created couchbase cluster client for {}", enrichedSettings.username) |
This avoids the string concatenation in case debug logging is disabled.
| .recover(err => { | ||
| log.error("failed to create couchbase cluster", err) | ||
| throw err | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the only intent is to side effect (the logging) this could potentially be covered by a singe andThen:
.andThen { result =>
log.debug("created couchbase cluster client for {}", enrichedSettings.username)
result.failed.foreach { exception =>
log.error("failed to create couchbase cluster", exception)
}
}We are already returning a failed future with the exception though, so logging at error here seems superflous?
| def counter(id: String, delta: Long, initial: Long, writeSettings: CouchbaseWriteSettings): Future[Long] | ||
| private val collectionSessions = new AtomicReference( | ||
| mutable.WeakHashMap.empty[(String, String), CouchbaseCollectionSession] | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The general pattern for doing this kind of compare and swap is that the collection inside would be immutable.
I see this is safe since it is probably safe since cloned before adding an entry on each try, but better to use Java concurrent hash map in this case.
WeakHashMap and weak references are problematic for GC in general so I'd recommend not to try to use that as a cache eviction strategy.
If a pool is really needed it'd be better be done as an akka extension, but I think we should take a step back and consider if this is really worth it. And if it is worth a pool of clients, let's do that a s a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking here was very similar — the number of collection sessions probably would be low and they will be reused often so, this shouldn't result in GC issues and having a pool of sessions feels a little bit as overkill here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problematic aspect, as I remember it is not the number of weak references, but using weak references at all, messing with GC. Better push that choice to user space and not do it in the library.
Static singleton caches like this also has problems for testing and if multiple actor systems are used in the same JVM for some reason, so we generally avoid that anyway.
|
|
||
| import scala.reflect.ClassTag | ||
|
|
||
| class CouchbaseDocument[T: ClassTag](val id: String, val document: T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the constructor part of the public API also for Java? (That could make sense for example for users constructing CouchBaseDocument instances in their tests) In that case T : ClassTag is problematic and there should be an explicit constructor for Java accepting Class<T>.
Also, make it final if not intended for user subclassing as public API.
References #3395
This PR introduces new
CouchbaseCollectionSessionclass that can be accessed fromCouchbaseSessionviaCouchbaseSession::collectionmethod. It also updates sinks, sources and flow constructors and their signatures to work with collections.All of these are breaking changes.
Please note that, although fascinated by the framework and some language features, I am not very familiar with either alpakka or scala and more of a Java dev. So, any suggestions on how to improve my changes are greatly appreciated.
Cheers!