Skip to content

Commit

Permalink
kotlin coroutines test finished
Browse files Browse the repository at this point in the history
  • Loading branch information
sheinbergon committed Sep 20, 2020
1 parent 4a3bbd4 commit 65d9a42
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 30 deletions.
4 changes: 3 additions & 1 deletion knitter/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ plugins {
dependencies {
api project(':needle-core')
implementation project(':needle-concurrent')
testImplementation project(':needle-core').sourceSets.test.output

implementation "org.jetbrains.kotlin:kotlin-stdlib"
compileOnly 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9'

testImplementation project(':needle-core').sourceSets.test.output
testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9'
}

task sourcesJar(type: Jar, dependsOn: classes) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,50 +1,45 @@
package org.sheinbergon.needle.knitter.coroutines

import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Runnable
import kotlinx.coroutines.asCoroutineDispatcher
import org.sheinbergon.needle.AffinityDescriptor
import org.sheinbergon.needle.concurrent.FixedAffinityPinnedThreadFactory
import org.sheinbergon.needle.concurrent.GovernedAffinityPinnedThreadFactory
import org.sheinbergon.needle.concurrent.PinnedThreadPoolExecutor
import kotlin.coroutines.CoroutineContext

object PinnedDispatchers {
private const val `1` = 1

private const val `1` = 1
private class GovernedAffinityDelegatingDispatcher(
parallelism: Int,
affinity: AffinityDescriptor
) : GovernedAffinityDispatcher() {

private class GovernedAffinityDelegatingDispatcher(
parallelism: Int,
affinity: AffinityDescriptor
) : GovernedAffinityDispatcher() {
val factory: GovernedAffinityPinnedThreadFactory = GovernedAffinityPinnedThreadFactory(affinity)

val factory: GovernedAffinityPinnedThreadFactory
private val delegate: CoroutineDispatcher

val delegate: CoroutineDispatcher

init {
factory = GovernedAffinityPinnedThreadFactory(affinity)
val executor = PinnedThreadPoolExecutor.newFixedPinnedThreadPool(parallelism, factory)
delegate = executor.asCoroutineDispatcher()
}
init {
val executor = PinnedThreadPoolExecutor.newFixedPinnedThreadPool(parallelism, factory)
delegate = executor.asCoroutineDispatcher()
}

override fun alter(affinity: AffinityDescriptor) = factory.alter(affinity, true)
override fun alter(affinity: AffinityDescriptor) = factory.alter(affinity, true)

override fun dispatch(context: CoroutineContext, block: Runnable) = delegate.dispatch(context, block)
}
override fun dispatch(context: CoroutineContext, block: Runnable) = delegate.dispatch(context, block)
}

fun governedAffinitySingleThread(affinity: AffinityDescriptor): GovernedAffinityDispatcher =
governedAffinityThreadPool(`1`, affinity)
fun governedAffinitySingleThread(affinity: AffinityDescriptor): GovernedAffinityDispatcher =
governedAffinityThreadPool(`1`, affinity)

fun governedAffinityThreadPool(parallelism: Int, affinity: AffinityDescriptor): GovernedAffinityDispatcher =
GovernedAffinityDelegatingDispatcher(parallelism, affinity)
fun governedAffinityThreadPool(parallelism: Int, affinity: AffinityDescriptor): GovernedAffinityDispatcher =
GovernedAffinityDelegatingDispatcher(parallelism, affinity)

fun fixedAffinitySingleThread(affinity: AffinityDescriptor) =
fixedAffinityThreadPool(`1`, affinity)
fun fixedAffinitySingleThread(affinity: AffinityDescriptor) =
fixedAffinityThreadPool(`1`, affinity)

fun fixedAffinityThreadPool(parallelism: Int, affinity: AffinityDescriptor): CoroutineDispatcher {
val factory = FixedAffinityPinnedThreadFactory(affinity)
val executor = PinnedThreadPoolExecutor.newFixedPinnedThreadPool(parallelism, factory)
return executor.asCoroutineDispatcher()
}
fun fixedAffinityThreadPool(parallelism: Int, affinity: AffinityDescriptor): CoroutineDispatcher {
val factory = FixedAffinityPinnedThreadFactory(affinity)
val executor = PinnedThreadPoolExecutor.newFixedPinnedThreadPool(parallelism, factory)
return executor.asCoroutineDispatcher()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package org.sheinbergon.needle.knitter.coroutines

import kotlinx.coroutines.*
import org.amshove.kluent.shouldBeEqualTo
import org.amshove.kluent.shouldBeLessOrEqualTo
import org.junit.jupiter.api.Test
import org.sheinbergon.needle.*

class PinnedDispatchersTest {

@Test
fun `Fixed affinity single threaded dispatcher`() {
val dispatcher = fixedAffinitySingleThread(testAffinityDescriptor)
val deferred = deferredAffinitySingleAsync(dispatcher)
runBlocking { blockingAssertSingle(deferred, binaryTestMask, textTestMask) }
}

@Test
fun `Fixed affinity thread-pool dispatcher`() {
val dispatcher = fixedAffinityThreadPool(availableCores, negatedTestAffinityDescriptor)
val deferred = deferredAffinityPoolAsync(availableCores, dispatcher)
runBlocking { blockingAssertPool(availableCores, deferred, negatedBinaryTestMask, negatedTextTestMask) }
}

@Test
fun `Governed affinity single threaded dispatcher`() {
val dispatcher = governedAffinitySingleThread(testAffinityDescriptor)
val deferred1 = deferredAffinitySingleAsync(dispatcher)
runBlocking { blockingAssertSingle(deferred1, binaryTestMask, textTestMask) }
dispatcher.alter(negatedTestAffinityDescriptor)
val deferred2 = deferredAffinitySingleAsync(dispatcher)
runBlocking { blockingAssertSingle(deferred2, negatedBinaryTestMask, negatedTextTestMask) }
}

@Test
fun `Governed affinity thread-pool dispatcher`() {
val dispatcher = governedAffinityThreadPool(availableCores, negatedTestAffinityDescriptor)
val deferred1 = deferredAffinityPoolAsync(availableCores, dispatcher)
runBlocking { blockingAssertPool(availableCores, deferred1, negatedBinaryTestMask, negatedTextTestMask) }
dispatcher.alter(testAffinityDescriptor)
val deferred2 = deferredAffinityPoolAsync(availableCores, dispatcher)
runBlocking { blockingAssertPool(availableCores, deferred2, binaryTestMask, textTestMask) }
}

private fun deferredAffinitySingleAsync(dispatcher: CoroutineDispatcher) =
GlobalScope.async(dispatcher) { Needle.affinity() }

private fun deferredAffinityPoolAsync(cores: Int, dispatcher: CoroutineDispatcher) = (`1`..cores)
.map {
GlobalScope.async(dispatcher) {
Thread.currentThread() to Needle.affinity()
}
}

private suspend fun blockingAssertSingle(
deferred: Deferred<AffinityDescriptor>,
binaryMask: Long,
textMask: String
) {
val affinity = deferred.await()
affinity.mask() shouldBeEqualTo binaryMask
affinity.toString() shouldBeEqualTo textMask
}

private suspend fun blockingAssertPool(
cores: Int,
deferred: List<Deferred<Pair<Thread, AffinityDescriptor>>>,
binaryMask: Long,
textMask: String
) {
val results = deferred.awaitAll()
val threads = results.mapTo(mutableSetOf(), Pair<Thread, *>::first)
threads.size shouldBeLessOrEqualTo cores
results.forEach { (_, affinity) ->
affinity.mask() shouldBeEqualTo binaryMask
affinity.toString() shouldBeEqualTo textMask
}
}
}

0 comments on commit 65d9a42

Please sign in to comment.