diff --git a/temerity/src/commonMain/kotlin/edu/ucsc/its/temerity/core/Temerity.kt b/temerity/src/commonMain/kotlin/edu/ucsc/its/temerity/core/Temerity.kt index 6fedf33c4489f5a7e29b573a1464872ba4640354..fdb62b1f0f8ba35ade6ebcc589146d360375d016 100644 --- a/temerity/src/commonMain/kotlin/edu/ucsc/its/temerity/core/Temerity.kt +++ b/temerity/src/commonMain/kotlin/edu/ucsc/its/temerity/core/Temerity.kt @@ -33,9 +33,8 @@ import edu.ucsc.its.temerity.api.PlatformApi import edu.ucsc.its.temerity.core.Temerity.Companion.DEFAULT_WEB_TIMEOUT import edu.ucsc.its.temerity.di.LibModule.libModule import edu.ucsc.its.temerity.di.platformModule -import edu.ucsc.its.temerity.extensions.coroutines.calculateMaxThreads +import edu.ucsc.its.temerity.extensions.coroutines.availableThreads import edu.ucsc.its.temerity.extensions.coroutines.createJobScope -import edu.ucsc.its.temerity.extensions.coroutines.setThreadCount import edu.ucsc.its.temerity.extensions.datetime.DateTimeExt import edu.ucsc.its.temerity.extensions.time.applyAuditLogFormat import edu.ucsc.its.temerity.extensions.time.applyScheduledSessionDateFormat @@ -107,15 +106,20 @@ public class Temerity internal constructor( KoinComponent { /** * Constructor for Temerity + * @param temApiToken The [String] API token to use for the Temerity client */ internal constructor(temApiToken: String) : this(TemClientConfig.withToken(temApiToken)) public companion object { + @JvmStatic public val DEFAULT_WEB_TIMEOUT: Duration = 2.minutes // TODO: Use this as cache4k expiration time + @JvmStatic public val DEFAULT_CACHE_EXPIRATION: Duration = 15.minutes - internal const val DEFAULT_MINIMUM_THREAD_COUNT: Int = 2 + + @JvmStatic + internal val DEFAULT_MINIMUM_THREAD_COUNT: Int = 2 @JvmStatic internal fun createLogger( @@ -185,6 +189,9 @@ public class Temerity internal constructor( private var cachedUserRoleList: ConcurrentMutableMap<Int, String> private var libraryCoroutineDispatcher: CoroutineDispatcher private var libraryCoroutineScope: CoroutineScope + private var jsonProcessingDispatcher: CoroutineDispatcher + private var webRequestDispatcher: CoroutineDispatcher + private var fileProcessDispatcher: CoroutineDispatcher init { check(!config.serviceToken.isNullOrBlank()) { @@ -196,18 +203,18 @@ public class Temerity internal constructor( } libraryCoroutineDispatcher = get<CoroutineDispatcher>(named("libraryCoroutineDispatcher")) { - val dispatcherName = "Temerity Library Dispatcher" - when (val optThreadCount = config.threadCount) { - null -> parametersOf(calculateMaxThreads(DEFAULT_MINIMUM_THREAD_COUNT), dispatcherName) - else -> parametersOf(setThreadCount(optThreadCount), dispatcherName) - } + parametersOf(availableThreads(config.threadCount), "Temerity Library Dispatcher") } libraryCoroutineScope = get<CoroutineScope>(named("libraryCoroutineScope")) { parametersOf(libraryCoroutineDispatcher) } + jsonProcessingDispatcher = get<CoroutineDispatcher>(named("childDispatcher")) { parametersOf(libraryCoroutineDispatcher, 2, "TemerityJsonProcessingCoroutineDispatcher") } + webRequestDispatcher = get<CoroutineDispatcher>(named("childDispatcher")) { parametersOf(libraryCoroutineDispatcher, 2, "TemerityWebRequestCoroutineDispatcher") } + fileProcessDispatcher = get<CoroutineDispatcher>(named("childDispatcher")) { parametersOf(libraryCoroutineDispatcher, 1, "TemerityFileProcessCoroutineDispatcher") } + platformApi = get<PlatformApi>(named("ktorfitApi")) { - parametersOf(config) + parametersOf(config, webRequestDispatcher) } cachedUserRoleList = ConcurrentMutableMap() @@ -234,10 +241,13 @@ public class Temerity internal constructor( else -> Unit } } - val apiResponsePayload = response.getOrThrow() + val apiResponsePayload = createJobScope(webRequestDispatcher).run { + response.getOrThrow() + } + if (apiResponsePayload == "[]") throw BreakException(page - 1) // Handle deserialization exceptions. Queue up decode: - val deserializedJson = runCatching { + val deserializedJson = createJobScope(jsonProcessingDispatcher).runCatching { json.decodeFromString<T>(apiResponsePayload) } // Attempt, returning the deserialized object if successful @@ -245,8 +255,12 @@ public class Temerity internal constructor( when (exception) { is SerializationException -> { // TODO: Implement platform API version checking - get<KermitLogger> { parametersOf("TemerityLib") }.d { "Returned JSON object: ${exception.message}" } - error("Encountered error decoding response from platform API. You likely need to choose a Temerity release that supports the API version implemented by your instance. \nPlatform API response data : $apiResponsePayload") + get<KermitLogger>(named("libraryLogger")).d { "Returned JSON object: ${exception.message}" } + val error = "Encountered error decoding response from platform API. You likely need to choose a Temerity release that supports the API version implemented by your instance. \nPlatform API response data : $apiResponsePayload" + error.run { + get<KermitLogger>(named("libraryLogger")).e { this } + error(this) + } } else -> throw exception @@ -254,55 +268,68 @@ public class Temerity internal constructor( } } - public override suspend fun getUsers(): List<User> = withContext(libraryCoroutineDispatcher) { - val userRequest = platformApi.getUsers() - decodeResponseCatching(userRequest.executeApiResponse<String>()) - } + public override suspend fun getUsers(): List<User> = + withContext(libraryCoroutineDispatcher) { + val userRequest = platformApi.getUsers() + decodeResponseCatching(userRequest.executeApiResponse<String>()) + } public override suspend fun getUser(userId: Long): User = - getUsers().first { it.userId == userId } - - public override suspend fun createUser(primaryIdentifier: String, newUser: NewUser): HttpResponse { - val validationMessage = newUser.validate() - if (validationMessage != "Valid") { - error(validationMessage) - } else { - return platformApi.createUser(primaryIdentifier, json.encodeToString(newUser)).getOrThrow() + withContext(libraryCoroutineDispatcher) { + getUsers().first { it.userId == userId } + } + + public override suspend fun createUser(primaryIdentifier: String, newUser: NewUser): HttpResponse = + withContext(libraryCoroutineDispatcher) { + val validationMessage = newUser.validate() + if (validationMessage != "Valid") { + error(validationMessage) + } else { + platformApi.createUser(primaryIdentifier, json.encodeToString(newUser)).getOrThrow() + } } - } public override suspend fun updateUser(userId: Long, userUpdate: UserUpdate): HttpResponse = - platformApi.setUser(userId, json.encodeToString(userUpdate)).getOrThrow() + withContext(libraryCoroutineDispatcher) { + platformApi.setUser(userId, json.encodeToString(userUpdate)).getOrThrow() + } public override suspend fun deleteUser(userId: Long): HttpResponse = - platformApi.deleteUser(userId).getOrThrow() - - public override suspend fun refreshCachedUserRoles(): List<String> { - val returnedUsersResponse = platformApi.getUsers().executeApiResponse<String>().getOrThrow() - val returnedUserList = json.decodeFromString<List<User>>(returnedUsersResponse) - val roleTypes = returnedUserList.map { it.userType }.distinct() - cachedUserRoleList.clear() - roleTypes.forEach { - cachedUserRoleList[it.hashCode()] = it + withContext(libraryCoroutineDispatcher) { + platformApi.deleteUser(userId).getOrThrow() } - return roleTypes - } - public override suspend fun getCachedUserRoles(refresh: Boolean): List<String> { - if (refresh) { - return refreshCachedUserRoles() + public override suspend fun refreshCachedUserRoles(): List<String> = + withContext(libraryCoroutineDispatcher) { + val returnedUsersResponse = platformApi.getUsers().executeApiResponse<String>().getOrThrow() + val returnedUserList = json.decodeFromString<List<User>>(returnedUsersResponse) + val roleTypes = returnedUserList.map { it.userType }.distinct() + cachedUserRoleList.clear() + roleTypes.forEach { + cachedUserRoleList[it.hashCode()] = it + } + roleTypes } - val returnedUserRoles = cachedUserRoleList.values.toList() - return returnedUserRoles.ifEmpty { - refreshCachedUserRoles() + + public override suspend fun getCachedUserRoles(refresh: Boolean): List<String> = + withContext(libraryCoroutineDispatcher) { + if (refresh) { + refreshCachedUserRoles() + } + val returnedUserRoles = cachedUserRoleList.values.toList() + returnedUserRoles.ifEmpty { + refreshCachedUserRoles() + } } - } public override suspend fun getUserGroups(userId: Long): List<UserGroup> = - decodeResponseCatching(platformApi.getUserGroups(userId).executeApiResponse<String>()) + withContext(libraryCoroutineDispatcher) { + decodeResponseCatching(platformApi.getUserGroups(userId).executeApiResponse<String>()) + } - public override suspend fun getUserGroupsOwned(userId: Long): List<UserGroup> = + public override suspend fun getUserGroupsOwned(userId: Long): List<UserGroup> = withContext(libraryCoroutineDispatcher) { decodeResponseCatching<List<UserGroup>>(platformApi.getUserGroupsOwned(userId).executeApiResponse()) + } public override suspend fun createUserWithExternalId(primaryIdentifier: String, newUser: NewUser): HttpResponse { TODO("Not yet implemented") @@ -320,7 +347,7 @@ public class Temerity internal constructor( TODO("Not yet implemented") } - public override suspend fun getGroups(paginated: Boolean): List<Group> { + public override suspend fun getGroups(paginated: Boolean): List<Group> = withContext(libraryCoroutineDispatcher) { when (paginated) { true -> { val returnedGroups = ArrayList<Group>() @@ -335,28 +362,29 @@ public class Temerity internal constructor( when (e) { is BreakException -> { if (config.optDebugEnabled) { - get<KermitLogger>().d("Caught BreakException() from decodeResponseCatching() notifying we're done reading groups from API: ${e.page} pages read") + get<KermitLogger>(named("libraryLogger")).d("Caught BreakException() from decodeResponseCatching() notifying we're done reading groups from API: ${e.page} pages read") } - return returnedGroups + return@withContext returnedGroups } else -> throw e } } - return returnedGroups + returnedGroups } else -> { - return emptyList() + emptyList() } } } - public override suspend fun getCourse(courseCode: String): Course = + public override suspend fun getCourse(courseCode: String): Course = withContext(libraryCoroutineDispatcher) { decodeResponseCatching(platformApi.getCourse(courseCode).executeApiResponse<String>()) + } - public override suspend fun getCourses(paginated: Boolean): List<Course> { - return when (paginated) { + public override suspend fun getCourses(paginated: Boolean): List<Course> = withContext(libraryCoroutineDispatcher) { + when (paginated) { true -> { val returnedCourses = ArrayList<Course>() try { @@ -371,15 +399,15 @@ public class Temerity internal constructor( when (e) { is BreakException -> { if (config.optDebugEnabled) { - get<KermitLogger>().d("Caught BreakException() from decodeResponseCatching() notifying we're done reading groups from API: $e") + get<KermitLogger>(named("libraryLogger")).d("Caught BreakException() from decodeResponseCatching() notifying we're done reading groups from API: $e") } - return returnedCourses + return@withContext returnedCourses } else -> throw e } } - return returnedCourses + returnedCourses } false -> { @@ -396,7 +424,7 @@ public class Temerity internal constructor( endTime: LocalDate, eventTypeList: List<EventType>?, sortOrder: AuditLogSortOrder?, - ): List<AuditLogEntry> { + ): List<AuditLogEntry> = withContext(libraryCoroutineDispatcher) { // TODO: Error out if user inputs time window > 1 month, since the platform API doesn't support val returnedEntries = ArrayList<AuditLogEntry>() val jobScope = createJobScope(libraryCoroutineScope.coroutineContext) @@ -413,7 +441,7 @@ public class Temerity internal constructor( // Apply passed sortOrder, or default to NEW_FIRST TODO: Read from user settings returnedEntries.applyOrDefault(sortOrder ?: NEW_FIRST) - return returnedEntries + returnedEntries } // Get audit log entries for a specific event type within window @@ -424,7 +452,7 @@ public class Temerity internal constructor( eventType: EventType, sortOrder: AuditLogSortOrder?, paginated: Boolean, - ): List<AuditLogEntry> { + ): List<AuditLogEntry> = withContext(libraryCoroutineDispatcher) { require(startTime <= endTime) { "End date must be same or later than start date" } @@ -468,7 +496,7 @@ public class Temerity internal constructor( } // Apply passed sortOrder, or default to NEW_FIRST TODO: Read from user settings returnedEntries.applyOrDefault(sortOrder ?: NEW_FIRST) - return returnedEntries + returnedEntries } false -> { @@ -491,17 +519,19 @@ public class Temerity internal constructor( // TODO: Implement setUser() function - public override suspend fun getDevices(): List<Device> = + public override suspend fun getDevices(): List<Device> = withContext(libraryCoroutineDispatcher) { decodeResponseCatching(platformApi.getDevices().executeApiResponse<String>()) + } - public override suspend fun getDevice(deviceId: Long): Device = + public override suspend fun getDevice(deviceId: Long): Device = withContext(libraryCoroutineDispatcher) { decodeResponseCatching(platformApi.getDeviceById(deviceId).executeApiResponse<String>()) + } public override suspend fun getUserSessions( userId: String, startTime: LocalDate, endTime: LocalDate, - ): List<UserRecordingSession> = + ): List<UserRecordingSession> = withContext(libraryCoroutineDispatcher) { decodeResponseCatching( platformApi.getUserSessions( userId, @@ -509,12 +539,13 @@ public class Temerity internal constructor( endTime.applyScheduledSessionDateFormat(), ).executeApiResponse<String>(), ) + } public override suspend fun getDeviceSchedule( deviceId: Long, startTime: LocalDate, endTime: LocalDate, - ): List<DeviceRecordingSession> = + ): List<DeviceRecordingSession> = withContext(libraryCoroutineDispatcher) { decodeResponseCatching( platformApi.getDeviceSchedule( deviceId, @@ -522,10 +553,17 @@ public class Temerity internal constructor( endTime.applyScheduledSessionDateFormat(), ).executeApiResponse<String>(), ) + } - public override suspend fun getStorageAnalyticsReport(groupId: Long): ByteArray = - platformApi.getStorageAnalyticsReport(groupId).executeApiResponse<ByteReadChannel>().getOrThrow() - .toByteArray(limit = 2000000000) // Limit file downloads to 2 GB + public override suspend fun getStorageAnalyticsReport(groupId: Long): ByteArray = withContext(libraryCoroutineDispatcher) { + val apiRequest = platformApi.getStorageAnalyticsReport(groupId) + val responseBytes = createJobScope(webRequestDispatcher).run { + apiRequest.executeApiResponse<ByteReadChannel>().getOrThrow() + } + withContext(fileProcessDispatcher) { + responseBytes.toByteArray(limit = 2000000000) + } // Limit file downloads to 2 GB + } } /** diff --git a/temerity/src/commonMain/kotlin/edu/ucsc/its/temerity/di/LibModule.kt b/temerity/src/commonMain/kotlin/edu/ucsc/its/temerity/di/LibModule.kt index 95de7e10c5db7a34ff987e93e3163c075aa19b46..76891c922dce04a93a95ce3ef1b2a17e629a85f4 100644 --- a/temerity/src/commonMain/kotlin/edu/ucsc/its/temerity/di/LibModule.kt +++ b/temerity/src/commonMain/kotlin/edu/ucsc/its/temerity/di/LibModule.kt @@ -24,11 +24,11 @@ import edu.ucsc.its.temerity.api.PlatformApi import edu.ucsc.its.temerity.api.createPlatformApi import edu.ucsc.its.temerity.core.buildHttpClient import edu.ucsc.its.temerity.core.createCommonLogger -import edu.ucsc.its.temerity.extensions.coroutines.createDispatcher import edu.ucsc.its.temerity.extensions.coroutines.createLibraryScope import io.ktor.client.HttpClient import io.ktor.client.engine.HttpClientEngine import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import org.koin.core.parameter.parametersOf import org.koin.core.qualifier.named @@ -41,26 +41,29 @@ internal object LibModule { * It includes the platform module and provides factories for [HttpClient] and [PlatformApi]. */ internal fun libModule() = module { - single(named("libraryLogger")) { (tag: String?, config: TemClientConfig?) -> + single<KermitLogger>(named("libraryLogger")) { (tag: String?, config: TemClientConfig?) -> createCommonLogger(tag = tag, config = config) } - single(named("libraryCoroutineDispatcher")) { (threadCount: Int, dispatcherName: String) -> - createDispatcher(Dispatchers.IO, threadCount, dispatcherName) + single<CoroutineDispatcher>(named("libraryCoroutineDispatcher")) { (threadCount: Int, dispatcherName: String) -> + Dispatchers.IO.limitedParallelism(threadCount, dispatcherName) } - single(named("libraryCoroutineScope")) { (dispatcher: CoroutineDispatcher) -> + factory<CoroutineDispatcher>(named("childDispatcher")) { (parentDispatcher: CoroutineDispatcher, threadCount: Int, dispatcherName: String) -> + parentDispatcher.limitedParallelism(threadCount, dispatcherName) + } + single<CoroutineScope>(named("libraryCoroutineScope")) { (dispatcher: CoroutineDispatcher) -> createLibraryScope(dispatcher) } - factory(named("httpClient")) { (config: TemClientConfig, kermit: co.touchlab.kermit.Logger) -> + factory<HttpClient>(named("httpClient")) { (dispatcher: CoroutineDispatcher, config: TemClientConfig, kermit: co.touchlab.kermit.Logger) -> buildHttpClient( - httpClientEngine = get<HttpClientEngine>(named("httpClientEngine"), parameters = { parametersOf(get<CoroutineDispatcher>(named("libraryCoroutineDispatcher"))) }), + httpClientEngine = get<HttpClientEngine>(named("httpClientEngine")) { parametersOf(dispatcher) }, config = config, logger = kermit, ) } - factory(named("ktorfitApi")) { (config: TemClientConfig) -> + factory<PlatformApi>(named("ktorfitApi")) { (config: TemClientConfig, webRequestDispatcher: CoroutineDispatcher) -> val logger: KermitLogger = get<KermitLogger>(named("libraryLogger")) val client: HttpClient = - get<HttpClient>(named("httpClient")) { parametersOf(config, logger) } + get<HttpClient>(named("httpClient")) { parametersOf(webRequestDispatcher, config, logger) } val ktorfit = ktorfit { config.serviceUrl?.let { baseUrl(it) } httpClient(client) diff --git a/temerity/src/commonMain/kotlin/edu/ucsc/its/temerity/extensions/coroutines/CoroutinesExt.kt b/temerity/src/commonMain/kotlin/edu/ucsc/its/temerity/extensions/coroutines/CoroutinesExt.kt index 90e834b2b2707be5926024ddb8620d1df20b2a56..d9790ad6d8dabfda8000926b290b7a7a507279ac 100644 --- a/temerity/src/commonMain/kotlin/edu/ucsc/its/temerity/extensions/coroutines/CoroutinesExt.kt +++ b/temerity/src/commonMain/kotlin/edu/ucsc/its/temerity/extensions/coroutines/CoroutinesExt.kt @@ -29,26 +29,26 @@ internal fun createJobScope(coroutineContext: CoroutineContext, allowIndependent return CoroutineScope(coroutineContext + parentJob) } -// This function is used to create a dispatcher with a limited number of threads from a platform-specific thread pool -internal fun createDispatcher(dispatcherThreadPool: CoroutineDispatcher, threadCount: Int, dispatcherName: String): CoroutineDispatcher = dispatcherThreadPool.limitedParallelism(threadCount, dispatcherName) - internal fun createLibraryScope(dispatcher: CoroutineDispatcher): CoroutineScope = createJobScope(dispatcher, allowIndependentFailure = true) -private fun availableThreads() = Runtime.getRuntime().availableProcessors().minus(1) - -internal fun calculateMaxThreads(defaultMinimumThreadCount: Int) = availableThreads().coerceAtLeast(defaultMinimumThreadCount) - -internal fun setThreadCount(maximumThreadCount: Int? = null): Int = when (maximumThreadCount) { - null -> calculateMaxThreads(DEFAULT_MINIMUM_THREAD_COUNT) - else -> { - if (maximumThreadCount > availableThreads()) { - // TODO: Log warning that the configured thread count is higher than the number of available threads - calculateMaxThreads(DEFAULT_MINIMUM_THREAD_COUNT) - } - if (maximumThreadCount < DEFAULT_MINIMUM_THREAD_COUNT) { - // TODO: Log warning that the configured thread count is lower than the default minimum thread pool size - calculateMaxThreads(DEFAULT_MINIMUM_THREAD_COUNT) +internal fun availableThreads(maximumThreadCount: Int? = null): Int { + val availableThreadCount = Runtime.getRuntime().availableProcessors().plus(1) // +1 to maintain full utilization in case one coroutine blocks. Never less than 2 (DEFAULT_MINIMUM_THREAD_COUNT). See: https://github.com/Kotlin/kotlinx.coroutines/issues/261 + with(maximumThreadCount) { + return when { + this == null -> availableThreadCount + this > availableThreadCount -> { + // TODO: Log warning that the configured thread count is higher than the number of available threads + availableThreadCount + } + this < DEFAULT_MINIMUM_THREAD_COUNT -> { + // TODO: Log warning that the configured thread count is lower than the default minimum thread pool size + DEFAULT_MINIMUM_THREAD_COUNT + } + this < availableThreadCount -> { + // TODO: Log warning that the configured thread count is lower than the optimal thread pool size + this + } + else -> availableThreadCount } - maxOf(maximumThreadCount, calculateMaxThreads(DEFAULT_MINIMUM_THREAD_COUNT)) } }