Skip to content

Commit

Permalink
Fixes to running statistics collection under a scheduler.
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrczarnas committed Oct 1, 2024
1 parent cf23fd1 commit 4eeb3e2
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,8 @@ public void applyScheduleDeltaToJob(JobSchedulesDelta schedulesDelta, JobKey job
try {
Trigger triggerToAdd = this.triggerFactory.createTrigger(scheduleToAdd, jobKey, dataDomainName);

if (!this.scheduler.checkExists(triggerToAdd.getKey())) {
TriggerKey triggerKey = triggerToAdd.getKey();
if (!this.scheduler.checkExists(triggerKey)) {
this.scheduler.scheduleJob(triggerToAdd);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ public Trigger createTrigger(CronScheduleSpec schedule, JobKey jobKey, String da
jobDataMapAdapter.setSchedule(triggerJobData, schedule);
jobDataMapAdapter.setDataDomain(triggerJobData, dataDomainName);

String key = "job=" + jobKey.getName() + ",cron=" + schedule.getCronExpression() + ",domain=" + dataDomainName;
TriggerBuilder<Trigger> triggerBuilder = newTrigger()
.withIdentity(schedule.toString() + "/" + dataDomainName)
.withIdentity(key)
.usingJobData(triggerJobData);

ScheduleBuilder<?> scheduleBuilder = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public StatisticsCollectionExecutionSummary executeStatisticsCollectorsOnTable(E
* @return Collection of table wrappers.
*/
public Collection<TableWrapper> listTargetTables(UserHome userHome, StatisticsCollectorSearchFilters statisticsCollectorSearchFilters) {
Collection<TableWrapper> tables = this.hierarchyNodeTreeSearcher.findTables(userHome.getConnections(), statisticsCollectorSearchFilters);
Collection<TableWrapper> tables = this.hierarchyNodeTreeSearcher.findTablesForStatisticsCollection(userHome.getConnections(), statisticsCollectorSearchFilters);
return tables;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public StatisticsCollectionExecutionSummary executeCollectorsOnTable(ExecutionCo
StatisticsCollectionExecutionSummary statisticsCollectionExecutionSummary = new StatisticsCollectionExecutionSummary();
CollectorExecutionStatistics executionStatistics = new CollectorExecutionStatistics();

Collection<AbstractStatisticsCollectorSpec<?>> collectors = this.hierarchyNodeTreeSearcher.findStatisticsCollectors(targetTable, statisticsCollectorSearchFilters);
Collection<AbstractStatisticsCollectorSpec<?>> collectors = this.hierarchyNodeTreeSearcher.findStatisticsCollectors(targetTable.getSpec(), statisticsCollectorSearchFilters);
if (collectors.size() == 0) {
statisticsCollectionExecutionSummary.reportTableStats(connectionWrapper, targetTable.getSpec(), executionStatistics);
return statisticsCollectionExecutionSummary; // no checks for this table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public interface HierarchyNodeTreeSearcher {
* @param statisticsCollectorSearchFilters Search filters.
* @return Collection of tables that passed the filter and will be analyzed to collect statistics.
*/
Collection<TableSpec> findTablesForStatisticsCollection(HierarchyNode startNode, StatisticsCollectorSearchFilters statisticsCollectorSearchFilters);
Collection<TableWrapper> findTablesForStatisticsCollection(HierarchyNode startNode, StatisticsCollectorSearchFilters statisticsCollectorSearchFilters);

/**
* Search for connection in the tree.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,16 @@ public Collection<AbstractStatisticsCollectorSpec<?>> findStatisticsCollectors(H
* @return Collection of tables that passed the filter and will be analyzed to collect statistics.
*/
@Override
public Collection<TableSpec> findTablesForStatisticsCollection(HierarchyNode startNode, StatisticsCollectorSearchFilters statisticsCollectorSearchFilters) {
StatisticsCollectorTableSearchFiltersVisitor searchFilterVisitor = statisticsCollectorSearchFilters.createTableCollectorSearchFilterVisitor();
public Collection<TableWrapper> findTablesForStatisticsCollection(HierarchyNode startNode, StatisticsCollectorSearchFilters statisticsCollectorSearchFilters) {
StatisticsCollectorTargetTableSearchFiltersVisitor searchFilterVisitor = statisticsCollectorSearchFilters.createTargetTableCollectorSearchFilterVisitor();
ArrayList<HierarchyNode> matchingNodes = new ArrayList<>();
LabelsSearcherObject labelsSearcherObject = new LabelsSearcherObject();
DataGroupingConfigurationSearcherObject dataGroupingConfigurationSearcherObject = new DataGroupingConfigurationSearcherObject();
SearchParameterObject searchParameterObject = new SearchParameterObject(matchingNodes, dataGroupingConfigurationSearcherObject, labelsSearcherObject);
this.hierarchyNodeTreeWalker.traverseHierarchyNodeTree(startNode, node -> node.visit(searchFilterVisitor,
searchParameterObject));

return (List<TableSpec>)(ArrayList<?>)matchingNodes;
return (List<TableWrapper>)(ArrayList<?>)matchingNodes;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ public StatisticsCollectorSearchFiltersVisitor createCollectorSearchFilterVisito
* Create a hierarchy tree node traversal visitor that will search for tables matching the filters.
* @return Search visitor.
*/
public StatisticsCollectorTableSearchFiltersVisitor createTableCollectorSearchFilterVisitor() {
return new StatisticsCollectorTableSearchFiltersVisitor(this);
public StatisticsCollectorTargetTableSearchFiltersVisitor createTargetTableCollectorSearchFilterVisitor() {
return new StatisticsCollectorTargetTableSearchFiltersVisitor(this);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.dqops.metadata.groupings.DataGroupingConfigurationSpec;
import com.dqops.metadata.id.HierarchyId;
import com.dqops.metadata.labels.LabelSetSpec;
import com.dqops.metadata.scheduling.CronScheduleSpec;
import com.dqops.metadata.scheduling.CronSchedulesSpec;
import com.dqops.metadata.sources.*;
import com.dqops.metadata.traversal.TreeNodeTraversalResult;
import com.dqops.sensors.AbstractSensorParametersSpec;
Expand All @@ -36,13 +38,174 @@
/**
* Visitor for {@link StatisticsCollectorSearchFilters} that finds statistics collectors to execute.
*/
public class StatisticsCollectorSearchFiltersVisitor extends StatisticsCollectorTableSearchFiltersVisitor {
public class StatisticsCollectorSearchFiltersVisitor extends AbstractSearchVisitor<SearchParameterObject> {
private final StatisticsCollectorSearchFilters filters;

/**
* Creates a visitor for the given filters.
* @param filters Check search filters.
*/
public StatisticsCollectorSearchFiltersVisitor(StatisticsCollectorSearchFilters filters) {
super(filters);
this.filters = filters;
}

/**
* Accepts a list of connections.
*
* @param connectionList List of connections.
* @param parameter Target object where found hierarchy nodes, dimensions and labels should be added.
* @return Accept's result.
*/
@Override
public TreeNodeTraversalResult accept(ConnectionList connectionList, SearchParameterObject parameter) {
String connectionNameFilter = this.filters.getConnection();
if (Strings.isNullOrEmpty(connectionNameFilter)) {
return TreeNodeTraversalResult.TRAVERSE_CHILDREN;
}

if (StringPatternComparer.isSearchPattern(connectionNameFilter)) {
return TreeNodeTraversalResult.TRAVERSE_CHILDREN; // we need to iterate anyway
}

// exact connection name given, let's find it
ConnectionWrapper connectionWrapper = connectionList.getByObjectName(connectionNameFilter, true);
if (connectionWrapper == null) {
return TreeNodeTraversalResult.TRAVERSE_CHILDREN; // another try, maybe the name is case-sensitive
}

return TreeNodeTraversalResult.traverseSelectedChildNodes(connectionWrapper);
}

/**
* Accepts a connection wrapper (lazy loader).
*
* @param connectionWrapper Connection wrapper.
* @param parameter Target object where found hierarchy nodes, dimensions and labels should be added.
* @return Accept's result.
*/
@Override
public TreeNodeTraversalResult accept(ConnectionWrapper connectionWrapper, SearchParameterObject parameter) {
if (!Strings.isNullOrEmpty(this.filters.getEnabledCronScheduleExpression())) {
CronSchedulesSpec schedules = connectionWrapper.getSpec().getSchedules();
CronScheduleSpec profilingSchedule = schedules != null ? schedules.getProfiling() : null;
this.filters.setIgnoreTablesWithoutSchedule(
profilingSchedule == null ||
profilingSchedule.isDisabled() ||
!Objects.equals(profilingSchedule.getCronExpression(), this.filters.getEnabledCronScheduleExpression()));
}

String connectionNameFilter = this.filters.getConnection();
LabelsSearcherObject labelsSearcherObject = parameter.getLabelsSearcherObject();
labelsSearcherObject.setConnectionLabels(connectionWrapper.getSpec().getLabels());

if (Strings.isNullOrEmpty(connectionNameFilter)) {
return TreeNodeTraversalResult.TRAVERSE_CHILDREN;
}

if (StringPatternComparer.matchSearchPattern(connectionWrapper.getName(), connectionNameFilter)) {
return TreeNodeTraversalResult.TRAVERSE_CHILDREN;
}

return TreeNodeTraversalResult.SKIP_CHILDREN;
}

/**
* Accepts a collection of tables inside a connection.
*
* @param tableList Table list.
* @param parameter Target object where found hierarchy nodes, dimensions and labels should be added.
* @return Accept's result.
*/
@Override
public TreeNodeTraversalResult accept(TableList tableList, SearchParameterObject parameter) {
String schemaTableName = this.filters.getFullTableName();
if (Strings.isNullOrEmpty(schemaTableName)) {
return TreeNodeTraversalResult.TRAVERSE_CHILDREN;
}

PhysicalTableName physicalTableName = PhysicalTableName.fromSchemaTableFilter(schemaTableName);
if (physicalTableName.isSearchPattern()) {
return TreeNodeTraversalResult.TRAVERSE_CHILDREN; // we need to iterate anyway
}

TableWrapper tableWrapper = tableList.getByObjectName(physicalTableName, true);
if (tableWrapper == null) {
return TreeNodeTraversalResult.TRAVERSE_CHILDREN; // another try, maybe the name is case-sensitive
}

return TreeNodeTraversalResult.traverseSelectedChildNodes(tableWrapper);
}

/**
* Accepts a table wrapper (lazy loader).
*
* @param tableWrapper Table wrapper.
* @param parameter Target object where found hierarchy nodes, dimensions and labels should be added.
* @return Accept's result.
*/
@Override
public TreeNodeTraversalResult accept(TableWrapper tableWrapper, SearchParameterObject parameter) {
TableSpec tableSpec = tableWrapper.getSpec();
Boolean enabledFilter = this.filters.getEnabled();

if (this.filters.isIgnoreTablesWithoutSchedule()) {
if (tableSpec.getSchedulesOverride() == null || tableSpec.getSchedulesOverride().getProfiling() == null ||
tableSpec.getSchedulesOverride().getProfiling().isDisabled() ||
!Objects.equals(tableSpec.getSchedulesOverride().getProfiling().getCronExpression(), this.filters.getEnabledCronScheduleExpression())) {
return TreeNodeTraversalResult.SKIP_CHILDREN;
}
}

if (!Strings.isNullOrEmpty(this.filters.getEnabledCronScheduleExpression())) {
if (tableSpec.getSchedulesOverride() != null && tableSpec.getSchedulesOverride().getProfiling() != null &&
!tableSpec.getSchedulesOverride().getProfiling().isDisabled() &&
!Objects.equals(tableSpec.getSchedulesOverride().getProfiling().getCronExpression(), this.filters.getEnabledCronScheduleExpression())) {
return TreeNodeTraversalResult.SKIP_CHILDREN;
}
}

LabelsSearcherObject labelsSearcherObject = parameter.getLabelsSearcherObject();
labelsSearcherObject.setTableLabels(tableSpec.getLabels());

if (labelsSearcherObject != null) {
labelsSearcherObject.setTableLabels(tableWrapper.getSpec().getLabels());
}

LabelSetSpec overriddenLabels = new LabelSetSpec();

if (labelsSearcherObject.getTableLabels() != null) {
overriddenLabels.addAll(labelsSearcherObject.getTableLabels());
}

if (labelsSearcherObject.getConnectionLabels() != null) {
overriddenLabels.addAll(labelsSearcherObject.getConnectionLabels());
}

if (!LabelsSearchMatcher.matchTableLabels(this.filters, overriddenLabels)) {
return TreeNodeTraversalResult.SKIP_CHILDREN;
}

PhysicalTableName physicalTableName = this.filters.getPhysicalTableName();
if (physicalTableName != null) {
if (!tableWrapper.getPhysicalTableName().matchPattern(physicalTableName)) {
return TreeNodeTraversalResult.SKIP_CHILDREN;
}
}

if (tableSpec.isDisabled()) {
return TreeNodeTraversalResult.SKIP_CHILDREN;
}

if (enabledFilter != null) {
if (enabledFilter && tableSpec.isDisabled()) {
return TreeNodeTraversalResult.SKIP_CHILDREN;
}
if (!enabledFilter && !tableSpec.isDisabled()) {
return TreeNodeTraversalResult.SKIP_CHILDREN;
}
}

return TreeNodeTraversalResult.TRAVERSE_CHILDREN;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,28 @@
*/
package com.dqops.metadata.search;

import com.dqops.metadata.groupings.DataGroupingConfigurationSpec;
import com.dqops.metadata.id.HierarchyId;
import com.dqops.metadata.labels.LabelSetSpec;
import com.dqops.metadata.policies.column.ColumnQualityPolicyList;
import com.dqops.metadata.policies.table.TableQualityPolicyList;
import com.dqops.metadata.scheduling.CronScheduleSpec;
import com.dqops.metadata.scheduling.CronSchedulesSpec;
import com.dqops.metadata.sources.*;
import com.dqops.metadata.traversal.TreeNodeTraversalResult;
import com.dqops.sensors.AbstractSensorParametersSpec;
import com.dqops.statistics.AbstractRootStatisticsCollectorsContainerSpec;
import com.dqops.statistics.AbstractStatisticsCollectorCategorySpec;
import com.dqops.statistics.AbstractStatisticsCollectorSpec;
import com.dqops.statistics.StatisticsCollectorTarget;
import com.dqops.statistics.column.ColumnStatisticsCollectorsRootCategoriesSpec;
import com.dqops.statistics.table.TableStatisticsCollectorsRootCategoriesSpec;
import com.google.common.base.Strings;

import java.util.Collection;
import java.util.Objects;
import java.util.Set;

/**
* Visitor for {@link StatisticsCollectorSearchFilters} that finds statistics collectors to execute.
* Visitor for {@link StatisticsCollectorSearchFilters} that finds target tables on which statistics collectors are run.
*/
public class StatisticsCollectorTableSearchFiltersVisitor extends AbstractSearchVisitor<SearchParameterObject> {
public class StatisticsCollectorTargetTableSearchFiltersVisitor extends AbstractSearchVisitor<SearchParameterObject> {
protected final StatisticsCollectorSearchFilters filters;

/**
* Creates a visitor for the given filters.
* @param filters Check search filters.
*/
public StatisticsCollectorTableSearchFiltersVisitor(StatisticsCollectorSearchFilters filters) {
public StatisticsCollectorTargetTableSearchFiltersVisitor(StatisticsCollectorSearchFilters filters) {
this.filters = filters;
}

Expand Down Expand Up @@ -147,33 +136,7 @@ public TreeNodeTraversalResult accept(TableList tableList, SearchParameterObject
*/
@Override
public TreeNodeTraversalResult accept(TableWrapper tableWrapper, SearchParameterObject parameter) {
String schemaTableName = this.filters.getFullTableName();

if (Strings.isNullOrEmpty(schemaTableName)) {
return TreeNodeTraversalResult.TRAVERSE_CHILDREN;
}

PhysicalTableName physicalTableName = PhysicalTableName.fromSchemaTableFilter(schemaTableName);
if (physicalTableName.isSearchPattern()) {
return TreeNodeTraversalResult.TRAVERSE_CHILDREN; // we need to iterate anyway
}

if (tableWrapper.getPhysicalTableName().matchPattern(physicalTableName)) {
return TreeNodeTraversalResult.TRAVERSE_CHILDREN;
}

return TreeNodeTraversalResult.SKIP_CHILDREN;
}

/**
* Accepts a table specification.
*
* @param tableSpec Table specification.
* @param parameter Target object where found hierarchy nodes, dimensions and labels should be added.
* @return Accept's result.
*/
@Override
public TreeNodeTraversalResult accept(TableSpec tableSpec, SearchParameterObject parameter) {
TableSpec tableSpec = tableWrapper.getSpec();
Boolean enabledFilter = this.filters.getEnabled();

if (this.filters.isIgnoreTablesWithoutSchedule()) {
Expand All @@ -195,6 +158,35 @@ public TreeNodeTraversalResult accept(TableSpec tableSpec, SearchParameterObject
LabelsSearcherObject labelsSearcherObject = parameter.getLabelsSearcherObject();
labelsSearcherObject.setTableLabels(tableSpec.getLabels());

if (labelsSearcherObject != null) {
labelsSearcherObject.setTableLabels(tableWrapper.getSpec().getLabels());
}

LabelSetSpec overriddenLabels = new LabelSetSpec();

if (labelsSearcherObject.getTableLabels() != null) {
overriddenLabels.addAll(labelsSearcherObject.getTableLabels());
}

if (labelsSearcherObject.getConnectionLabels() != null) {
overriddenLabels.addAll(labelsSearcherObject.getConnectionLabels());
}

if (!LabelsSearchMatcher.matchTableLabels(this.filters, overriddenLabels)) {
return TreeNodeTraversalResult.SKIP_CHILDREN;
}

PhysicalTableName physicalTableName = this.filters.getPhysicalTableName();
if (physicalTableName != null) {
if (!tableWrapper.getPhysicalTableName().matchPattern(physicalTableName)) {
return TreeNodeTraversalResult.SKIP_CHILDREN;
}
}
else {
parameter.getNodes().add(tableWrapper);
return TreeNodeTraversalResult.SKIP_CHILDREN;
}

if (tableSpec.isDisabled()) {
return TreeNodeTraversalResult.SKIP_CHILDREN;
}
Expand All @@ -208,7 +200,7 @@ public TreeNodeTraversalResult accept(TableSpec tableSpec, SearchParameterObject
}
}

parameter.getNodes().add(tableSpec);
parameter.getNodes().add(tableWrapper);

return TreeNodeTraversalResult.SKIP_CHILDREN;
}
Expand Down

0 comments on commit 4eeb3e2

Please sign in to comment.