Skip to content

Commit

Permalink
Logical type : TIME (#143)
Browse files Browse the repository at this point in the history
# Problem

Part of #99 

![image](https://github.com/user-attachments/assets/3526d69a-b89b-4513-b02c-39ff03e91af3)

Support logical types in parquetjs starting with `TIME` support

Solution
========
Implementation following the parquet
[spec](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#deprecated-time-convertedtype)

## Change summary:

- Added a logical type to support parquet types
- Implemented type conversions for logical type time to primitive type
according to spec
- Implement field, schema and file tests to ensure everything works
together

## Steps to Verify:

1. npm run test
  • Loading branch information
saraswatpuneet authored Sep 13, 2024
1 parent 6797c99 commit 6b7bea9
Show file tree
Hide file tree
Showing 19 changed files with 956 additions and 11 deletions.
3 changes: 2 additions & 1 deletion lib/codec/types.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { PrimitiveType } from '../declare';
import { ParquetCodec, OriginalType, ParquetField } from '../declare';
import { Statistics } from '../../gen-nodejs/parquet_types';
import { LogicalType, Statistics } from '../../gen-nodejs/parquet_types';

export interface Options {
typeLength: number;
bitWidth: number;
disableEnvelope?: boolean;
primitiveType?: PrimitiveType;
originalType?: OriginalType;
logicalType?: LogicalType;
encoding?: ParquetCodec;
compression?: string;
column?: ParquetField;
Expand Down
4 changes: 3 additions & 1 deletion lib/declare.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Thanks to https://github.com/kbajalc/parquets

import parquet_thrift from '../gen-nodejs/parquet_types';
import parquet_thrift, { LogicalType } from '../gen-nodejs/parquet_types';
import {
Statistics,
OffsetIndex,
Expand Down Expand Up @@ -61,6 +61,7 @@ export type SchemaDefinition = Record<string, FieldDefinition>;
export interface FieldDefinition {
type?: ParquetType;
typeLength?: number;
logicalType?: LogicalType;
encoding?: ParquetCodec;
compression?: ParquetCompression;
optional?: boolean;
Expand All @@ -80,6 +81,7 @@ export interface ParquetField {
primitiveType?: PrimitiveType;
originalType?: OriginalType;
repetitionType: RepetitionType;
logicalType?: LogicalType;
typeLength?: number;
encoding?: ParquetCodec;
compression?: ParquetCompression;
Expand Down
24 changes: 23 additions & 1 deletion lib/fields.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Helper functions for creating fields

import { FieldDefinition, ParquetType, SchemaDefinition } from './declare';
import { LogicalType, TimeType } from '../gen-nodejs/parquet_types';
import { FieldDefinition, ParquetType, PrimitiveType, SchemaDefinition } from './declare';

export function createStringField(optional = true, fieldOptions: FieldDefinition = {}): FieldDefinition {
return { ...fieldOptions, optional, type: 'UTF8' };
Expand Down Expand Up @@ -80,3 +81,24 @@ export function createListField(
},
};
}

export function createTimeField(
logicalType: TimeType,
optional = true,
fieldOptions: FieldDefinition = {}
): FieldDefinition {
let primitiveType: PrimitiveType;
if (logicalType.unit.MILLIS) {
primitiveType = 'INT32'; // TIME_MILLIS uses INT32
} else if (logicalType.unit.MICROS || logicalType.unit.NANOS) {
primitiveType = 'INT64'; // TIME_MICROS and TIME_NANOS use INT64
} else {
throw new Error('Unsupported time unit in logicalType');
}
return {
...fieldOptions,
optional,
type: primitiveType,
logicalType: new LogicalType({ TIME: logicalType }),
};
}
69 changes: 67 additions & 2 deletions lib/jsonSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import { JSONSchema4 } from 'json-schema';
import { FieldDefinition, SchemaDefinition } from './declare';
import * as fields from './fields';
import { TimeUnit } from '../gen-nodejs/parquet_types';
import { TimeType } from '../gen-nodejs/parquet_types';

type SupportedJSONSchema4 = Omit<
JSONSchema4,
Expand Down Expand Up @@ -70,18 +72,52 @@ const fromJsonSchemaArray = (fieldValue: SupportedJSONSchema4, optionalFieldList

switch (fieldValue.items.type) {
case 'string':
if (fieldValue.items.format && fieldValue.items.format == 'date-time') {
if (fieldValue.items.format && fieldValue.items.format === 'date-time') {
return fields.createListField('TIMESTAMP_MILLIS', optionalFieldList);
}
return fields.createListField('UTF8', optionalFieldList);

case 'integer':
return fields.createListField('INT64', optionalFieldList);

case 'number':
return fields.createListField('DOUBLE', optionalFieldList);

case 'boolean':
return fields.createListField('BOOLEAN', optionalFieldList);

case 'object':
// Handle array of time fields
if (
fieldValue.items.properties &&
fieldValue.items.properties.unit &&
fieldValue.items.properties.isAdjustedToUTC
) {
if (!fieldValue.items.properties.unit.enum) {
throw new UnsupportedJsonSchemaError('Unit enum is not defined');
}
const unit = fieldValue.items.properties.unit.default || fieldValue.items.properties.unit.enum[0];
const isAdjustedToUTC = !!fieldValue.items.properties.isAdjustedToUTC.default;
let timeUnit: TimeUnit;

switch (unit) {
case 'MICROS':
timeUnit = new TimeUnit({ MICROS: true });
break;
case 'NANOS':
timeUnit = new TimeUnit({ NANOS: true });
break;
default:
timeUnit = new TimeUnit({ MILLIS: true });
break;
}

const timeLogicalType = new TimeType({ isAdjustedToUTC, unit: timeUnit });
return fields.createTimeField(timeLogicalType, optionalFieldList);
}

return fields.createStructListField(fromJsonSchema(fieldValue.items), optionalFieldList);

default:
throw new UnsupportedJsonSchemaError(`Array field type ${JSON.stringify(fieldValue.items)} is unsupported.`);
}
Expand All @@ -100,20 +136,49 @@ const fromJsonSchemaField =

switch (fieldValue.type) {
case 'string':
if (fieldValue.format && fieldValue.format == 'date-time') {
if (fieldValue.format && fieldValue.format === 'date-time') {
return fields.createTimestampField(optional);
}
return fields.createStringField(optional);

case 'integer':
return fields.createIntField(64, optional);

case 'number':
return fields.createDoubleField(optional);

case 'boolean':
return fields.createBooleanField(optional);

case 'array':
return fromJsonSchemaArray(fieldValue, optional);

case 'object':
if (fieldValue.properties && fieldValue.properties.unit && fieldValue.properties.isAdjustedToUTC) {
if (!fieldValue.properties.unit.enum) {
throw new UnsupportedJsonSchemaError('Unit enum is not defined');
}
const unit = fieldValue.properties.unit.default || fieldValue.properties.unit.enum[0];
const isAdjustedToUTC = !!fieldValue.properties.isAdjustedToUTC.default;
let timeUnit: TimeUnit;
switch (unit) {
case 'MICROS':
timeUnit = new TimeUnit({ MICROS: true });
break;
case 'NANOS':
timeUnit = new TimeUnit({ NANOS: true });
break;
default:
timeUnit = new TimeUnit({ MILLIS: true });
break;
}

const timeLogicalType = new TimeType({ isAdjustedToUTC, unit: timeUnit });
return fields.createTimeField(timeLogicalType, optional);
}

return fields.createStructField(fromJsonSchema(fieldValue), optional);

default:
throw new UnsupportedJsonSchemaError(
`Unable to convert "${fieldName}" with JSON Schema type "${fieldValue.type}" to a Parquet Schema.`
Expand Down
2 changes: 2 additions & 0 deletions lib/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ function buildFields(schema: SchemaDefinition, rLevelParentMax?: number, dLevelP
statistics: opts.statistics,
fieldCount: Object.keys(opts.fields).length,
fields: buildFields(opts.fields, rLevelMax, dLevelMax, path.concat(name)),
logicalType: opts.logicalType,
};

if (opts.type == 'LIST' || opts.type == 'MAP') fieldList[name].originalType = opts.type;
Expand Down Expand Up @@ -174,6 +175,7 @@ function buildFields(schema: SchemaDefinition, rLevelParentMax?: number, dLevelP
name: name,
primitiveType: typeDef.primitiveType,
originalType: typeDef.originalType,
logicalType: opts.logicalType,
path: path.concat([name]),
repetitionType: repetitionType,
encoding: opts.encoding,
Expand Down
72 changes: 69 additions & 3 deletions lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,28 @@ interface INTERVAL {
milliseconds: number;
}

interface TIME {
value: string | bigint | number;
unit: 'MILLIS' | 'MICROS' | 'NANOS';
isAdjustedToUTC: boolean;
}

export function getParquetTypeDataObject(
type: ParquetType,
field?: ParquetField | Options | FieldDefinition
): ParquetTypeDataObject {
if (type === 'DECIMAL') {
if (field?.typeLength !== undefined && field?.typeLength !== null) {
if (field?.typeLength !== undefined) {
return {
primitiveType: 'FIXED_LEN_BYTE_ARRAY',
originalType: 'DECIMAL',
typeLength: field.typeLength,
toPrimitive: toPrimitive_FIXED_LEN_BYTE_ARRAY_DECIMAL,
};
} else if (field?.precision !== undefined && field?.precision !== null && field.precision > 18) {
} else if (field?.precision && field.precision > 18) {
return {
primitiveType: 'BYTE_ARRAY',
originalType: 'DECIMAL',
typeLength: field.typeLength,
toPrimitive: toPrimitive_BYTE_ARRAY_DECIMAL,
};
} else {
Expand All @@ -47,6 +52,29 @@ export function getParquetTypeDataObject(
toPrimitive: toPrimitive_INT64,
};
}
} else if (field?.logicalType?.TIME) {
const unit = field.logicalType.TIME.unit;
if (unit.MILLIS) {
return {
originalType: 'TIME_MILLIS',
primitiveType: 'INT32',
toPrimitive: toPrimitive_TIME,
};
}
if (unit.MICROS) {
return {
originalType: 'TIME_MICROS',
primitiveType: 'INT64',
toPrimitive: toPrimitive_TIME,
};
}
if (unit.NANOS) {
return {
primitiveType: 'INT64',
toPrimitive: toPrimitive_TIME,
};
}
throw new Error('TIME type must have a valid unit (MILLIS, MICROS, NANOS).');
} else {
return PARQUET_LOGICAL_TYPE_DATA[type];
}
Expand Down Expand Up @@ -560,3 +588,41 @@ function checkValidValue(lowerRange: number | bigint, upperRange: number | bigin
throw 'invalid value';
}
}

function toPrimitive_TIME(time: TIME): bigint | number {
const { value, unit, isAdjustedToUTC } = time;

const timeValue = typeof value === 'string' ? BigInt(value) : BigInt(value);

if (isAdjustedToUTC) {
return unit === 'MILLIS' ? Number(timeValue) : timeValue;
} else {
switch (unit) {
case 'MILLIS':
return Number(adjustToLocalTimestamp(timeValue, { MILLIS: true }));
case 'MICROS':
return adjustToLocalTimestamp(timeValue, { MICROS: true });
case 'NANOS':
return adjustToLocalTimestamp(timeValue, { NANOS: true });
default:
throw new Error(`Unsupported time unit: ${unit}`);
}
}
}

function adjustToLocalTimestamp(
timestamp: bigint,
unit: { MILLIS?: boolean; MICROS?: boolean; NANOS?: boolean }
): bigint {
const localOffset = BigInt(new Date().getTimezoneOffset()) * 60n * 1000n; // Offset in milliseconds

if (unit.MILLIS) {
return timestamp - localOffset;
} else if (unit.MICROS) {
return timestamp - localOffset * 1000n;
} else if (unit.NANOS) {
return timestamp - localOffset * 1000000n;
}

throw new Error('Unsupported time unit');
}
8 changes: 8 additions & 0 deletions test/decodeSchema.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ describe('ParquetSchema', function () {
dLevelMax: 0,
isNested: true,
fieldCount: 2,
logicalType: undefined,
fields: {
b: {
name: 'b',
Expand All @@ -130,6 +131,7 @@ describe('ParquetSchema', function () {
dLevelMax: 0,
isNested: true,
fieldCount: 2,
logicalType: undefined,
fields: {
c: {
name: 'c',
Expand All @@ -140,6 +142,7 @@ describe('ParquetSchema', function () {
dLevelMax: 0,
isNested: true,
fieldCount: 1,
logicalType: undefined,
fields: {
d: {
name: 'd',
Expand All @@ -150,6 +153,7 @@ describe('ParquetSchema', function () {
statistics: undefined,
typeLength: undefined,
encoding: 'PLAIN',
logicalType: undefined,
compression: 'UNCOMPRESSED',
rLevelMax: 0,
dLevelMax: 0,
Expand All @@ -167,6 +171,7 @@ describe('ParquetSchema', function () {
dLevelMax: 0,
isNested: true,
fieldCount: 2,
logicalType: undefined,
fields: {
f: {
name: 'f',
Expand All @@ -177,6 +182,7 @@ describe('ParquetSchema', function () {
statistics: undefined,
typeLength: undefined,
encoding: 'PLAIN',
logicalType: undefined,
compression: 'UNCOMPRESSED',
rLevelMax: 0,
dLevelMax: 0,
Expand All @@ -192,6 +198,7 @@ describe('ParquetSchema', function () {
statistics: undefined,
typeLength: undefined,
encoding: 'PLAIN',
logicalType: undefined,
compression: 'UNCOMPRESSED',
rLevelMax: 0,
dLevelMax: 0,
Expand All @@ -211,6 +218,7 @@ describe('ParquetSchema', function () {
statistics: undefined,
typeLength: undefined,
encoding: 'PLAIN',
logicalType: undefined,
compression: 'UNCOMPRESSED',
rLevelMax: 0,
dLevelMax: 0,
Expand Down
Loading

0 comments on commit 6b7bea9

Please sign in to comment.