Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logical type : TIME #143

Merged
merged 30 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
fb4b59a
minimal changes, passing test
saraswatpuneet Sep 5, 2024
1ca98b8
handle possible exceptions on conversion
saraswatpuneet Sep 5, 2024
8fcafe2
set the primitive type for TIME as INT64
saraswatpuneet Sep 5, 2024
9949fd4
define field definition for time support and helper function
saraswatpuneet Sep 6, 2024
679c96c
declare additional fields for TIME
saraswatpuneet Sep 6, 2024
d3a1382
update jsonSchema to support TIME
saraswatpuneet Sep 6, 2024
e22fbe4
add field tests for time
saraswatpuneet Sep 6, 2024
ddfcc0e
annotate as int32 for MILLIS and int64 for rest
saraswatpuneet Sep 6, 2024
05e2414
spec says int32 for milli, bigint for micro and nano
saraswatpuneet Sep 6, 2024
38a1d57
redo based on generated types
saraswatpuneet Sep 9, 2024
dddb09a
add TIME related primitive converters
saraswatpuneet Sep 9, 2024
9401f67
support time field for json schema
saraswatpuneet Sep 9, 2024
13007a5
finalize implementation side of TIME logical type
saraswatpuneet Sep 9, 2024
ae58c96
set converted types for backward compatibility
saraswatpuneet Sep 9, 2024
42d72ad
set example field tests
saraswatpuneet Sep 9, 2024
510f402
set an example time schema
saraswatpuneet Sep 10, 2024
bc9e11c
set schema test
saraswatpuneet Sep 10, 2024
44f64aa
update schema files for testing
saraswatpuneet Sep 10, 2024
97b1475
schema test result file
saraswatpuneet Sep 10, 2024
6485753
adjust types based on ongoing file tests
saraswatpuneet Sep 11, 2024
6927f31
logical types from parquet file cannot be reconstructed back without …
saraswatpuneet Sep 11, 2024
bd1cb5d
Merge branch 'main' into logical_type_time
saraswatpuneet Sep 11, 2024
abf7f0b
remove debug logs
saraswatpuneet Sep 11, 2024
0a5040f
add schema test for millis, macros and nanos
saraswatpuneet Sep 12, 2024
119bc70
update existing test to include logicalType
saraswatpuneet Sep 12, 2024
6958dab
revert and fix decodeSchema.js test inline with recent changes
saraswatpuneet Sep 12, 2024
6ec10b8
pick default or 0 whichever
saraswatpuneet Sep 12, 2024
96ec9f7
pick default unit for now,
saraswatpuneet Sep 12, 2024
06b7441
fix typo
saraswatpuneet Sep 12, 2024
8ca323a
better schema
saraswatpuneet Sep 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/codec/types.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { PrimitiveType } from '../declare';
import { ParquetCodec, OriginalType, ParquetField } from '../declare';
import { Statistics } from '../../gen-nodejs/parquet_types';
import { LogicalType, Statistics } from '../../gen-nodejs/parquet_types';

