Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create spark job to get user file acc times using wmarchive lfnarray belonging crab jobs #113

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

mrceyhun
Copy link
Contributor

@mrceyhun mrceyhun commented Sep 7, 2022

This PR includes calculation of last/first access times of datasets accessed by user jobs, using WMArchive data by filtering only CRAB* jobtype.

There are so many lines for extracting DBS additional information but main logic that extract LFN files and their information from WMArchive data can be found in udf_lfn_extract function.

@drkovalskyi WMArchive HDFS data goes back to 18 months, so to 2021-03. It means that if a file was accessed before 2021-03-01, we will not have this information. I'm running the spark job on full data and send it to ES. I'll inform you when it is visible in Kibana.

@vkuznet you're the WMArchive expert and creator of its producer. If you've time, a review would be great.

Bash script is long but it's our general script that we use for other cron jobs, so it's trivial on our side.

@mrceyhun mrceyhun self-assigned this Sep 7, 2022
@mrceyhun mrceyhun force-pushed the f-wma-file-access branch 3 times, most recently from a6de041 to e684264 Compare September 7, 2022 19:43
@mrceyhun
Copy link
Contributor Author

mrceyhun commented Sep 7, 2022

Changed access time from data.wmats to data.meta_data.ts

@mrceyhun mrceyhun force-pushed the f-wma-file-access branch 4 times, most recently from a8e9b52 to fb01a2e Compare September 7, 2022 22:15
Copy link

@brij01 brij01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mrceyhun.
changes look good to me, description in the script file is very clear as well.
awaiting review by WMArchive expert @vkuznet

# ------------------------------------------------------------------------------------------------------- RUN SPARK JOB
# Required for Spark job in K8s
util4logi "spark job starts"
export PYTHONPATH=$script_dir/../src/python:$PYTHONPATH
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very dangerous construct which hides the dependency that this script requires specific data structure with python source. I rather prefer to put it on top of the script and perform check of loading some python module which this script requires. If module import fail you can through an error asking user to setup proper PYTHONPATH environment.

# Define logs path for Spark imports which produce lots of info logs
LOG_DIR="$WDIR"/logs/$(date +%Y%m%d)
mkdir -p "$LOG_DIR"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The script should print all environment variable it uses, e.g.

echo "LOG_DIR=$LOGDIR"
...

This will help you later in debug process.

@@ -0,0 +1,122 @@
#!/bin/bash
set -e
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add author part into the script.

HDFS_DBS_PHYSICS_GROUPS = f'/tmp/cmsmonit/rucio_daily_stats-{TODAY}/PHYSICS_GROUPS/part*.avro'
HDFS_DBS_ACQUISITION_ERAS = f'/tmp/cmsmonit/rucio_daily_stats-{TODAY}/ACQUISITION_ERAS/part*.avro'
HDFS_DBS_DATASET_ACCESS_TYPES = f'/tmp/cmsmonit/rucio_daily_stats-{TODAY}/DATASET_ACCESS_TYPES/part*.avro'

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to add dump of all global variable to the stdout to help debugging process

# Send data with STOMP AMQ
# =====================================================================================================================
def credentials(f_name):
if os.path.exists(f_name):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add docstring to describe format of input file.

@mrceyhun mrceyhun force-pushed the f-wma-file-access branch 3 times, most recently from d834c44 to 3324217 Compare September 11, 2022 21:15
@mrceyhun
Copy link
Contributor Author

Thanks @vkuznet , I applied all changes.

@drkovalskyi Spark job is ready to test. Since it takes so much time, I could not fully test it, I'll do tomorrow. And also there were some problems in swan which slowed me a bit.

In general:

  • dataset creation time will come from max(CREATION_DATE) of files.
  • Instead of filtering only CRAB3 wma jobs, I included all of them in case we need.
  • Joined them with previous access times that come from Rucio Replicas. There will be separate columns for dataset last access times, such as RucioLastAccess and WmaLastAccess.
  • Only datasets on DISK and prod RSEs(not test/temp) are used.
  • There are so many logical operations and joins of 10 tables, hope I'm not missing anything.

@mrceyhun
Copy link
Contributor Author

mrceyhun commented Oct 4, 2022

This spark job requires proper documentation when it is completely done. Let me explain latest changes here:

As requested by Dima, we need to get last access times and access counts of user jobs to datasets. WMArchive provides user job information in jobtype:CRAB3. Without using any jobstatus filter, WmaAccessCnt is calculated for each dataset. Without giving into more detail, this information is extracted from files the lots of joins and aggregations with files of LFNArray in WMArchive, Rucio tables and DBS tables.
To be able to calculate access count properly, we need to calculate accesses to child datasets too. DBS dataset_parents table provide the parent/child relationship between datasets. Because a dataset can have multiple parents, I used a custom logic to get parent datasets count including their child datasets counts. This gist link gives a hint how it is calculated: https://gist.github.com/mrceyhun/b2b13dab8bc401d7f6b0e6035866d154 and you can check related functions. Said that WmaTotalAccessCnt provides total USER accesses to dataset and WmaAccessCnt provides USER access to dataset itself only. If we can say how deep parent:child dataset relationship is AS hierarchy level, to make things efficient, I did not include calculations after 5 level, though we can include. For example, NANOAOD->MINIAOD->AOD->RAW is 3 level: child->parent->grandparent->grandgrandparent .

When we come to access times, LastAccess provides latest access to dataset which include Rucio REPLICAS values. However, WmaLastAccess and WmaFirstAccess come from WMArchive USER jobs. As it can be understood by the naming, any field starts with Wma means only CRAB3 job results of WMArchive.

Last but not least, there 2 critical filters: RseType: DISK and RseKind:prod which means no test/temp RSE files(replicas) are included in the calculations that requires Rucio REPLICAS table, like LastAccess.

fyi @drkovalskyi

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants