Skip to content

Commit

Permalink
added OOM message pointing to new csv implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Jolanrensen committed Nov 1, 2024
1 parent d40fe77 commit 0b0776c
Showing 1 changed file with 54 additions and 45 deletions.
99 changes: 54 additions & 45 deletions core/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/csv.kt
Original file line number Diff line number Diff line change
Expand Up @@ -351,61 +351,70 @@ public fun DataFrame.Companion.readDelim(
readLines: Int? = null,
parserOptions: ParserOptions? = null,
): AnyFrame {
var reader = reader
if (skipLines > 0) {
reader = BufferedReader(reader)
repeat(skipLines) { reader.readLine() }
}

val csvParser = format.parse(reader)
val records = if (readLines == null) {
csvParser.records
} else {
require(readLines >= 0) { "`readLines` must not be negative" }
val records = ArrayList<CSVRecord>(readLines)
val iter = csvParser.iterator()
var count = readLines ?: 0
while (iter.hasNext() && 0 < count--) {
records.add(iter.next())
try {
var reader = reader
if (skipLines > 0) {
reader = BufferedReader(reader)
repeat(skipLines) { reader.readLine() }
}
records
}

val columnNames = csvParser.headerNames.takeIf { it.isNotEmpty() }
?: (1..(records.firstOrNull()?.count() ?: 0)).map { index -> "X$index" }

val generator = ColumnNameGenerator()
val uniqueNames = columnNames.map { generator.addUnique(it) }
val csvParser = format.parse(reader)
val records = if (readLines == null) {
csvParser.records
} else {
require(readLines >= 0) { "`readLines` must not be negative" }
val records = ArrayList<CSVRecord>(readLines)
val iter = csvParser.iterator()
var count = readLines ?: 0
while (iter.hasNext() && 0 < count--) {
records.add(iter.next())
}
records
}

val cols = uniqueNames.mapIndexed { colIndex, colName ->
val defaultColType = colTypes[".default"]
val colType = colTypes[colName] ?: defaultColType
var hasNulls = false
val values = records.map {
if (it.isSet(colIndex)) {
it[colIndex].ifEmpty {
val columnNames = csvParser.headerNames.takeIf { it.isNotEmpty() }
?: (1..(records.firstOrNull()?.count() ?: 0)).map { index -> "X$index" }

val generator = ColumnNameGenerator()
val uniqueNames = columnNames.map { generator.addUnique(it) }

val cols = uniqueNames.mapIndexed { colIndex, colName ->
val defaultColType = colTypes[".default"]
val colType = colTypes[colName] ?: defaultColType
var hasNulls = false
val values = records.map {
if (it.isSet(colIndex)) {
it[colIndex].ifEmpty {
hasNulls = true
null
}
} else {
hasNulls = true
null
}
} else {
hasNulls = true
null
}
}
val column = DataColumn.createValueColumn(colName, values, typeOf<String>().withNullability(hasNulls))
when (colType) {
null -> column.tryParse(parserOptions)

else -> {
column.tryParse(
(parserOptions ?: ParserOptions()).copy(
skipTypes = ParserOptions.allTypesExcept(colType.toKType()),
),
)
val column = DataColumn.createValueColumn(colName, values, typeOf<String>().withNullability(hasNulls))
when (colType) {
null -> column.tryParse(parserOptions)

else -> {
column.tryParse(
(parserOptions ?: ParserOptions()).copy(
skipTypes = ParserOptions.allTypesExcept(colType.toKType()),
),
)
}
}
}
return cols.toDataFrame()
} catch (e: OutOfMemoryError) {
throw OutOfMemoryError(
"Ran out of memory reading this CSV-like file. " +
"You can try our new experimental CSV reader by adding the dependency " +
"\"org.jetbrains.kotlinx:dataframe-csv:{VERSION}\" and using `DataFrame.readCsv()` instead of " +
"`DataFrame.readCSV()`. This requires `@OptIn(ExperimentalCsv::class)`.",
)
}
return cols.toDataFrame()
}

public fun AnyFrame.writeCSV(file: File, format: CSVFormat = CSVFormat.DEFAULT): Unit =
Expand Down

0 comments on commit 0b0776c

Please sign in to comment.