export interface Options {
typeLength: number;
bitWidth: number;
disableEnvelope?: boolean;
primitiveType?: PrimitiveType;
originalType?: OriginalType;
logicalType?: LogicalType;
saraswatpuneet marked this conversation as resolved.
Show resolved Hide resolved
encoding?: ParquetCodec;
compression?: string;
column?: ParquetField;
Expand Down
4 changes: 3 additions & 1 deletion lib/declare.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Thanks to https://github.com/kbajalc/parquets

import parquet_thrift from '../gen-nodejs/parquet_types';
import parquet_thrift, { LogicalType } from '../gen-nodejs/parquet_types';
import {
Statistics,
OffsetIndex,
Expand Down Expand Up @@ -61,6 +61,7 @@ export type SchemaDefinition = Record<string, FieldDefinition>;
export interface FieldDefinition {
type?: ParquetType;
typeLength?: number;
logicalType?: LogicalType;
encoding?: ParquetCodec;
compression?: ParquetCompression;
optional?: boolean;
Expand All @@ -80,6 +81,7 @@ export interface ParquetField {
primitiveType?: PrimitiveType;
originalType?: OriginalType;
repetitionType: RepetitionType;
logicalType?: LogicalType;
typeLength?: number;
encoding?: ParquetCodec;
compression?: ParquetCompression;
Expand Down
24 changes: 23 additions & 1 deletion lib/fields.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Helper functions for creating fields

import { FieldDefinition, ParquetType, SchemaDefinition } from './declare';
import { LogicalType, TimeType } from '../gen-nodejs/parquet_types';
import { FieldDefinition, ParquetType, PrimitiveType, SchemaDefinition } from './declare';

export function createStringField(optional = true, fieldOptions: FieldDefinition = {}): FieldDefinition {
return { ...fieldOptions, optional, type: 'UTF8' };
Expand Down Expand Up @@ -80,3 +81,24 @@ export function createListField(
},
};
}

export function createTimeField(
logicalType: TimeType,
optional = true,
fieldOptions: FieldDefinition = {}
): FieldDefinition {
let primitiveType: PrimitiveType;
if (logicalType.unit.MILLIS) {
primitiveType = 'INT32'; // TIME_MILLIS uses INT32
} else if (logicalType.unit.MICROS || logicalType.unit.NANOS) {
primitiveType = 'INT64'; // TIME_MICROS and TIME_NANOS use INT64
} else {
throw new Error('Unsupported time unit in logicalType');
}
return {
...fieldOptions,
optional,
type: primitiveType,
logicalType: new LogicalType({ TIME: logicalType }),
};
}
64 changes: 62 additions & 2 deletions lib/jsonSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import { JSONSchema4 } from 'json-schema';
import { FieldDefinition, SchemaDefinition } from './declare';
import * as fields from './fields';
import { TimeUnit } from '../gen-nodejs/parquet_types';
import { TimeType } from '../gen-nodejs/parquet_types';

type SupportedJSONSchema4 = Omit<
JSONSchema4,
Expand Down Expand Up @@ -70,18 +72,49 @@ const fromJsonSchemaArray = (fieldValue: SupportedJSONSchema4, optionalFieldList

switch (fieldValue.items.type) {
case 'string':
if (fieldValue.items.format && fieldValue.items.format == 'date-time') {
if (fieldValue.items.format && fieldValue.items.format === 'date-time') {
return fields.createListField('TIMESTAMP_MILLIS', optionalFieldList);
}
return fields.createListField('UTF8', optionalFieldList);

case 'integer':
return fields.createListField('INT64', optionalFieldList);

case 'number':
return fields.createListField('DOUBLE', optionalFieldList);

case 'boolean':
return fields.createListField('BOOLEAN', optionalFieldList);

case 'object':
// Handle array of time fields
if (
fieldValue.items.properties &&
fieldValue.items.properties.unit &&
fieldValue.items.properties.isAdjustedToUTC
) {
const unit = fieldValue.items.properties.unit.default?.toString() || 'MILLIS';
const isAdjustedToUTC = !!fieldValue.items.properties.isAdjustedToUTC.default;
let timeUnit: TimeUnit;

switch (unit) {
case 'MICROS':
timeUnit = new TimeUnit({ MICROS: true });
break;
case 'NANOS':
timeUnit = new TimeUnit({ NANOS: true });
break;
default:
timeUnit = new TimeUnit({ MILLIS: true });
break;
}

const timeLogicalType = new TimeType({ isAdjustedToUTC, unit: timeUnit });
return fields.createTimeField(timeLogicalType, optionalFieldList);
}

return fields.createStructListField(fromJsonSchema(fieldValue.items), optionalFieldList);

default:
throw new UnsupportedJsonSchemaError(`Array field type ${JSON.stringify(fieldValue.items)} is unsupported.`);
}
Expand All @@ -100,20 +133,47 @@ const fromJsonSchemaField =

switch (fieldValue.type) {
case 'string':
if (fieldValue.format && fieldValue.format == 'date-time') {
if (fieldValue.format && fieldValue.format === 'date-time') {
shannonwells marked this conversation as resolved.
Show resolved Hide resolved
return fields.createTimestampField(optional);
}
return fields.createStringField(optional);

case 'integer':
return fields.createIntField(64, optional);

case 'number':
return fields.createDoubleField(optional);

case 'boolean':
return fields.createBooleanField(optional);

case 'array':
return fromJsonSchemaArray(fieldValue, optional);

case 'object':
if (fieldValue.properties && fieldValue.properties.unit && fieldValue.properties.isAdjustedToUTC) {
const unit = fieldValue.properties.unit.default?.toString() || 'MILLIS';
const isAdjustedToUTC = !!fieldValue.properties.isAdjustedToUTC.default;
let timeUnit: TimeUnit;

switch (unit) {
case 'MICROS':
timeUnit = new TimeUnit({ MICROS: true });
break;
case 'NANOS':
timeUnit = new TimeUnit({ NANOS: true });
break;
default:
timeUnit = new TimeUnit({ MILLIS: true });
break;
}

const timeLogicalType = new TimeType({ isAdjustedToUTC, unit: timeUnit });
return fields.createTimeField(timeLogicalType, optional);
}

return fields.createStructField(fromJsonSchema(fieldValue), optional);

default:
throw new UnsupportedJsonSchemaError(
`Unable to convert "${fieldName}" with JSON Schema type "${fieldValue.type}" to a Parquet Schema.`
Expand Down
2 changes: 2 additions & 0 deletions lib/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ function buildFields(schema: SchemaDefinition, rLevelParentMax?: number, dLevelP
statistics: opts.statistics,
fieldCount: Object.keys(opts.fields).length,
fields: buildFields(opts.fields, rLevelMax, dLevelMax, path.concat(name)),
logicalType: opts.logicalType,
};

if (opts.type == 'LIST' || opts.type == 'MAP') fieldList[name].originalType = opts.type;
Expand Down Expand Up @@ -174,6 +175,7 @@ function buildFields(schema: SchemaDefinition, rLevelParentMax?: number, dLevelP
name: name,
primitiveType: typeDef.primitiveType,
originalType: typeDef.originalType,
logicalType: opts.logicalType,
path: path.concat([name]),
repetitionType: repetitionType,
encoding: opts.encoding,
Expand Down
73 changes: 70 additions & 3 deletions lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,28 @@ interface INTERVAL {
milliseconds: number;
}

interface TIME {
value: string | bigint | number;
unit: 'MILLIS' | 'MICROS' | 'NANOS';
isAdjustedToUTC: boolean;
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type for handling incoming time type and handle accordingly


export function getParquetTypeDataObject(
type: ParquetType,
field?: ParquetField | Options | FieldDefinition
): ParquetTypeDataObject {
if (type === 'DECIMAL') {
if (field?.typeLength !== undefined && field?.typeLength !== null) {
if (field?.typeLength !== undefined) {
return {
primitiveType: 'FIXED_LEN_BYTE_ARRAY',
originalType: 'DECIMAL',
typeLength: field.typeLength,
toPrimitive: toPrimitive_FIXED_LEN_BYTE_ARRAY_DECIMAL,
};
} else if (field?.precision !== undefined && field?.precision !== null && field.precision > 18) {
} else if (field?.precision && field.precision > 18) {
saraswatpuneet marked this conversation as resolved.
Show resolved Hide resolved
return {
primitiveType: 'BYTE_ARRAY',
originalType: 'DECIMAL',
typeLength: field.typeLength,
toPrimitive: toPrimitive_BYTE_ARRAY_DECIMAL,
};
} else {
Expand All @@ -47,6 +52,29 @@ export function getParquetTypeDataObject(
toPrimitive: toPrimitive_INT64,
};
}
} else if (field?.logicalType?.TIME) {
const unit = field.logicalType.TIME.unit;
if (unit.MILLIS) {
return {
originalType: 'TIME_MILLIS',
primitiveType: 'INT32',
toPrimitive: toPrimitive_TIME,
};
}
if (unit.MICROS) {
return {
originalType: 'TIME_MICROS',
primitiveType: 'INT64',
toPrimitive: toPrimitive_TIME,
};
}
if (unit.NANOS) {
return {
primitiveType: 'INT64',
toPrimitive: toPrimitive_TIME,
};
}
throw new Error('TIME type must have a valid unit (MILLIS, MICROS, NANOS).');
} else {
return PARQUET_LOGICAL_TYPE_DATA[type];
}
Expand Down Expand Up @@ -560,3 +588,42 @@ function checkValidValue(lowerRange: number | bigint, upperRange: number | bigin
throw 'invalid value';
}
}

function toPrimitive_TIME(time: TIME): bigint | number {
console.log('time', time);
const { value, unit, isAdjustedToUTC } = time;

const timeValue = typeof value === 'string' ? BigInt(value) : BigInt(value);

if (isAdjustedToUTC) {
return unit === 'MILLIS' ? Number(timeValue) : timeValue;
} else {
switch (unit) {
case 'MILLIS':
return Number(adjustToLocalTimestamp(timeValue, { MILLIS: true }));
case 'MICROS':
return adjustToLocalTimestamp(timeValue, { MICROS: true });
case 'NANOS':
return adjustToLocalTimestamp(timeValue, { NANOS: true });
default:
throw new Error(`Unsupported time unit: ${unit}`);
}
}
}

function adjustToLocalTimestamp(
timestamp: bigint,
unit: { MILLIS?: boolean; MICROS?: boolean; NANOS?: boolean }
): bigint {
const localOffset = BigInt(new Date().getTimezoneOffset()) * 60n * 1000n; // Offset in milliseconds

if (unit.MILLIS) {
return timestamp - localOffset;
} else if (unit.MICROS) {
return timestamp - localOffset * 1000n;
} else if (unit.NANOS) {
return timestamp - localOffset * 1000000n;
}

throw new Error('Unsupported time unit');
}
saraswatpuneet marked this conversation as resolved.
Show resolved Hide resolved
17 changes: 16 additions & 1 deletion test/decodeSchema.js
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,21 @@ describe('ParquetSchema', function () {
};

const reader = new parquet.ParquetReader(metadata, {});
assert.deepEqual(reader.schema.fields, expected);
const removeUndefinedFields = (obj) => {
Object.keys(obj).forEach((key) => {
if (obj[key] === undefined || obj[key] === null || (Array.isArray(obj[key]) && obj[key].length === 0)) {
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete obj[key];
} else if (typeof obj[key] === 'object') {
removeUndefinedFields(obj[key]);
}
});
return obj;
};

const sanitizedExpected = removeUndefinedFields(expected);
const sanitizedActual = removeUndefinedFields(reader.schema.fields);

assert.deepEqual(sanitizedActual, sanitizedExpected);
saraswatpuneet marked this conversation as resolved.
Show resolved Hide resolved
});
});
Loading
Loading