Skip to content

Commit

Permalink
feat (VOMS2IAM): add options to sync from IAM
Browse files Browse the repository at this point in the history
  • Loading branch information
chaen committed Jun 3, 2024
1 parent 5917f15 commit a50ec49
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 11 deletions.
25 changes: 25 additions & 0 deletions src/DIRAC/ConfigurationSystem/Agent/VOMS2CSAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
corresponding options defined in the ``/Registry/VO/<VO_name>`` configuration section.
"""


from DIRAC import S_OK, gConfig, S_ERROR
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.Utilities.Proxy import executeWithUserProxy
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOOption, getUserOption
from DIRAC.ConfigurationSystem.Client.VOMS2CSSynchronizer import VOMS2CSSynchronizer
from DIRAC.FrameworkSystem.Client.NotificationClient import NotificationClient
from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager
from DIRAC.Resources.Catalog.FileCatalog import FileCatalog


Expand All @@ -50,6 +53,8 @@ def __init__(self, *args, **kwargs):
self.autoLiftSuspendedStatus = True
self.mailFrom = "[email protected]"
self.syncPluginName = None
self.compareWithIAM = False
self.useIAM = False

def initialize(self):
"""Initialize the default parameters"""
Expand All @@ -63,6 +68,8 @@ def initialize(self):
self.autoLiftSuspendedStatus = self.am_getOption("AutoLiftSuspendedStatus", self.autoLiftSuspendedStatus)
self.makeFCEntry = self.am_getOption("MakeHomeDirectory", self.makeFCEntry)
self.syncPluginName = self.am_getOption("SyncPluginName", self.syncPluginName)
self.compareWithIAM = self.am_getOption("CompareWithIAM", self.compareWithIAM)
self.useIAM = self.am_getOption("UseIAM", self.useIAM)

self.detailedReport = self.am_getOption("DetailedReport", self.detailedReport)
self.mailFrom = self.am_getOption("MailFrom", self.mailFrom)
Expand Down Expand Up @@ -95,13 +102,31 @@ def execute(self):
autoLiftSuspendedStatus = getVOOption(vo, "AutoLiftSuspendedStatus", self.autoLiftSuspendedStatus)
syncPluginName = getVOOption(vo, "SyncPluginName", self.syncPluginName)

compareWithIAM = getVOOption(vo, "CompareWithIAM", self.compareWithIAM)
useIAM = getVOOption(vo, "UseIAM", self.useIAM)

accessToken = None
if compareWithIAM or useIAM:
res = gTokenManager.getToken(
userGroup=voAdminGroup,
requiredTimeLeft=3600,
scope=["scim:read"],
)
if not res["OK"]:
return res

accessToken = res["Value"]["access_token"]

vomsSync = VOMS2CSSynchronizer(
vo,
autoAddUsers=autoAddUsers,
autoModifyUsers=autoModifyUsers,
autoDeleteUsers=autoDeleteUsers,
autoLiftSuspendedStatus=autoLiftSuspendedStatus,
syncPluginName=syncPluginName,
compareWithIAM=compareWithIAM,
useIAM=useIAM,
accessToken=accessToken,
)

result = self.__syncCSWithVOMS( # pylint: disable=unexpected-keyword-arg
Expand Down
71 changes: 65 additions & 6 deletions src/DIRAC/ConfigurationSystem/Client/VOMS2CSSynchronizer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
""" VOMS2CSSyncronizer is a helper class containing the logic for synchronization
of the VOMS user data with the DIRAC Registry
"""

from collections import defaultdict

from DIRAC import S_OK, S_ERROR, gLogger, gConfig

from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise, convertToReturnValue
from DIRAC.Core.Security.IAMService import IAMService
from DIRAC.Core.Security.VOMSService import VOMSService
from DIRAC.Core.Utilities.List import fromChar
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
Expand Down Expand Up @@ -127,6 +129,9 @@ def __init__(
autoDeleteUsers=False,
autoLiftSuspendedStatus=False,
syncPluginName=None,
compareWithIAM=False,
useIAM=False,
accessToken=None,
):
"""VOMS2CSSynchronizer class constructor
Expand All @@ -136,6 +141,9 @@ def __init__(
:param autoDeleteUsers: flag to automatically delete users from CS if no more in VOMS
:param autoLiftSuspendedStatus: flag to automatically remove Suspended status in CS
:param syncPluginName: name of the plugin to validate or extend users' info
:param compareWithIAM: if true, also dump the list of users from IAM and compare
:param useIAM: if True, use Iam instead of VOMS
:param accessToken: if talking to IAM, needs a token with scim:read property
:return: None
"""
Expand All @@ -154,6 +162,9 @@ def __init__(
self.autoLiftSuspendedStatus = autoLiftSuspendedStatus
self.voChanged = False
self.syncPlugin = None
self.compareWithIAM = compareWithIAM
self.useIAM = useIAM
self.accessToken = accessToken

if syncPluginName:
objLoader = ObjectLoader()
Expand All @@ -166,6 +177,57 @@ def __init__(

self.syncPlugin = _class["Value"]()

def compare_entry(self, iam_entry, voms_entry, is_robot):
"""Compare a VOMS and IAM entry"""

if not iam_entry.get("mail") == voms_entry.get("mail"):
self.log.info(f"{iam_entry['nickname']} - mail : {iam_entry.get('mail')} vs {voms_entry.get('mail')}")
if is_robot:
self.log.info("\t this is expected for robots !")

for field in ("CA", "certSuspended", "suspended", "mail", "nickname"):
if not iam_entry.get(field) == voms_entry.get(field):
self.log.info(f"{iam_entry['nickname']} - {field} : {iam_entry.get(field)} vs {voms_entry.get(field)}")

if not sorted(iam_entry["Roles"]) == sorted(voms_entry["Roles"]):
self.log.info(f"{iam_entry['nickname']} - Roles : {iam_entry['Roles']} vs {voms_entry['Roles']}")

def compareUsers(self, voms_users, iam_users):
missing_in_iam = set(voms_users) - set(iam_users)
if missing_in_iam:
self.log.info("Missing entries in IAM:", missing_in_iam)
else:
self.log.info("No entry missing in IAM, GOOD !")
# suspended_in_voms = {dn for dn in voms_users if voms_users[dn]["suspended"]}
missing_in_voms = set(iam_users) - set(voms_users)

if missing_in_voms:
self.log.info("Entries in IAM that are not in VOMS:", missing_in_voms)
else:
self.log.info("No extra entry entries in IAM, GOOD !")

# We are waiting for IAM to synchronize also suspended people
# https://github.com/indigo-iam/voms-importer/pull/22
# assert missing_in_iam == suspended_in_voms

for dn in set(iam_users) & set(voms_users):
is_robot = "CN=Robot:" in dn
self.compare_entry(iam_users[dn], voms_users[dn], is_robot=is_robot)

@convertToReturnValue
def _getUsers(self):
if self.compareWithIAM or self.useIAM:
iamSrv = IAMService(self.accessToken, vo=self.vo)
iam_users = returnValueOrRaise(iamSrv.getUsers())
if self.useIAM:
return iam_users

vomsSrv = VOMSService(self.vo)
voms_users = returnValueOrRaise(vomsSrv.getUsers())
if self.compareWithIAM:
self.compareUsers(voms_users, iam_users)
return voms_users

def syncCSWithVOMS(self):
"""Performs the synchronization of the DIRAC registry with the VOMS data. The resulting
CSAPI object containing modifications is returned as part of the output dictionary.
Expand All @@ -186,12 +248,9 @@ def syncCSWithVOMS(self):
noVOMSGroups = result["Value"]["NoVOMS"]
noSyncVOMSGroups = result["Value"]["NoSyncVOMS"]

vomsSrv = VOMSService(self.vo)

# Get VOMS users
result = vomsSrv.getUsers()
result = self._getUsers()
if not result["OK"]:
self.log.error("Could not retrieve user information from VOMS", result["Message"])
self.log.error("Could not retrieve user information", result["Message"])
return result

self.vomsUserDict = result["Value"]
Expand Down
5 changes: 5 additions & 0 deletions src/DIRAC/ConfigurationSystem/ConfigTemplate.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ Agents
DryRun = True
# Name of the plugin to validate or expand user's info. See :py:mod:`DIRAC.ConfigurationSystem.Client.SyncPlugins.DummySyncPlugin`
SyncPluginName =
# If set to true, will query the VO IAM server for the list of user, and print
# a comparison of what is with VOMS
CompareWithIAM = False
# If set to true, will only query IAM and return the list of users from there
UseIAM = False
}
##END
##BEGIN GOCDB2CSAgent
Expand Down
34 changes: 31 additions & 3 deletions src/DIRAC/ConfigurationSystem/scripts/dirac_admin_voms_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
from DIRAC.ConfigurationSystem.Client.VOMS2CSSynchronizer import VOMS2CSSynchronizer
from DIRAC.Core.Utilities.Proxy import executeWithUserProxy
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOOption

