Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Major memory control refactoring #199

Merged
merged 16 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
54ec016
JBAI-4393 [core, ndarray] Edited an output allocation marking mechani…
dmitriyb Aug 5, 2024
2d7c310
JBAI-4393 [core, ndarray] Removed ArrayContainer and refactored memor…
dmitriyb Aug 12, 2024
9caf75c
JBAI-4393 [core, ndarray] Refactored memory management and array hand…
dmitriyb Aug 15, 2024
f1a9296
JBAI-4393 [core, ndarray] Refactored memory management and array hand…
dmitriyb Aug 19, 2024
f334632
JBAI-4393 [core, ndarray] Refactored memory management and array hand…
dmitriyb Aug 21, 2024
e900ca8
Refactor NDArray storage and retrieval methods.
dmitriyb Aug 22, 2024
954f6cc
JBAI-4393 [core, ndarray, utils] Major allocator refactoring
dmitriyb Aug 27, 2024
b83f7f8
JBAI-4393 [core, ndarray] Added getPrimitiveBlock extension functions…
dmitriyb Aug 29, 2024
450a39e
JBAI-4393 [ndarray] Added Fastutil support for more efficient primiti…
dmitriyb Aug 29, 2024
d25ecff
JBAI-4393 [buildSrc] Configured JVM benchmark tests to disable corout…
dmitriyb Aug 29, 2024
8dfd6eb
Fixed broadcasting shape logic in matrix multiplication for 1D.
dmitriyb Aug 30, 2024
a19fc9c
JBAI-4393 [core, ndarray] Streamlined memory size calculations using …
dmitriyb Sep 2, 2024
c942273
JBAI-4393 [core, ndarray] Refactored coroutine contexts to be polymor…
dmitriyb Sep 2, 2024
3caa0bc
JBAI-4393 [core] Rework context keys
cupertank Sep 2, 2024
9d67670
JBAI-4393 [core] Optimize imports
cupertank Sep 2, 2024
61011f8
JBAI-4393 [ndarray] Functional interface to streamline parallelizatio…
dmitriyb Sep 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ fun KotlinJvmTarget.configureBenchmarkTests() {
group = "verification"

maxHeapSize = "4G"
systemProperty("kotlinx.coroutines.debug", "off")

useJUnitPlatform()

Expand Down
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ okio = "3.6.0"
onnxruntime = "1.17.0.patched-1"
slf4j = "2.0.9"
wire = "4.9.3"
fastutil = "8.5.14"

# JS Dependencies
loglevel = "1.8.1"
Expand All @@ -36,3 +37,4 @@ onnxruntime-gpu = { module = "com.microsoft.onnxruntime:onnxruntime_gpu", versio
slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }
wire-runtime = { module = "com.squareup.wire:wire-runtime", version.ref = "wire" }
fastutil-core = { module = "it.unimi.dsi:fastutil-core", version.ref = "fastutil" }
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,19 @@ import io.kinference.core.optimizer.rules.OptimizerRuleSet
import io.kinference.data.ONNXData
import io.kinference.data.ONNXDataType
import io.kinference.model.IrOptimizableEngine
import io.kinference.ndarray.arrays.memory.MemoryLimiter
import io.kinference.ndarray.arrays.memory.MemoryLimiters
import io.kinference.optimizer.GraphOptimizer
import io.kinference.optimizer.OptimizerRule
import io.kinference.protobuf.*
import io.kinference.protobuf.message.*
import io.kinference.utils.CommonDataLoader
import io.kinference.utils.PlatformUtils
import io.kinference.utils.PredictionConfig
import io.kinference.utils.PredictionConfigs
import okio.Buffer
import okio.Path
import okio.Path.Companion.toPath

typealias KIONNXData<T> = ONNXData<T, CoreBackend>

// Define an interface for allocation control marking output
internal interface KIONNXDataArraysReleaser {
fun markOutput()
}

internal fun <T> KIONNXData<T>.markOutput() {
if (this is KIONNXDataArraysReleaser)
this.markOutput()
}

object CoreBackend : BackendInfo(name = "KInference Core CPU Backend")

/**
Expand All @@ -51,37 +40,37 @@ object KIEngine : IrOptimizableEngine<KIONNXData<*>> {

fun protoReader(bytes: ByteArray) = ProtobufReader(Buffer().write(bytes), KI_READER_CONFIG)

suspend fun loadModel(bytes: ByteArray, optimize: Boolean, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel {
suspend fun loadModel(bytes: ByteArray, optimize: Boolean, predictionConfig: PredictionConfig): KIModel {
val rules = if (optimize) OptimizerRuleSet.DEFAULT_OPT_RULES else emptyList()
return loadModel(bytes, rules, memoryLimiter, parallelismLimit)
return loadModel(bytes, rules, predictionConfig)
}

override suspend fun loadModel(bytes: ByteArray, optimize: Boolean): KIModel {
return loadModel(bytes, optimize, MemoryLimiters.NoAllocator, PlatformUtils.cores)
return loadModel(bytes, optimize, PredictionConfigs.NoAllocator)
}

override suspend fun loadModel(bytes: ByteArray, rules: List<OptimizerRule<KIONNXData<*>>>): KIModel = loadModel(bytes, rules, MemoryLimiters.NoAllocator, PlatformUtils.cores)
override suspend fun loadModel(bytes: ByteArray, rules: List<OptimizerRule<KIONNXData<*>>>): KIModel = loadModel(bytes, rules, PredictionConfigs.NoAllocator)

suspend fun loadModel(bytes: ByteArray, rules: List<OptimizerRule<KIONNXData<*>>>, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel {
suspend fun loadModel(bytes: ByteArray, rules: List<OptimizerRule<KIONNXData<*>>>, predictionConfig: PredictionConfig): KIModel {
val modelScheme = ModelProto.decode(protoReader(bytes))
val model = KIModel(modelScheme, memoryLimiter)
val model = KIModel(modelScheme, predictionConfig)

return if (rules.isNotEmpty()) {
val newGraph = GraphOptimizer(model.graph).run(rules) as KIGraph
KIModel(model.id, model.name, model.opSet, newGraph, memoryLimiter, parallelismLimit)
KIModel(model.id, model.name, model.opSet, newGraph, predictionConfig)
} else {
model
}
}

override suspend fun loadModel(bytes: ByteArray): KIModel = loadModel(bytes, optimize = true)

suspend fun loadModel(path: Path, optimize: Boolean, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel {
return loadModel(CommonDataLoader.bytes(path), optimize, memoryLimiter, parallelismLimit)
suspend fun loadModel(path: Path, optimize: Boolean, predictionConfig: PredictionConfig): KIModel {
return loadModel(CommonDataLoader.bytes(path), optimize, predictionConfig)
}

override suspend fun loadModel(path: Path, optimize: Boolean): KIModel {
return loadModel(path, optimize, MemoryLimiters.NoAllocator, PlatformUtils.cores)
return loadModel(path, optimize, PredictionConfigs.NoAllocator)
}

override suspend fun loadModel(path: Path): KIModel = loadModel(path, optimize = true)
Expand All @@ -90,12 +79,12 @@ object KIEngine : IrOptimizableEngine<KIONNXData<*>> {
return loadModel(CommonDataLoader.bytes(path), rules)
}

suspend fun loadModel(path: String, optimize: Boolean, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel {
return loadModel(CommonDataLoader.bytes(path.toPath()), optimize, memoryLimiter, parallelismLimit)
suspend fun loadModel(path: String, optimize: Boolean, predictionConfig: PredictionConfig): KIModel {
return loadModel(CommonDataLoader.bytes(path.toPath()), optimize, predictionConfig)
}

override suspend fun loadModel(path: String, optimize: Boolean): KIModel {
return loadModel(path, optimize, MemoryLimiters.NoAllocator, PlatformUtils.cores)
return loadModel(path, optimize, PredictionConfigs.NoAllocator)
}

override suspend fun loadModel(path: String): KIModel = loadModel(path, optimize = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import io.kinference.protobuf.message.TensorProto
import io.kinference.types.ValueInfo
import io.kinference.types.ValueTypeInfo

class KIONNXMap(name: String?, data: Map<Any, KIONNXData<*>>, val info: ValueTypeInfo.MapTypeInfo) : ONNXMap<Map<Any, KIONNXData<*>>, CoreBackend>(name, data), KIONNXDataArraysReleaser {
class KIONNXMap(name: String?, data: Map<Any, KIONNXData<*>>, val info: ValueTypeInfo.MapTypeInfo) : ONNXMap<Map<Any, KIONNXData<*>>, CoreBackend>(name, data) {
constructor(data: Map<Any, KIONNXData<*>>, info: ValueInfo) : this(info.name, data, info.typeInfo as ValueTypeInfo.MapTypeInfo)

override val backend = CoreBackend
Expand All @@ -26,10 +26,6 @@ class KIONNXMap(name: String?, data: Map<Any, KIONNXData<*>>, val info: ValueTyp

override fun rename(name: String): KIONNXMap = KIONNXMap(name, data, info)

override fun markOutput() {
data.values.forEach { it.markOutput() }
}

override suspend fun clone(newName: String?): KIONNXMap {
val newMap = HashMap<Any, KIONNXData<*>>(data.size)
for ((key, value) in data.entries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import io.kinference.data.ONNXSequence
import io.kinference.protobuf.message.SequenceProto
import io.kinference.types.*

class KIONNXSequence(name: String?, data: List<KIONNXData<*>>, val info: ValueTypeInfo.SequenceTypeInfo) : ONNXSequence<List<KIONNXData<*>>, CoreBackend>(name, data), KIONNXDataArraysReleaser {
class KIONNXSequence(name: String?, data: List<KIONNXData<*>>, val info: ValueTypeInfo.SequenceTypeInfo) : ONNXSequence<List<KIONNXData<*>>, CoreBackend>(name, data) {
constructor(name: String?, info: ValueTypeInfo.SequenceTypeInfo, size: Int, init: (Int) -> KIONNXData<*>) : this(name, List(size, init), info)
constructor(data: List<KIONNXData<*>>, info: ValueInfo) : this(info.name, data, info.typeInfo as ValueTypeInfo.SequenceTypeInfo)

Expand All @@ -23,10 +23,6 @@ class KIONNXSequence(name: String?, data: List<KIONNXData<*>>, val info: ValueTy

override fun rename(name: String): KIONNXSequence = KIONNXSequence(name, data, info)

override fun markOutput() {
data.forEach { it.markOutput() }
}

val length: Int = data.size

companion object {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.kinference.core.data.tensor

import io.kinference.core.CoreBackend
import io.kinference.core.KIONNXDataArraysReleaser
import io.kinference.core.*
import io.kinference.data.ONNXTensor
import io.kinference.ndarray.arrays.*
import io.kinference.ndarray.arrays.memory.contexts.ManualAllocatorContext
import io.kinference.ndarray.arrays.tiled.*
import io.kinference.protobuf.FLOAT_TENSOR_TYPES
import io.kinference.protobuf.message.TensorProto
Expand All @@ -13,22 +13,18 @@ import io.kinference.types.ValueTypeInfo

//TODO: support segments
//TODO: support external data
class KITensor(name: String?, override val data: NDArrayCore, val info: ValueTypeInfo.TensorTypeInfo) : ONNXTensor<NDArrayCore, CoreBackend>(name, data), KIONNXDataArraysReleaser {
class KITensor(name: String?, override val data: NDArrayCore, val info: ValueTypeInfo.TensorTypeInfo, private var context: ManualAllocatorContext? = null) : ONNXTensor<NDArrayCore, CoreBackend>(name, data) {
constructor(data: NDArrayCore, info: ValueInfo) : this(info.name, data, info.typeInfo as ValueTypeInfo.TensorTypeInfo)

override suspend fun close() {
context?.returnNDArray(data)
data.close()
}

override suspend fun clone(newName: String?): KITensor {
return KITensor(newName, data.clone(), info)
}

override fun markOutput() {
if (this.data is MemoryControlledArray)
data.markOutput()
}

suspend operator fun minus(other: KITensor): KITensor {
require(this.data is NumberNDArrayCore && other.data is NumberNDArrayCore)
return (this.data - other.data).asTensor()
Expand All @@ -47,7 +43,7 @@ class KITensor(name: String?, override val data: NDArrayCore, val info: ValueTyp
override val backend = CoreBackend

override fun rename(name: String): KITensor {
return KITensor(name, data, info)
return KITensor(name, data, info, context)
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package io.kinference.core.data.tensor

import io.kinference.ndarray.arrays.*
import io.kinference.ndarray.arrays.memory.contexts.ManualAllocatorContext
import io.kinference.ndarray.extensions.concat
import io.kinference.ndarray.extensions.splitWithAxis
import io.kinference.primitives.types.DataType
import io.kinference.protobuf.resolveProtoDataType
import io.kinference.types.TensorShape
import io.kinference.types.ValueTypeInfo

fun NDArrayCore.asTensor(name: String? = null) = KITensor(name, this, ValueTypeInfo.TensorTypeInfo(TensorShape(this.shape), type.resolveProtoDataType()))
fun NDArrayCore.asTensor(name: String? = null, context: ManualAllocatorContext? = null) = KITensor(name, this, ValueTypeInfo.TensorTypeInfo(TensorShape(this.shape), type.resolveProtoDataType()), context)

internal fun <T : NDArray> T.asTensor(name: String? = null) = (this as NDArrayCore).asTensor(name)
internal fun <T : NDArray> T.asTensor(name: String? = null, context: ManualAllocatorContext? = null) = (this as NDArrayCore).asTensor(name, context)

internal fun <T : NDArray> Collection<T>.asONNXTensors(names: List<String>): List<KITensor> {
return this.zip(names).map { (data, name) -> data.asTensor(name) }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package io.kinference.core.model

import io.kinference.core.KIONNXData
import io.kinference.core.*
import io.kinference.core.graph.KIGraph
import io.kinference.core.markOutput
import io.kinference.graph.Contexts
import io.kinference.model.Model
import io.kinference.ndarray.arrays.memory.*
import io.kinference.ndarray.arrays.memory.contexts.finalizeAllocatorContext
import io.kinference.operator.OperatorSetRegistry
import io.kinference.profiler.*
import io.kinference.protobuf.message.ModelProto
Expand All @@ -18,14 +18,10 @@ class KIModel(
val name: String,
val opSet: OperatorSetRegistry,
val graph: KIGraph,
memoryLimiter: MemoryLimiter = MemoryLimiters.NoAllocator,
parallelismLimit: Int = PlatformUtils.cores,
predictionConfig: PredictionConfig = PredictionConfigs.NoAllocator,
) : Model<KIONNXData<*>>, Profilable, Cacheable {
private val profiles: MutableList<ProfilingContext> = ArrayList()

@OptIn(ExperimentalCoroutinesApi::class)
private val dispatcher: CoroutineDispatcher = Dispatchers.Default.limitedParallelism(parallelismLimit)
private val modelArrayStorage: ModelArrayStorage = ModelArrayStorage(memoryLimiter)
private val predictionContextDispatcher: PredictionContextDispatcher = PredictionContextDispatcher(predictionConfig)

override fun addProfilingContext(name: String): ProfilingContext = ProfilingContext(name).apply { profiles.add(this) }
override fun analyzeProfilingResults(): ProfileAnalysisEntry = profiles.analyze("Model $name")
Expand All @@ -37,24 +33,23 @@ class KIModel(
if (profile) addProfilingContext("Model $name") else null
)

val limiterContext = ParallelismLimiterContext(dispatcher)
var coreReserved = false
val results = try {
withContext(NonCancellable) {
ResourcesDispatcher.reserveCore()
coreReserved = true
}

val allocatorContext = modelArrayStorage.createAllocatorContext()
val mixedContext = allocatorContext + limiterContext

withContext(mixedContext) {
val coroutineContext = coroutineContext[AllocatorContext.Key]!!
val execResult = graph.execute(input, contexts)
execResult.forEach { it.markOutput() }
coroutineContext.closeAllocated()
execResult
val predictionContext = predictionContextDispatcher.getPredictionContext()
val output = if (predictionContextDispatcher.allocationMode != AllocationMode.Auto) withContext(predictionContext) {
return@withContext graph.execute(input, contexts)
} else withContext(predictionContext) {
return@withContext graph.execute(input, contexts).map { it.clone(it.name) }.toList()
}

predictionContext.finalizeAllocatorContext()
predictionContextDispatcher.returnStorage(predictionContext)
output
} finally {
if (coreReserved) {
ResourcesDispatcher.releaseCore()
Expand All @@ -66,11 +61,11 @@ class KIModel(

override suspend fun close() {
graph.close()
modelArrayStorage.close()
predictionContextDispatcher.close()
}

override fun clearCache() {
modelArrayStorage.clearCache()
predictionContextDispatcher.clearCache()
}

companion object {
Expand All @@ -80,14 +75,13 @@ class KIModel(

suspend operator fun invoke(
proto: ModelProto,
memoryLimiter: MemoryLimiter = MemoryLimiters.NoAllocator,
limiterParallelismCounter: Int = PlatformUtils.cores,
predictionConfig: PredictionConfig = PredictionConfigs.NoAllocator,
): KIModel {
val name = "${proto.domain}:${proto.modelVersion}"
val id = "$name:${generateModelId()}"
val opSet = OperatorSetRegistry(proto.opSetImport)
val graph = KIGraph(proto.graph!!, opSet)
return KIModel(id, name, opSet, graph, memoryLimiter, limiterParallelismCounter)
return KIModel(id, name, opSet, graph, predictionConfig)
}
}
}
Loading
Loading