|
@@ -159,19 +159,25 @@ inline fun <T> Flow<T>.scope(
|
|
|
*
|
|
|
* @param deferredArray 一系列并发任务
|
|
|
*/
|
|
|
+@OptIn(ExperimentalCoroutinesApi::class)
|
|
|
@Suppress("SuspendFunctionOnCoroutineScope")
|
|
|
suspend fun <T> CoroutineScope.fastest(vararg deferredArray: Deferred<T>): T {
|
|
|
val chan = Channel<T>()
|
|
|
+ val mutex = Mutex()
|
|
|
deferredArray.forEach {
|
|
|
- launch {
|
|
|
+ launch(Dispatchers.IO) {
|
|
|
try {
|
|
|
val result = it.await()
|
|
|
- NetCancel.cancel(coroutineContext[CoroutineExceptionHandler])
|
|
|
- chan.send(result)
|
|
|
+ mutex.withLock {
|
|
|
+ NetCancel.cancel(coroutineContext[CoroutineExceptionHandler])
|
|
|
+ chan.send(result)
|
|
|
+ }
|
|
|
} catch (e: Exception) {
|
|
|
it.cancel()
|
|
|
val allFail = deferredArray.all { it.isCancelled }
|
|
|
- if (allFail) throw e else e.printStackTrace()
|
|
|
+ if (allFail) throw e else {
|
|
|
+ if (e !is CancellationException) e.printStackTrace()
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -187,27 +193,26 @@ suspend fun <T> CoroutineScope.fastest(vararg deferredArray: Deferred<T>): T {
|
|
|
*/
|
|
|
@OptIn(ExperimentalCoroutinesApi::class)
|
|
|
@Suppress("SuspendFunctionOnCoroutineScope")
|
|
|
-suspend fun <T> CoroutineScope.fastest(vararg deferredArray: DeferredTransform<T>): T {
|
|
|
- val chan = Channel<T>()
|
|
|
+suspend fun <T, R> CoroutineScope.fastest(vararg deferredArray: DeferredTransform<T, R>): R {
|
|
|
+ val chan = Channel<R>()
|
|
|
val mutex = Mutex()
|
|
|
deferredArray.forEach {
|
|
|
launch(Dispatchers.IO) {
|
|
|
try {
|
|
|
val result = it.deferred.await()
|
|
|
- NetCancel.cancel(coroutineContext[CoroutineExceptionHandler])
|
|
|
mutex.withLock {
|
|
|
+ NetCancel.cancel(coroutineContext[CoroutineExceptionHandler])
|
|
|
if (!chan.isClosedForSend) {
|
|
|
val transformResult = it.block(result)
|
|
|
chan.send(transformResult)
|
|
|
- chan.close()
|
|
|
}
|
|
|
}
|
|
|
} catch (e: Exception) {
|
|
|
it.deferred.cancel()
|
|
|
val allFail = deferredArray.all { it.deferred.isCancelled }
|
|
|
- if (allFail) throw e else e.printStackTrace()
|
|
|
- } finally {
|
|
|
- chan.close()
|
|
|
+ if (allFail) throw e else {
|
|
|
+ if (e !is CancellationException) e.printStackTrace()
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|