Skip to content

Commit

Permalink
feat: add upstream lineage dependencies for view/materialized view in…
Browse files Browse the repository at this point in the history
… bq extractor (#37)

* feat: add upstream lineage dependencies for view/materialized view in bq extractor

* fix: review comments (lint error and test case)

* fix: add test case for view lineage in bigquery plugin

* fix: test case

* fix: remove bigquery test

---------

Co-authored-by: Utsav Agarwal <[email protected]>
  • Loading branch information
utsav14nov and utsav14nov authored Sep 14, 2023
1 parent d3b9647 commit 37da4b0
Show file tree
Hide file tree
Showing 5 changed files with 601 additions and 2 deletions.
1 change: 1 addition & 0 deletions plugins/extractors/bigquery/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ source:
- dataset_c.table_a
max_page_size: 100
profile_column: true
build_view_lineage: true
# Only one of service_account_base64 / service_account_json is needed.
# If both are present, service_account_base64 takes precedence
service_account_base64: _________BASE64_ENCODED_SERVICE_ACCOUNT_________________
Expand Down
43 changes: 41 additions & 2 deletions plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
v1beta2 "github.com/goto/meteor/models/gotocompany/assets/v1beta2"
"github.com/goto/meteor/plugins"
"github.com/goto/meteor/plugins/extractors/bigquery/auditlog"
"github.com/goto/meteor/plugins/extractors/bigquery/upstream"
"github.com/goto/meteor/registry"
"github.com/goto/meteor/utils"
"github.com/goto/salt/log"
Expand Down Expand Up @@ -53,6 +54,7 @@ type Config struct {
IsCollectTableUsage bool `mapstructure:"collect_table_usage" default:"false"`
UsagePeriodInDay int64 `mapstructure:"usage_period_in_day" default:"7"`
UsageProjectIDs []string `mapstructure:"usage_project_ids"`
BuildViewLineage bool `mapstructure:"build_view_lineage" default:"false"`
}

type Exclude struct {
Expand Down Expand Up @@ -83,6 +85,7 @@ exclude:
- dataset_c.table_a
max_page_size: 100
include_column_profile: true
build_view_lineage: true
# Only one of service_account_base64 / service_account_json is needed.
# If both are present, service_account_base64 takes precedence
service_account_base64: ____base64_encoded_service_account____
Expand Down Expand Up @@ -432,15 +435,25 @@ func (e *Extractor) buildAsset(ctx context.Context, t *bigquery.Table, md *bigqu
e.logger.Warn("error creating Any struct", "error", err)
}

return &v1beta2.Asset{
asset := &v1beta2.Asset{
Urn: tableURN,
Name: t.TableID,
Type: "table",
Description: md.Description,
Service: "bigquery",
Data: table,
Labels: md.Labels,
}, nil
}

if e.config.BuildViewLineage && (md.Type == bigquery.ViewTable || md.Type == bigquery.MaterializedView) {
query := getViewQuery(md)
upstreamResources := getUpstreamResources(query)
asset.Lineage = &v1beta2.Lineage{
Upstreams: upstreamResources,
}
}

return asset, nil
}

// Extract table schema
Expand Down Expand Up @@ -744,6 +757,32 @@ func (e *Extractor) fetchTableMetadata(ctx context.Context, tbl *bigquery.Table)
return tbl.Metadata(ctx)
}

func getViewQuery(md *bigquery.TableMetadata) string {
switch md.Type {
case bigquery.ViewTable:
return md.ViewQuery
case bigquery.MaterializedView:
return md.MaterializedView.Query
}
return ""
}

func getUpstreamResources(query string) []*v1beta2.Resource {
upstreamDependencies := upstream.ParseTopLevelUpstreamsFromQuery(query)
uniqueUpstreamDependencies := upstream.UniqueFilterResources(upstreamDependencies)
var upstreams []*v1beta2.Resource
for _, dependency := range uniqueUpstreamDependencies {
urn := plugins.BigQueryURN(dependency.Project, dependency.Dataset, dependency.Name)
upstreams = append(upstreams, &v1beta2.Resource{
Urn: urn,
Name: dependency.Name,
Type: "table",
Service: "bigquery",
})
}
return upstreams
}

// Register the extractor to catalog
func init() {
if err := registry.Extractors.Register("bigquery", func() plugins.Extractor {
Expand Down
106 changes: 106 additions & 0 deletions plugins/extractors/bigquery/upstream/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package upstream

import (
"regexp"
"strings"
)

type QueryParser func(query string) []Resource

var (
topLevelUpstreamsPattern = regexp.MustCompile("" +
"(?i)(?:FROM)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-\\*?]+)`?" +
"|" +
"(?i)(?:JOIN)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?" +
"|" +
"(?i)(?:WITH)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?\\s+(?:AS)" +
"|" +
"(?i)(?:VIEW)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?" +
"|" +
"(?i)(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`\\s*(?:AS)?")

singleLineCommentsPattern = regexp.MustCompile(`(--.*)`)
multiLineCommentsPattern = regexp.MustCompile(`(((/\*)+?[\w\W]*?(\*/)+))`)
specialCommentPattern = regexp.MustCompile(`(/\*\s*(@[a-zA-Z0-9_-]+)\s*\*/)`)
)

func ParseTopLevelUpstreamsFromQuery(query string) []Resource {
cleanedQuery := cleanQueryFromComment(query)

resourcesFound := make(map[Resource]bool)
pseudoResources := make(map[Resource]bool)

matches := topLevelUpstreamsPattern.FindAllStringSubmatch(cleanedQuery, -1)

for _, match := range matches {
var projectIdx, datasetIdx, nameIdx, ignoreUpstreamIdx int
tokens := strings.Fields(match[0])
clause := strings.ToLower(tokens[0])

switch clause {
case "from":
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 1, 2, 3, 4
case "join":
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 5, 6, 7, 8
case "with":
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 9, 10, 11, 12
case "view":
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 13, 14, 15, 16
default:
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 17, 18, 19, 20
}

project := match[projectIdx]
dataset := match[datasetIdx]
name := match[nameIdx]

if project == "" || dataset == "" || name == "" {
continue
}

if strings.TrimSpace(match[ignoreUpstreamIdx]) == "@ignoreupstream" {
continue
}

if clause == "view" {
continue
}

resource := Resource{
Project: project,
Dataset: dataset,
Name: name,
}

if clause == "with" {
pseudoResources[resource] = true
} else {
resourcesFound[resource] = true
}
}

var output []Resource

for resource := range resourcesFound {
if pseudoResources[resource] {
continue
}
output = append(output, resource)
}

return output
}

func cleanQueryFromComment(query string) string {
cleanedQuery := singleLineCommentsPattern.ReplaceAllString(query, "")

matches := multiLineCommentsPattern.FindAllString(query, -1)
for _, match := range matches {
if specialCommentPattern.MatchString(match) {
continue
}
cleanedQuery = strings.ReplaceAll(cleanedQuery, match, "")
}

return cleanedQuery
}
Loading

0 comments on commit 37da4b0

Please sign in to comment.