diff --git a/tap_dynamodb/__init__.py b/tap_dynamodb/__init__.py index 36da0d6..8650a16 100644 --- a/tap_dynamodb/__init__.py +++ b/tap_dynamodb/__init__.py @@ -43,6 +43,14 @@ def do_sync(config, catalog, state): key_properties = metadata.get(mdata, (), 'table-key-properties') singer.write_schema(stream_name, stream['schema'], key_properties) + filter_expression = metadata.get(mdata, (), 'FilterExpression') + filter_value = metadata.get(mdata, (), 'ExpressionAttributeValues') + scan_params = {} + if filter_expression and filter_value: + scan_params = { 'FilterExpression': filter_expression, 'ExpressionAttributeValues': json.loads(filter_value) } + config = { **config, **scan_params } + LOGGER.info("Applying scan_params: %s for stream: %s", str(scan_params), stream_name) + LOGGER.info("%s: Starting sync", stream_name) counts[stream_name] = sync_stream(config, state, stream) sync_times[stream_name] = time.time() - start_time diff --git a/tap_dynamodb/sync_strategies/full_table.py b/tap_dynamodb/sync_strategies/full_table.py index 7403c29..92e0f5d 100644 --- a/tap_dynamodb/sync_strategies/full_table.py +++ b/tap_dynamodb/sync_strategies/full_table.py @@ -13,6 +13,9 @@ def scan_table(table_name, projection, last_evaluated_key, config): 'TableName': table_name, 'Limit': 1000 } + if 'FilterExpression' in config: + scan_params['FilterExpression'] = config['FilterExpression'] + scan_params['ExpressionAttributeValues'] = config['ExpressionAttributeValues'] if projection is not None: scan_params['ProjectionExpression'] = projection