Skip to content

Commit

Permalink
Support sending a data quality health status to data catalogs.
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrczarnas committed Sep 26, 2024
1 parent 7c4294a commit e7ff4da
Show file tree
Hide file tree
Showing 17 changed files with 460 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ public DqoRootCliCommand(BeanFactory beanFactory,
description = "Sets the password of the SMTP server that is used to send email notifications.")
private String dqoSmtpServerPassword;

@CommandLine.Option(names = {"--dqo.integrations.table-health-webhook-urls"},
description = "A comma separated list of webhook URLs where DQOps sends updates of the table data quality status changes.", defaultValue = "")
private Integer dqoIntegrationsTableHealthWebhookUrls;

@CommandLine.Option(names = {"--dqo.queue.max-concurrent-jobs"},
description = "Sets the maximum number of concurrent jobs that the job queue can process at once (running data quality checks, importing metadata, etc.). " +
"The maximum number of threads is also limited by the DQOps license.")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright © 2021 DQOps ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dqops.core.catalogsync;

import com.dqops.data.checkresults.models.currentstatus.TableCurrentDataQualityStatusModel;
import com.dqops.data.checkresults.statuscache.DomainConnectionTableKey;

/**
* Data catalog synchronization service that sends data quality health statuses to a rest api that will upload it into a data catalog.
*/
public interface DataCatalogHealthSendService {
/**
* Sends a serialized data quality status JSON message to a webhook that will load it to a data catalog.
*
* @param tableKey Table key.
* @param dataQualityStatusModel Data quality status model.
*/
void sendTableQualityStatusToCatalog(DomainConnectionTableKey tableKey, TableCurrentDataQualityStatusModel dataQualityStatusModel);

/**
* Checks if the instance is configured to support synchronization with a data catalog.
* @return True when synchronization is possible.
*/
boolean isSynchronizationSupported();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright © 2021 DQOps ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dqops.core.catalogsync;

import com.dqops.core.configuration.DqoIntegrationsConfigurationProperties;
import com.dqops.data.checkresults.models.currentstatus.TableCurrentDataQualityStatusModel;
import com.dqops.data.checkresults.statuscache.DomainConnectionTableKey;
import com.dqops.utils.http.OutboundHttpCallQueue;
import com.dqops.utils.http.OutboundHttpMessage;
import com.dqops.utils.serialization.JsonSerializer;
import org.apache.commons.lang3.StringUtils;
import org.apache.parquet.Strings;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Data catalog synchronization service that sends data quality health statuses to a rest api that will upload it into a data catalog.
*/
@Component
public class DataCatalogHealthSendServiceImpl implements DataCatalogHealthSendService {
private final DqoIntegrationsConfigurationProperties dqoIntegrationsConfigurationProperties;
private final OutboundHttpCallQueue outboundHttpCallQueue;
private final JsonSerializer jsonSerializer;
private final Object lock = new Object();
private final Map<DomainConnectionTableKey, TableCurrentDataQualityStatusModel> currentSendBatch = new LinkedHashMap<>();
private final Map<DomainConnectionTableKey, TableCurrentDataQualityStatusModel> nextSendBatch = new LinkedHashMap<>();
private String[] notificationUrls;

/**
* Dependency injection constructor.
* @param dqoIntegrationsConfigurationProperties Configuration parameters for this service.
* @param outboundHttpCallQueue HTTP call queue.
* @param jsonSerializer Json serializer.
*/
@Autowired
public DataCatalogHealthSendServiceImpl(
DqoIntegrationsConfigurationProperties dqoIntegrationsConfigurationProperties,
OutboundHttpCallQueue outboundHttpCallQueue,
JsonSerializer jsonSerializer) {
this.dqoIntegrationsConfigurationProperties = dqoIntegrationsConfigurationProperties;
this.outboundHttpCallQueue = outboundHttpCallQueue;
this.jsonSerializer = jsonSerializer;
if (!Strings.isNullOrEmpty(dqoIntegrationsConfigurationProperties.getTableHealthWebhookUrls())) {
this.notificationUrls = StringUtils.split(dqoIntegrationsConfigurationProperties.getTableHealthWebhookUrls(), ',');
} else {
this.notificationUrls = new String[0];
}
}

/**
* Sends a serialized data quality status JSON message to a webhook that will load it to a data catalog.
* @param tableKey Table key.
* @param dataQualityStatusModel Data quality status model.
*/
@Override
public void sendTableQualityStatusToCatalog(DomainConnectionTableKey tableKey, TableCurrentDataQualityStatusModel dataQualityStatusModel) {
if (this.notificationUrls.length == 0 || dataQualityStatusModel == null) {
return;
}

synchronized (this.lock) {
this.currentSendBatch.remove(tableKey); // table status is under changes, do not send it in the next batch yet
this.nextSendBatch.put(tableKey, dataQualityStatusModel);
}
}

/**
* Called by a scheduler - moves the operations up from the next batch to the current batch, and sends values in teh current batch.
*/
@Scheduled(fixedRate = 10, timeUnit = TimeUnit.SECONDS)
public void moveSendQueueUpOnSchedule() {
List<TableCurrentDataQualityStatusModel> currentTableStatusesToSent = null;
synchronized (this.lock) {
currentTableStatusesToSent = this.currentSendBatch.values().stream().collect(Collectors.toList());
this.currentSendBatch.clear();

for (Map.Entry<DomainConnectionTableKey, TableCurrentDataQualityStatusModel> nextBatchKeyValue : this.nextSendBatch.entrySet()) {
this.currentSendBatch.put(nextBatchKeyValue.getKey(), nextBatchKeyValue.getValue());
}

this.nextSendBatch.clear();
}

for (TableCurrentDataQualityStatusModel dataQualityStatusModel : currentTableStatusesToSent) {
String serializedModel = this.jsonSerializer.serialize(dataQualityStatusModel);

for (String healthApiUrl : this.notificationUrls) {
this.outboundHttpCallQueue.sendMessage(new OutboundHttpMessage(healthApiUrl, serializedModel));
}
}
}

/**
* Checks if the instance is configured to support synchronization with a data catalog.
*
* @return True when synchronization is possible.
*/
@Override
public boolean isSynchronizationSupported() {
return this.notificationUrls.length > 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright © 2021 DQOps ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dqops.core.catalogsync;

import com.dqops.metadata.search.TableSearchFilters;
import com.dqops.metadata.userhome.UserHome;

/**
* Service that initiates sending updated health statuses to the data catalog for requested tables.
*/
public interface DataCatalogHealthSyncService {
/**
* Retrieves and sends the current data quality health status to the data catalog for each table in the given user home
* that matches the table filter.
*
* @param userHome User home with tables to iterate over and synchronize.
* @param tableSearchFilters Table search filter for matching tables.
*/
void synchronizeDataCatalog(UserHome userHome, TableSearchFilters tableSearchFilters);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright © 2021 DQOps ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dqops.core.catalogsync;

import com.dqops.data.checkresults.statuscache.DomainConnectionTableKey;
import com.dqops.data.checkresults.statuscache.TableStatusCache;
import com.dqops.metadata.search.HierarchyNodeTreeSearcher;
import com.dqops.metadata.search.TableSearchFilters;
import com.dqops.metadata.sources.TableSpec;
import com.dqops.metadata.sources.TableWrapper;
import com.dqops.metadata.userhome.UserHome;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Collection;

/**
* Service that initiates sending updated health statuses to the data catalog for requested tables.
*/
@Component
public class DataCatalogHealthSyncServiceImpl implements DataCatalogHealthSyncService {
private final TableStatusCache tableStatusCache;
private final HierarchyNodeTreeSearcher hierarchyNodeTreeSearcher;

/**
* Dependency injection constructor.
* @param tableStatusCache Table status cache.
* @param hierarchyNodeTreeSearcher Hierarchy search service.
*/
@Autowired
public DataCatalogHealthSyncServiceImpl(TableStatusCache tableStatusCache,
HierarchyNodeTreeSearcher hierarchyNodeTreeSearcher) {
this.tableStatusCache = tableStatusCache;
this.hierarchyNodeTreeSearcher = hierarchyNodeTreeSearcher;
}

/**
* Retrieves and sends the current data quality health status to the data catalog for each table in the given user home
* that matches the table filter.
* @param userHome User home with tables to iterate over and synchronize.
* @param tableSearchFilters Table search filter for matching tables.
*/
@Override
public void synchronizeDataCatalog(UserHome userHome, TableSearchFilters tableSearchFilters) {
String dataDomain = userHome.getUserIdentity().getDataDomainCloud();
Collection<TableWrapper> tableWrappers = this.hierarchyNodeTreeSearcher.findTables(userHome.getConnections(), tableSearchFilters);

for (TableWrapper tableWrapper : tableWrappers) {
TableSpec tableSpec = tableWrapper.getSpec();
if (tableSpec == null) {
continue;
}

DomainConnectionTableKey domainConnectionTableKey = new DomainConnectionTableKey(
dataDomain, tableSpec.getHierarchyId().getConnectionName(), tableSpec.getPhysicalTableName());

this.tableStatusCache.sendCurrentTableStatusToDataCatalog(domainConnectionTableKey);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright © 2021 DQOps ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dqops.core.configuration;

import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
* Configuration POJO with the configuration for the dqo.integrations. - for configuring REST API calls where the table health status should be sent.
*/
@Configuration
@ConfigurationProperties(prefix = "dqo.integrations")
@EqualsAndHashCode(callSuper = false)
@Data
public class DqoIntegrationsConfigurationProperties implements Cloneable {
/**
* The URLs of webhooks that should receive information about changes of a table healths status.
*/
private String tableHealthWebhookUrls;

/**
* Creates a clone of the object.
* @return Cloned instance.
*/
@Override
public DqoIntegrationsConfigurationProperties clone() {
try {
DqoIntegrationsConfigurationProperties cloned = (DqoIntegrationsConfigurationProperties) super.clone();
return cloned;
}
catch (CloneNotSupportedException ex) {
throw new RuntimeException("Cannot clone object", ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public class TableCurrentDataQualityStatusModel implements CurrentDataQualitySta
*/
public static final String UPSTREAM_FAKE_COLUMN_NAME = "__upstream_columns_combined_status";

/**
* Data domain name.
*/
@JsonPropertyDescription("Data domain name.")
private String dataDomain;

/**
* The connection name in DQOps.
*/
Expand Down Expand Up @@ -499,6 +505,7 @@ public TableCurrentDataQualityStatusModel createSample() {
(validResultsAggregate + warningResults) * 100.0 / totalExecutedChecksWithNoExecutionErrors : null;

TableCurrentDataQualityStatusModel result = new TableCurrentDataQualityStatusModel() {{
setDataDomain("");
setConnectionName(SampleStringsRegistry.getConnectionName());
setSchemaName(SampleStringsRegistry.getSchemaName());
setTableName(SampleStringsRegistry.getTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,7 @@ public TableCurrentDataQualityStatusModel analyzeTableMostRecentQualityStatus(
PhysicalTableName physicalTableName = tableCurrentDataQualityStatusFilterParameters.getPhysicalTableName();

TableCurrentDataQualityStatusModel statusModel = new TableCurrentDataQualityStatusModel();
statusModel.setDataDomain(userDomainIdentity.getDataDomainCloud());
statusModel.setConnectionName(connectionName);
statusModel.setSchemaName(physicalTableName.getSchemaName());
statusModel.setTableName(physicalTableName.getTableName());
Expand Down
Loading

0 comments on commit e7ff4da

Please sign in to comment.