from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager

dryRun = False
voName = None
compareWithIAM = False
useIAM = False


def setDryRun(value):
Expand All @@ -29,10 +31,25 @@ def setVO(value):
return S_OK()


def setCompareWithIAM(value):
global compareWithIAM
compareWithIAM = True
return S_OK()


def setUseIAM(value):
global useIAM
useIAM = True
return S_OK()


@Script()
def main():
Script.registerSwitch("V:", "vo=", "VO name", setVO)
Script.registerSwitch("D", "dryRun", "Dry run", setDryRun)
Script.registerSwitch("C", "compareWithIAM", "Compare user list with IAM", setCompareWithIAM)
Script.registerSwitch("I", "useIAM", "Use IAM as authoritative source", setUseIAM)

Script.parseCommandLine(ignoreErrors=True)

@executeWithUserProxy
Expand All @@ -41,8 +58,19 @@ def syncCSWithVOMS(vomsSync):

voAdminUser = getVOOption(voName, "VOAdmin")
voAdminGroup = getVOOption(voName, "VOAdminGroup", getVOOption(voName, "DefaultGroup"))

vomsSync = VOMS2CSSynchronizer(voName)
accessToken = None
if compareWithIAM or useIAM:
res = gTokenManager.getToken(
userGroup=voAdminGroup,
requiredTimeLeft=3600,
scope=["scim:read"],
)
if not res["OK"]:
return res

