Skip to content

Commit

Permalink
[#5146] fix(core): Support to rename and delete metadata object in th…
Browse files Browse the repository at this point in the history
…e authorization plugin
  • Loading branch information
jerqi committed Oct 30, 2024
1 parent b2730a9 commit 1cc7a19
Show file tree
Hide file tree
Showing 18 changed files with 798 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.auth.AuthConstants;
import org.apache.gravitino.auth.AuthenticatorType;
import org.apache.gravitino.authorization.Owner;
import org.apache.gravitino.authorization.Privileges;
import org.apache.gravitino.authorization.SecurableObject;
import org.apache.gravitino.authorization.SecurableObjects;
Expand Down Expand Up @@ -93,6 +94,8 @@ public class RangerHiveE2EIT extends BaseIT {

private static final String SQL_CREATE_SCHEMA = String.format("CREATE DATABASE %s", schemaName);

private static final String SQL_DROP_SCHEMA = String.format("DROP DATABASE %s", schemaName);

private static final String SQL_USE_SCHEMA = String.format("USE SCHEMA %s", schemaName);

private static final String SQL_CREATE_TABLE =
Expand All @@ -112,6 +115,12 @@ public class RangerHiveE2EIT extends BaseIT {
private static final String SQL_ALTER_TABLE =
String.format("ALTER TABLE %s ADD COLUMN d string", tableName);

private static final String SQL_RENAME_TABLE =
String.format("ALTER TABLE %s RENAME TO new_table", tableName);

private static final String SQL_RENAME_BACK_TABLE =
String.format("ALTER TABLE new_table RENAME TO %s", tableName);

private static final String SQL_DROP_TABLE = String.format("DROP TABLE %s", tableName);

private static String RANGER_ADMIN_URL = null;
Expand Down Expand Up @@ -471,10 +480,10 @@ void testCreateAllPrivilegesRole() throws InterruptedException {

waitForUpdatingPolicies();

// Test to create a schema
// Test to create the schema
sparkSession.sql(SQL_CREATE_SCHEMA);

// Test to creat a table
// Test to create a table
sparkSession.sql(SQL_USE_SCHEMA);
sparkSession.sql(SQL_CREATE_TABLE);

Expand Down Expand Up @@ -527,6 +536,208 @@ void testDeleteAndRecreateRole() throws InterruptedException {
metalake.deleteRole(roleName);
}

@Test
void testDeleteAndRecreateMetadataObject() throws InterruptedException {
// Create a role with CREATE_SCHEMA privilege
String roleName = currentFunName();
SecurableObject securableObject =
SecurableObjects.parse(
String.format("%s", catalogName),
MetadataObject.Type.CATALOG,
Lists.newArrayList(Privileges.CreateSchema.allow()));
metalake.createRole(roleName, Collections.emptyMap(), Lists.newArrayList(securableObject));

// Granted this role to the spark execution user `HADOOP_USER_NAME`
String userName1 = System.getenv(HADOOP_USER_NAME);
metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
waitForUpdatingPolicies();

// Create a schema
sparkSession.sql(SQL_CREATE_SCHEMA);

// Set owner
MetadataObject schemaObject =
MetadataObjects.of(catalogName, schemaName, MetadataObject.Type.SCHEMA);
metalake.setOwner(schemaObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Delete a schema
sparkSession.sql(SQL_DROP_SCHEMA);

// Recreate a schema
sparkSession.sql(SQL_CREATE_SCHEMA);

// Clean up
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(roleName);
}

@Test
void testRenameMetadataObject() throws InterruptedException {
// Create a role with CREATE_SCHEMA and CREATE_TABLE privilege
String roleName = currentFunName();
SecurableObject securableObject =
SecurableObjects.parse(
String.format("%s", catalogName),
MetadataObject.Type.CATALOG,
Lists.newArrayList(
Privileges.UseCatalog.allow(),
Privileges.CreateSchema.allow(),
Privileges.CreateTable.allow(),
Privileges.ModifyTable.allow()));
metalake.createRole(roleName, Collections.emptyMap(), Lists.newArrayList(securableObject));
// Granted this role to the spark execution user `HADOOP_USER_NAME`
String userName1 = System.getenv(HADOOP_USER_NAME);
metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);

waitForUpdatingPolicies();

// Create a schema and a table
sparkSession.sql(SQL_CREATE_SCHEMA);
sparkSession.sql(SQL_USE_SCHEMA);
sparkSession.sql(SQL_CREATE_TABLE);

// Rename a table and rename back
sparkSession.sql(SQL_RENAME_TABLE);
sparkSession.sql(SQL_RENAME_BACK_TABLE);

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(roleName);
}

@Test
void testChangeOwner() throws InterruptedException {
// Create a schema and a table
String helperRole = currentFunName();
SecurableObject securableObject =
SecurableObjects.ofMetalake(
metalakeName,
Lists.newArrayList(
Privileges.UseSchema.allow(),
Privileges.CreateSchema.allow(),
Privileges.CreateTable.allow(),
Privileges.ModifyTable.allow()));
String userName1 = System.getenv(HADOOP_USER_NAME);
metalake.createRole(helperRole, Collections.emptyMap(), Lists.newArrayList(securableObject));
metalake.grantRolesToUser(Lists.newArrayList(helperRole), userName1);
waitForUpdatingPolicies();

// Create a schema and a table
sparkSession.sql(SQL_CREATE_SCHEMA);
sparkSession.sql(SQL_USE_SCHEMA);
sparkSession.sql(SQL_CREATE_TABLE);
sparkSession.sql(SQL_INSERT_TABLE);

metalake.revokeRolesFromUser(Lists.newArrayList(helperRole), userName1);
metalake.deleteRole(helperRole);
waitForUpdatingPolicies();

// case 1. Have none of privileges of the table

// - a. Fail to insert data into the table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_INSERT_TABLE));

// - b. Fail to select data from the table
Assertions.assertThrows(
AccessControlException.class, () -> sparkSession.sql(SQL_SELECT_TABLE).collectAsList());

// - c: Fail to update data in the table
Assertions.assertThrows(
SparkUnsupportedOperationException.class, () -> sparkSession.sql(SQL_UPDATE_TABLE));

// - d: Fail to delete data from the table
Assertions.assertThrows(AnalysisException.class, () -> sparkSession.sql(SQL_DELETE_TABLE));

// - e: Fail to alter the table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_ALTER_TABLE));

// - f: Fail to drop the table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_DROP_TABLE));

// case 2. user is the table owner
MetadataObject tableObject =
MetadataObjects.of(
Lists.newArrayList(catalogName, schemaName, tableName), MetadataObject.Type.TABLE);
metalake.setOwner(tableObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Owner has all the privileges except for creating table
checkTableAllPrivilegesExceptForCreating();

// Delete Gravitino's meta data
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
waitForUpdatingPolicies();

// Fail to create the table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_CREATE_TABLE));

// case 3. user is the schema owner
MetadataObject schemaObject =
MetadataObjects.of(catalogName, schemaName, MetadataObject.Type.SCHEMA);
metalake.setOwner(schemaObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Succeed to create a table
sparkSession.sql(SQL_CREATE_TABLE);

// Succeed to check other table privileges
checkTableAllPrivilegesExceptForCreating();

// Succeed to drop schema
sparkSession.sql(SQL_DROP_SCHEMA);
catalog.asSchemas().dropSchema(schemaName, true);
waitForUpdatingPolicies();

// Fail to create schema
Assertions.assertThrows(
AccessControlException.class, () -> sparkSession.sql(SQL_CREATE_SCHEMA));

// case 4. user is the catalog owner
MetadataObject catalogObject =
MetadataObjects.of(null, catalogName, MetadataObject.Type.CATALOG);
metalake.setOwner(catalogObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Succeed to create a schema
sparkSession.sql(SQL_CREATE_SCHEMA);

// Succeed to create a table
sparkSession.sql(SQL_CREATE_TABLE);

// Succeed to check other table privileges
checkTableAllPrivilegesExceptForCreating();

// Succeed to drop schema
sparkSession.sql(SQL_DROP_SCHEMA);
catalog.asSchemas().dropSchema(schemaName, true);
waitForUpdatingPolicies();

metalake.setOwner(catalogObject, AuthConstants.ANONYMOUS_USER, Owner.Type.USER);
// case 5. user is the metalake owner
MetadataObject metalakeObject =
MetadataObjects.of(null, metalakeName, MetadataObject.Type.METALAKE);
metalake.setOwner(metalakeObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Succeed to create a schema
sparkSession.sql(SQL_CREATE_SCHEMA);

// Succeed to create a table
sparkSession.sql(SQL_CREATE_TABLE);

// Succeed to check other table privileges
checkTableAllPrivilegesExceptForCreating();

// Succeed to drop schema
sparkSession.sql(SQL_DROP_SCHEMA);

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
}

@Test
void testAllowUseSchemaPrivilege() throws InterruptedException {
// Create a role with CREATE_SCHEMA privilege
Expand Down Expand Up @@ -587,6 +798,67 @@ void testAllowUseSchemaPrivilege() throws InterruptedException {
metalake.deleteRole(roleName);
}

@Test
void testDenyPrivileges() throws InterruptedException {
// Create a schema
catalog.asSchemas().createSchema(schemaName, "test", Collections.emptyMap());

// Create a role with CREATE_SCHEMA privilege
String roleName = currentFunName();
SecurableObject allowObject =
SecurableObjects.parse(
String.format("%s", catalogName),
MetadataObject.Type.CATALOG,
Lists.newArrayList(Privileges.UseSchema.allow(), Privileges.CreateTable.allow()));
SecurableObject denyObject =
SecurableObjects.parse(
String.format("%s.%s", catalogName, schemaName),
MetadataObject.Type.SCHEMA,
Lists.newArrayList(Privileges.CreateTable.deny()));
// Create a role, catalog allows to create a table, schema denies to create a table
metalake.createRole(
roleName, Collections.emptyMap(), Lists.newArrayList(allowObject, denyObject));

// Granted this role to the spark execution user `HADOOP_USER_NAME`
String userName1 = System.getenv(HADOOP_USER_NAME);
metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
waitForUpdatingPolicies();

// Fail to create a table
sparkSession.sql(SQL_USE_SCHEMA);
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_CREATE_TABLE));

// Delete the role
metalake.deleteRole(roleName);

// Create another role, but catalog denies to create a table, schema allows to create a table
allowObject =
SecurableObjects.parse(
String.format("%s", catalogName),
MetadataObject.Type.CATALOG,
Lists.newArrayList(Privileges.CreateTable.deny()));
denyObject =
SecurableObjects.parse(
String.format("%s.%s", catalogName, schemaName),
MetadataObject.Type.SCHEMA,
Lists.newArrayList(Privileges.CreateTable.allow()));
metalake.createRole(
roleName, Collections.emptyMap(), Lists.newArrayList(allowObject, denyObject));

// Granted this role to the spark execution user `HADOOP_USER_NAME`
userName1 = System.getenv(HADOOP_USER_NAME);
metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);

waitForUpdatingPolicies();

// Fail to create a table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_CREATE_TABLE));

// Clean up
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(roleName);
}

private void createMetalake() {
GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes();
Assertions.assertEquals(0, gravitinoMetalakes.length);
Expand Down Expand Up @@ -623,6 +895,27 @@ private static void createCatalog() {
LOG.info("Catalog created: {}", catalog);
}

private void checkTableAllPrivilegesExceptForCreating() {
// - a. Succeed to insert data into the table
sparkSession.sql(SQL_INSERT_TABLE);

// - b. Succeed to select data from the table
sparkSession.sql(SQL_SELECT_TABLE).collectAsList();

// - c: Fail to update data in the table. Because Hive doesn't support
Assertions.assertThrows(
SparkUnsupportedOperationException.class, () -> sparkSession.sql(SQL_UPDATE_TABLE));

// - d: Fail to delete data from the table, Because Hive doesn't support
Assertions.assertThrows(AnalysisException.class, () -> sparkSession.sql(SQL_DELETE_TABLE));

// - e: Succeed to alter the table
sparkSession.sql(SQL_ALTER_TABLE);

// - f: Succeed to drop the table
sparkSession.sql(SQL_DROP_TABLE);
}

private static void waitForUpdatingPolicies() throws InterruptedException {
// After Ranger authorization, Must wait a period of time for the Ranger Spark plugin to update
// the policy Sleep time must be greater than the policy update interval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ Role createRole(
* Lists the role names associated the metadata object.
*
* @param metalake The Metalake of the Role.
* @param object The object of the Roles.
* @return The role list.
* @throws NoSuchMetalakeException If the Metalake with the given name does not exist.
* @throws NoSuchMetadataObjectException If the Metadata object with the given name does not
Expand Down
Loading

0 comments on commit 1cc7a19

Please sign in to comment.