Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
jerqi committed Oct 30, 2024
1 parent 5d01018 commit 442bdb9
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.gravitino.integration.test.container.RangerContainer;
import org.apache.gravitino.integration.test.util.BaseIT;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.gravitino.rel.TableChange;
import org.apache.kyuubi.plugin.spark.authz.AccessControlException;
import org.apache.spark.SparkUnsupportedOperationException;
import org.apache.spark.sql.AnalysisException;
Expand Down Expand Up @@ -253,6 +254,9 @@ void testCreateSchema() throws InterruptedException {
// Third, succeed to create the schema
sparkSession.sql(SQL_CREATE_SCHEMA);

// Fourth, fail to create the table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_CREATE_TABLE));

// Clean up
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(roleName);
Expand Down Expand Up @@ -346,10 +350,16 @@ void testReadWriteTableWithMetalakeLevelRole() throws InterruptedException {
// case 6: Fail to drop the table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_DROP_TABLE));

// case 7: If we don't have the role, we can't insert and select from data.
metalake.deleteRole(readWriteRole);
waitForUpdatingPolicies();
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_INSERT_TABLE));
Assertions.assertThrows(
AccessControlException.class, () -> sparkSession.sql(SQL_SELECT_TABLE).collectAsList());

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(readWriteRole);
}

@Test
Expand Down Expand Up @@ -404,10 +414,16 @@ void testReadWriteTableWithTableLevelRole() throws InterruptedException {
// case 6: Fail to drop the table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_DROP_TABLE));

// case 7: If we don't have the role, we can't insert and select from data.
metalake.deleteRole(roleName);
waitForUpdatingPolicies();
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_INSERT_TABLE));
Assertions.assertThrows(
AccessControlException.class, () -> sparkSession.sql(SQL_SELECT_TABLE).collectAsList());

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(roleName);
}

@Test
Expand Down Expand Up @@ -452,16 +468,22 @@ void testReadOnlyTable() throws InterruptedException {
// case 6: Fail to drop the table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_DROP_TABLE));

// case 7: If we don't have the role, we can't insert and select from data.
metalake.deleteRole(readOnlyRole);
waitForUpdatingPolicies();
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_INSERT_TABLE));
Assertions.assertThrows(
AccessControlException.class, () -> sparkSession.sql(SQL_SELECT_TABLE).collectAsList());

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(readOnlyRole);
}

@Test
void testWriteOnlyTable() throws InterruptedException {
// First, create a role for creating a database and grant role to the user
String readOnlyRole = currentFunName();
String writeOnlyRole = currentFunName();
SecurableObject securableObject =
SecurableObjects.ofMetalake(
metalakeName,
Expand All @@ -471,8 +493,8 @@ void testWriteOnlyTable() throws InterruptedException {
Privileges.CreateTable.allow(),
Privileges.ModifyTable.allow()));
String userName1 = System.getenv(HADOOP_USER_NAME);
metalake.createRole(readOnlyRole, Collections.emptyMap(), Lists.newArrayList(securableObject));
metalake.grantRolesToUser(Lists.newArrayList(readOnlyRole), userName1);
metalake.createRole(writeOnlyRole, Collections.emptyMap(), Lists.newArrayList(securableObject));
metalake.grantRolesToUser(Lists.newArrayList(writeOnlyRole), userName1);
waitForUpdatingPolicies();
// Second, create a schema
sparkSession.sql(SQL_CREATE_SCHEMA);
Expand Down Expand Up @@ -501,10 +523,16 @@ void testWriteOnlyTable() throws InterruptedException {
// case 6: Fail to drop the table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_DROP_TABLE));

// case 7: If we don't have the role, we can't insert and select from data.
metalake.deleteRole(writeOnlyRole);
waitForUpdatingPolicies();
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_INSERT_TABLE));
Assertions.assertThrows(
AccessControlException.class, () -> sparkSession.sql(SQL_SELECT_TABLE).collectAsList());

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(readOnlyRole);
}

@Test
Expand Down Expand Up @@ -553,6 +581,9 @@ void testCreateAllPrivilegesRole() throws InterruptedException {

@Test
void testDeleteAndRecreateRole() throws InterruptedException {
// Fail to create schema
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_DROP_SCHEMA));

// Create a role with CREATE_SCHEMA privilege
String roleName = currentFunName();
SecurableObject securableObject =
Expand Down Expand Up @@ -613,6 +644,8 @@ void testDeleteAndRecreateMetadataObject() throws InterruptedException {
// Create a schema
sparkSession.sql(SQL_CREATE_SCHEMA);

Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_DROP_SCHEMA));

