Skip to content

Commit

Permalink
[#5146] fix(core): Support to rename and delete metadata object in th…
Browse files Browse the repository at this point in the history
…e authorization plugin
  • Loading branch information
jerqi committed Oct 29, 2024
1 parent bb20a1e commit 699500c
Show file tree
Hide file tree
Showing 19 changed files with 742 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.auth.AuthConstants;
import org.apache.gravitino.auth.AuthenticatorType;
import org.apache.gravitino.authorization.Owner;
import org.apache.gravitino.authorization.Privileges;
import org.apache.gravitino.authorization.SecurableObject;
import org.apache.gravitino.authorization.SecurableObjects;
Expand Down Expand Up @@ -93,6 +94,8 @@ public class RangerHiveE2EIT extends BaseIT {

private static final String SQL_CREATE_SCHEMA = String.format("CREATE DATABASE %s", schemaName);

private static final String SQL_DROP_SCHEMA = String.format("DROP DATABASE %s", schemaName);

private static final String SQL_USE_SCHEMA = String.format("USE SCHEMA %s", schemaName);

private static final String SQL_CREATE_TABLE =
Expand All @@ -112,6 +115,12 @@ public class RangerHiveE2EIT extends BaseIT {
private static final String SQL_ALTER_TABLE =
String.format("ALTER TABLE %s ADD COLUMN d string", tableName);

private static final String SQL_RENAME_TABLE =
String.format("ALTER TABLE %s RENAME TO new_table", tableName);

private static final String SQL_RENAME_BACK_TABLE =
String.format("ALTER TABLE new_table RENAME TO %s", tableName);

private static final String SQL_DROP_TABLE = String.format("DROP TABLE %s", tableName);

private static String RANGER_ADMIN_URL = null;
Expand Down Expand Up @@ -471,10 +480,10 @@ void testCreateAllPrivilegesRole() throws InterruptedException {

waitForUpdatingPolicies();

// Test to create a schema
// Test to create the schema
sparkSession.sql(SQL_CREATE_SCHEMA);

// Test to creat a table
// Test to create a table
sparkSession.sql(SQL_USE_SCHEMA);
sparkSession.sql(SQL_CREATE_TABLE);

Expand Down Expand Up @@ -527,6 +536,208 @@ void testDeleteAndRecreateRole() throws InterruptedException {
metalake.deleteRole(roleName);
}

@Test
void testDeleteAndRecreateMetadataObject() throws InterruptedException {
// Create a role with CREATE_SCHEMA privilege
String roleName = currentFunName();
SecurableObject securableObject =
SecurableObjects.parse(
String.format("%s", catalogName),
MetadataObject.Type.CATALOG,
Lists.newArrayList(Privileges.CreateSchema.allow()));
metalake.createRole(roleName, Collections.emptyMap(), Lists.newArrayList(securableObject));

// Granted this role to the spark execution user `HADOOP_USER_NAME`
String userName1 = System.getenv(HADOOP_USER_NAME);
metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);
waitForUpdatingPolicies();

// Create a schema
sparkSession.sql(SQL_CREATE_SCHEMA);

// Set owner
MetadataObject schemaObject =
MetadataObjects.of(catalogName, schemaName, MetadataObject.Type.SCHEMA);
metalake.setOwner(schemaObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Delete a schema
sparkSession.sql(SQL_DROP_SCHEMA);

// Recreate a schema
sparkSession.sql(SQL_CREATE_SCHEMA);

// Clean up
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(roleName);
}

@Test
void testRenameMetadataObject() throws InterruptedException {
// Create a role with CREATE_SCHEMA and CREATE_TABLE privilege
String roleName = currentFunName();
SecurableObject securableObject =
SecurableObjects.parse(
String.format("%s", catalogName),
MetadataObject.Type.CATALOG,
Lists.newArrayList(
Privileges.UseCatalog.allow(),
Privileges.CreateSchema.allow(),
Privileges.CreateTable.allow(),
Privileges.ModifyTable.allow()));
metalake.createRole(roleName, Collections.emptyMap(), Lists.newArrayList(securableObject));
// Granted this role to the spark execution user `HADOOP_USER_NAME`
String userName1 = System.getenv(HADOOP_USER_NAME);
metalake.grantRolesToUser(Lists.newArrayList(roleName), userName1);

waitForUpdatingPolicies();

// Create a schema and a table
sparkSession.sql(SQL_CREATE_SCHEMA);
sparkSession.sql(SQL_USE_SCHEMA);
sparkSession.sql(SQL_CREATE_TABLE);

// Rename a table and rename back
sparkSession.sql(SQL_RENAME_TABLE);
sparkSession.sql(SQL_RENAME_BACK_TABLE);

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
metalake.deleteRole(roleName);
}

@Test
void testChangeOwner() throws InterruptedException {
// Create a schema and a table
String helperRole = currentFunName();
SecurableObject securableObject =
SecurableObjects.ofMetalake(
metalakeName,
Lists.newArrayList(
Privileges.UseSchema.allow(),
Privileges.CreateSchema.allow(),
Privileges.CreateTable.allow(),
Privileges.ModifyTable.allow()));
String userName1 = System.getenv(HADOOP_USER_NAME);
metalake.createRole(helperRole, Collections.emptyMap(), Lists.newArrayList(securableObject));
metalake.grantRolesToUser(Lists.newArrayList(helperRole), userName1);
waitForUpdatingPolicies();

// Create a schema and a table
sparkSession.sql(SQL_CREATE_SCHEMA);
sparkSession.sql(SQL_USE_SCHEMA);
sparkSession.sql(SQL_CREATE_TABLE);
sparkSession.sql(SQL_INSERT_TABLE);

metalake.revokeRolesFromUser(Lists.newArrayList(helperRole), userName1);
metalake.deleteRole(helperRole);
waitForUpdatingPolicies();

// case 1. Have none of privileges of the table

// - a. Fail to insert data into the table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_INSERT_TABLE));

// - b. Fail to select data from the table
Assertions.assertThrows(
AccessControlException.class, () -> sparkSession.sql(SQL_SELECT_TABLE).collectAsList());

// - c: Fail to update data in the table
Assertions.assertThrows(
SparkUnsupportedOperationException.class, () -> sparkSession.sql(SQL_UPDATE_TABLE));

// - d: Fail to delete data from the table
Assertions.assertThrows(AnalysisException.class, () -> sparkSession.sql(SQL_DELETE_TABLE));

// - e: Fail to alter the table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_ALTER_TABLE));

// - f: Fail to drop the table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_DROP_TABLE));

// case 2. user is the table owner
MetadataObject tableObject =
MetadataObjects.of(
Lists.newArrayList(catalogName, schemaName, tableName), MetadataObject.Type.TABLE);
metalake.setOwner(tableObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Owner has all the privileges except for creating table
checkTableAllPrivilegesExceptForCreating();

// Delete Gravitino's meta data
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
waitForUpdatingPolicies();

// Fail to create the table
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_CREATE_TABLE));

// case 3. user is the schema owner
MetadataObject schemaObject =
MetadataObjects.of(catalogName, schemaName, MetadataObject.Type.SCHEMA);
metalake.setOwner(schemaObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Succeed to create a table
sparkSession.sql(SQL_CREATE_TABLE);

// Succeed to check other table privileges
checkTableAllPrivilegesExceptForCreating();

// Succeed to drop schema
sparkSession.sql(SQL_DROP_SCHEMA);
catalog.asSchemas().dropSchema(schemaName, true);
waitForUpdatingPolicies();

// Fail to create schema
Assertions.assertThrows(
AccessControlException.class, () -> sparkSession.sql(SQL_CREATE_SCHEMA));

// case 4. user is the catalog owner
MetadataObject catalogObject =
MetadataObjects.of(null, catalogName, MetadataObject.Type.CATALOG);
metalake.setOwner(catalogObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Succeed to create a schema
sparkSession.sql(SQL_CREATE_SCHEMA);

// Succeed to create a table
sparkSession.sql(SQL_CREATE_TABLE);

// Succeed to check other table privileges
checkTableAllPrivilegesExceptForCreating();

// Succeed to drop schema
sparkSession.sql(SQL_DROP_SCHEMA);
catalog.asSchemas().dropSchema(schemaName, true);
waitForUpdatingPolicies();

metalake.setOwner(catalogObject, AuthConstants.ANONYMOUS_USER, Owner.Type.USER);
// case 5. user is the metalake owner
MetadataObject metalakeObject =
MetadataObjects.of(null, metalakeName, MetadataObject.Type.METALAKE);
metalake.setOwner(metalakeObject, userName1, Owner.Type.USER);
waitForUpdatingPolicies();

// Succeed to create a schema
sparkSession.sql(SQL_CREATE_SCHEMA);

// Succeed to create a table
sparkSession.sql(SQL_CREATE_TABLE);

// Succeed to check other table privileges
checkTableAllPrivilegesExceptForCreating();

// Succeed to drop schema
sparkSession.sql(SQL_DROP_SCHEMA);

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, true);
}

@Test
void testAllowUseSchemaPrivilege() throws InterruptedException {
// Create a role with CREATE_SCHEMA privilege
Expand Down Expand Up @@ -623,6 +834,27 @@ private static void createCatalog() {
LOG.info("Catalog created: {}", catalog);
}

private void checkTableAllPrivilegesExceptForCreating() {
// - a. Succeed to insert data into the table
sparkSession.sql(SQL_INSERT_TABLE);

// - b. Succeed to select data from the table
sparkSession.sql(SQL_SELECT_TABLE).collectAsList();

// - c: Fail to update data in the table. Because Hive doesn't support
Assertions.assertThrows(
SparkUnsupportedOperationException.class, () -> sparkSession.sql(SQL_UPDATE_TABLE));

// - d: Fail to delete data from the table, Because Hive doesn't support
Assertions.assertThrows(AnalysisException.class, () -> sparkSession.sql(SQL_DELETE_TABLE));

// - e: Succeed to alter the table
sparkSession.sql(SQL_ALTER_TABLE);

// - f: Succeed to drop the table
sparkSession.sql(SQL_DROP_TABLE);
}

private static void waitForUpdatingPolicies() throws InterruptedException {
// After Ranger authorization, Must wait a period of time for the Ranger Spark plugin to update
// the policy Sleep time must be greater than the policy update interval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ Role createRole(
* Lists the role names associated the metadata object.
*
* @param metalake The Metalake of the Role.
* @param object The object of the Roles.
* @return The role list.
* @throws NoSuchMetalakeException If the Metalake with the given name does not exist.
* @throws NoSuchMetadataObjectException If the Metadata object with the given name does not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,17 +186,6 @@ public static void callAuthorizationPluginForMetadataObject(
}
}

private static void callAuthorizationPluginImpl(
Consumer<AuthorizationPlugin> consumer, Catalog catalog) {

if (catalog instanceof BaseCatalog) {
BaseCatalog baseCatalog = (BaseCatalog) catalog;
if (baseCatalog.getAuthorizationPlugin() != null) {
consumer.accept(baseCatalog.getAuthorizationPlugin());
}
}
}

public static boolean needApplyAuthorizationPluginAllCatalogs(SecurableObject securableObject) {
if (securableObject.type() == MetadataObject.Type.METALAKE) {
List<Privilege> privileges = securableObject.privileges();
Expand Down Expand Up @@ -271,4 +260,50 @@ private static boolean needApplyAuthorizationPluginAllCatalogs(MetadataObject.Ty
private static boolean needApplyAuthorization(MetadataObject.Type type) {
return type != MetadataObject.Type.ROLE && type != MetadataObject.Type.METALAKE;
}

private static void callAuthorizationPluginImpl(
Consumer<AuthorizationPlugin> consumer, Catalog catalog) {

if (catalog instanceof BaseCatalog) {
BaseCatalog baseCatalog = (BaseCatalog) catalog;
if (baseCatalog.getAuthorizationPlugin() != null) {
consumer.accept(baseCatalog.getAuthorizationPlugin());
}
}
}

public static void removeAuthorizationPluginPrivileges(
NameIdentifier ident, Entity.EntityType type) {
// If we enable authorization, we should remove the privileges about the entity in the
// authorization plugin.
if (GravitinoEnv.getInstance().accessControlDispatcher() != null) {
MetadataObject metadataObject = NameIdentifierUtil.toMetadataObject(ident, type);
MetadataObjectChange removeObject = MetadataObjectChange.remove(metadataObject);
callAuthorizationPluginForMetadataObject(
ident.namespace().level(0),
metadataObject,
authorizationPlugin -> {
authorizationPlugin.onMetadataUpdated(removeObject);
});
}
}

public static void renameAuthorizationPluginPrivileges(
NameIdentifier ident, Entity.EntityType type, String newName) {
// If we enable authorization, we should rename the privileges about the entity in the
// authorization plugin.
if (GravitinoEnv.getInstance().accessControlDispatcher() != null) {
MetadataObject oldMetadataObject = NameIdentifierUtil.toMetadataObject(ident, type);
MetadataObject newMetadataObject =
NameIdentifierUtil.toMetadataObject(NameIdentifier.of(ident.namespace(), newName), type);
MetadataObjectChange renameObject =
MetadataObjectChange.rename(oldMetadataObject, newMetadataObject);
callAuthorizationPluginForMetadataObject(
ident.namespace().level(0),
oldMetadataObject,
authorizationPlugin -> {
authorizationPlugin.onMetadataUpdated(renameObject);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,24 @@ public Catalog createCatalog(
@Override
public Catalog alterCatalog(NameIdentifier ident, CatalogChange... changes)
throws NoSuchCatalogException, IllegalArgumentException {
// For underlying authorization plugins, the privilege information shouldn't
// contain catalog information, so catalog rename won't affect the privileges
// of the authorization plugin.
return dispatcher.alterCatalog(ident, changes);
}

@Override
public boolean dropCatalog(NameIdentifier ident) {
// For catalog, we don't clear all the privileges of catalog authorization plugin.
// we just remove catalog.
return dispatcher.dropCatalog(ident);
}

@Override
public boolean dropCatalog(NameIdentifier ident, boolean force)
throws NonEmptyEntityException, CatalogInUseException {
// For catalog, we don't clear all the privileges of catalog authorization plugin.
// we just remove catalog.
return dispatcher.dropCatalog(ident, force);
}

Expand Down
Loading

0 comments on commit 699500c

Please sign in to comment.