Skip to content

Commit

Permalink
transform keys that arent accepted by bigQuery in schema
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikolaos Veneti authored and Nikolaos Veneti committed Nov 25, 2019
1 parent 5f37f9d commit bc191cf
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions target_bigquery/schema.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
from google.cloud.bigquery import SchemaField
import re

from google.cloud.bigquery import SchemaField

def defineArrayType(field, name):
schema_type = field.get("items").get("type")
schema_mode = "REPEATED"
schema_description = None
schema_fields = ()

if schema_type == 'array':
return defineArrayType(field['items'], name)
if isinstance(schema_type, list):
if "null" in schema_type and schema_type.index('null') != 0:
schema_type.remove('null')
schema_type.insert(0, 'null')
schema_type = schema_type[-1]
else:
else:
schema_type = schema_type[-1]
if schema_type == "object":
schema_type = "RECORD"
Expand Down Expand Up @@ -69,6 +70,12 @@ def define_schema(field, name):
return (schema_name, schema_type, schema_mode, schema_description, schema_fields)


def bigquery_transformed_key(key):
if re.search('\.|-', key):
return re.sub('\.|-', '_', key)
else:
return key

def build_schema(schema):
SCHEMA = []
for key in schema["properties"].keys():
Expand All @@ -78,7 +85,7 @@ def build_schema(schema):
continue

schema_name, schema_type, schema_mode, schema_description, schema_fields = define_schema(
schema["properties"][key], key
schema["properties"][key], bigquery_transformed_key(key)
)
SCHEMA.append(
SchemaField(
Expand Down

0 comments on commit bc191cf

Please sign in to comment.