Skip to content

Commit

Permalink
feat: Optionally store content marked as "base16" encoded into `byt…
Browse files Browse the repository at this point in the history
…ea` columns (#287)

Motivation: more efficient `bytea` storage of data marked with
`"contentEncoding": "base16"`

JSON Schema References:
- https://datatracker.ietf.org/doc/html/rfc4648#section-8
-
https://json-schema.org/draft/2020-12/draft-bhutton-json-schema-validation-00#rfc.section.8.3

Bytea is at least twice as efficient as string to store hex data:
```
select 
    octet_length('\x2BdfBd329984Cf0DC9027734681A16f542cF3bB4'::bytea) as "bytea", 
    octet_length('0x2BdfBd329984Cf0DC9027734681A16f542cF3bB4') as "string"
;
 bytea | string 
-------+--------
    20 |     42
```

It's probably a good idea to make this behaviour opt-in. I'm not sure
how to implement that since most of the type detection code is inside
static methods of `PostgresConnector`. I can't find a way to inject the
target config without making a big change.

---------

Co-authored-by: Edgar Ramírez Mondragón <[email protected]>
  • Loading branch information
prevostc and edgarrmondragon authored Feb 6, 2024
1 parent 1f7b73d commit 557c9da
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 9 deletions.
50 changes: 49 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ The below table shows how this tap will map between jsonschema datatypes and Pos
| UNSUPPORTED | bit varying [ (n) ] |
| boolean | boolean |
| UNSUPPORTED | box |
| UNSUPPORTED | bytea |
| string with contentEncoding="base16" ([opt-in feature](#content-encoding-support)) | bytea |
| UNSUPPORTED | character [ (n) ] |
| UNSUPPORTED | character varying [ (n) ] |
| UNSUPPORTED | cidr |
Expand Down Expand Up @@ -215,6 +215,7 @@ The below table shows how this tap will map between jsonschema datatypes and Pos
Note that while object types are mapped directly to jsonb, array types are mapped to a jsonb array.

If a column has multiple jsonschema types, the following order is using to order Postgres types, from highest priority to lowest priority.
- BYTEA
- ARRAY(JSONB)
- JSONB
- TEXT
Expand All @@ -227,3 +228,50 @@ If a column has multiple jsonschema types, the following order is using to order
- INTEGER
- BOOLEAN
- NOTYPE

## Content Encoding Support

Json Schema supports the [`contentEncoding` keyword](https://datatracker.ietf.org/doc/html/rfc4648#section-8), which can be used to specify the encoding of input string types.

This target can detect content encoding clues in the schema to determine how to store the data in the postgres in a more efficient way.

Content encoding interpretation is disabled by default. This is because the default config is meant to be as permissive as possible, and do not make any assumptions about the data that could lead to data loss.

However if you know your data respects the advertised content encoding way, you can enable this feature to get better performance and storage efficiency.

To enable it, set the `interpret_content_encoding` option to `True`.

### base16

The string is encoded using the base16 encoding, as defined in [RFC 4648](https://json-schema.org/draft/2020-12/draft-bhutton-json-schema-validation-00#rfc.section.8.3
).

Example schema:
```json
{
"type": "object",
"properties": {
"my_hex": {
"type": "string",
"contentEncoding": "base16"
}
}
}
```

Data will be stored as a `bytea` in the database.

Example data:
```json
# valid data
{ "my_hex": "01AF" }
{ "my_hex": "01af" }
{ "my_hex": "1af" }
{ "my_hex": "0x1234" }

# invalid data
{ "my_hex": " 0x1234 " }
{ "my_hex": "House" }
```

For convenience, data prefixed with `0x` or containing an odd number of characters is supported although it's not part of the standard.
73 changes: 65 additions & 8 deletions target_postgres/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import signal
import typing as t
from contextlib import contextmanager
from functools import cached_property
from os import chmod, path
from typing import cast

Expand All @@ -15,7 +16,7 @@
import sqlalchemy as sa
from singer_sdk import SQLConnector
from singer_sdk import typing as th
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, JSONB
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, BYTEA, JSONB
from sqlalchemy.engine import URL
from sqlalchemy.engine.url import make_url
from sqlalchemy.types import (
Expand Down Expand Up @@ -79,6 +80,18 @@ def __init__(self, config: dict) -> None:
sqlalchemy_url=url.render_as_string(hide_password=False),
)

@cached_property
def interpret_content_encoding(self) -> bool:
"""Whether to interpret schema contentEncoding to set the column type.
It is an opt-in feature because it might result in data loss if the
actual data does not match the schema's advertised encoding.
Returns:
True if the feature is enabled, False otherwise.
"""
return self.config.get("interpret_content_encoding", False)

def prepare_table( # type: ignore[override]
self,
full_table_name: str,
Expand Down Expand Up @@ -205,8 +218,7 @@ def clone_table(
new_table.create(bind=connection)
return new_table

@staticmethod
def to_sql_type(jsonschema_type: dict) -> sa.types.TypeEngine:
def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: # type: ignore[override]
"""Return a JSON Schema representation of the provided type.
By default will call `typing.to_sql_type()`.
Expand All @@ -232,6 +244,8 @@ def to_sql_type(jsonschema_type: dict) -> sa.types.TypeEngine:
json_type_dict = {"type": entry}
if jsonschema_type.get("format", False):
json_type_dict["format"] = jsonschema_type["format"]
if encoding := jsonschema_type.get("contentEncoding", False):
json_type_dict["contentEncoding"] = encoding
json_type_array.append(json_type_dict)
else:
msg = "Invalid format for jsonschema type: not str or list."
Expand All @@ -246,16 +260,13 @@ def to_sql_type(jsonschema_type: dict) -> sa.types.TypeEngine:
return NOTYPE()
sql_type_array = []
for json_type in json_type_array:
picked_type = PostgresConnector.pick_individual_type(
jsonschema_type=json_type
)
picked_type = self.pick_individual_type(jsonschema_type=json_type)
if picked_type is not None:
sql_type_array.append(picked_type)

return PostgresConnector.pick_best_sql_type(sql_type_array=sql_type_array)

@staticmethod
def pick_individual_type(jsonschema_type: dict):
def pick_individual_type(self, jsonschema_type: dict):
"""Select the correct sql type assuming jsonschema_type has only a single type.
Args:
Expand All @@ -272,8 +283,15 @@ def pick_individual_type(jsonschema_type: dict):
return JSONB()
if "array" in jsonschema_type["type"]:
return ARRAY(JSONB())

# string formats
if jsonschema_type.get("format") == "date-time":
return TIMESTAMP()
if (
self.interpret_content_encoding
and jsonschema_type.get("contentEncoding") == "base16"
):
return HexByteString()
individual_type = th.to_sql_type(jsonschema_type)
if isinstance(individual_type, VARCHAR):
return TEXT()
Expand All @@ -290,6 +308,7 @@ def pick_best_sql_type(sql_type_array: list):
An instance of the best SQL type class based on defined precedence order.
"""
precedence_order = [
HexByteString,
ARRAY,
JSONB,
TEXT,
Expand Down Expand Up @@ -834,3 +853,41 @@ def python_type(self):
def as_generic(self, *args: t.Any, **kwargs: t.Any):
"""Return the generic type for this column."""
return TEXT()


class HexByteString(TypeDecorator):
"""Convert Python string representing Hex data to bytes and vice versa.
This is used to store binary data in more efficient format in the database.
The string is encoded using the base16 encoding, as defined in RFC 4648
https://json-schema.org/draft/2020-12/draft-bhutton-json-schema-validation-00#rfc.section.8.3
For convenience, data prefixed with `0x` or containing an odd number of characters
is supported although it's not part of the standard.
"""

impl = BYTEA

def process_bind_param(self, value, dialect):
"""Convert hex string to bytes."""
if value is None:
return None

if isinstance(value, str):
if value.startswith("\\x") or value.startswith("0x"):
value = value[2:]

if len(value) % 2:
value = f"0{value}"

try:
value = bytes.fromhex(value)
except ValueError as ex:
raise ValueError(f"Invalid hexadecimal string: {value}") from ex

if not isinstance(value, bytearray | memoryview | bytes):
raise TypeError(
"HexByteString columns support only bytes or hex string values. "
f"{type(value)} is not supported"
)

return value
11 changes: 11 additions & 0 deletions target_postgres/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,17 @@ def __init__(
+ "for more information."
),
),
th.Property(
"interpret_content_encoding",
th.BooleanType,
default=False,
description=(
"If set to true, the target will interpret the content encoding of the "
"schema to determine how to store the data. Using this option may "
"result in a more efficient storage of the data but may also result "
"in an error if the data is not encoded as expected."
),
),
th.Property(
"ssl_enable",
th.BooleanType,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"type":"SCHEMA","stream":"test_base_16_encoding_interpreted","schema":{"type":"object","properties":{"id":{"type":"string"},"contract_address":{"type":"string","contentEncoding":"base16"},"raw_event_data":{"type":["string","null"],"contentEncoding":"base16"}},"required":["id","contract_address","raw_event_data"]},"key_properties":["id"]}
{"type":"RECORD","stream":"test_base_16_encoding_interpreted","record":{"id":"test_handle_an_hex_str","contract_address":"0xA1B2C3D4E5F607080910","raw_event_data":"0xA1B2C3D4E5F60708091001020304050607080910010203040506070809100102030405060708091001020304050607080910"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_base_16_encoding_interpreted","record":{"id":"empty_0x_str","contract_address":"0x","raw_event_data":"0x"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_base_16_encoding_interpreted","record":{"id":"empty_str","contract_address":"","raw_event_data":""},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_base_16_encoding_interpreted","record":{"id":"test_nullable_field","contract_address":"","raw_event_data":null},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_base_16_encoding_interpreted","record":{"id":"test_handle_hex_without_the_0x_prefix","contract_address":"A1B2C3D4E5F607080910","raw_event_data":"A1B2C3D4E5F6070809100102030405060"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_base_16_encoding_interpreted","record":{"id":"test_handle_odd_and_even_number_of_chars","contract_address":"0xA1","raw_event_data":"A12"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_base_16_encoding_interpreted","record":{"id":"test_handle_upper_and_lowercase_hex","contract_address":"0xa1","raw_event_data":"A12b"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"type":"SCHEMA","stream":"test_base_16_encoding_not_interpreted","schema":{"type":"object","properties":{"id":{"type":"string"},"contract_address":{"type":"string","contentEncoding":"base16"},"raw_event_data":{"type":["string","null"],"contentEncoding":"base16"}},"required":["id","contract_address","raw_event_data"]},"key_properties":["id"]}
{"type":"RECORD","stream":"test_base_16_encoding_not_interpreted","record":{"id":"test_handle_an_hex_str","contract_address":"0xA1B2C3D4E5F607080910","raw_event_data":"0xA1B2C3D4E5F60708091001020304050607080910010203040506070809100102030405060708091001020304050607080910"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_base_16_encoding_not_interpreted","record":{"id":"empty_0x_str","contract_address":"0x","raw_event_data":"0x"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_base_16_encoding_not_interpreted","record":{"id":"empty_str","contract_address":"","raw_event_data":""},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_base_16_encoding_not_interpreted","record":{"id":"test_nullable_field","contract_address":"","raw_event_data":null},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_base_16_encoding_not_interpreted","record":{"id":"test_handle_hex_without_the_0x_prefix","contract_address":"A1B2C3D4E5F607080910","raw_event_data":"A1B2C3D4E5F6070809100102030405060"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_base_16_encoding_not_interpreted","record":{"id":"test_handle_odd_and_even_number_of_chars","contract_address":"0xA1","raw_event_data":"A12"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_base_16_encoding_not_interpreted","record":{"id":"test_handle_upper_and_lowercase_hex","contract_address":"0xa1","raw_event_data":"A12b"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
80 changes: 80 additions & 0 deletions target_postgres/tests/test_target_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ def verify_data(
result_dict = [
remove_metadata_columns(row._asdict()) for row in result.all()
]

# bytea columns are returned as memoryview objects
# we need to convert them to bytes to allow comparison with check_data
for row in result_dict:
for col in row:
if isinstance(row[col], memoryview):
row[col] = bytes(row[col])

assert result_dict == check_data
else:
raise ValueError("Invalid check_data - not dict or list of dicts")
Expand Down Expand Up @@ -500,6 +508,78 @@ def test_new_array_column(postgres_target):
singer_file_to_target(file_name, postgres_target)


def test_base16_content_encoding_not_interpreted(postgres_config_no_ssl):
"""Make sure we can insert base16 encoded data into the database without interpretation"""
postgres_config_modified = copy.deepcopy(postgres_config_no_ssl)
postgres_config_modified["interpret_content_encoding"] = False
target = TargetPostgres(config=postgres_config_modified)

singer_file_to_target("base16_content_encoding_not_interpreted.singer", target)

rows = [
{"id": "empty_0x_str", "contract_address": "0x", "raw_event_data": "0x"},
{"id": "empty_str", "contract_address": "", "raw_event_data": ""},
{
"id": "test_handle_an_hex_str",
"contract_address": "0xA1B2C3D4E5F607080910",
"raw_event_data": "0xA1B2C3D4E5F60708091001020304050607080910010203040506070809100102030405060708091001020304050607080910",
},
{
"id": "test_handle_hex_without_the_0x_prefix",
"contract_address": "A1B2C3D4E5F607080910",
"raw_event_data": "A1B2C3D4E5F6070809100102030405060",
},
{
"id": "test_handle_odd_and_even_number_of_chars",
"contract_address": "0xA1",
"raw_event_data": "A12",
},
{
"id": "test_handle_upper_and_lowercase_hex",
"contract_address": "0xa1",
"raw_event_data": "A12b",
},
{"id": "test_nullable_field", "contract_address": "", "raw_event_data": None},
]
verify_data(target, "test_base_16_encoding_not_interpreted", 7, "id", rows)


def test_base16_content_encoding_interpreted(postgres_config_no_ssl):
"""Make sure we can insert base16 encoded data into the database with interpretation"""
postgres_config_modified = copy.deepcopy(postgres_config_no_ssl)
postgres_config_modified["interpret_content_encoding"] = True
target = TargetPostgres(config=postgres_config_modified)

singer_file_to_target("base16_content_encoding_interpreted.singer", target)

rows = [
{"id": "empty_0x_str", "contract_address": b"", "raw_event_data": b""},
{"id": "empty_str", "contract_address": b"", "raw_event_data": b""},
{
"id": "test_handle_an_hex_str",
"contract_address": b"\xa1\xb2\xc3\xd4\xe5\xf6\x07\x08\x09\x10",
"raw_event_data": b"\xa1\xb2\xc3\xd4\xe5\xf6\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10",
},
{
"id": "test_handle_hex_without_the_0x_prefix",
"contract_address": b"\xa1\xb2\xc3\xd4\xe5\xf6\x07\x08\x09\x10",
"raw_event_data": b"\x0a\x1b\x2c\x3d\x4e\x5f\x60\x70\x80\x91\x00\x10\x20\x30\x40\x50\x60",
},
{
"id": "test_handle_odd_and_even_number_of_chars",
"contract_address": b"\xa1",
"raw_event_data": b"\x0a\x12",
},
{
"id": "test_handle_upper_and_lowercase_hex",
"contract_address": b"\xa1",
"raw_event_data": b"\xa1\x2b",
},
{"id": "test_nullable_field", "contract_address": b"", "raw_event_data": None},
]
verify_data(target, "test_base_16_encoding_interpreted", 7, "id", rows)


def test_activate_version_hard_delete(postgres_config_no_ssl):
"""Activate Version Hard Delete Test"""
table_name = "test_activate_version_hard"
Expand Down

0 comments on commit 557c9da

Please sign in to comment.