Skip to content

Commit

Permalink
Use arrow-csv for CSV I/O
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed Oct 10, 2024
1 parent fda2133 commit e8fb954
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ categories = ["science::geo"]
rust-version = "1.80"

[features]
csv = ["geozero/with-csv"]
csv = ["dep:arrow-csv", "geozero/with-csv"]
flatgeobuf = ["dep:flatgeobuf"]
flatgeobuf_async = [
"flatgeobuf/http",
Expand Down Expand Up @@ -48,6 +48,7 @@ arrow = { version = "53", features = ["ffi"] }
arrow-array = { version = "53", features = ["chrono-tz"] }
arrow-buffer = "53"
arrow-cast = { version = "53" }
arrow-csv = { version = "53", optional = true }
arrow-data = "53"
arrow-ipc = "53"
arrow-schema = "53"
Expand Down
44 changes: 40 additions & 4 deletions src/io/csv/writer.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,52 @@
use crate::array::NativeArrayDyn;
use crate::error::Result;
use crate::io::stream::RecordBatchReader;
use geozero::csv::CsvWriter;
use geozero::GeozeroDatasource;
use crate::io::wkt::ToWKT;
use crate::{ArrayBase, NativeArray};
use arrow_array::RecordBatch;
use arrow_schema::Schema;
use std::io::Write;
use std::sync::Arc;

// TODO: add CSV writer options

/// Write a Table to CSV
pub fn write_csv<W: Write, S: Into<RecordBatchReader>>(stream: S, writer: W) -> Result<()> {
let mut csv_writer = CsvWriter::new(writer);
stream.into().process(&mut csv_writer)?;
let mut stream: RecordBatchReader = stream.into();
let reader = stream.take().unwrap();

let mut csv_writer = arrow_csv::Writer::new(writer);
for batch in reader {
csv_writer.write(&encode_batch(batch?)?)?;
}

Ok(())
}

fn encode_batch(batch: RecordBatch) -> Result<RecordBatch> {
let schema = batch.schema();
let fields = schema.fields();

let mut new_fields = Vec::with_capacity(fields.len());
let mut new_columns = Vec::with_capacity(fields.len());

for (field, column) in schema.fields().iter().zip(batch.columns()) {
if let Ok(arr) = NativeArrayDyn::from_arrow_array(&column, &field) {
let wkt_arr = arr.as_ref().to_wkt::<i32>();
new_fields.push(wkt_arr.extension_field());
new_columns.push(wkt_arr.into_array_ref());
} else {
new_fields.push(field.clone());
new_columns.push(column.clone());
}
}

Ok(RecordBatch::try_new(
Arc::new(Schema::new(new_fields).with_metadata(schema.metadata().clone())),
new_columns,
)?)
}

#[cfg(test)]
mod test {
use super::*;
Expand Down

0 comments on commit e8fb954

Please sign in to comment.