accessToken = res["Value"]["access_token"]

vomsSync = VOMS2CSSynchronizer(voName, compareWithIAM=compareWithIAM, useIAM=useIAM, accessToken=accessToken)
result = syncCSWithVOMS( # pylint: disable=unexpected-keyword-arg
vomsSync, proxyUserName=voAdminUser, proxyUserGroup=voAdminGroup
)
Expand Down
123 changes: 123 additions & 0 deletions src/DIRAC/Core/Security/IAMService.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
""" IAMService class encapsulates connection to the IAM service for a given VO
"""

import requests

from DIRAC import gConfig, gLogger, S_OK, S_ERROR
from DIRAC.Core.Utilities import DErrno
from DIRAC.Core.Security.Locations import getProxyLocation, getCAsLocation
from DIRAC.Core.Utilities.Decorators import deprecated
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOOption
from DIRAC.ConfigurationSystem.Client.Helpers.CSGlobals import getVO


def convert_dn(inStr):
"""Convert a string separated DN into the slash one, like
CN=Christophe Haen,CN=705305,CN=chaen,OU=Users,OU=Organic Units,DC=cern,DC=ch
/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=chaen/CN=705305/CN=Christophe Haen
"""
return "/" + "/".join(inStr.split(",")[::-1])


class IAMService:
def __init__(self, access_token, vo=None):
"""c'tor
:param str vo: name of the virtual organization (community)
:param str access_token: the token used to talk to IAM, with the scim:read property
"""

if not access_token:
raise ValueError("access_token not set")

if vo is None:
vo = getVO()
if not vo:
raise Exception("No VO name given")

self.vo = vo

self.iam_url = None

id_provider = gConfig.getValue(f"/Registry/VO/{self.vo}/IdProvider")
if not id_provider:
raise ValueError(f"/Registry/VO/{self.vo}/IdProvider not found")
result = gConfig.getOptionsDict(f"/Resources/IdProviders/{id_provider}")
if result["OK"]:
self.iam_url = result["Value"]["issuer"]
gLogger.verbose("Using IAM server", self.iam_url)
else:
raise ValueError(f"/Resources/IdProviders/{id_provider}")

self.userDict = None
self.access_token = access_token

def _getIamUserDump(self):
"""List the users from IAM"""

headers = {"Authorization": f"Bearer {self.access_token}"}
iam_list_url = f"{self.iam_url}/scim/Users"
iam_users = []
startIndex = 1
totalResults = 1000 # total number of users
itemsPerPage = 10
while startIndex <= totalResults:
resp = requests.get(iam_list_url, headers=headers, params={"startIndex": startIndex})
resp.raise_for_status()
data = resp.json()
# These 2 should never change, but just to be sure...
totalResults = data["totalResults"]
itemsPerPage = data["itemsPerPage"]

startIndex += itemsPerPage
iam_users.extend(data["Resources"])
return iam_users

@staticmethod
def convert_iam_to_voms(iam_output):
"""Convert an IAM entry into the voms style, i.e. DN based"""
converted_output = {}

for cert in iam_output["urn:indigo-dc:scim:schemas:IndigoUser"]["certificates"]:
cert_dict = {}
dn = convert_dn(cert["subjectDn"])
ca = convert_dn(cert["issuerDn"])

cert_dict["CA"] = ca
cert_dict["nickname"] = iam_output["userName"]
# This is not correct, we take the overall status instead of the certificate one
# however there are no known case of cert suspended while the user isn't
cert_dict["certSuspended"] = not iam_output["active"]
# There are still bugs in IAM regarding the active status vs voms suspended

cert_dict["suspended"] = not iam_output["active"]
# The mail may be different, in particular for robot accounts
cert_dict["mail"] = iam_output["emails"][0]["value"].lower()

# https://github.com/indigo-iam/voms-importer/blob/main/vomsimporter.py
roles = []

for role in iam_output["groups"]:
role_name = role["display"]
if "/" in role_name:
role_name = role_name.replace("/", "/Role=")
roles.append(f"/{role_name}")

cert_dict["Roles"] = roles
converted_output[dn] = cert_dict
return converted_output

def getUsers(self):
self.iam_users_raw = self._getIamUserDump()
users = {}
errors = 0
for user in self.iam_users_raw:
try:
users.update(self.convert_iam_to_voms(user))
except Exception as e:
errors += 1
print(f"Could not convert {user['name']} {e!r} ")
print(f"There were in total {errors} errors")
self.userDict = dict(users)
return S_OK(users)
Loading

0 comments on commit a50ec49

Please sign in to comment.