Skip to content

Commit

Permalink
Fix issues in CREATE DATA LINK implementation (hazelcast#23941)
Browse files Browse the repository at this point in the history
- race when creating/dropping
- wrong field names in information_schema
- `DataLink` class name used twice for different things
- broadcasting the datalink change to all members
- format datalink options as JSON in information_schema table
- weird prefix for datalinks in SQL catalog
- fix formatting broken in the original PR

---------

Co-authored-by: Krzysztof Jamróz <[email protected]>
  • Loading branch information
viliam-durina and k-jamroz authored Mar 16, 2023
1 parent 2133d66 commit c6f0fe6
Show file tree
Hide file tree
Showing 26 changed files with 403 additions and 269 deletions.
27 changes: 13 additions & 14 deletions docs/design/sql/18-data-links.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ This can then be used when creating a Java pipeline JDBC source:

```java
@Beta
public static<T> BatchSource<T> jdbc(
@Nonnull ExternalDataStoreRef externalDataStoreRef,
@Nonnull ToResultSetFunction resultSetFn,
@Nonnull FunctionEx<? super ResultSet,?extends T>createOutputFn
)
public static <T> BatchSource<T> jdbc(
@Nonnull ExternalDataStoreRef externalDataStoreRef,
@Nonnull ToResultSetFunction resultSetFn,
@Nonnull FunctionEx<? super ResultSet,?extends T>createOutputFn
)
```

Or in a mapping:
Expand Down Expand Up @@ -117,14 +117,13 @@ Particular Jet source/sink then uses the `ExternalDataStoreRef` to look the
factory up:

```java
ExternalDataStoreFactory<?> dataStoreFactory=nodeEngine.getExternalDataStoreService().getExternalDataStoreFactory(name);
if(!(dataStoreFactory instanceof JdbcDataStoreFactory)){
String className=JdbcDataStoreFactory.class.getSimpleName();
throw new HazelcastException("Data store factory '"+name+"' must be an instance of "+className);
}
return(JdbcDataStoreFactory)dataStoreFactory;
ExternalDataStoreFactory<?> dataStoreFactory = nodeEngine.getExternalDataStoreService().getExternalDataStoreFactory(name);
if (!(dataStoreFactory instanceof JdbcDataStoreFactory)) {
String className = JdbcDataStoreFactory.class.getSimpleName();
throw new HazelcastException("Data store factory '" + name + "' must be an instance of " + className);
}
return (JdbcDataStoreFactory) dataStoreFactory;
```

We considered 2 use cases of a data source - shared and non-shared:

* Shared - an instance is created at startup - e.g. a single instance of the
Expand Down Expand Up @@ -210,7 +209,7 @@ links, all connecting to the same remote system.

```sql
CREATE DATA LINK <name>
TYPE <connector name>
[CONNECTOR] TYPE <connector name>
OPTIONS ( /* connector-specific options */ );
```

Expand Down Expand Up @@ -662,7 +661,7 @@ these privileges related to data links:
the `get_ddl` function, or views in information_schema, if we have them. Also
note that the `__sql.catalog` IMap exposes the data link options in a
non-documented way, so access to this map must be denied.
* To DROP data link, we require all both `view-datalink` and `drop-datalink`.
* To DROP data link, we require both `view-datalink` and `drop-datalink`.

There will be no way to grant/revoke access to individual data links, or to
individual remote resources, every user will be able to access every data link.
Expand Down
4 changes: 2 additions & 2 deletions hazelcast-sql/src/main/codegen/config.fmpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ data: {
"INDEX"
"JOB"
"JOBS"
"LINK"
"MAPPING"
"MAPPINGS"
"LINK"
"RESUME"
"SINK"
"SNAPSHOT"
Expand Down Expand Up @@ -398,9 +398,9 @@ data: {
"EXTERNAL"
"JOB"
"JOBS"
"LINK"
"MAPPING"
"MAPPINGS"
"LINK"
"RESUME"
"SINK"
"SNAPSHOT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public CalciteSqlOptimizer(NodeEngine nodeEngine, QueryResultRegistry resultRegi
this.planExecutor = new PlanExecutor(
tableResolverImpl,
dataLinksResolver,
nodeEngine.getHazelcastInstance(),
nodeEngine,
resultRegistry
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.hazelcast.jet.sql.impl.opt.FieldCollation;
import com.hazelcast.jet.sql.impl.opt.physical.AggregateAbstractPhysicalRule;
import com.hazelcast.jet.sql.impl.processors.RootResultConsumerSink;
import com.hazelcast.jet.sql.impl.validate.UpdateDataLinkOperation;
import com.hazelcast.nio.serialization.DataSerializableFactory;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.sql.impl.exec.scan.MapIndexScanMetadata;
Expand Down Expand Up @@ -70,11 +71,10 @@ public class JetSqlSerializerHook implements DataSerializerHook {
public static final int ROW_IDENTITY_FN = 22;
public static final int AGGREGATE_EXPORT_FUNCTION = 23;
public static final int AGGREGATE_JSON_OBJECT_AGG_SUPPLIER = 24;
// reserved until 6.0
public static final int TO_ROW = 25;
public static final int UDT_OBJECT_TO_JSON = 26;
public static final int UPDATE_DATA_LINK_OPERATION = 27;

public static final int LEN = UDT_OBJECT_TO_JSON + 1;
public static final int LEN = UPDATE_DATA_LINK_OPERATION + 1;

@Override
public int getFactoryId() {
Expand Down Expand Up @@ -115,6 +115,7 @@ public DataSerializableFactory createFactory() {
constructors[AGGREGATE_EXPORT_FUNCTION] = arg -> AggregateAbstractPhysicalRule.AggregateExportFunction.INSTANCE;
constructors[AGGREGATE_JSON_OBJECT_AGG_SUPPLIER] = arg -> new AggregateAbstractPhysicalRule.AggregateObjectAggSupplier();
constructors[UDT_OBJECT_TO_JSON] = arg -> new UdtObjectToJsonFunction();
constructors[UPDATE_DATA_LINK_OPERATION] = arg -> new UpdateDataLinkOperation();

return new ArrayDataSerializableFactory(constructors);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.hazelcast.jet.sql.impl;

import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;
import com.hazelcast.config.BitmapIndexOptions;
import com.hazelcast.config.IndexConfig;
import com.hazelcast.config.IndexType;
Expand All @@ -27,6 +29,7 @@
import com.hazelcast.jet.JobStateSnapshot;
import com.hazelcast.jet.RestartableException;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.impl.AbstractJetInstance;
import com.hazelcast.jet.impl.JetServiceBackend;
import com.hazelcast.jet.impl.util.ReflectionUtils;
Expand Down Expand Up @@ -58,6 +61,8 @@
import com.hazelcast.jet.sql.impl.schema.TableResolverImpl;
import com.hazelcast.jet.sql.impl.schema.TypeDefinitionColumn;
import com.hazelcast.jet.sql.impl.schema.TypesUtils;
import com.hazelcast.jet.sql.impl.validate.UpdateDataLinkOperation;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
import com.hazelcast.map.impl.EntryRemovingProcessor;
import com.hazelcast.map.impl.MapContainer;
Expand All @@ -67,7 +72,7 @@
import com.hazelcast.nio.serialization.ClassDefinition;
import com.hazelcast.query.impl.getters.Extractors;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import com.hazelcast.sql.SqlColumnMetadata;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlRowMetadata;
Expand All @@ -80,7 +85,7 @@
import com.hazelcast.sql.impl.expression.ExpressionEvalContext;
import com.hazelcast.sql.impl.row.EmptyRow;
import com.hazelcast.sql.impl.row.JetSqlRow;
import com.hazelcast.sql.impl.schema.datalink.DataLink;
import com.hazelcast.sql.impl.schema.datalink.DataLinkCatalogEntry;
import com.hazelcast.sql.impl.schema.type.Type;
import com.hazelcast.sql.impl.schema.type.TypeKind;
import com.hazelcast.sql.impl.schema.view.View;
Expand All @@ -90,6 +95,7 @@
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.SqlNode;

import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -105,11 +111,12 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.hazelcast.cluster.memberselector.MemberSelectors.DATA_MEMBER_SELECTOR;
import static com.hazelcast.config.BitmapIndexOptions.UniqueKeyTransformation;
import static com.hazelcast.jet.config.JobConfigArguments.KEY_SQL_QUERY_TEXT;
import static com.hazelcast.jet.config.JobConfigArguments.KEY_SQL_UNBOUNDED;
import static com.hazelcast.jet.datamodel.Tuple2.tuple2;
import static com.hazelcast.jet.impl.util.ExceptionUtil.isTopologyException;
import static com.hazelcast.jet.impl.util.Util.getNodeEngine;
import static com.hazelcast.jet.impl.util.Util.getSerializationService;
import static com.hazelcast.jet.sql.impl.SqlPlanImpl.CreateDataLinkPlan;
import static com.hazelcast.jet.sql.impl.parse.SqlCreateIndex.UNIQUE_KEY;
Expand All @@ -129,21 +136,27 @@ public class PlanExecutor {
private final TableResolverImpl catalog;
private final DataLinksResolver dataLinksCatalog;
private final HazelcastInstance hazelcastInstance;
private final NodeEngine nodeEngine;
private final QueryResultRegistry resultRegistry;

private final ILogger logger;

// test-only
private final AtomicLong directIMapQueriesExecuted = new AtomicLong();

public PlanExecutor(
TableResolverImpl catalog,
DataLinksResolver dataLinksCatalog,
HazelcastInstance hazelcastInstance,
DataLinksResolver dataLinksResolver,
NodeEngine nodeEngine,
QueryResultRegistry resultRegistry
) {
this.catalog = catalog;
this.dataLinksCatalog = dataLinksCatalog;
this.hazelcastInstance = hazelcastInstance;
this.dataLinksCatalog = dataLinksResolver;
this.nodeEngine = nodeEngine;
this.hazelcastInstance = nodeEngine.getHazelcastInstance();
this.resultRegistry = resultRegistry;

logger = nodeEngine.getLogger(getClass());
}

SqlResult execute(CreateMappingPlan plan) {
Expand All @@ -157,28 +170,29 @@ SqlResult execute(DropMappingPlan plan) {
}

SqlResult execute(CreateDataLinkPlan plan) {
InternalDataLinkService dlService = getNodeEngine(hazelcastInstance).getDataLinkService();
if (plan.ifNotExists() && dlService.existsDataLink(plan.name())) {
throw new HazelcastException("Data link '" + plan.name() + "' already exists");
}
InternalDataLinkService dlService = nodeEngine.getDataLinkService();
assert !plan.ifNotExists() || !plan.isReplace();

dlService.createSqlDataLink(plan.name(), plan.type(), plan.options(), plan.isReplace());
dataLinksCatalog.createDataLink(
new DataLink(plan.name(), plan.type(), plan.options()),
if (dlService.existsConfigDataLink(plan.name())) {
throw new HazelcastException("Cannot replace a data link created from configuration");
}
boolean added = dataLinksCatalog.createDataLink(
new DataLinkCatalogEntry(plan.name(), plan.type(), plan.options()),
plan.isReplace(),
plan.ifNotExists());
if (added) {
broadcastUpdateDataLinkOperations(plan.name());
}
return UpdateSqlResultImpl.createUpdateCountResult(0);
}

SqlResult execute(DropDataLinkPlan plan) {
InternalDataLinkService dlService = getNodeEngine(hazelcastInstance).getDataLinkService();
if (plan.ifExists()) {
if (!dlService.existsDataLink(plan.name())) {
throw new HazelcastException("Data link '" + plan.name() + "' not found");
}
InternalDataLinkService dlService = nodeEngine.getDataLinkService();
if (dlService.existsConfigDataLink(plan.name())) {
throw new HazelcastException("Data link '" + plan.name() + "' is configured via Config and can't be removed");
}
dlService.removeDataLink(plan.name());
dataLinksCatalog.removeDataLink(plan.name(), plan.ifExists());
broadcastUpdateDataLinkOperations(plan.name());
return UpdateSqlResultImpl.createUpdateCountResult(0);
}

Expand Down Expand Up @@ -358,7 +372,6 @@ SqlResult execute(ShowStatementPlan plan) {
rows = catalog.getViewNames().stream();
break;
case JOBS:
NodeEngine nodeEngine = getNodeEngine(hazelcastInstance);
JetServiceBackend jetServiceBackend = nodeEngine.getService(JetServiceBackend.SERVICE_NAME);
rows = jetServiceBackend.getJobRepository().getActiveJobNames().stream();
break;
Expand Down Expand Up @@ -537,7 +550,6 @@ SqlResult execute(IMapDeletePlan plan, List<Object> arguments, long timeout) {
}

SqlResult execute(CreateTypePlan plan) {
NodeEngineImpl nodeEngine = getNodeEngine(hazelcastInstance);
if (!nodeEngine.getProperties().getBoolean(SQL_CUSTOM_TYPES_ENABLED)) {
throw QueryException.error("Experimental feature of creating custom types isn't enabled. To enable, set "
+ SQL_CUSTOM_TYPES_ENABLED + " to true");
Expand Down Expand Up @@ -691,6 +703,31 @@ private static <K, V> MapContainer getMapContainer(IMap<K, V> map) {
return mapServiceContext.getMapContainer(map.getName());
}

private void broadcastUpdateDataLinkOperations(@Nonnull String dataLinkName) {
List<Tuple2<Address, CompletableFuture<?>>> futures = new ArrayList<>();
for (Member m : nodeEngine.getClusterService().getMembers(DATA_MEMBER_SELECTOR)) {
UpdateDataLinkOperation op = new UpdateDataLinkOperation(dataLinkName);
Address target = m.getAddress();
InvocationFuture<Object> future = nodeEngine.getOperationService()
.createInvocationBuilder(JetServiceBackend.SERVICE_NAME, op, target)
.invoke();
futures.add(tuple2(target, future));
}

for (Tuple2<Address, CompletableFuture<?>> tuple : futures) {
try {
assert tuple.f1() != null;
tuple.f1().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (ExecutionException e) {
logger.warning("Failed to update datalink '" + dataLinkName + "' on member '" + tuple.f0()
+ "'. Background process should resolve this");
}
}
}

public long getDirectIMapQueriesExecuted() {
return directIMapQueriesExecuted.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,50 +16,52 @@

package com.hazelcast.jet.sql.impl.connector.infoschema;

import com.hazelcast.jet.json.JsonUtil;
import com.hazelcast.sql.impl.schema.ConstantTableStatistics;
import com.hazelcast.sql.impl.schema.TableField;
import com.hazelcast.sql.impl.schema.datalink.DataLink;
import com.hazelcast.sql.impl.schema.datalink.DataLinkCatalogEntry;
import com.hazelcast.sql.impl.type.QueryDataType;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import static com.hazelcast.jet.impl.util.Util.uncheckCall;
import static java.util.Arrays.asList;

public class DataLinksTable extends InfoSchemaTable {
private static final String NAME = "datalinks";

private static final List<TableField> FIELDS = asList(
new TableField("table_catalog", QueryDataType.VARCHAR, false),
new TableField("table_schema", QueryDataType.VARCHAR, false),
new TableField("datalink_catalog", QueryDataType.VARCHAR, false),
new TableField("datalink_schema", QueryDataType.VARCHAR, false),
new TableField("datalink_name", QueryDataType.VARCHAR, false),
new TableField("datalink_type", QueryDataType.VARCHAR, false),
new TableField("datalink_options", QueryDataType.VARCHAR, false)
);

private final String dataLinkSchema;
private final Collection<DataLink> dataLinks;
private final Collection<DataLinkCatalogEntry> dataLinkCatalogEntries;

public DataLinksTable(String catalog,
String schemaName,
String dataLinkSchema,
Collection<DataLink> dataLinks) {
Collection<DataLinkCatalogEntry> dataLinkCatalogEntries) {
super(FIELDS, catalog, schemaName, NAME, new ConstantTableStatistics(0));
this.dataLinkSchema = dataLinkSchema;
this.dataLinks = dataLinks;
this.dataLinkCatalogEntries = dataLinkCatalogEntries;
}

@Override
protected List<Object[]> rows() {
List<Object[]> rows = new ArrayList<>(dataLinks.size());
for (DataLink dl : dataLinks) {
List<Object[]> rows = new ArrayList<>(dataLinkCatalogEntries.size());
for (DataLinkCatalogEntry dl : dataLinkCatalogEntries) {
Object[] row = new Object[]{
catalog(),
dataLinkSchema,
dl.getName(),
dl.getType(),
dl.getOptions().toString()
uncheckCall(() -> JsonUtil.toJson(dl.getOptions()))
};
rows.add(row);
}
Expand Down
Loading

0 comments on commit c6f0fe6

Please sign in to comment.