// Set owner
MetadataObject schemaObject =
MetadataObjects.of(catalogName, schemaName, MetadataObject.Type.SCHEMA);
Expand All @@ -621,13 +654,28 @@ void testDeleteAndRecreateMetadataObject() throws InterruptedException {

// Delete a schema
sparkSession.sql(SQL_DROP_SCHEMA);
catalog.asSchemas().dropSchema(schemaName, true);
waitForUpdatingPolicies();

// Recreate a schema
sparkSession.sql(SQL_CREATE_SCHEMA);

Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_DROP_SCHEMA));

// Set owner
schemaObject = MetadataObjects.of(catalogName, schemaName, MetadataObject.Type.SCHEMA);
metalake.setOwner(schemaObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();
sparkSession.sql(SQL_DROP_SCHEMA);

// Delete the role and fail to create schema
metalake.deleteRole(roleName);
waitForUpdatingPolicies();
;
sparkSession.sql(SQL_CREATE_SCHEMA);

// Clean up
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(roleName);
}

@Test
Expand Down Expand Up @@ -665,6 +713,52 @@ void testRenameMetadataObject() throws InterruptedException {
metalake.deleteRole(roleName);
}

@Test
void testRenameMetadataObjectPrivilege() throws InterruptedException {
// Create a role with CREATE_SCHEMA and CREATE_TABLE privilege
String roleName = currentFunName();
SecurableObject securableObject =
SecurableObjects.parse(
String.format("%s", catalogName),
MetadataObject.Type.CATALOG,
Lists.newArrayList(
Privileges.UseCatalog.allow(),
Privileges.CreateSchema.allow(),
Privileges.CreateTable.allow(),
Privileges.ModifyTable.allow()));
metalake.createRole(roleName, Collections.emptyMap(), Lists.newArrayList(securableObject));
// Granted this role to the spark execution user `HADOOP_USER_NAME`
String userName1 = System.getenv(HADOOP_USER_NAME);
metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);

waitForUpdatingPolicies();

// Create a schema and a table
sparkSession.sql(SQL_CREATE_SCHEMA);
sparkSession.sql(SQL_USE_SCHEMA);
sparkSession.sql(SQL_CREATE_TABLE);

// Rename a table and rename back
catalog
.asTableCatalog()
.alterTable(NameIdentifier.of(schemaName, tableName), TableChange.rename("new_table"));

// Succeed to insert data
sparkSession.sql("INSERT INTO new_table (a, b, c) VALUES (1, 'a', 'b')");

catalog
.asTableCatalog()
.alterTable(NameIdentifier.of(schemaName, "new_table"), TableChange.rename(tableName));

// Succeed to insert data
sparkSession.sql(SQL_INSERT_TABLE);

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(roleName);
}

@Test
void testChangeOwner() throws InterruptedException {
// Create a schema and a table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,24 +104,30 @@ public Catalog createCatalog(
@Override
public Catalog alterCatalog(NameIdentifier ident, CatalogChange... changes)
throws NoSuchCatalogException, IllegalArgumentException {
// For underlying authorization plugins, the privilege information shouldn't
// contain catalog information, so catalog rename won't affect the privileges
// of the authorization plugin.
return dispatcher.alterCatalog(ident, changes);
Catalog alteredCatalog = dispatcher.alterCatalog(ident, changes);
CatalogChange.RenameCatalog lastRenameChange = null;
for (CatalogChange change : changes) {
if (change instanceof CatalogChange.RenameCatalog) {
lastRenameChange = (CatalogChange.RenameCatalog) change;
}
}
if (lastRenameChange != null) {
AuthorizationUtils.renameAuthorizationPluginPrivileges(
ident, Entity.EntityType.CATALOG, lastRenameChange.getNewName());
}
return alteredCatalog;
}

@Override
public boolean dropCatalog(NameIdentifier ident) {
// For catalog, we don't clear all the privileges of catalog authorization plugin.
// we just remove catalog.
AuthorizationUtils.removeAuthorizationPluginPrivileges(ident, Entity.EntityType.CATALOG);
return dispatcher.dropCatalog(ident);
}

@Override
public boolean dropCatalog(NameIdentifier ident, boolean force)
throws NonEmptyEntityException, CatalogInUseException {
// For catalog, we don't clear all the privileges of catalog authorization plugin.
// we just remove catalog.
AuthorizationUtils.removeAuthorizationPluginPrivileges(ident, Entity.EntityType.CATALOG);
return dispatcher.dropCatalog(ident, force);
}

Expand Down

0 comments on commit 442bdb9

Please sign in to comment.