From 87afa63269ae386a4e47dc5c299e8227772d7a66 Mon Sep 17 00:00:00 2001 From: Keith Mayoral Date: Mon, 17 Mar 2025 14:47:23 -0700 Subject: [PATCH] Update to support latest versions of ktor, exposed, kotlin, etc --- .gitignore | 1 + gradle/libs.versions.toml | 40 +++++---- .../TaskSchedulingPluginTest.kt | 4 +- .../managers/lock/database/JdbcLockManager.kt | 38 ++++++--- .../build.gradle.kts | 2 + .../lock/database/MongoDBLockManager.kt | 81 ++++++++++++++++++- .../build.gradle.kts | 3 + .../managers/lock/redis/RedisLockManager.kt | 51 +++++++----- 8 files changed, 170 insertions(+), 50 deletions(-) diff --git a/.gitignore b/.gitignore index 40fa70c5..4c9bb8c3 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,4 @@ out/ .actrc /.run/publish all to local.run.xml /.run/publish kafka and run example.run.xml +/.kotlin diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index bad7ea35..6b5bbacd 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,22 +1,22 @@ [versions] # Core technologies -kotlin = "2.0.20" -java = "11" -kotlinx-serialization = "1.7.3" -kotlinx-io = "0.3.0" -ksp = "1.9.10-1.0.13" +kotlin = "2.1.10" +java = "17" +kotlinx-serialization = "1.8.0" +kotlinx-io = "0.7.0" +ksp = "2.1.10-1.0.31" # Web Framework -ktor = "3.0.0" +ktor = "3.1.1" # Android android = "8.1.0" # Database -h2 = "2.2.224" -postgres = "42.6.0" -exposed = "0.44.0" -mongodb = "5.2.0" +h2 = "2.3.232" +postgres = "42.7.5" +exposed = "0.60.0" +mongodb = "5.3.1" # Logging logback = "1.5.12" @@ -24,8 +24,8 @@ kotlin-logging = "5.1.0" logging_capabilities = "0.11.1" # Asynchronous and Concurrency -atomicfu = "0.25.0" -kotlinx-coroutines = "1.9.0" +atomicfu = "0.27.0" +kotlinx-coroutines = "1.10.1" # Testing #kotest = "6.0.0.M1" @@ -35,7 +35,7 @@ kotest-test-containers = "2.0.2" mockk = "1.13.4" mockative = "2.0.1" kmock = "0.3.0-rc08" -testcontainers = "1.19.8" +testcontainers = "1.20.6" redis-testcontainers = "1.6.4" # Code Quality and Coverage @@ -46,10 +46,11 @@ koverBadge = "0.0.6" detekt = "1.23.1" # Date and Time -kotlinx-datetime = "0.4.0" +kotlinx-datetime = "0.6.2" # Functional Programming arrow = "1.2.0" +reactor = "3.7.4" # Messaging kafka = "3.5.1" @@ -60,6 +61,8 @@ avro = "1.12.0" # Redis kreds = "0.9.1" redis-mp-client = "0.0.3" +microutils = "3.0.5" # required for kreds lib that doesn't expose via api() - https://github.com/crackthecodeabhi/kreds/blob/main/build.gradle.kts#L74-L76 +netty = "4.1.104.Final" # required for kreds lib that doesn't expose via api() - https://github.com/crackthecodeabhi/kreds/blob/main/build.gradle.kts#L74-L76 # Documentation dokka = "1.9.10" @@ -71,8 +74,8 @@ gradle-release = "3.0.2" nexusPublish = "2.0.0-rc-1" # Miscellaneous -krontab = "2.2.1" -uuid = "0.8.1" +krontab = "2.7.2" +uuid = "0.8.4" [libraries] # Core libraries @@ -83,9 +86,11 @@ kotlinx-serialization-core = { module = "org.jetbrains.kotlinx:kotlinx-serializa kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json", version.ref = "kotlinx-serialization" } kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "kotlinx-coroutines" } kotlinx-coroutines-test = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-test", version.ref = "kotlinx-coroutines" } +kotlinx-coroutines-reactive = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-reactive", version.ref = "kotlinx-coroutines" } kotlinx-datetime = { module = "org.jetbrains.kotlinx:kotlinx-datetime", version.ref = "kotlinx-datetime" } kotlinx-io-core = { module = "org.jetbrains.kotlinx:kotlinx-io-core", version.ref = "kotlinx-io" } kotlin-reflect = { module = "org.jetbrains.kotlin:kotlin-reflect", version.ref = "kotlin" } +reactor-core = { module = "io.projectreactor:reactor-core", version.ref = "reactor" } # Web Framework ktor-server-core = { module = "io.ktor:ktor-server-core", version.ref = "ktor" } @@ -188,6 +193,9 @@ gradle-release-gradlePlugin = { module = "net.researchgate:gradle-release", vers krontab = { module = "dev.inmo:krontab", version.ref = "krontab" } uuid = { module = "com.benasher44:uuid", version.ref = "uuid" } kreds = { module = "io.github.crackthecodeabhi:kreds", version.ref = "kreds" } +microutils-logging = { module = "io.github.microutils:kotlin-logging-jvm", version.ref = "microutils" } +netty-codec-redis = { module = "io.netty:netty-codec-redis", version.ref = "netty" } +netty-handler = { module = "io.netty:netty-handler", version.ref = "netty" } redis-mp-client = { module = "io.github.flaxoos:redis-client-multiplatform", version.ref = "redis-mp-client" } [plugins] diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingPluginTest.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingPluginTest.kt index 3ab87019..3b160247 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingPluginTest.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingPluginTest.kt @@ -7,7 +7,7 @@ import io.kotest.assertions.fail import io.kotest.core.spec.style.FunSpec import io.kotest.core.spec.style.scopes.ContainerScope import io.kotest.datatest.withData -import io.kotest.matchers.ints.shouldBeGreaterThan +import io.kotest.matchers.ints.shouldBeGreaterThanOrEqual import io.ktor.server.application.log import io.ktor.server.config.MapApplicationConfig import io.ktor.server.config.mergeWith @@ -77,7 +77,7 @@ abstract class TaskSchedulingPluginTest : FunSpec() { try { with(taskLogsAndApplications.map { it.first }.flatten()) { - size shouldBeGreaterThan executions - 2 + size shouldBeGreaterThanOrEqual executions - 2 with(groupingBy { it }.eachCount()) { val errors = this.mapNotNull { diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/JdbcLockManager.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/JdbcLockManager.kt index 8082a44a..b4bedfa8 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/JdbcLockManager.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/JdbcLockManager.kt @@ -16,7 +16,7 @@ import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.LiteralOp import org.jetbrains.exposed.sql.SchemaUtils import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq -import org.jetbrains.exposed.sql.SqlExpressionBuilder.neq +import org.jetbrains.exposed.sql.SqlExpressionBuilder.isNull import org.jetbrains.exposed.sql.Table import org.jetbrains.exposed.sql.and import org.jetbrains.exposed.sql.insertIgnore @@ -59,12 +59,12 @@ public class JdbcLockManager( database, transactionIsolation = Connection.TRANSACTION_READ_COMMITTED, ) { - repetitionAttempts = 0 + maxAttempts = 1 debug = true taskLockTable.insertIgnore { it[name] = task.name it[concurrencyIndex] = taskConcurrencyIndex - it[lockedAt] = Instant.fromEpochMilliseconds(0) + it[lockedAt] = null } }.insertedCount == 1 @@ -84,7 +84,6 @@ public class JdbcLockManager( selectClause( task, concurrencyIndex, - taskExecutionInstant, ) }, ) { @@ -103,16 +102,37 @@ public class JdbcLockManager( } } - override suspend fun releaseLockKey(key: JdbcTaskLock) {} + override suspend fun releaseLockKey(key: JdbcTaskLock) { + newSuspendedTransaction( + application.coroutineContext, + db = database, + transactionIsolation = Connection.TRANSACTION_READ_COMMITTED, + ) { + taskLockTable.update( + where = { + selectClauseForRelease( + key, + ) + }, + ) { + it[lockedAt] = null + } + } + } override fun close() {} private fun selectClause( task: Task, concurrencyIndex: Int, - taskExecutionInstant: Instant, ) = (taskLockTable.name eq task.name and taskLockTable.concurrencyIndex.eq(concurrencyIndex)) and - lockedAt.neq(LiteralOp(KotlinInstantColumnType(), taskExecutionInstant)) + lockedAt.isNull() + + private fun selectClauseForRelease(lock: JdbcTaskLock) = + taskLockTable.name eq lock.name and taskLockTable.concurrencyIndex.eq(lock.concurrencyIndex) and + lockedAt.eq( + LiteralOp(KotlinInstantColumnType(), Instant.fromEpochMilliseconds(lock.lockedAt.unixMillisLong)), + ) } public class JdbcTaskLock( @@ -128,13 +148,13 @@ public abstract class ExposedTaskLockTable( ) : Table(tableName) { public abstract val name: Column public abstract val concurrencyIndex: Column - public abstract val lockedAt: Column + public abstract val lockedAt: Column } public object DefaultTaskLockTable : ExposedTaskLockTable("task_locks") { override val name: Column = text("_name") override val concurrencyIndex: Column = integer("concurrency_index") - override val lockedAt: Column = timestamp("locked_at").index() + override val lockedAt: Column = timestamp("locked_at").nullable().index() override val primaryKey: PrimaryKey = PrimaryKey(firstColumn = name, concurrencyIndex, name = "pk_task_locks") } diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/build.gradle.kts b/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/build.gradle.kts index eac18589..22cefe8d 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/build.gradle.kts +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/build.gradle.kts @@ -14,6 +14,8 @@ kotlin { api(projects.ktorServerTaskScheduling.ktorServerTaskSchedulingCore) api(libs.mongodb.driver.kotlin.coroutine) api(libs.mongodb.bson.kotlinx) + api(libs.reactor.core) + implementation(libs.kotlinx.coroutines.reactive) } jvmTestDependencies { implementation(projects.ktorServerTaskScheduling.ktorServerTaskSchedulingCore.test) diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/MongoDBLockManager.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/MongoDBLockManager.kt index 6abd215c..6a52caba 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/MongoDBLockManager.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/MongoDBLockManager.kt @@ -61,8 +61,8 @@ public class MongoDBLockManager( Filters.and( Filters.eq(MongoDbTaskLock::name.name, task.name), Filters.eq(MongoDbTaskLock::concurrencyIndex.name, concurrencyIndex), + Filters.eq(MongoDbTaskLock::lockedAt.name, DateTime.EPOCH), ), - Filters.ne(MongoDbTaskLock::lockedAt.name, executionTime), ) val updates = Updates.combine( @@ -86,6 +86,7 @@ public class MongoDBLockManager( Indexes.compoundIndex( Indexes.ascending(MongoDbTaskLock::name.name), Indexes.ascending(MongoDbTaskLock::concurrencyIndex.name), + Indexes.ascending(MongoDbTaskLock::lockedAt.name), ), IndexOptions().unique(true), ) @@ -103,6 +104,7 @@ public class MongoDBLockManager( Filters.and( Filters.eq(MongoDbTaskLock::name.name, task.name), Filters.eq(MongoDbTaskLock::concurrencyIndex.name, taskConcurrencyIndex), + Filters.eq(MongoDbTaskLock::lockedAt.name, DateTime.EPOCH), ), ).firstOrNull() ?.let { false } ?: runCatching { @@ -127,7 +129,32 @@ public class MongoDBLockManager( } } - protected override suspend fun releaseLockKey(key: MongoDbTaskLock) {} + protected override suspend fun releaseLockKey(key: MongoDbTaskLock) { + val query = + Filters.and( + Filters.and( + Filters.eq(MongoDbTaskLock::name.name, key.name), + Filters.eq(MongoDbTaskLock::concurrencyIndex.name, key.concurrencyIndex), + Filters.eq(MongoDbTaskLock::lockedAt.name, key.lockedAt), + ), + ) + val updates = + Updates.combine( + Updates.set(MongoDbTaskLock::lockedAt.name, DateTime.EPOCH), + ) + val options = FindOneAndUpdateOptions().upsert(false) + client.startSession().use { session -> + session.startTransaction(transactionOptions = majorityJTransaction()) + runCatching { + collection.findOneAndUpdate(query, updates, options).also { + session.commitTransaction() + } + }.onFailure { + session.abortTransaction() + if (it !is MongoWriteException) throw it + } + } + } override fun close() { client.close() @@ -143,7 +170,7 @@ public class MongoDBLockManager( .build() } -public data class MongoDbTaskLock( +public class MongoDbTaskLock( override val name: String, override val concurrencyIndex: Int, override var lockedAt: DateTime, @@ -151,6 +178,49 @@ public data class MongoDbTaskLock( override fun toString(): String = "MongoDbTaskLockKey(name=$name, concurrencyIndex=$concurrencyIndex, lockedAt=${lockedAt.format2()})" } +internal class MongoDbTaskLockCodec : Codec { + override fun encode( + writer: BsonWriter, + value: MongoDbTaskLock, + encoderContext: EncoderContext, + ) { + writer.writeStartDocument() + writer.writeString("name", value.name) + writer.writeInt32("concurrencyIndex", value.concurrencyIndex) + writer.writeString("lockedAt", value.lockedAt.format2()) + writer.writeEndDocument() + } + + override fun decode( + reader: BsonReader, + decoderContext: DecoderContext, + ): MongoDbTaskLock { + reader.readStartDocument() + var name: String? = null + var concurrencyIndex: Int? = null + var lockedAt: DateTime? = null + + while (reader.readBsonType() != org.bson.BsonType.END_OF_DOCUMENT) { + when (reader.readName()) { + "_id" -> reader.skipValue() + "name" -> name = reader.readString() + "concurrencyIndex" -> concurrencyIndex = reader.readInt32() + "lockedAt" -> lockedAt = reader.readString().format2ToDateTime() + else -> reader.skipValue() // Skip unknown fields + } + } + reader.readEndDocument() + + if (name != null && concurrencyIndex != null && lockedAt != null) { + return MongoDbTaskLock(name = name, concurrencyIndex = concurrencyIndex, lockedAt = lockedAt) + } else { + throw IllegalStateException("Missing required fields in MongoDbTaskLock document") + } + } + + override fun getEncoderClass(): Class = MongoDbTaskLock::class.java +} + internal class DateTimeCodec : Codec { override fun encode( writer: BsonWriter, @@ -170,7 +240,10 @@ internal class DateTimeCodec : Codec { internal val codecRegistry = CodecRegistries.fromRegistries( - CodecRegistries.fromCodecs(DateTimeCodec()), + CodecRegistries.fromCodecs( + DateTimeCodec(), + MongoDbTaskLockCodec(), + ), MongoClientSettings.getDefaultCodecRegistry(), ) diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/build.gradle.kts b/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/build.gradle.kts index 94daa597..fc095e8b 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/build.gradle.kts +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/build.gradle.kts @@ -23,6 +23,9 @@ kotlin { implementation(libs.mockk) implementation(libs.testcontainers.redis) implementation(libs.kreds) + implementation(libs.microutils.logging) + implementation(libs.netty.handler) + implementation(libs.netty.codec.redis) } nativeMainDependencies { api(projects.common) diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/redis/RedisLockManager.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/redis/RedisLockManager.kt index ae7ed2a2..a6a4b616 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/redis/RedisLockManager.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/redis/RedisLockManager.kt @@ -14,6 +14,8 @@ import io.github.oshai.kotlinlogging.KotlinLogging import io.ktor.server.application.Application import korlibs.time.DateTime import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlin.jvm.JvmInline internal val logger = KotlinLogging.logger { } @@ -28,29 +30,42 @@ public class RedisLockManager( private val lockExpirationMs: Long, private val connectionAcquisitionTimeoutMs: Long, ) : TaskLockManager() { + private val mutex = Mutex() + override suspend fun init(tasks: List) {} override suspend fun acquireLockKey( task: Task, executionTime: DateTime, concurrencyIndex: Int, - ): RedisTaskLock? = - connectionPool.withConnection(connectionAcquisitionTimeoutMs) { redisConnection -> - logger.debug { "${application.host()}: ${executionTime.format2()}: Acquiring lock for ${task.name} - $concurrencyIndex" } - val key = task.toRedisLockKey(executionTime, concurrencyIndex) - if (redisConnection.setNx(key.value, "1", lockExpirationMs) != null) { - logger.debug { "${application.host()}: ${executionTime.format2()}: Acquired lock for ${task.name} - $concurrencyIndex" } - return@withConnection key - } - null - } ?: run { - logger.debug { - "${application.host()}: ${executionTime.format2()}: Failed to acquire lock for ${task.name} - $concurrencyIndex" + ): RedisTaskLock? { + logger.debug { "${application.host()}: ${executionTime.format2()}: Acquiring lock for ${task.name} - $concurrencyIndex" } + val key = task.toRedisLockKey(concurrencyIndex) + return mutex.withLock(key) { + connectionPool.withConnection(connectionAcquisitionTimeoutMs) { redisConnection -> + if (redisConnection.setNx(key.value, executionTime.format2(), lockExpirationMs) != null) { + logger.debug { "${application.host()}: ${executionTime.format2()}: Acquired lock for ${task.name} - $concurrencyIndex" } + return@withConnection key + } else { + return@withConnection null + } + } ?: run { + logger.debug { + "${application.host()}: ${executionTime.format2()}: Failed to acquire lock for ${task.name} - $concurrencyIndex" + } + null } - null } + } - override suspend fun releaseLockKey(key: RedisTaskLock) {} + override suspend fun releaseLockKey(key: RedisTaskLock) { + mutex.withLock(key) { + connectionPool.withConnection(connectionAcquisitionTimeoutMs) { redisConnection -> + logger.debug { "${application.host()}: Released lock for ${key.name} - ${key.concurrencyIndex}" } + redisConnection.del(key.value) + } + } + } override fun close() { runBlocking { @@ -70,12 +85,10 @@ public value class RedisTaskLock internal constructor( public val value: String, ) : TaskLock { public companion object { - private const val DELIMITER = "-" + private const val DELIMITER = "-***-" - public fun Task.toRedisLockKey( - executionTime: DateTime, - concurrencyIndex: Int, - ): RedisTaskLock = RedisTaskLock("${name.replace(DELIMITER, "_")}-$concurrencyIndex at ${executionTime.format2()}") + public fun Task.toRedisLockKey(concurrencyIndex: Int): RedisTaskLock = + RedisTaskLock("${name.replace(DELIMITER, "_")}$DELIMITER$concurrencyIndex") } override val name: String