diff --git a/README.rst b/README.rst index a28e9011c..501d23438 100644 --- a/README.rst +++ b/README.rst @@ -36,15 +36,17 @@ At the moment of the release v0.4.2 of *magic-cta-pipe*, some LST-1 data are pro Installation for users ---------------------- -*magic-cta-pipe* and its dependencies may be installed using the *Anaconda* or *Miniconda* package system. We recommend creating a conda virtual environment +The very first step to reduce MAGIC-LST data is to have remote access/credentials to the IT Container. If you do not have it, please write an email to request it to , and the admin will send you the instructions to connect to the IT container. + +*magic-cta-pipe* and its dependencies may be installed using the *Anaconda* or *Miniconda* package system (if you have mamba installed, we recommend you to use it instead of conda, so that the installation process will be much faster; if you don't have anaconda/miniconda/miniforge, please install one of them into your workspace directory). We recommend creating a conda virtual environment first, to isolate the installed version and dependencies from your master environment (this is optional). The following command will set up a conda virtual environment, add the necessary package channels, and install *magic-cta-pipe* and its dependencies:: git clone https://github.com/cta-observatory/magic-cta-pipe.git cd magic-cta-pipe - conda env create -n magic-lst1 -f environment.yml - conda activate magic-lst1 + conda env create -n magic-lst -f environment.yml + conda activate magic-lst pip install . In general, *magic-cta-pipe* is still in heavy development phase, so expect large changes between different releases. diff --git a/environment.yml b/environment.yml index 55a3fd163..6f6354f1f 100644 --- a/environment.yml +++ b/environment.yml @@ -38,7 +38,7 @@ dependencies: - pyyaml - scipy~=1.11.4 - scikit-learn=1.2 - - setuptools + - setuptools<=71 - sphinx - sphinx-automodapi - sphinx-design diff --git a/magicctapipe/conftest.py b/magicctapipe/conftest.py index 6158d760d..aba518d71 100644 --- a/magicctapipe/conftest.py +++ b/magicctapipe/conftest.py @@ -411,7 +411,7 @@ def config_monly(): @pytest.fixture(scope="session") def config_gen(): - config_path = resource_file("test_config_general.yaml") + config_path = resource_file("test_config_auto_MCP.yaml") with open(config_path, "rb") as f: config = yaml.safe_load(f) return config @@ -419,7 +419,7 @@ def config_gen(): @pytest.fixture(scope="session") def config_gen_4lst(): - config_path = resource_file("test_config_general_4LST.yaml") + config_path = resource_file("test_config_auto_MCP_4LST.yaml") with open(config_path, "rb") as f: config = yaml.safe_load(f) return config diff --git a/magicctapipe/scripts/lst1_magic/config.yaml b/magicctapipe/resources/config.yaml similarity index 100% rename from magicctapipe/scripts/lst1_magic/config.yaml rename to magicctapipe/resources/config.yaml diff --git a/magicctapipe/resources/database_config.yaml b/magicctapipe/resources/database_config.yaml new file mode 100644 index 000000000..193698835 --- /dev/null +++ b/magicctapipe/resources/database_config.yaml @@ -0,0 +1,12 @@ +database_paths: + MAGIC+LST1: "/fefs/aswg/workspace/federico.dipierro/MAGIC_LST1_simultaneous_runs_info/simultaneous_obs_summary.h5" + MAGIC+LST1_bis: "/home/alessio.berti/MAGIC-LST_common/runfile/simultaneous_obs_summary.h5" + MAGIC: '/fefs/aswg/workspace/joanna.wojtowicz/data/Common_MAGIC_LST1_data_MAGIC_runs_subruns.h5' + LST: "/fefs/aswg/workspace/elisa.visentin/auto_MCP_PR/observations_LST.h5" + +database_keys: + MAGIC+LST1: '/str/table' + MAGIC+LST1_bis: '/str' + MAGIC-I: "MAGIC1/runs_M1" + MAGIC-II: "MAGIC2/runs_M2" + LST: "joint_obs" diff --git a/magicctapipe/resources/test_config_general.yaml b/magicctapipe/resources/test_config_auto_MCP.yaml similarity index 100% rename from magicctapipe/resources/test_config_general.yaml rename to magicctapipe/resources/test_config_auto_MCP.yaml diff --git a/magicctapipe/resources/test_config_general_4LST.yaml b/magicctapipe/resources/test_config_auto_MCP_4LST.yaml similarity index 100% rename from magicctapipe/resources/test_config_general_4LST.yaml rename to magicctapipe/resources/test_config_auto_MCP_4LST.yaml diff --git a/magicctapipe/scripts/lst1_magic/README.md b/magicctapipe/scripts/lst1_magic/README.md new file mode 100644 index 000000000..c494d0d72 --- /dev/null +++ b/magicctapipe/scripts/lst1_magic/README.md @@ -0,0 +1,192 @@ +# Scripts for MAGIC+LST analysis + +This folder contains the scripts to perform MAGIC+LST analysis in a semi-automatic way. + +Each script can be called from the command line from anywhere in your system. Please run them with `-h` option for the first time to check what are the options available. + + +## Overview + + +MAGIC+LST analysis starts from MAGIC calibrated data (\_Y\_ files), LST data level 1 (DL1) data and SimTelArray DL0 data, and our goal is to achieve data level 3 (DL3). + +Behind the scenes, the semi-automatic scripts will run: +- `magic_calib_to_dl1` on real MAGIC data, to convert them into DL1 format. +- `merge_hdf_files` on MAGIC data to merge subruns and/or runs together. +- `lst1_magic_event_coincidence` to find coincident events between MAGIC and LST-1, starting from DL1 data. +- `lst1_magic_stereo_reco` to add stereo parameters to the DL1 data. +- `lst1_magic_train_rfs` to train the RFs (energy, direction, classification) on train gamma MCs and protons. +- `lst1_magic_dl1_stereo_to_dl2` to apply the RFs to stereo DL1 data (real and test MCs) and produce DL2 data. +- `lst1_magic_create_irf` to create the IRF. +- `lst1_magic_dl2_to_dl3` to create DL3 files, and `create_dl3_index_files` to create DL3 HDU and index files. + +From DL3 on, the analysis is done with gammapy. + + +## Analysis + +During the analysis, some files (i.e., bash scripts, lists of sources and runs) are automatically produced by the scripts and are saved in your working directory. These files are necessary for the subsequent steps in the analysis chain. It is therefore mandatory to always launch the scripts from the same working directory so that the output files stored there can be correctly assigned as input files at the subsequent analysis steps. + +### DL0 to DL1 + +In this step, we will convert the MAGIC Calibrated data to Data Level (DL) 1 (our goal is to reach DL3). + +In your working IT Container directory (i.e., `workspace_dir`), open your environment with the command `conda activate {env_name}` and update the file `config_auto_MCP.yaml` according to your analysis. If you need non-standard parameters (e.g., for the cleaning), take care that the `resources/config.yaml` file gets installed when you install the pipeline, so you will have to copy it, e.g. in your workspace, modify it and put the path to this new file in the `config_auto_MCP.yaml` (this way you don't need to install again the pipeline). + +The file `config_auto_MCP.yaml` must contain parameters for data selection and some information on the night sky background (NSB) level and software versions: + +``` + +directories: + workspace_dir : "/fefs/aswg/workspace/elisa.visentin/auto_MCP_PR/" # Output directory where all the data products will be saved. + + +data_selection: + source_name_database: "CrabNebula" # MUST BE THE SAME AS IN THE DATABASE; Set to null to process all sources in the given time range. + source_name_output: 'Crabtest' # Name tag of your target. Used only if source_name_database != null. + time_range : True # Search for all runs in a LST time range (e.g., 2020_01_01 -> 2022_01_01). + min : "2023_11_17" + max : "2024_03_03" + date_list : ['2020_12_15','2021_03_11'] # LST list of days to be processed (only if time_range=False), format: YYYY_MM_DD. + skip_LST_runs: [3216,3217] # LST runs to ignore. + skip_MAGIC_runs: [5094658] # MAGIC runs to ignore. + +general: + base_config_file: '' # path + name to a custom MCP config file. If not provided, the default config.yaml file will be used + LST_version : "v0.10" # check the `processed_lstchain_file` version in the LST database! + LST_tailcut : "tailcut84" + simtel_nsb : "/fefs/aswg/data/mc/DL0/LSTProd2/TestDataset/sim_telarray/node_theta_14.984_az_355.158_/output_v1.4/simtel_corsika_theta_14.984_az_355.158_run10.simtel.gz" # simtel file (DL0) to evaluate NSB + lstchain_modified_config : true # use_flatfield_heuristic = True to evaluate NSB + nsb : [0.5, 1.0, 1.5, 2.0, 2.5, 3.0] + env_name : magic-lst # name of the conda environment to be used to process data. + cluster : "SLURM" # cluster management system on which data are processed. At the moment we have only SLURM available, in the future maybe also condor (PIC, CNAF). + + +``` + +WARNING: Only the runs for which the `LST_version` parameter matches the `processed_lstchain_file` version in the LST database (i.e., the version used to evaluate the NSB level; generally the last available and processable version of a run) will be processed. + +WARNING: `env_name` must be the same as the name of the environment in which you installed this version of the pipeline + +Now that the configuration file is ready, let's create a list with all the MAGIC+LST1 runs for the time window (or list of nights) defined on the `config_auto_MCP.yaml` file: + +> $ list_from_h5 -c config_auto_MCP.yaml + +The output in the terminal should look like this: +``` +Cleaning pre-existing *_LST_runs.txt and *_MAGIC_runs.txt files +Source: XXX +Finding LST runs... +Source: XXX +Finding MAGIC runs... +``` +And it will save the files `{TARGET}_LST_runs.txt`, `{TARGET}_MAGIC_runs.txt`, and `list_sources.dat` (i.e., the list of all the sources found in the database according to both custom and default settings) in your current working directory. In case no runs are found for MAGIC and/or LST (for a source and a given time range/list of dates), a warning will be printed and no output text file will be produced for the given source and telescope(s). + +At this point, we can convert the MAGIC data into DL1 format with the following command: +> $ dl1_production -c config_auto_MCP.yaml + +The output in the terminal will be something like this: +``` +*** Converting Calibrated into DL1 data *** +Process name: {source} +To check the jobs submitted to the cluster, type: squeue -n {source} +``` + +The command `dl1_production` does a series of things: + +- Creates a directory with the target name within the directory `yourprojectname/{MCP_version}` and several subdirectories inside it that are necessary for the rest of the data reduction. The main directories are: +``` +workspace_dir/VERSION/ +workspace_dir/VERSION/{source}/DL1 +workspace_dir/VERSION/{source}/DL1/[subdirectories] +``` +where [subdirectories] stands for several subdirectories containing the MAGIC subruns in the DL1 format. +- Generates a configuration file called `config_DL0_to_DL1.yaml` with telescope ID information and adopted imaging/cleaning cuts, and puts it in the directory `[...]/yourprojectname/VERSION/{source}/` created in the previous step. +- Links the MAGIC data addresses to their respective subdirectories defined in the previous steps. +- Runs the script `magic_calib_to_dl1.py` for each one of the linked data files. + + +You can check if this process is done with the following commands: + +> $ squeue -n {source} + +or + +> $ squeue -u your_user_name + +Once it is done, all of the subdirectories in `workspace_dir/VERSION/{source}/DL1` will be filled with files of the type `dl1_MX.RunXXXXXX.0XX.h5` for each MAGIC subrun. + +WARNING: some of these jobs could fail due to 'broken' input files: before moving to the next step, check for failed jobs (through `job_accounting` and/or log files) and remove the output files produced by these failed jobs (these output files will generally have a very small size, lower than few kB, and cannot be read in the following steps) + +The next step of the conversion from calibrated to DL1 is to merge all the MAGIC data files such that in the end, we have only one datafile per night. To do so, we run the following command (always in the directory `yourprojectname`): + +> $ merging_runs (-c config_auto_MCP.yaml) + +The output in the terminal will be something like this: +``` +***** Generating merge_MAGIC bashscripts... +***** Running merge_hdf_files.py in the MAGIC data files... +Process name: merging_{source} +To check the jobs submitted to the cluster, type: squeue -n merging_{source} +``` + +This script will merge MAGIC-I (and MAGIC-II) subruns into runs. + +### Coincident events and stereo parameters on DL1 + +To find coincident events between MAGIC and LST, starting from DL1 data, we run the following command in the working directory: + +> $ coincident_events (-c config_auto_MCP.yaml) + +This script creates the file `config_coincidence.yaml` containing both the telescope IDs and the coincidence parameters listed in the general `config.yaml` file (the one in `magicctapipe/resources`). + +Then, matches LST and MAGIC dates and links the LST data files to the output directory `[...]/DL1Coincident`; eventually, it runs the script `lst1_magic_event_coincidence.py` in all of them. + +Once it is done, we add stereo parameters to the MAGIC+LST coincident DL1 files by running: + +> $ stereo_events (-c config_auto_MCP.yaml) + +This script creates the file `config_stereo.yaml` containing both the telescope IDs and the stereo parameters listed in the general `config.yaml` file (the one in `magicctapipe/resources`). + +It then creates the output directories for the DL1 with stereo parameters `[...]/DL1Stereo`, and then runs the script `lst1_magic_stereo_reco.py` in all of the coincident DL1 files. The stereo DL1 files are then saved in these directories. + +Eventually, to merge DL1 stereo (LST) subruns into runs, we run the `merge_stereo.py` script, whose output will be saved in `[...]/DL1Stereo/Merged`: + +> $ merge_stereo (-c config_auto_MCP.yaml) + +### Random forest and DL1 to DL2 + +TBD. + +### Instrument response function and DL3 + +TBD. + +## High-level analysis + +Since the DL3 may have only a few MBs, it is typically convenient to download it to your own computer at this point. It will be necessary to have astropy and gammapy (version >= 0.20) installed before proceeding. + +The folder [Notebooks](https://github.com/cta-observatory/magic-cta-pipe/tree/master/notebooks) contains Jupyter notebooks to perform checks on the IRF, to produce theta2 plots and SEDs. + + +## For mainteiners (MAGIC and LST databases) + +To create and update the MAGIC and LST databases (from the one produced by AB and FDP) you should use the scripts in `database_production` + +- `create_lst_table`: creates the LST database (1 row per LST run) by dropping some columns from the parent one (AB, FDP) and adding columns for NSB value (default: NaN), lstchain available versions, most recent lstchain version, processed file and NSB error codes (default: -1). It could also be used to update the given database, possibly selecting a given time range from the parent databases (by the -b and -e parameters, which stand for begin and end date of the range). Launched as `create_lst_table (-b YYYYMMDD -e YYYYMMDD)` + +- `lstchain_version`: this scripts loop over all the rows of the database, estract date and run number from the table and look for the data stored on the IT (i.e., which version of lstchain has been used to process a run). It evaluates all the versions used to process a run and the most recent MCP-compatible one according to a hard-coded, ordered list. Launched as `lstchain_version` + +- `nsb_level`: evaluates, for the last (MCP compatible) version of every LST run, the respective NSB value (i.e., the median over the NSB estimated by lstchain over a sub-set of sub-runs per run). This scripts launch a set of jobs (one per run; each job calls the `LSTnsb.py` script) and each jobs produces an output txt file containing a string like `date,run,NSB`; in the title of these files, both the run number and the NSB range are indicated (0.5=(0,0.75), 1.0=(0.75, 1.25),...., 2.5=(2.25,2.75), 3.0=(2.75,3.25), `high`=(3.25,Infinity) ). To limit the number of simultaneous jobs running on SLURM, the script requires that you provide a begin and a end date (-b and -e parameters) in the options. Launched as `nsb_level -c config_auto_MCP.yaml -b YYYY_MM_DD -e YYYY_MM_DD` + +- `LSTnsb`: called by `nsb_level`, it gathers all the subruns for a run, evaluates the NSB for a subset of them (using the lstchain `calculate_noise_parameters` function), evaluates the median over these values and the approximate NSB level according to the list provided in `config_auto_MCP.yaml` (e.g., 0.5, 1.0, 1.5, ...., 2.5, 3.0, `high`) and then creates one txt file per run. These files contain the value of the NSB (i.e., the median over subruns) and are needed to fill the `nsb` column in the LST database. Launched as `LSTnsb (-c MCP_config) -i run -d date -l lstchain_config (-s N_subruns)` + +- `nsb_to_h5`: this script reads the txt files created by `nsb_level` to know the NSB value for each run. This value is used to fill the `nsb` column of the LST database at the location of the respective run number. It also updates the error codes (0: NSB lower than 3.0, 1: NSB could not be evaluated, 2: NSB higher than 3.0). Launched as `nsb_to_h5` + +- `update_magic_db`: this script updates (or creates, if it does not exist) the MAGIC database from a time range provided by the user (-m and -M parameters, which stand for minimum and maximum date). Not to accidentally destroy the current database, the updated database is saved as a new file instead of overwriting the current one. Launched as `update_magic_db -m YYYYMMDD -M YYYYMMDD` + +- `job_accounting`: this script (in `semi_automatic_scripts` directory) allows to track progress of the submitted jobs, in particular listing errors. If you don-t use the `--no-accounting` option, it also provides basic resource statistics (CPU and memory) of the completed jobs. Finally, it can be also used to update the database files with the progress of data processing. Launched as `job_accounting (-c config) (-d data_level) (-v MCP_version) (--no-accounting) (-r h5_database)` + +- `check_MAGIC_runs`: this script checks the MAGIC data stored on the IT (i.e., missing and existing data) in a given time range (-m and -M parameters, which stand for minimum and maximum date). Launched as `check_MAGIC_runs -m YYYYMMDD -M YYYYMMDD` + + diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/__init__.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/__init__.py new file mode 100644 index 000000000..d153a2dd2 --- /dev/null +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/__init__.py @@ -0,0 +1,41 @@ +from .check_MAGIC_runs import ( + existing_files, + fix_lists_and_convert, + missing_files, + table_magic_runs, +) +from .clusters import rc_lines, slurm_lines +from .coincident_events import configfile_coincidence, linking_bash_lst +from .dl1_production import ( + config_file_gen, + directories_generator_real, + lists_and_bash_gen_MAGIC, +) +from .job_accounting import run_shell +from .list_from_h5 import clear_files, list_run, magic_date, split_lst_date +from .merge_stereo import MergeStereo +from .merging_runs import merge +from .stereo_events import bash_stereo, configfile_stereo + +__all__ = [ + "bash_stereo", + "clear_files", + "configfile_coincidence", + "configfile_stereo", + "config_file_gen", + "directories_generator_real", + "existing_files", + "fix_lists_and_convert", + "linking_bash_lst", + "lists_and_bash_gen_MAGIC", + "list_run", + "magic_date", + "merge", + "MergeStereo", + "missing_files", + "rc_lines", + "run_shell", + "slurm_lines", + "split_lst_date", + "table_magic_runs", +] diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/check_MAGIC_runs.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/check_MAGIC_runs.py new file mode 100644 index 000000000..aac5900a1 --- /dev/null +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/check_MAGIC_runs.py @@ -0,0 +1,244 @@ +""" +This script allows to get information about every MAGIC run ID (and subruns) +in files (in a time interval) used for common data analysis (MAGIC1, MAGIC2, LST1). + +The MAGIC files that can be used for analysis are located in the IT cluster +in the following directory: +/fefs/onsite/common/MAGIC/data/M{tel_id}/event/Calibrated/{YYYY}/{MM}/{DD} + +In this path, 'tel_id' refers to the telescope ID, which must be either 1 or 2. +'YYYY', 'MM', and 'DD' specify the date. +""" + +import argparse +import os +from datetime import datetime, timedelta + +import pandas as pd +import yaml + +from magicctapipe.io import resource_file + +__all__ = [ + "fix_lists_and_convert", + "table_magic_runs", + "existing_files", + "missing_files", +] + + +def fix_lists_and_convert(cell): + """ + An additional function necessary to organize lists in the function table_magic_runs. + The function removes brackets to avoid double lists and splits on "][". + + Parameters + ---------- + cell : str + A string representing lists of MAGIC runs from the date and the source. + + Returns + ------- + list + A list of unique integers representing the MAGIC runs. + """ + + parts = cell.replace("][", ",").strip("[]").split(",") + return list(dict.fromkeys(int(item) for item in parts)) + + +def table_magic_runs(df, date_min, date_max): + """ + Generate a table with data filtered by the specified date range. + + Parameters + ---------- + df : pandas.DataFrame + DataFrame with general information about MAGIC+LST1 observations. + date_min : str + Start of the time interval (in LST convention). + date_max : str + End of the time interval (in LST convention). + + Returns + ------- + pandas.DataFrame + A DataFrame filtered by the specified date range. + """ + + df_selected_data = df.iloc[:, [2, 1, 25]] + df_selected_data.columns = ["DATE", "source", "MAGIC_runs"] + grouped_data = df_selected_data.groupby(["DATE", "source"]) + result_table = [] + + for (date, source), group in grouped_data: + if date >= date_min and date <= date_max: + runs_combined = group["MAGIC_runs"].sum() + + result_table.append( + {"DATE": date, "source": source, "MAGIC runs": runs_combined} + ) + + result = pd.DataFrame(result_table) + result["MAGIC runs"] = result["MAGIC runs"].apply(fix_lists_and_convert) + return result + + +def existing_files(tel_id, date, source, magic_run): + """ + Checking existing files on the IT cluster. + + Parameters + ---------- + tel_id : int + The telescope ID, which must be either 1 or 2. + date : str + Date (in LST convention). + source : str + Name of the source. + magic_run : int + The MAGIC run from the date and the source. + """ + + magic_run = str(magic_run) + + date_obj = datetime.strptime(date, "%Y%m%d") + date_obj += timedelta(days=1) + new_date = datetime.strftime(date_obj, "%Y%m%d") + YYYY = new_date[:4] + MM = new_date[4:6] + DD = new_date[6:8] + Y = "_Y_" + + path = f"/fefs/onsite/common/MAGIC/data/M{tel_id}/event/Calibrated/{YYYY}/{MM}/{DD}" + + if os.path.exists(path): + files = os.listdir(path) + count_with_run_id = 0 + # Counter for files that include the run_id. + for filename in files: + if Y in filename: + if new_date in filename: + if source in filename: + if magic_run in filename: + count_with_run_id += 1 + if count_with_run_id != 0: + print(f"{date}\t{source}\t{magic_run}\t{count_with_run_id}") + + +def missing_files(tel_id, date, source, magic_runs): + """ + Checking missing files on the IT cluster. + + Parameters + ---------- + tel_id : int + The telescope ID, which must be either 1 or 2. + date : str + Date (in LST convention). + source : str + Name of the source. + magic_runs : list + List of MAGIC runs from the date and the source. + """ + date_obj = datetime.strptime(date, "%Y%m%d") + date_obj += timedelta(days=1) + new_date = datetime.strftime(date_obj, "%Y%m%d") + YYYY = new_date[:4] + MM = new_date[4:6] + DD = new_date[6:8] + Y = "_Y_" + + path = f"/fefs/onsite/common/MAGIC/data/M{tel_id}/event/Calibrated/{YYYY}/{MM}/{DD}" + + if os.path.exists(path): + files = os.listdir(path) + count_with_source = 0 + count_with_run_id = 0 + # Counter for files that include the source. We want to check if any file with the source was found. + # Counter for files that include the run_id. We want to check if any file with the run_id was found. + for filename in files: + if Y in filename: + if new_date in filename: + if source in filename: + count_with_source += 1 + for run in magic_runs: + run = str(run) + if run in filename: + count_with_run_id += 1 + if count_with_source == 0: + if tel_id == 1: + # Between 2022/09/04 - 2022/12/14 MAGIC 1 had a failure. Therefore we have to skip the range when we want to get information about missing files. + if date <= "20220904" or date >= "20221214": + print(f"No files found containing the source '{source}' on {date}") + if tel_id == 2: + print(f"No files found containing the source '{source}' on {date}") + if count_with_source != 0 and count_with_run_id == 0: + if tel_id == 1 and (date < "20220904" or date > "20221214"): + print(f"No run id: {run} found in the {source} on {date}.") + if tel_id == 2: + print(f"No run id: {run} found in the {source} on {date}.") + else: + print(f"No such file or directory: {date}") + + +def main(): + + """Main function.""" + + parser = argparse.ArgumentParser() + + date_min_default = "20191101" + current_datetime = datetime.now() + date_max_default = current_datetime.strftime("%Y%m%d") + + parser.add_argument( + "--date-min", + "-m", + dest="date_min", + type=str, + default=date_min_default, + help="Start of the time interval (in LST convention, format YYYYMMDD).", + ) + + parser.add_argument( + "--date-max", + "-M", + dest="date_max", + type=str, + default=date_max_default, + help="End of the time interval (in LST convention, format YYYYMMDD).", + ) + + args = parser.parse_args() + + config = resource_file("database_config.yaml") + + with open(config, "rb") as bf: + config_dict = yaml.safe_load(bf) + df_path = config_dict["database_paths"]["MAGIC+LST1"] + df_key = config_dict["database_keys"]["MAGIC+LST1"] + df = pd.read_hdf( + df_path, + key=df_key, + ) + + tel_id = [1, 2] + + database = table_magic_runs(df, args.date_min, args.date_max) + database_exploded = database.explode("MAGIC runs") + database_exploded_reset = database_exploded.reset_index(drop=True) + + for tel in tel_id: + print(f"MAGIC {tel}") + print("DATE\tsource\tRun ID\t Subruns") + for index, row in database_exploded_reset.iterrows(): + existing_files(tel, row["DATE"], row["source"], row["MAGIC runs"]) + print() + for index, row in database.iterrows(): + missing_files(tel, row["DATE"], row["source"], row["MAGIC runs"]) + print() + + +if __name__ == "__main__": + main() diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/clusters.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/clusters.py new file mode 100644 index 000000000..54d149225 --- /dev/null +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/clusters.py @@ -0,0 +1,68 @@ +""" +Module for generating bash script lines for running analysis in different clusters +""" +__all__ = ["slurm_lines", "rc_lines"] + + +def slurm_lines(queue, job_name, array=None, mem=None, out_name=None): + """ + Function for creating the general lines that slurm scripts are starting with. + + Parameters + ---------- + queue : str + Name of the queue + job_name : str + Job name + array : None or int + If not none array of jobs from 0 to array will be made + mem : None or str + Requested memory. If None cluster default (5 GB) will be used + out_name : None or str + If the output should be written to a specific output file + + Returns + ------- + list + List of strings to submit a SLURM job. + """ + lines = [ + "#!/bin/sh\n\n", + f"#SBATCH -p {queue}\n", + f"#SBATCH -J {job_name}\n", + f"#SBATCH --array=0-{array}\n" if array is not None else "", + f"#SBATCH --mem {mem}\n" if mem is not None else "", + "#SBATCH -n 1\n\n", + f"#SBATCH --output={out_name}.out\n" if out_name is not None else "", + f"#SBATCH --error={out_name}.err\n\n" if out_name is not None else "", + "ulimit -l unlimited\n", + "ulimit -s unlimited\n", + "ulimit -a\n\n", + ] + return lines + + +def rc_lines(store, out): + """ + Function for creating the general lines for error tracking. + + Parameters + ---------- + store : str + String what to store in addition to $rc + out : str + Base name for the log files with return codes, all output will go into {out}_return.log, only errors to {out}_failed.log + + Returns + ------- + list + List of strings to attach to a shell script + """ + lines = [ + "rc=$?\n", + 'if [ "$rc" -ne "0" ]; then\n', + f" echo {store} $rc >> {out}_failed.log\n", + "fi\n", + f"echo {store} $rc >> {out}_return.log\n", + ] + return lines diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/coincident_events.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/coincident_events.py new file mode 100644 index 000000000..542c5ba67 --- /dev/null +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/coincident_events.py @@ -0,0 +1,256 @@ +""" +This scripts facilitates the usage of the script +"lst1_magic_event_coincidence.py". This script is +more like a "manager" that organizes the analysis +process by: +1) Creating the bash scripts for looking for +coincidence events between MAGIC and LST in each +night. +2) Creating the subdirectories for the coincident +event files. + +Usage: +$ coincident_events (-c config) +""" +import argparse +import glob +import logging +import os +from datetime import date as dtdt +from datetime import timedelta +from pathlib import Path + +import joblib +import numpy as np +import yaml + +from magicctapipe import __version__ +from magicctapipe.io import resource_file +from magicctapipe.scripts.lst1_magic.semi_automatic_scripts.clusters import ( + rc_lines, + slurm_lines, +) + +__all__ = ["configfile_coincidence", "linking_bash_lst"] + +logger = logging.getLogger(__name__) +logger.addHandler(logging.StreamHandler()) +logger.setLevel(logging.INFO) + + +def configfile_coincidence(target_dir, source_name, config_file): + + """ + This function creates the configuration file needed for the event coincidence step + + Parameters + ---------- + target_dir : str + Path to the working directory + source_name : str + Name of the target source + config_file : str + Path to MCP configuration file (e.g., resources/config.yaml) + """ + + if config_file == "": + config_file = resource_file("config.yaml") + + with open( + config_file, "rb" + ) as fc: # "rb" mode opens the file in binary format for reading + config_dict = yaml.safe_load(fc) + + conf = { + "mc_tel_ids": config_dict["mc_tel_ids"], + "event_coincidence": config_dict["event_coincidence"], + } + + file_name = f"{target_dir}/v{__version__}/{source_name}/config_coincidence.yaml" + + with open(file_name, "w") as f: + + yaml.dump(conf, f, default_flow_style=False) + + +def linking_bash_lst(target_dir, LST_runs, source_name, LST_version, env_name, cluster): + + """ + This function links the LST data paths to the working directory and creates bash scripts. + + Parameters + ---------- + target_dir : str + Path to the working directory + LST_runs : matrix of strings + This matrix ([['date','run'],['date','run']...]) is imported from *_LST_runs.txt files and tells the function where to find the LST data and link them to our working directory + source_name : str + Target name + LST_version : str + The lstchain version used to process the LST data + env_name : str + Name of the conda environment + cluster : str + Cluster system + """ + + coincidence_DL1_dir = f"{target_dir}/v{__version__}/{source_name}" + + MAGIC_DL1_dir = f"{target_dir}/v{__version__}/{source_name}/DL1" + + dates = [os.path.basename(x) for x in glob.glob(f"{MAGIC_DL1_dir}/Merged/[0-9]*")] + if cluster != "SLURM": + logger.warning( + "Automatic processing not implemented for the cluster indicated in the config file" + ) + return + for d in dates: + Y_M, M_M, D_M = [int(x) for x in d.split("_")[:]] + + day_MAGIC = dtdt(Y_M, M_M, D_M) + + delta = timedelta(days=1) + for i in LST_runs: + Y_L, M_L, D_L = [int(x) for x in i[0].split("_")] + + day_LST = dtdt(int(Y_L), int(M_L), int(D_L)) + if day_MAGIC == day_LST + delta: + + lstObsDir = i[0].replace("_", "") + inputdir = ( + f"/fefs/aswg/data/real/DL1/{lstObsDir}/{LST_version}/tailcut84" + ) + + outputdir = f"{coincidence_DL1_dir}/DL1Coincident/{lstObsDir}" + os.makedirs(f"{outputdir}/logs", exist_ok=True) + + list_of_subruns = np.sort(glob.glob(f"{inputdir}/dl1*Run*{i[1]}*.*.h5")) + + with open(f"{outputdir}/logs/list_LST.txt", "a+") as LSTdataPathFile: + for subrun in list_of_subruns: + LSTdataPathFile.write(f"{subrun}\n") + + if not os.path.exists(f"{outputdir}/logs/list_LST.txt"): + continue + with open(f"{outputdir}/logs/list_LST.txt", "r") as f: + process_size = len(f.readlines()) - 1 + + if process_size < 0: + continue + slurm = slurm_lines( + queue="short", + job_name=f"{source_name}_coincidence", + array=process_size, + mem="6g", + out_name=f"{outputdir}/logs/slurm-%x.%A_%a", + ) + rc = rc_lines( + store="$SAMPLE ${SLURM_ARRAY_JOB_ID} ${SLURM_ARRAY_TASK_ID}", + out="$OUTPUTDIR/logs/list", + ) + + lines = ( + slurm + + [ + f"export INM={MAGIC_DL1_dir}/Merged/{d}\n", + f"export OUTPUTDIR={outputdir}\n", + "SAMPLE_LIST=($(<$OUTPUTDIR/logs/list_LST.txt))\n", + "SAMPLE=${SAMPLE_LIST[${SLURM_ARRAY_TASK_ID}]}\n", + "export LOG=$OUTPUTDIR/logs/coincidence_${SLURM_ARRAY_JOB_ID}_${SLURM_ARRAY_TASK_ID}.log\n", + f"conda run -n {env_name} lst1_magic_event_coincidence --input-file-lst $SAMPLE --input-dir-magic $INM --output-dir $OUTPUTDIR --config-file {target_dir}/v{__version__}/{source_name}/config_coincidence.yaml >$LOG 2>&1\n", + ] + + rc + ) + with open( + f"{source_name}_LST_coincident_{outputdir.split('/')[-1]}.sh", + "w", + ) as f: + f.writelines(lines) + + +def main(): + + """ + Main function + """ + + parser = argparse.ArgumentParser() + parser.add_argument( + "--config-file", + "-c", + dest="config_file", + type=str, + default="./config_auto_MCP.yaml", + help="Path to a configuration file", + ) + + args = parser.parse_args() + with open( + args.config_file, "rb" + ) as f: # "rb" mode opens the file in binary format for reading + config = yaml.safe_load(f) + target_dir = Path(config["directories"]["workspace_dir"]) + + env_name = config["general"]["env_name"] + LST_version = config["general"]["LST_version"] + config_file = config["general"]["base_config_file"] + + source_in = config["data_selection"]["source_name_database"] + source = config["data_selection"]["source_name_output"] + + cluster = config["general"]["cluster"] + + if source_in is None: + source_list = joblib.load("list_sources.dat") + + else: + if source is None: + source = source_in + source_list = [source] + for source_name in source_list: + + print("***** Generating file config_coincidence.yaml...") + configfile_coincidence(target_dir, source_name, config_file) + + LST_runs_and_dates = f"{source_name}_LST_runs.txt" + LST_runs = np.genfromtxt(LST_runs_and_dates, dtype=str, delimiter=",", ndmin=2) + + try: + + print("***** Linking the paths to LST data files...") + + print("***** Generating the bashscript...") + linking_bash_lst( + target_dir, + LST_runs, + source_name, + LST_version, + env_name, + cluster, + ) # linking the data paths to current working directory + + print("***** Submitting processess to the cluster...") + print(f"Process name: {source_name}_coincidence") + print( + f"To check the jobs submitted to the cluster, type: squeue -n {source_name}_coincidence" + ) + + # Below we run the bash scripts to find the coincident events + list_of_coincidence_scripts = np.sort( + glob.glob(f"{source_name}_LST_coincident*.sh") + ) + if len(list_of_coincidence_scripts) < 1: + logger.warning("No bash scripts") + continue + launch_jobs = "" + for n, run in enumerate(list_of_coincidence_scripts): + launch_jobs += (" && " if n > 0 else "") + f"sbatch {run}" + + os.system(launch_jobs) + + except OSError as exc: + logger.error(exc) + + +if __name__ == "__main__": + main() diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/config_auto_MCP.yaml b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/config_auto_MCP.yaml new file mode 100644 index 000000000..1ce9c418a --- /dev/null +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/config_auto_MCP.yaml @@ -0,0 +1,24 @@ +directories: + workspace_dir: "/fefs/aswg/workspace/elisa.visentin/auto_MCP_PR/" # Output directory where all the data products will be saved. + + +data_selection: + source_name_database: "CrabNebula" # MUST BE THE SAME AS IN THE DATABASE; Set to null to process all sources in the given time range. + source_name_output: 'Crabtest' # Name tag of your target. Used only if source_name_database != null. + time_range: True # Search for all runs in a LST time range (e.g., 2020_01_01 -> 2022_01_01). + min: "2023_11_17" + max: "2024_03_03" + date_list: ['2020_12_15','2021_03_11'] # LST list of days to be processed (only if time_range=False), format: YYYY_MM_DD. + skip_LST_runs: [3216,3217] # LST runs to ignore. + skip_MAGIC_runs: [5094658] # MAGIC runs to ignore. + +general: + base_config_file: '' # path + name to a custom MCP config file. If not provided, the default config.yaml file will be used + LST_version: "v0.10" # check the `processed_lstchain_file` version in the LST database! + LST_tailcut: "tailcut84" + simtel_nsb: "/fefs/aswg/data/mc/DL0/LSTProd2/TestDataset/sim_telarray/node_theta_14.984_az_355.158_/output_v1.4/simtel_corsika_theta_14.984_az_355.158_run10.simtel.gz" # simtel file (DL0) to evaluate NSB + lstchain_modified_config: true # use_flatfield_heuristic = True to evaluate NSB + nsb: [0.5, 1.0, 1.5, 2.0, 2.5, 3.0] + env_name: magic-lst # name of the conda environment to be used to process data. + cluster: "SLURM" # cluster management system on which data are processed. At the moment we have only SLURM available, in the future maybe also condor (PIC, CNAF). + diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/LSTnsb.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/LSTnsb.py new file mode 100644 index 000000000..955e00bdf --- /dev/null +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/LSTnsb.py @@ -0,0 +1,224 @@ +""" +Evaluates NSB level for a LST run (as a median over the NSB values for a subset of subruns) + +One txt file per run is created here: its content is a (date,run,NSB) n-tuple and its title contain an information about the NSB-bin to which the run belongs (according to the list of NSB values provided in the config file) + +Usage: +$ LSTnsb (-c MCP_config) -i run -d date -l lstchain_config (-s N_subruns) +""" +import argparse +import glob +import logging + +import numpy as np +import pandas as pd +import yaml +from lstchain.image.modifier import calculate_noise_parameters + +__all__ = ["nsb"] + +logger = logging.getLogger(__name__) +logger.addHandler(logging.StreamHandler()) +logger.setLevel(logging.INFO) + + +def update_mod(mod, n_sub, denominator, index, n_noise): + """ + Function to update the step used to extract the subruns for the NSB evaluation + + Parameters + ---------- + mod : int + Sampling step + n_sub : int + Number of subruns in the run + denominator : int + Number of subruns to be used to evaluate NSB for a run + index : int + Index of the currently used subrun + n_noise : int + Number of NSB values already computed + + Returns + ------- + int + Sampling step + """ + if (n_sub > denominator) and (denominator > n_noise): + mod = (n_sub - index) // (denominator - n_noise) + return mod + + +def nsb(run_list, simtel, lst_config, run_number, denominator): + + """ + Here we compute the NSB value for a run based on a subset of its subruns + + Parameters + ---------- + run_list : list + List of subruns in the run + simtel : str + Simtel (MC) file to be used to evaluate the extra noise in dim pixels + lst_config : str + LST configuration file (cf. lstchain) + run_number : int + LST run number + denominator : int + Number of subruns to be used to evaluate NSB for a run + + Returns + ------- + list + List of the sub-run wise NSB values + """ + + noise = [] + + if len(run_list) == 0: + logger.warning( + "There is no subrun matching the provided run number. Check the list of the LST runs (LST_runs.txt)" + ) + return + if len(run_list) <= denominator: + mod = 1 + else: + mod = len(run_list) // denominator + + logger.info("NSB levels (sub-runs): \n") + for ii in range(0, len(run_list)): + subrun = run_list[ii].split(".")[-2] + if mod == 0: + break + if ii % mod == 0: + try: + a, _, _ = calculate_noise_parameters(simtel, run_list[ii], lst_config) + if a is not None: + if a > 0.0: + noise.append(a) + logger.info(a) + else: + df_subrun = pd.read_hdf( + run_list[ii], + key="dl1/event/telescope/parameters/LST_LSTCam", + ) + n_ped = len(df_subrun[df_subrun["event_type"] == 2]) + if n_ped > 0: + noise.append(a) + logger.info(a) + else: + mod = update_mod( + mod, len(run_list), denominator, ii, len(noise) + ) + logger.warning( + f"NSB level could not be adequately evaluated for subrun {subrun} (missing pedestal events): skipping this subrun..." + ) + else: + mod = update_mod(mod, len(run_list), denominator, ii, len(noise)) + logger.warning( + f"NSB level is None for subrun {subrun} (missing interleaved FF): skipping this subrun..." + ) + + except IndexError: + + mod = update_mod(mod, len(run_list), denominator, ii, len(noise)) + logger.warning( + f"Subrun {subrun} caused an error in the NSB level evaluation for run {run_number}. Check reports before using it" + ) + return noise + + +def main(): + + """ + Main function + """ + + parser = argparse.ArgumentParser() + parser.add_argument( + "--config-file", + "-c", + dest="config_file", + type=str, + default="../config_auto_MCP.yaml", + help="Path to a configuration file", + ) + parser.add_argument( + "--input-run", + "-i", + dest="run", + type=str, + help="Run to be processed", + ) + parser.add_argument( + "--day", + "-d", + dest="day", + type=str, + help="Day of the run to be processed", + ) + parser.add_argument( + "--lstchain-config", + "-l", + dest="lst_conf", + type=str, + help="lstchain configuration file", + ) + parser.add_argument( + "--denominator", + "-s", + dest="denominator", + type=int, + default=25, + help="Number of subruns to be processed", + ) + args = parser.parse_args() + with open( + args.config_file, "rb" + ) as f: # "rb" mode opens the file in binary format for reading + config = yaml.safe_load(f) + run_number = args.run + date = args.day + denominator = args.denominator + lst_config = args.lst_conf + simtel = config["general"]["simtel_nsb"] + nsb_list = config["general"]["nsb"] + lst_version = config["general"]["LST_version"] + lst_tailcut = config["general"]["LST_tailcut"] + width = [a / 2 - b / 2 for a, b in zip(nsb_list[1:], nsb_list[:-1])] + width.append(0.25) + nsb_limit = [a + b for a, b in zip(nsb_list[:], width[:])] + nsb_limit.insert( + 0, -0.01 + ) # arbitrary small negative number so that 0.0 > nsb_limit[0] + + LST_files = np.sort(glob.glob(f"nsb_LST_*_{run_number}.txt")) + + if len(LST_files) == 1: + logger.info(f"Run {run_number} already processed") + return + + inputdir = f"/fefs/aswg/data/real/DL1/{date}/{lst_version}/{lst_tailcut}" + run_list = np.sort(glob.glob(f"{inputdir}/dl1*Run*{run_number}.*.h5")) + noise = nsb(run_list, simtel, lst_config, run_number, denominator) + if len(noise) == 0: + logger.warning( + "No NSB value could be evaluated: check the observation logs (observation problems, car flashes...)" + ) + return + median_NSB = np.median(noise) + logger.info("\n\n") + logger.info(f"Run n. {run_number}, NSB median {median_NSB}") + + for j in range(0, len(nsb_list)): + if (median_NSB <= nsb_limit[j + 1]) & (median_NSB > nsb_limit[j]): + with open(f"nsb_LST_{nsb_list[j]}_{run_number}.txt", "a+") as f: + f.write(f"{date},{run_number},{median_NSB}\n") + if median_NSB > nsb_limit[-1]: + with open(f"nsb_LST_high_{run_number}.txt", "a+") as f: + f.write(f"{date},{run_number},{median_NSB}\n") + break + + +if __name__ == "__main__": + main() diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/__init__.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/__init__.py new file mode 100644 index 000000000..9957c9dbb --- /dev/null +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/__init__.py @@ -0,0 +1,20 @@ +from .lstchain_version import lstchain_versions, version_lstchain +from .LSTnsb import nsb +from .nsb_level import bash_scripts +from .nsb_to_h5 import collect_nsb +from .update_MAGIC_database import ( + fix_lists_and_convert, + table_magic_runs, + update_tables, +) + +__all__ = [ + "bash_scripts", + "collect_nsb", + "fix_lists_and_convert", + "lstchain_versions", + "nsb", + "table_magic_runs", + "update_tables", + "version_lstchain", +] diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/create_LST_table.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/create_LST_table.py new file mode 100644 index 000000000..9fd1c0696 --- /dev/null +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/create_LST_table.py @@ -0,0 +1,117 @@ +""" +Create a new h5 table (or upgrades an existing database by adding data collected in the time range defined by the provided begin and end dates) from the one of joint observations. + +Only the columns needed to produce the lists of LST runs to be processed are preserved, and columns are added to store NSB level (and related error code) and lstchain versions (available, last and processed) + +Usage: +$ create_LST_table (-b YYYYMMDD -e YYYYMMDD) +""" + +import argparse +import os + +import numpy as np +import pandas as pd +import yaml + +from magicctapipe.io import resource_file + + +def main(): + + """ + Main function + """ + + parser = argparse.ArgumentParser() + + parser.add_argument( + "--begin-date", + "-b", + dest="begin", + type=int, + default=0, + help="First date to update database (YYYYMMDD)", + ) + parser.add_argument( + "--end-date", + "-e", + dest="end", + type=int, + default=0, + help="End date to update database (YYYYMMDD)", + ) + + args = parser.parse_args() + config_file = resource_file("database_config.yaml") + + with open( + config_file, "rb" + ) as fc: # "rb" mode opens the file in binary format for reading + config_dict = yaml.safe_load(fc) + + out_h5 = config_dict["database_paths"]["LST"] + out_key = config_dict["database_keys"]["LST"] + + df = pd.read_hdf( + config_dict["database_paths"]["MAGIC+LST1"], + key=config_dict["database_keys"]["MAGIC+LST1"], + ) # TODO: put this file in a shared folder + df2 = pd.read_hdf( + config_dict["database_paths"]["MAGIC+LST1_bis"], + key=config_dict["database_keys"]["MAGIC+LST1_bis"], + ) # TODO: put this file in a shared folder + df = pd.concat([df, df2]).drop_duplicates(subset="LST1_run", keep="first") + if args.begin != 0: + df = df[df["DATE"].astype(int) >= args.begin] + if args.end != 0: + df = df[df["DATE"].astype(int) <= args.end] + + needed_cols = [ + "DATE", + "source", + "LST1_run", + "MAGIC_stereo", + "MAGIC_trigger", + "MAGIC_HV", + ] + df_cut = df[needed_cols] + + df_cut = df_cut.assign(nsb=np.nan) + df_cut = df_cut.assign(lstchain_versions="[]") + df_cut = df_cut.assign(last_lstchain_file="") + df_cut = df_cut.assign(processed_lstchain_file="") + df_cut = df_cut.assign(error_code_nsb=-1) + + if os.path.isfile(out_h5): + df_old = pd.read_hdf( + out_h5, + key=out_key, + ) + df_cut = pd.concat([df_old, df_cut]).drop_duplicates( + subset="LST1_run", keep="first" + ) + df_cut = df_cut.sort_values(by=["DATE", "source"]) + + df_cut = df_cut.reset_index(drop=True) + df_cols = df_cut.columns.tolist() + for col in df_cols: + if "_rc_all" in col: + df_cut[col] = df_cut[col].fillna(False) + elif "_rc" in col: + df_cut[col] = df_cut[col].fillna("{}") + + df_cut.to_hdf( + out_h5, + key=out_key, + mode="w", + min_itemsize={ + "lstchain_versions": 20, + "last_lstchain_file": 90, + "processed_lstchain_file": 90, + }, + ) + + +if __name__ == "__main__": + main() diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/lstchain_version.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/lstchain_version.py new file mode 100644 index 000000000..d3cb1935d --- /dev/null +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/lstchain_version.py @@ -0,0 +1,98 @@ +""" +Fills the lstchain_versions column of the LST database with a list of the versions of LST data which are stored on the IT cluster + +Moreover, it fills the last_lstchain_files column of the LST database with the path to the LST DL1 file processed with the last available lstchain version (the name and order of the versions to be considered is stored in the lstchain_versions variable here defined) + +Usage: +$ lstchain_version +""" + + +import glob +import os + +import pandas as pd +import yaml + +from magicctapipe.io import resource_file + +lstchain_versions = ["v0.9", "v0.10"] +__all__ = ["version_lstchain"] + + +def version_lstchain(df_LST): + """ + Evaluates (and store in the database) all the versions used to process a given file and the last version of a file + + Parameters + ---------- + df_LST : :class:`pandas.DataFrame` + Dataframe of the LST-1 observations. + """ + for i, row in df_LST.iterrows(): + + version = [] + run = row["LST1_run"] + run = format(int(run), "05d") + date = row["DATE"] + directories_version = [ + i.split("/")[-1] for i in glob.glob(f"/fefs/aswg/data/real/DL1/{date}/v*") + ] + + for vers in directories_version: + + if os.path.isfile( + f"/fefs/aswg/data/real/DL1/{date}/{vers}/tailcut84/dl1_LST-1.Run{run}.h5" + ): + if vers not in version: + version.append(vers) + + version = list(version) + df_LST.loc[i, "lstchain_versions"] = str(version) + max_version = None + + for j in range(len(lstchain_versions)): + + if lstchain_versions[j] in version: + + max_version = lstchain_versions[j] + + if max_version is None: + raise ValueError("issue with lstchain versions") + name = f"/fefs/aswg/data/real/DL1/{date}/{max_version}/tailcut84/dl1_LST-1.Run{run}.h5" + + df_LST.loc[i, "last_lstchain_file"] = name + + +def main(): + + """ + Main function + """ + config_file = resource_file("database_config.yaml") + + with open( + config_file, "rb" + ) as fc: # "rb" mode opens the file in binary format for reading + config_dict = yaml.safe_load(fc) + + LST_h5 = config_dict["database_paths"]["LST"] + LST_key = config_dict["database_keys"]["LST"] + df_LST = pd.read_hdf(LST_h5, key=LST_key) + + version_lstchain(df_LST) + + df_LST.to_hdf( + LST_h5, + key=LST_key, + mode="w", + min_itemsize={ + "lstchain_versions": 20, + "last_lstchain_file": 90, + "processed_lstchain_file": 90, + }, + ) + + +if __name__ == "__main__": + main() diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/nsb_level.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/nsb_level.py new file mode 100644 index 000000000..802b99640 --- /dev/null +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/nsb_level.py @@ -0,0 +1,210 @@ +""" +Creates bash scripts to run LSTnsb.py on all the LST runs, in the provided time range (-b, -e), by using parallel jobs. It sets error_code_nsb = NaN for these runs + +Moreover, it can modify the lstchain standard configuration file (used to evaluate NSB) by adding "use_flatfield_heuristic" = True + +Usage: +$ nsb_level (-c config.yaml -b YYYY_MM_DD -e YYYY_MM_DD) +""" + +import argparse +import glob +import json +import logging +import os +from datetime import datetime + +import numpy as np +import pandas as pd +import yaml + +from magicctapipe.io import resource_file +from magicctapipe.scripts.lst1_magic.semi_automatic_scripts.clusters import slurm_lines + +from .lstchain_version import lstchain_versions + +__all__ = ["bash_scripts"] + +logger = logging.getLogger(__name__) +logger.addHandler(logging.StreamHandler()) +logger.setLevel(logging.INFO) + + +def bash_scripts(run, date, config, env_name, cluster, lst_config): + + """Here we create the bash scripts (one per LST run) + + Parameters + ---------- + run : str + LST run number + date : str + LST date + config : str + Name of the configuration file + env_name : str + Name of the environment + cluster : str + Cluster system + lst_config : str + Configuration file lstchain + """ + if cluster != "SLURM": + logger.warning( + "Automatic processing not implemented for the cluster indicated in the config file" + ) + return + slurm = slurm_lines( + queue="long", + job_name="nsb", + out_name=f"slurm-nsb_{run}-%x.%j", + ) + lines = slurm + [ + f"conda run -n {env_name} LSTnsb -c {config} -i {run} -d {date} -l {lst_config} > nsblog_{date}_{run}_", + "${SLURM_JOB_ID}.log 2>&1 \n\n", + ] + + with open(f"nsb_{date}_run_{run}.sh", "w") as f: + f.writelines(lines) + + +def main(): + + """ + Main function + """ + + parser = argparse.ArgumentParser() + parser.add_argument( + "--config-file", + "-c", + dest="config_file", + type=str, + default="../config_auto_MCP.yaml", + help="Path to a configuration file", + ) + parser.add_argument( + "--begin-date", + "-b", + dest="begin_date", + type=str, + help="Begin date to start NSB evaluation from the database.", + ) + parser.add_argument( + "--end-date", + "-e", + dest="end_date", + type=str, + help="End date to start NSB evaluation from the database.", + ) + args = parser.parse_args() + with open( + args.config_file, "rb" + ) as f: # "rb" mode opens the file in binary format for reading + config = yaml.safe_load(f) + config_db = resource_file("database_config.yaml") + + with open( + config_db, "rb" + ) as fc: # "rb" mode opens the file in binary format for reading + config_dict = yaml.safe_load(fc) + + LST_h5 = config_dict["database_paths"]["LST"] + LST_key = config_dict["database_keys"]["LST"] + env_name = config["general"]["env_name"] + + cluster = config["general"]["cluster"] + + df_LST = pd.read_hdf( + LST_h5, + key=LST_key, + ) + lstchain_v = config["general"]["LST_version"] + lstchain_modified = config["general"]["lstchain_modified_config"] + conda_path = os.environ["CONDA_PREFIX"] + lst_config_orig = ( + str(conda_path) + + "/lib/python3.11/site-packages/lstchain/data/lstchain_standard_config.json" + ) + with open(lst_config_orig, "r") as f_lst: + lst_dict = json.load(f_lst) + if lstchain_modified: + lst_dict["source_config"]["LSTEventSource"]["use_flatfield_heuristic"] = True + with open("lstchain.json", "w+") as outfile: + json.dump(lst_dict, outfile) + lst_config = "lstchain.json" + + min = datetime.strptime(args.begin_date, "%Y_%m_%d") + max = datetime.strptime(args.end_date, "%Y_%m_%d") + lst = pd.to_datetime(df_LST["DATE"].str.replace("_", "-")) + df_LST["date"] = lst + df_LST = df_LST[df_LST["date"] >= min] + df_LST = df_LST[df_LST["date"] <= max] + + df_LST = df_LST.drop(columns="date") + + print("***** Generating bashscripts...") + for i, row in df_LST.iterrows(): + + list_v = [eval(i) for i in row["lstchain_versions"].strip("][").split(", ")] + + if str(lstchain_v) not in list_v: + continue + + common_v = [value for value in lstchain_versions if value in list_v] + + max_common = common_v[-1] + + if lstchain_v != str(max_common): + + continue + + run_number = row["LST1_run"] + date = row["DATE"] + + df_LST.loc[ + i, "processed_lstchain_file" + ] = f"/fefs/aswg/data/real/DL1/{date}/{max_common}/tailcut84/dl1_LST-1.Run{run_number}.h5" + df_LST.loc[i, "error_code_nsb"] = np.nan + + bash_scripts(run_number, date, args.config_file, env_name, cluster, lst_config) + + print("Process name: nsb") + print("To check the jobs submitted to the cluster, type: squeue -n nsb") + list_of_bash_scripts = np.sort(glob.glob("nsb_*_run_*.sh")) + + if len(list_of_bash_scripts) < 1: + logger.warning( + "No bash script has been produced to evaluate the NSB level for the provided LST runs. Please check the input dates" + ) + return + print("Update database and launch jobs") + df_old = pd.read_hdf( + LST_h5, + key=LST_key, + ) + df_LST = pd.concat([df_LST, df_old]).drop_duplicates( + subset="LST1_run", keep="first" + ) + df_LST = df_LST.sort_values(by=["DATE", "source", "LST1_run"]) + + launch_jobs = "" + for n, run in enumerate(list_of_bash_scripts): + launch_jobs += (" && " if n > 0 else "") + f"sbatch {run}" + + os.system(launch_jobs) + + df_LST.to_hdf( + LST_h5, + key=LST_key, + mode="w", + min_itemsize={ + "lstchain_versions": 20, + "last_lstchain_file": 90, + "processed_lstchain_file": 90, + }, + ) + + +if __name__ == "__main__": + main() diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/nsb_to_h5.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/nsb_to_h5.py new file mode 100644 index 000000000..27a9bc0cd --- /dev/null +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/nsb_to_h5.py @@ -0,0 +1,106 @@ +""" +Script to fill the 'nsb' column of the LST database by using the txt files produced by nsb_level + +It also fills the error_code_nsb column by 0 if the NSB could be evaluated and is < 3.0, by 2 if the NSB is > 3.0 and by 1 if the NSB could not be evaluated (NSB = NaN) + +Usage: +$ nsb_to_h5 +""" + +import glob +import logging + +import numpy as np +import pandas as pd +import yaml + +from magicctapipe.io import resource_file + +__all__ = ["collect_nsb"] + +logger = logging.getLogger(__name__) +logger.addHandler(logging.StreamHandler()) +logger.setLevel(logging.INFO) + + +def collect_nsb(df_LST): + """ + Here we collect NSB values from txt files and store them into the dataframe + + Parameters + ---------- + df_LST : :class:`pandas.DataFrame` + Dataframe collecting the LST1 runs (produced by the create_LST_table script) + + Returns + ------- + :class:`pandas.DataFrame` + Same dataframe as the input one, but with NSB values added in the 'nsb' column (for the runs processed by nsb_level.py) + """ + nsb_files = glob.glob("nsb_LST_*.txt") + df_LST = df_LST.set_index("LST1_run") + for file_nsb in nsb_files: + run = file_nsb.split("_")[3] + run = run.split(".")[0] + nsb = np.nan + with open(file_nsb) as ff: + line_str = ff.readline().rstrip("\n") + nsb = line_str.split(",")[2] + + df_LST.loc[run, "nsb"] = float(nsb) + df_LST = df_LST.reset_index() + df_LST = df_LST[ + [ + "DATE", + "source", + "LST1_run", + "MAGIC_stereo", + "MAGIC_trigger", + "MAGIC_HV", + "nsb", + "lstchain_versions", + "last_lstchain_file", + "processed_lstchain_file", + "error_code_nsb", + ] + ] + return df_LST + + +def main(): + + """ + Main function + """ + config_file = resource_file("database_config.yaml") + + with open( + config_file, "rb" + ) as fc: # "rb" mode opens the file in binary format for reading + config_dict = yaml.safe_load(fc) + + LST_h5 = config_dict["database_paths"]["LST"] + LST_key = config_dict["database_keys"]["LST"] + df_LST = pd.read_hdf( + LST_h5, + key=LST_key, + ) + + df_new = collect_nsb(df_LST) + + df_new = df_new.sort_values(by=["DATE", "source", "LST1_run"]) + + df_new.loc[df_new["error_code_nsb"].isna(), "error_code_nsb"] = "1" + + df_new.loc[df_new["nsb"].notna(), "error_code_nsb"] = "0" + df_new.loc[df_new["nsb"] > 3.0, "error_code_nsb"] = "2" + + df_new.to_hdf( + LST_h5, + key=LST_key, + mode="w", + ) + + +if __name__ == "__main__": + main() diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/update_MAGIC_database.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/update_MAGIC_database.py new file mode 100644 index 000000000..1c0e297ff --- /dev/null +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/database_production/update_MAGIC_database.py @@ -0,0 +1,329 @@ +""" +The script updates the common MAGIC database from a given time range. +At the moment, to avoid accidentally destroying the previous database, +we save the updated database as a new file (see in main function new_h5_file_path). +If the path to the previous database is not found, +the script creates a new one. The start of the time interval +is the date of the beginning of the common MAGIC+LST1 observations. +The end of the time interval is the current date. + +The MAGIC files that can be used for analysis are located in the IT cluster +in the following directory: +/fefs/onsite/common/MAGIC/data/M{tel_id}/event/Calibrated/{YYYY}/{MM}/{DD} + +In this path, 'tel_id' refers to the telescope ID, which must be either 1 or 2. +'YYYY', 'MM', and 'DD' specify the date. +""" + +import argparse +import os +from datetime import datetime, timedelta + +import numpy as np +import pandas as pd +import yaml + +from magicctapipe.io import resource_file + +__all__ = ["fix_lists_and_convert", "table_magic_runs", "update_tables"] + + +def fix_lists_and_convert(cell): + """ + An additional function necessary to organize lists in the function table_magic_runs. + The function remove brackets to avoid double lists and split on "][". + + Parameters + ----------- + cell : str + List of MAGIC runs from the date and the source. + + Returns + ------- + list + A list of unique integers representing the MAGIC runs. + """ + + parts = cell.replace("][", ",").strip("[]").split(",") + return list(dict.fromkeys(int(item) for item in parts)) + + +def table_magic_runs(df, date_min, date_max): + + """ + Generate a table with data filtered by the specified date range. + + Parameters + ---------- + df : pandas.DataFrame + DataFrame with general information about MAGIC+LST1 observations. + date_min : str + Start of the time interval (in LST convention, format YYYYMMDD). + date_max : str + End of the time interval (in LST convention, format YYYYMMDD). + + Returns + ------- + pandas.DataFrame + A DataFrame filtered by the specified date range. + """ + + df_selected_data = df[["DATE", "source", "MAGIC_runs"]] + grouped_data = df_selected_data.groupby(["DATE", "source"]) + result_table = [] + + for (date, source), group in grouped_data: + if date >= date_min and date <= date_max: + runs_combined = group["MAGIC_runs"].sum() + + result_table.append( + {"DATE": date, "source": source, "Run ID": runs_combined} + ) + + result = pd.DataFrame(result_table) + result["Run ID"] = result["Run ID"].apply(fix_lists_and_convert) + result_exploded = result.explode("Run ID") + result_exploded_reset = result_exploded.reset_index(drop=True) + return result_exploded_reset + + +def update_tables(database, DF, tel_id): + """ + Updating the MAGIC database by comparison of data that are only in + common MAGIC+LST1 database and not in the MAGIC database. + Then, the function checks existing files and counts number of subruns. + Data are added chronologically. + + The updated table DF may include new rows that contain NaN values in some cells. + The function automatically filling NaN values with predefined default values + based on the column's data type. + + Parameters + ----------- + database : pandas.DataFrame + Table with informations about MAGIC runs from the date and the source from given time interval. + DF : pandas.DataFrame + The previous MAGIC database which we want to update. + tel_id : int + The telescope ID, which must be either 1 or 2. + + Returns + ------- + pandas.DataFrame + A DataFrame with updated MAGIC database. + """ + + database["DATE"] = database["DATE"].astype(str) + DF["DATE"] = DF["DATE"].astype(str) + columns_to_compare = ["DATE", "source", "Run ID"] + merged_df = pd.merge( + database, + DF[columns_to_compare], + on=columns_to_compare, + how="left", + indicator=True, + ) + non_matching_rows = merged_df[merged_df["_merge"] == "left_only"].drop( + columns=["_merge"] + ) + + if non_matching_rows.empty: + raise Exception("There is no un-updated data for a given time interval.") + else: + + non_matching_rows_reset = non_matching_rows.reset_index(drop=True) + new_rows = [] + + for _, (date, source, run_id) in non_matching_rows_reset[ + ["DATE", "source", "Run ID"] + ].iterrows(): + + run_id = str(run_id) + date_obj = datetime.strptime(date, "%Y%m%d") + date_obj += timedelta(days=1) + new_date = datetime.strftime(date_obj, "%Y%m%d") + YYYY = new_date[:4] + MM = new_date[4:6] + DD = new_date[6:8] + Y = "_Y_" + + path = f"/fefs/onsite/common/MAGIC/data/M{tel_id}/event/Calibrated/{YYYY}/{MM}/{DD}" + + if os.path.exists(path): + files = os.listdir(path) + count_with_run_id = 0 + # Counter for files that include the run_id. + for filename in files: + if Y in filename: + if new_date in filename: + if source in filename: + if run_id in filename: + count_with_run_id += 1 + if count_with_run_id != 0: + new_rows.append( + { + "DATE": date, + "source": source, + "Run ID": run_id, + "number of subruns": count_with_run_id, + } + ) + + new_rows = pd.DataFrame(new_rows) + new_rows["DATE"] = pd.to_datetime(new_rows["DATE"]) + combined_df = pd.concat([DF, new_rows], ignore_index=True) + combined_df["DATE"] = pd.to_datetime(combined_df["DATE"], errors="coerce") + combined_df = combined_df.sort_values("DATE") + + combined_df["DATE"] = combined_df["DATE"].dt.strftime("%Y%m%d") + combined_df["DATE"] = combined_df["DATE"].astype(int) + combined_df["number of subruns"] = combined_df["number of subruns"].astype(int) + combined_df["Run ID"] = combined_df["Run ID"].astype(int) + combined_df.reset_index(drop=True, inplace=True) + + for column in combined_df.columns[4:]: + combined_df[column] = combined_df[column].replace( + r"^\s*$", np.nan, regex=True + ) + not_null_data = combined_df[column].dropna() + if not_null_data.empty: + continue # Skip if all values are NaN + + inferred_type = pd.api.types.infer_dtype(not_null_data, skipna=True) + + if inferred_type == "boolean": + default_value = False + elif inferred_type == "integer": + default_value = 0 + elif inferred_type == "floating": + default_value = 0.0 + elif inferred_type == "string": + default_value = "NaN" + else: + continue + + combined_df[column] = ( + combined_df[column] + .fillna(default_value) + .astype(type(not_null_data.iloc[0])) + ) + + combined_df = combined_df.infer_objects() + + return combined_df + + +def main(): + + """Main function.""" + + parser = argparse.ArgumentParser() + + date_min_default = "20191101" + current_datetime = datetime.now() + date_max_default = current_datetime.strftime("%Y%m%d") + + parser.add_argument( + "--date-min", + "-m", + dest="date_min", + type=str, + default=date_min_default, + help="Start of the time interval (in LST convention, format YYYYMMDD).", + ) + + parser.add_argument( + "--date-max", + "-M", + dest="date_max", + type=str, + default=date_max_default, + help="End of the time interval (in LST convention, format YYYYMMDD).", + ) + + args = parser.parse_args() + + config = resource_file("database_config.yaml") + + with open(config, "rb") as bf: + config_dict = yaml.safe_load(bf) + + df_path = config_dict["database_paths"]["MAGIC+LST1"] + df_key = config_dict["database_keys"]["MAGIC+LST1"] + df = pd.read_hdf(df_path, key=df_key) + + # Set "" to generate a new database. + previous_database_path = config_dict["database_paths"]["MAGIC"] + + tel_id = [1, 2] + + file_exists = os.path.exists(previous_database_path) + + new_h5_file_path = "/fefs/aswg/workspace/joanna.wojtowicz/data/Common_MAGIC_LST1_data_MAGIC_runs_subruns_UPDATED.h5" + + if file_exists: + + date_min = args.date_min + date_max = args.date_max + + print("Updating database...") + + database = table_magic_runs(df, date_min, date_max) + + for tel in tel_id: + + dat_key = "MAGIC-" + "I" * tel + key = config_dict["database_keys"][dat_key] + + DF = pd.read_hdf( + previous_database_path, + key=key, + ) + + updated_df = update_tables(database, DF, tel) + print(updated_df) + + try: + updated_df.to_hdf( + new_h5_file_path, + key=key, + mode=("w" if tel == 1 else "a"), + format="table", + ) + print(f"File saved successfully at {new_h5_file_path}") + + except Exception as e: + print(f"An error occurred: {e}") + + else: + print("Database does not exist. Creating a new database...") + + database_default = table_magic_runs(df, date_min_default, date_max_default) + + for tel in tel_id: + + dat_key = "MAGIC-" + "I" * tel + key = config_dict["database_keys"][dat_key] + + # an empty table filled by NaN + DF_empty = pd.DataFrame(columns=["DATE", "source", "Run ID"]) + + new_database = update_tables(database_default, DF_empty, tel) + print(new_database) + + try: + new_database.to_hdf( + new_h5_file_path, + key=key, + mode=("w" if tel == 1 else "a"), + format="table", + ) + + print(f"File saved successfully at {new_h5_file_path}") + + except Exception as e: + print(f"An error occurred: {e}") + + +if __name__ == "__main__": + main() diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/dl1_production.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/dl1_production.py new file mode 100644 index 000000000..bfd17707e --- /dev/null +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/dl1_production.py @@ -0,0 +1,275 @@ +""" +This script facilitates the usage of +"magic_calib_to_dl1.py". This script is more like a +"manager" that organizes the analysis process by: +1) Creating the necessary directories and subdirectories. +2) Generating all the bash script files that convert the +MAGIC files from Calibrated (`_Y_`) to DL1. +3) Launching these jobs in the IT container. + +Notice that in this stage we only use MAGIC data. +No LST data is used here. + +Standard usage: +$ dl1_production (-c config_file.yaml) +""" +import argparse +import glob +import logging +import os +from pathlib import Path + +import joblib +import numpy as np +import yaml + +from magicctapipe import __version__ +from magicctapipe.io import resource_file +from magicctapipe.scripts.lst1_magic.semi_automatic_scripts.clusters import ( + rc_lines, + slurm_lines, +) + +__all__ = [ + "config_file_gen", + "lists_and_bash_gen_MAGIC", + "directories_generator_real", +] + +logger = logging.getLogger(__name__) +logger.addHandler(logging.StreamHandler()) +logger.setLevel(logging.INFO) + + +def config_file_gen(target_dir, source_name, config_file): + + """ + Here we create the configuration file needed for transforming DL0 into DL1 + + Parameters + ---------- + target_dir : path + Directory to store the results + source_name : str + Name of the target source + config_file : str + Path to MCP configuration file (e.g., resources/config.yaml) + """ + + if config_file == "": + config_file = resource_file("config.yaml") + with open( + config_file, "rb" + ) as fc: # "rb" mode opens the file in binary format for reading + config_dict = yaml.safe_load(fc) + + conf = { + "mc_tel_ids": config_dict["mc_tel_ids"], + "LST": config_dict["LST"], + "MAGIC": config_dict["MAGIC"], + } + + file_name = f"{target_dir}/v{__version__}/{source_name}/config_DL0_to_DL1.yaml" + with open(file_name, "w") as f: + yaml.dump(conf, f, default_flow_style=False) + + +def lists_and_bash_gen_MAGIC( + target_dir, telescope_ids, MAGIC_runs, source, env_name, cluster +): + + """ + Below we create bash scripts that link the MAGIC data paths to each subdirectory and process them from Calibrated to Dl1 + + Parameters + ---------- + target_dir : str + Directory to store the results + telescope_ids : list + List of the telescope IDs (set by the user) + MAGIC_runs : array + MAGIC dates and runs to be processed + source : str + Name of the target + env_name : str + Name of the environment + cluster : str + Cluster system + """ + if cluster != "SLURM": + logger.warning( + "Automatic processing not implemented for the cluster indicated in the config file" + ) + return + process_name = source + lines = slurm_lines( + queue="short", + job_name=process_name, + out_name=f"{target_dir}/v{__version__}/{source}/DL1/slurm-linkMAGIC-%x.%j", + ) + + with open(f"{source}_linking_MAGIC_data_paths.sh", "w") as f: + f.writelines(lines) + for i in MAGIC_runs: + for magic in [1, 2]: + # if 1 then magic is second from last, if 2 then last + if telescope_ids[magic - 3] > 0: + lines = [ + f'export IN1=/fefs/onsite/common/MAGIC/data/M{magic}/event/Calibrated/{i[0].replace("_","/")}\n', + f"export OUT1={target_dir}/v{__version__}/{source}/DL1/M{magic}/{i[0]}/{i[1]}/logs \n", + f"ls $IN1/*{i[1][-2:]}.*_Y_*.root > $OUT1/list_cal.txt\n\n", + ] + f.writelines(lines) + + for magic in [1, 2]: + # if 1 then magic is second from last, if 2 then last + if telescope_ids[magic - 3] > 0: + for i in MAGIC_runs: + number_of_nodes = glob.glob( + f'/fefs/onsite/common/MAGIC/data/M{magic}/event/Calibrated/{i[0].replace("_","/")}/*{i[1]}.*_Y_*.root' + ) + number_of_nodes = len(number_of_nodes) - 1 + if number_of_nodes < 0: + continue + slurm = slurm_lines( + queue="short", + job_name=process_name, + array=number_of_nodes, + mem="2g", + out_name=f"{target_dir}/v{__version__}/{source}/DL1/M{magic}/{i[0]}/{i[1]}/logs/slurm-%x.%A_%a", + ) + rc = rc_lines( + store="$SAMPLE ${SLURM_ARRAY_JOB_ID} ${SLURM_ARRAY_TASK_ID}", + out="$OUTPUTDIR/logs/list", + ) + lines = ( + slurm + + [ + f"export OUTPUTDIR={target_dir}/v{__version__}/{source}/DL1/M{magic}/{i[0]}/{i[1]}\n", + "SAMPLE_LIST=($(<$OUTPUTDIR/logs/list_cal.txt))\n", + "SAMPLE=${SAMPLE_LIST[${SLURM_ARRAY_TASK_ID}]}\n\n", + "export LOG=$OUTPUTDIR/logs/real_0_1_task_${SLURM_ARRAY_JOB_ID}_${SLURM_ARRAY_TASK_ID}.log\n", + f"conda run -n {env_name} magic_calib_to_dl1 --input-file $SAMPLE --output-dir $OUTPUTDIR --config-file {target_dir}/v{__version__}/{source}/config_DL0_to_DL1.yaml >$LOG 2>&1\n", + ] + + rc + ) + with open( + f"{source}_MAGIC-" + "I" * magic + f"_cal_to_dl1_run_{i[1]}.sh", + "w", + ) as f: + f.writelines(lines) + + +def directories_generator_real(target_dir, telescope_ids, MAGIC_runs, source_name): + """ + Here we create all subdirectories for a given workspace and target name. + + Parameters + ---------- + target_dir : str + Directory to store the results + telescope_ids : list + List of the telescope IDs (set by the user) + MAGIC_runs : array + MAGIC dates and runs to be processed + source_name : str + Name of the target source + """ + + dl1_dir = str(f"{target_dir}/v{__version__}/{source_name}/DL1") + os.makedirs(dl1_dir, exist_ok=True) + + ########################################### + # MAGIC + ########################################### + for i in MAGIC_runs: + for magic in [1, 2]: + if telescope_ids[magic - 3] > 0: + os.makedirs(f"{dl1_dir}/M{magic}/{i[0]}/{i[1]}/logs", exist_ok=True) + + +def main(): + + """ + Main function + """ + + # Here we are simply collecting the parameters from the command line, as input file, output directory, and configuration file + + parser = argparse.ArgumentParser() + + parser.add_argument( + "--config-file", + "-c", + dest="config_file", + type=str, + default="./config_auto_MCP.yaml", + help="Path to a configuration file", + ) + + args = parser.parse_args() + with open( + args.config_file, "rb" + ) as f: # "rb" mode opens the file in binary format for reading + config = yaml.safe_load(f) + + telescope_ids = list(config["mc_tel_ids"].values()) + env_name = config["general"]["env_name"] + config_file = config["general"]["base_config_file"] + source_in = config["data_selection"]["source_name_database"] + source = config["data_selection"]["source_name_output"] + cluster = config["general"]["cluster"] + target_dir = Path(config["directories"]["workspace_dir"]) + + if source_in is None: + source_list = joblib.load("list_sources.dat") + + else: + if source is None: + source = source_in + source_list = [source] + + for source_name in source_list: + MAGIC_runs_and_dates = f"{source_name}_MAGIC_runs.txt" + MAGIC_runs = np.genfromtxt( + MAGIC_runs_and_dates, dtype=str, delimiter=",", ndmin=2 + ) # READ LIST OF DATES AND RUNS: format table where each line is like "2020_11_19,5093174" + + print("*** Converting Calibrated into DL1 data ***") + print(f"Process name: {source_name}") + print( + f"To check the jobs submitted to the cluster, type: squeue -n {source_name}" + ) + + directories_generator_real( + str(target_dir), telescope_ids, MAGIC_runs, source_name + ) # Here we create all the necessary directories in the given workspace and collect the main directory of the target + config_file_gen(target_dir, source_name, config_file) + + # Below we run the analysis on the MAGIC data + + lists_and_bash_gen_MAGIC( + target_dir, + telescope_ids, + MAGIC_runs, + source_name, + env_name, + cluster, + ) # MAGIC real data + if (telescope_ids[-2] > 0) or (telescope_ids[-1] > 0): + list_of_MAGIC_runs = glob.glob(f"{source_name}_MAGIC-*.sh") + if len(list_of_MAGIC_runs) < 1: + logger.warning( + "No bash script has been produced. Please check the provided MAGIC_runs.txt and the MAGIC calibrated data" + ) + continue + + launch_jobs = f"linking=$(sbatch --parsable {source_name}_linking_MAGIC_data_paths.sh)" + for n, run in enumerate(list_of_MAGIC_runs): + launch_jobs = f"{launch_jobs} && RES{n}=$(sbatch --parsable --dependency=afterany:$linking {run})" + + os.system(launch_jobs) + + +if __name__ == "__main__": + main() diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/job_accounting.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/job_accounting.py new file mode 100644 index 000000000..70ecf2120 --- /dev/null +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/job_accounting.py @@ -0,0 +1,325 @@ +""" +This script does checks of status of jobs based on the log files generated during the execution. +It also does accounting of memory and CPU usage +It loads the config_auto_MCP file to figure out what files it should look for and processes source name and time range +For the moment it ignores date_list and skip_*_runs + +It can also update the h5 file with the list of runs to process +""" +import argparse +import glob +import json +import os +import re +from datetime import datetime, timedelta +from subprocess import PIPE, run + +import numpy as np +import pandas as pd +import yaml + +from magicctapipe import __version__ + +GREEN = "\033[32m" +YELLOW = "\033[33m" +RED = "\033[31m" +ENDC = "\033[0m" + +__all__ = ["run_shell"] + + +def run_shell(command): + """ + Simple function to extract the output of a command run in a shell + + Parameters + ---------- + command : str + Command to be executed + + Returns + ---------- + list + List of lines returned by the program + """ + result = run(command, stdout=PIPE, stderr=PIPE, shell=True, universal_newlines=True) + return result.stdout + + +def main(): + """ + Function counts the number of jobs that should have been submitted, and checks the output of the logs to see how many finished successfully, and how many failed. + """ + parser = argparse.ArgumentParser() + + parser.add_argument( + "--config-file", + "-c", + dest="config_file", + type=str, + default="./config_auto_MCP.yaml", + help="Path to a configuration file config_auto_MCP.yaml", + ) + + parser.add_argument( + "--data-level", + "-d", + dest="data_level", + type=str, + default="DL1/M1", + help="Data level to be checked", + ) + + parser.add_argument( + "--version", + "-v", + dest="version", + type=str, + default=__version__, + help="MCP version (used for subdirectory name)", + ) + + parser.add_argument( + "--no-accounting", + action="store_true", + help="No CPU/Memory usage check (faster)", + ) + + parser.add_argument( + "--run-list-file", + "-r", + dest="run_list", + type=str, + default=None, + help="h5 file with run list", + ) + + args = parser.parse_args() + with open(args.config_file, "r") as f: + config = yaml.safe_load(f) + + if args.run_list is not None: + try: + h5key = "joint_obs" + run_key = "LST1_run" + ismagic = False + for magic in [1, 2]: + if args.data_level[-2:] == f"M{magic}": + h5key = f"MAGIC{magic}/runs_M{magic}" + run_key = "Run ID" + ismagic = True + + h5runs = pd.read_hdf(args.run_list, key=h5key) + except (FileNotFoundError, KeyError): + print(f"Cannot open {h5key} in {args.run_list}") + exit(1) + + rc_col = "DL1_rc" if ismagic else args.data_level + "_rc" + + if rc_col not in h5runs.keys(): + h5runs[rc_col] = "{}" + h5runs[rc_col + "_all"] = None + + rc_dicts = {} + for rrun, dct in np.array(h5runs[[run_key, rc_col]]): + rc_dicts[rrun] = json.loads(dct) + + # TODO: those variables will be needed when more features are implemented + source_out = config["data_selection"]["source_name_output"] + timerange = config["data_selection"]["time_range"] + + # skip_LST = config["data_selection"]["skip_LST_runs"] + # skip_MAGIC = config["data_selection"]["skip_MAGIC_runs"] + work_dir = config["directories"]["workspace_dir"] + + print(f"Checking progress of jobs stored in {work_dir}") + if source_out is None: + source_out = "*" + + indir = f"{work_dir}/v{args.version}/{source_out}/{args.data_level}" + + dirs = [ + x.replace("/logs", "") + for x in ( + sorted( + glob.glob(f"{indir}/[0-9]*/[0-9]*/logs") + + glob.glob(f"{indir}/[0-9]*/logs") + ) + ) + ] + + if dirs == []: + versions = [x.split("/v")[-1] for x in glob.glob(f"{work_dir}/v*")] + print("Error, no directories found") + print(f"for path {work_dir} found in {args.config_file} this is available") + print(f"Versions {versions}") + + print( + "Supported data types: DL1/M1, DL1/M2, DL1/Merged, DL1Coincident, DL1Stereo, DL1Stereo/Merged" + ) + exit(1) + + if timerange: + timemin = str(config["data_selection"]["min"]) + timemax = str(config["data_selection"]["max"]) + timemin = datetime.strptime(timemin, "%Y_%m_%d") + timemax = datetime.strptime(timemax, "%Y_%m_%d") + + all_todo = 0 + all_return = 0 + all_good = 0 + all_cpu = [] + all_mem = [] + total_time = 0 + all_jobs = [] + for dir in dirs: + this_date_str = re.sub(f".+/{args.data_level}/", "", dir) + this_date_str = re.sub(r"\D", "", this_date_str.split("/")[0]) + this_date = datetime.strptime(this_date_str, "%Y%m%d") + if timerange and (this_date < timemin or this_date > timemax): + continue + + print(dir) + list_dl0 = "" + ins = ["list_dl0.txt", "list_LST.txt", "list_coin.txt", "list_cal.txt"] + + for file in ins: + if os.path.exists(f"{dir}/logs/{file}"): + list_dl0 = f"{dir}/logs/{file}" + if list_dl0 != "": + with open(list_dl0, "r") as fp: + this_todo = len(fp.readlines()) + else: + print(f"{RED}No {ins} files {ENDC}") + this_todo = 0 + + list_return = f"{dir}/logs/list_return.log" + this_good = 0 + this_cpu = [] + this_mem = [] + try: + with open(list_return, "r") as fp: + returns = fp.readlines() + this_return = len(returns) + for line in returns: + line = line.split() + file_in = line[0] + slurm_id = f"{line[1]}_{line[2]}" if len(line) == 4 else line[1] + rc = line[-1] + + if args.run_list is not None: + if ismagic: + run_subrun = file_in.split("/")[-1].split("_")[2] + this_run = int(run_subrun.split(".")[0]) + this_subrun = int(run_subrun.split(".")[1]) + else: + filename = file_in.split("/")[-1] + this_run = filename.split(".")[1].replace("Run", "") + this_subrun = int(filename.split(".")[2]) + + rc_dicts[this_run][str(this_subrun)] = rc + + if rc == "0": + this_good += 1 + # now check accounting + if not args.no_accounting: + out = run_shell( + f'sacct --format="JobID,CPUTime,MaxRSS" --units=M -j {slurm_id}| tail -1' + ).split() + if len(out) == 3: + _, cpu, mem = out + elif ( + len(out) == 2 + ): # MaxRSS sometimes is missing in the output + cpu = out[1] + mem = None + else: + print("Unexpected sacct output: {out}") + if cpu is not None: + hh, mm, ss = (int(x) for x in str(cpu).split(":")) + delta = timedelta( + days=hh // 24, hours=hh % 24, minutes=mm, seconds=ss + ) + if slurm_id not in all_jobs: + total_time += delta.total_seconds() / 3600 + all_jobs += [slurm_id] + this_cpu.append(delta) + if mem is not None and mem.endswith("M"): + this_mem.append(float(mem[0:-1])) + else: + print("Memory usage information is missing") + else: + print(f"file {file_in} failed with error {rc}") + if len(this_cpu) > 0: + all_cpu += this_cpu + all_mem += this_mem + this_cpu = np.array(this_cpu) + this_mem = np.array(this_mem) + mem_info = ( + f"memory [M]: median={np.median(this_mem)}, max={this_mem.max()}" + if len(this_mem) + else "" + ) + print( + f"CPU: median={np.median(this_cpu)}, max={this_cpu.max()}; {mem_info}" + ) + + except IOError: + print(f"{RED}File {list_return} is missing{ENDC}") + this_return = 0 + + all_todo += this_todo + all_return += this_return + all_good += this_good + if this_good < this_return: + status = RED # there are errors in processing + elif this_return < this_todo: + status = YELLOW # waiting for jobs to finish (or lost jobs) + else: + status = GREEN # all done and ready + + if this_todo > 0: + print( + f"{status}to do: {this_todo}, finished: {this_return}, no errors: {this_good}{ENDC}" + ) + + print("\nSUMMARY") + if all_good < all_return: + status = RED # there are errors in processing + elif all_return < all_todo: + status = YELLOW # waiting for jobs to finish (or lost jobs) + else: + status = GREEN # all done and ready + + if all_todo > 0: + print( + f"{status}to do: {all_todo}, finished: {all_return}, no errors: {all_good}{ENDC}" + ) + + if len(all_cpu) > 0: + all_cpu = np.array(all_cpu) + all_mem = np.array(all_mem) + print( + f"CPU: median={np.median(all_cpu)}, max={all_cpu.max()}, total={total_time:.2f} CPU hrs; memory [M]: median={np.median(all_mem)}, max={all_mem.max()}" + ) + + if args.run_list is not None: + print("Updating the database") + for rrun in rc_dicts.keys(): + idx = h5runs[run_key] == rrun + h5runs.loc[idx, rc_col] = json.dumps(rc_dicts[rrun]) + if ismagic: + all_subruns = np.array(h5runs[idx]["number of subruns"])[0] + else: + all_subruns = len(rc_dicts[rrun]) + good_subruns = sum(np.array(list(rc_dicts[rrun].values())) == "0") + isgood = np.logical_and(good_subruns == all_subruns, good_subruns > 0) + h5runs.loc[idx, rc_col + "_all"] = isgood + + # fixme: for DL1/M[12] files since htere are two dataframes in the file, we need to append it + # and this causes increase in the file size every time the file is updated + h5runs.to_hdf(args.run_list, key=h5key, mode="r+") + + +if __name__ == "__main__": + main() diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/list_from_h5.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/list_from_h5.py new file mode 100644 index 000000000..f847fc714 --- /dev/null +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/list_from_h5.py @@ -0,0 +1,315 @@ +""" +This script creates the lists of MAGIC and LST runs (date and run number) from a dataframe in the .h5 format for a specific time range (or specific dates). +""" + +import argparse +import os +from datetime import datetime + +import joblib +import numpy as np +import pandas as pd +import yaml + +from magicctapipe.io import resource_file + +__all__ = ["split_lst_date", "magic_date", "clear_files", "list_run"] + + +def split_lst_date(df): + + """ + This function appends to the provided dataframe, which contains the LST date as YYYYMMDD in one of the columns, four new columns: the LST year, month and day and the date as YYYY-MM-DD + + Parameters + ---------- + df : :class:`pandas.DataFrame` + Dataframe of the joint MAGIC+LST-1 observations based on the .h5 table. + + Returns + ------- + :class:`pandas.DataFrame` + The input dataframe with four added columns. + """ + + date = df["DATE"] + df["YY_LST"] = date.str[:4] + df["MM_LST"] = date.str[4:6] + df["DD_LST"] = date.str[6:8] + df["date_LST"] = df["YY_LST"] + "-" + df["MM_LST"] + "-" + df["DD_LST"] + return df + + +def magic_date(df): + + """ + This function appends to the provided dataframe (which contains the LST date, year, month and day) a column with the MAGIC dates (in the YYYYMMDD format). + + Parameters + ---------- + df : :class:`pandas.DataFrame` + Dataframe of the joint MAGIC+LST-1 observations based on the .h5 table. + + Returns + ------- + :class:`pandas.DataFrame` + The input dataframe with an added column. + """ + + date_lst = pd.to_datetime(df["DATE"], format="%Y%m%d") + delta = pd.Timedelta("1 day") + date_magic = date_lst + delta + date_magic = date_magic.dt.strftime("%Y%m%d") + df["date_MAGIC"] = date_magic + return df + + +def clear_files(source_in, source_out, df_LST, df_MAGIC1, df_MAGIC2): + + """ + This function deletes any file named XXXX_LST_runs.txt and XXXX_MAGIC_runs.txt from the working directory. + + Parameters + ---------- + source_in : str + Target name in the database. If None, it stands for all the sources observed in a pre-set time interval. + source_out : str + Name tag for the target. Used only if source_in is not None. + df_LST : :class:`pandas.DataFrame` + LST-1 dataframe of the joint MAGIC+LST-1 observations. + df_MAGIC1 : :class:`pandas.DataFrame` + MAGIC-1 dataframe of the joint MAGIC+LST-1 observations. + df_MAGIC2 : :class:`pandas.DataFrame` + MAGIC-2 dataframe of the joint MAGIC+LST-1 observations. + """ + + source_list = [] + if source_in is None: + source_list = np.intersect1d( + np.intersect1d(np.unique(df_LST["source"]), np.unique(df_MAGIC1["source"])), + np.unique(df_MAGIC2["source"]), + ) + else: + source_list.append(source_out) + + joblib.dump(source_list, "list_sources.dat") + print("Cleaning pre-existing *_LST_runs.txt and *_MAGIC_runs.txt files") + for source_name in source_list: + file_list = [ + f"{source_name}_LST_runs.txt", + f"{source_name}_MAGIC_runs.txt", + ] # The order here must be LST before MAGIC! + for j in file_list: + if os.path.isfile(j): + os.remove(j) + print(f"{j} deleted.") + + +def list_run(source_in, source_out, df, skip_LST, skip_MAGIC, is_LST, M1_run_list=None): + + """ + This function creates the *_MAGIC_runs.txt and *_LST_runs.txt files, which contain the list of runs (with corresponding dates) to be processed for a given source. + + Parameters + ---------- + source_in : str or None + Name of the source in the database of joint observations. If None, it will process all sources for the given time range. + source_out : str + Name of the source to be used in the output file name. Useful only if source_in != None. + df : :class:`pandas.DataFrame` + Dataframe of the joint MAGIC+LST-1 observations. + skip_LST : list + List of the LST runs to be ignored. + skip_MAGIC : list + List of the MAGIC runs to be ignored. + is_LST : bool + If you are looking for LST runs, set it to True. For MAGIC set False. + M1_run_list : list + If you are looking for MAGIC runs, pass the list of MAGIC-1 runs here, and the MAGIC-2 database as df. + Only the runs both in the list and in the data frame (i.e., stereo MAGIC observations) will be saved in the output txt files + """ + + source_list = [] + if source_in is None: + source_list = np.unique(df["source"]) + + else: + source_list.append(source_out) + + for source_name in source_list: + + file_list = [ + f"{source_name}_LST_runs.txt", + f"{source_name}_MAGIC_runs.txt", + ] # The order here must be LST before MAGIC! + + run_listed = [] + if source_in is None: + df_source = df[df["source"] == source_name] + print("Source: ", source_name) + else: + df_source = df[df["source"] == source_in] + print("Source: ", source_in) + + if is_LST: + print("Finding LST runs...") + if len(df_source) == 0: + print("NO LST run found. Exiting...") + continue + LST_run = df_source["LST1_run"].tolist() # List with runs as strings + LST_date = df_source["date_LST"].tolist() + for k in range(len(df_source)): + if np.isnan(LST_run[k]): + continue + + if (int(LST_run[k]) in skip_LST) or (int(LST_run[k]) in run_listed): + continue + + with open(file_list[0], "a+") as f: + f.write( + f"{LST_date[k].replace('-','_')},{str(LST_run[k]).lstrip('0')}\n" + ) + run_listed.append(int(LST_run[k])) + + if not is_LST: + print("Finding MAGIC runs...") + if len(df_source) == 0: + print("NO MAGIC run found. Exiting...") + continue + MAGIC_date = df_source["date_MAGIC"].tolist() + M2_run = df_source["Run ID"].tolist() + for k in range(len(df_source)): + if np.isnan(M2_run[k]): + continue + + if (int(M2_run[k]) in skip_MAGIC) or (int(M2_run[k]) in run_listed): + continue + if int(M2_run[k]) not in M1_run_list: + continue + + with open(file_list[1], "a+") as f: + f.write( + f"{MAGIC_date[k][0:4]}_{MAGIC_date[k][4:6]}_{MAGIC_date[k][6:8]},{int(M2_run[k])}\n" + ) + run_listed.append(int(M2_run[k])) + + +def main(): + + """ + Main function + """ + + parser = argparse.ArgumentParser() + + parser.add_argument( + "--config-file", + "-c", + dest="config_file", + type=str, + default="./config_auto_MCP.yaml", + help="Path to a configuration file config_auto_MCP.yaml", + ) + + args = parser.parse_args() + with open( + args.config_file, "rb" + ) as f: # "rb" mode opens the file in binary format for reading + config = yaml.safe_load(f) + config_db = resource_file("database_config.yaml") + + with open( + config_db, "rb" + ) as fc: # "rb" mode opens the file in binary format for reading + config_dict = yaml.safe_load(fc) + + LST_h5 = config_dict["database_paths"]["LST"] + LST_key = config_dict["database_keys"]["LST"] + MAGIC_h5 = config_dict["database_paths"]["MAGIC"] + MAGIC1_key = config_dict["database_keys"]["MAGIC-I"] + MAGIC2_key = config_dict["database_keys"]["MAGIC-II"] + source_in = config["data_selection"]["source_name_database"] + source_out = config["data_selection"]["source_name_output"] + if (source_out is None) and (source_in is not None): + source_out = source_in + range = config["data_selection"]["time_range"] + skip_LST = config["data_selection"]["skip_LST_runs"] + skip_MAGIC = config["data_selection"]["skip_MAGIC_runs"] + + df_LST = pd.read_hdf( + LST_h5, + key=LST_key, + ) # TODO: put this file in a shared folder + df_LST.dropna(subset=["LST1_run"], inplace=True) + df_LST = split_lst_date(df_LST) + df_LST = df_LST.astype( + {"YY_LST": int, "MM_LST": int, "DD_LST": int, "nsb": float, "LST1_run": int} + ) + + stereo = True + lstchain_version = config["general"]["LST_version"] + + processed_v = df_LST["processed_lstchain_file"].str.split("/").str[-3] + + mask = processed_v == lstchain_version + df_LST = df_LST[mask] + + if source_in is None: + df_LST.query( + f'MAGIC_trigger=="L3T" & MAGIC_HV=="Nominal" & MAGIC_stereo == {stereo} & error_code_nsb=="0"', + inplace=True, + ) + else: + df_LST.query( + f'source=="{source_in}"& MAGIC_trigger=="L3T" & MAGIC_HV=="Nominal" & MAGIC_stereo == {stereo} & error_code_nsb=="0"', + inplace=True, + ) + + if range: + min = str(config["data_selection"]["min"]) + max = str(config["data_selection"]["max"]) + min = datetime.strptime(min, "%Y_%m_%d") + max = datetime.strptime(max, "%Y_%m_%d") + lst = pd.to_datetime(df_LST["date_LST"].str.replace("_", "-")) + df_LST["date"] = lst + + df_LST = df_LST[df_LST["date"] >= min] + df_LST = df_LST[df_LST["date"] <= max] + + else: + dates = config["data_selection"]["date_list"] + df_LST = df_LST[df_LST["date_LST"].isin(dates)] + + df_LST = df_LST.reset_index() + df_LST = df_LST.drop("index", axis=1) + df_MAGIC1 = pd.read_hdf( + MAGIC_h5, + key=MAGIC1_key, + ) + df_MAGIC2 = pd.read_hdf( + MAGIC_h5, + key=MAGIC2_key, + ) + + list_date_LST = np.unique(df_LST["date_LST"]) + list_date_LST_low = [int(sub.replace("-", "")) for sub in list_date_LST] + + df_MAGIC1 = df_MAGIC1[df_MAGIC1["DATE"].isin(list_date_LST_low)] + df_MAGIC2 = df_MAGIC2[df_MAGIC2["DATE"].isin(list_date_LST_low)] + + clear_files(source_in, source_out, df_LST, df_MAGIC1, df_MAGIC2) + + list_run(source_in, source_out, df_LST, skip_LST, skip_MAGIC, True) + + df_MAGIC2 = magic_date(df_MAGIC2) + df_MAGIC1 = magic_date(df_MAGIC1) + + M1_runs = df_MAGIC1["Run ID"].tolist() + if (len(M1_runs) == 0) or (len(df_MAGIC2) == 0): + print("NO MAGIC stereo run found. Exiting...") + return + list_run(source_in, source_out, df_MAGIC2, skip_LST, skip_MAGIC, False, M1_runs) + + +if __name__ == "__main__": + main() diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merge_stereo.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merge_stereo.py new file mode 100644 index 000000000..1833498fc --- /dev/null +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merge_stereo.py @@ -0,0 +1,140 @@ +""" +This scripts merges LST DL1Stereo subruns into runs + +Usage: +$ merge_stereo (-c config_file.yaml) +""" +import argparse +import glob +import logging +import os +from pathlib import Path + +import joblib +import numpy as np +import yaml + +from magicctapipe import __version__ +from magicctapipe.scripts.lst1_magic.semi_automatic_scripts.clusters import ( + rc_lines, + slurm_lines, +) + +__all__ = ["MergeStereo"] + +logger = logging.getLogger(__name__) +logger.addHandler(logging.StreamHandler()) +logger.setLevel(logging.INFO) + + +def MergeStereo(target_dir, env_name, source, cluster): + """ + This function creates the bash scripts to run merge_hdf_files.py in all DL1Stereo subruns. + + Parameters + ---------- + target_dir : str + Path to the working directory + env_name : str + Name of the environment + source : str + Name of the target + cluster : str + Cluster system + """ + + process_name = source + stereo_DL1_dir = f"{target_dir}/v{__version__}/{source}" + listOfNightsLST = np.sort(glob.glob(f"{stereo_DL1_dir}/DL1Stereo/*")) + if cluster != "SLURM": + logger.warning( + "Automatic processing not implemented for the cluster indicated in the config file" + ) + return + for nightLST in listOfNightsLST: + night = nightLST.split("/")[-1] + stereoMergeDir = f"{stereo_DL1_dir}/DL1Stereo/Merged/{night}" + os.makedirs(f"{stereoMergeDir}/logs", exist_ok=True) + + if len(glob.glob(f"{nightLST}/dl1_stereo*.h5")) < 1: + continue + + slurm = slurm_lines( + queue="short", + job_name=f"{process_name}_stereo_merge", + mem="2g", + out_name=f"{stereoMergeDir}/logs/slurm-%x.%A_%a", + ) + rc = rc_lines( + store=f"{nightLST} ${{SLURM_JOB_ID}}", out=f"{stereoMergeDir}/logs/list" + ) + os.system(f"echo {nightLST} >> {stereoMergeDir}/logs/list_dl0.txt") + lines = ( + slurm + + [ + f"conda run -n {env_name} merge_hdf_files --input-dir {nightLST} --output-dir {stereoMergeDir} --run-wise >{stereoMergeDir}/logs/merge_{night}_${{SLURM_JOB_ID}}.log 2>&1\n" + ] + + rc + ) + + with open(f"{source}_StereoMerge_{night}.sh", "w") as f: + f.writelines(lines) + + +def main(): + """ + Main function + """ + + parser = argparse.ArgumentParser() + parser.add_argument( + "--config-file", + "-c", + dest="config_file", + type=str, + default="./config_auto_MCP.yaml", + help="Path to a configuration file", + ) + + args = parser.parse_args() + with open( + args.config_file, "rb" + ) as f: # "rb" mode opens the file in binary format for reading + config = yaml.safe_load(f) + + target_dir = Path(config["directories"]["workspace_dir"]) + + env_name = config["general"]["env_name"] + + source_in = config["data_selection"]["source_name_database"] + source = config["data_selection"]["source_name_output"] + cluster = config["general"]["cluster"] + + if source_in is None: + source_list = joblib.load("list_sources.dat") + + else: + if source is None: + source = source_in + source_list = [source] + + for source_name in source_list: + + print("***** Merging DL1Stereo files run-wise...") + MergeStereo(target_dir, env_name, source_name, cluster) + + list_of_merge = glob.glob(f"{source_name}_StereoMerge_*.sh") + if len(list_of_merge) < 1: + print("Warning: no bash script has been produced") + continue + + launch_jobs = "" + + for n, run in enumerate(list_of_merge): + launch_jobs += (" && " if n > 0 else "") + f"sbatch {run}" + + os.system(launch_jobs) + + +if __name__ == "__main__": + main() diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py new file mode 100644 index 000000000..ca80d7d56 --- /dev/null +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py @@ -0,0 +1,169 @@ +""" +This script generates the bash +scripts to merge real data files by calling the script "merge_hdf_files.py": + +MAGIC: + +Merge the subruns into runs for M1 and M2 individually. + +Usage: +$ merging_runs (-c config.yaml) +""" + +import argparse +import glob +import logging +import os +from pathlib import Path + +import joblib +import numpy as np +import yaml + +from magicctapipe import __version__ +from magicctapipe.scripts.lst1_magic.semi_automatic_scripts.clusters import ( + rc_lines, + slurm_lines, +) + +__all__ = ["merge"] + +logger = logging.getLogger(__name__) +logger.addHandler(logging.StreamHandler()) +logger.setLevel(logging.INFO) + + +def merge(target_dir, MAGIC_runs, env_name, source, cluster): + + """ + This function creates the bash scripts to run merge_hdf_files.py for real data + + Parameters + ---------- + target_dir : str + Path to the working directory + MAGIC_runs : matrix of strings + Matrix of [(date,run)] n-tuples + env_name : str + Name of the environment + source : str + Target name + cluster : str + Cluster system + """ + + process_name = f"merging_{source}" + + MAGIC_DL1_dir = f"{target_dir}/v{__version__}/{source}/DL1/" + + if cluster != "SLURM": + logger.warning( + "Automatic processing not implemented for the cluster indicated in the config file" + ) + return + lines = slurm_lines( + queue="short", + job_name=process_name, + mem="2g", + out_name=f"{MAGIC_DL1_dir}/Merged/logs/slurm-%x.%j", + ) + os.makedirs(f"{MAGIC_DL1_dir}/Merged/logs", exist_ok=True) + + with open(f"{source}_Merge_MAGIC.sh", "w") as f: + f.writelines(lines) + for magic in [1, 2]: + for i in MAGIC_runs: + # Here is a difference w.r.t. original code. If only one telescope data are available they will be merged now for this telescope + indir = f"{MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]}" + if os.path.exists(f"{indir}"): + outdir = f"{MAGIC_DL1_dir}/Merged/{i[0]}" + os.makedirs(f"{outdir}/logs", exist_ok=True) + + f.write( + f"conda run -n {env_name} merge_hdf_files --input-dir {indir} --output-dir {outdir} >{outdir}/logs/merge_M{magic}_{i[0]}_{i[1]}_${{SLURM_JOB_ID}}.log 2>&1\n" + ) + rc = rc_lines( + store=f"{indir} ${{SLURM_JOB_ID}}", + out=f"{outdir}/logs/list", + ) + f.writelines(rc) + os.system(f"echo {indir} >> {outdir}/logs/list_dl0.txt") + else: + logger.error(f"{indir} does not exist") + + +def main(): + + """ + Main function + """ + + parser = argparse.ArgumentParser() + parser.add_argument( + "--config-file", + "-c", + dest="config_file", + type=str, + default="./config_auto_MCP.yaml", + help="Path to a configuration file", + ) + + args = parser.parse_args() + with open( + args.config_file, "rb" + ) as f: # "rb" mode opens the file in binary format for reading + config = yaml.safe_load(f) + + target_dir = Path(config["directories"]["workspace_dir"]) + + env_name = config["general"]["env_name"] + source_in = config["data_selection"]["source_name_database"] + source = config["data_selection"]["source_name_output"] + cluster = config["general"]["cluster"] + + if source_in is None: + source_list = joblib.load("list_sources.dat") + + else: + if source is None: + source = source_in + source_list = [source] + + for source_name in source_list: + MAGIC_runs_and_dates = f"{source_name}_MAGIC_runs.txt" + MAGIC_runs = np.genfromtxt( + MAGIC_runs_and_dates, dtype=str, delimiter=",", ndmin=2 + ) + + # Below we run the analysis on the MAGIC data + + print("***** Generating merge_MAGIC bashscripts...") + merge( + target_dir, + MAGIC_runs, + env_name, + source_name, + cluster, + ) # generating the bash script to merge the subruns + + print("***** Running merge_hdf_files.py on the MAGIC data files...") + + # Below we run the bash scripts to merge the MAGIC files + list_of_merging_scripts = np.sort(glob.glob(f"{source_name}_Merge_MAGIC*.sh")) + if len(list_of_merging_scripts) < 1: + logger.warning("No bash scripts for real data") + continue + launch_jobs = "" + for n, run in enumerate(list_of_merging_scripts): + launch_jobs += (" && " if n > 0 else "") + f"sbatch {run}" + + os.system(launch_jobs) + + print(f"Process name: merging_{source_name}") + print( + f"To check the jobs submitted to the cluster, type: squeue -n merging_{source_name}" + ) + + +if __name__ == "__main__": + main() diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/stereo_events.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/stereo_events.py new file mode 100644 index 000000000..a2f8a4eea --- /dev/null +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/stereo_events.py @@ -0,0 +1,206 @@ +""" +This scripts generates and runs the bashscripts +to compute the stereo parameters of DL1 +Coincident MAGIC+LST data files. + +Usage: +$ stereo_events (-c config.yaml) +""" + +import argparse +import glob +import logging +import os +from pathlib import Path + +import joblib +import numpy as np +import yaml + +from magicctapipe import __version__ +from magicctapipe.io import resource_file +from magicctapipe.scripts.lst1_magic.semi_automatic_scripts.clusters import ( + rc_lines, + slurm_lines, +) + +__all__ = ["configfile_stereo", "bash_stereo"] + +logger = logging.getLogger(__name__) +logger.addHandler(logging.StreamHandler()) +logger.setLevel(logging.INFO) + + +def configfile_stereo(target_dir, source_name, config_file): + + """ + This function creates the configuration file needed for the stereo reconstruction step + + Parameters + ---------- + target_dir : str + Path to the working directory + source_name : str + Name of the target source + config_file : str + Path to MCP configuration file (e.g., resources/config.yaml) + """ + + if config_file == "": + config_file = resource_file("config.yaml") + + with open( + config_file, "rb" + ) as fc: # "rb" mode opens the file in binary format for reading + config_dict = yaml.safe_load(fc) + conf = { + "mc_tel_ids": config_dict["mc_tel_ids"], + "stereo_reco": config_dict["stereo_reco"], + } + file_name = f"{target_dir}/v{__version__}/{source_name}/config_stereo.yaml" + with open(file_name, "w") as f: + + yaml.dump(conf, f, default_flow_style=False) + + +def bash_stereo(target_dir, source, env_name, cluster): + + """ + This function generates the bashscripts for running the stereo analysis. + + Parameters + ---------- + target_dir : str + Path to the working directory + source : str + Target name + env_name : str + Name of the environment + cluster : str + Cluster system + """ + + process_name = source + + coincidence_DL1_dir = f"{target_dir}/v{__version__}/{source}" + + listOfNightsLST = np.sort(glob.glob(f"{coincidence_DL1_dir}/DL1Coincident/*")) + if cluster != "SLURM": + logger.warning( + "Automatic processing not implemented for the cluster indicated in the config file" + ) + return + for nightLST in listOfNightsLST: + night = nightLST.split("/")[-1] + stereoDir = f"{coincidence_DL1_dir}/DL1Stereo/{night}" + os.makedirs(f"{stereoDir}/logs", exist_ok=True) + if not os.listdir(f"{nightLST}"): + continue + if len(os.listdir(nightLST)) < 2: + continue + + os.system( + f"ls {nightLST}/*LST*.h5 > {stereoDir}/logs/list_coin.txt" + ) # generating a list with the DL1 coincident data files. + with open(f"{stereoDir}/logs/list_coin.txt", "r") as f: + process_size = len(f.readlines()) - 1 + + if process_size < 0: + continue + + slurm = slurm_lines( + queue="short", + job_name=f"{process_name}_stereo", + array=process_size, + mem="2g", + out_name=f"{stereoDir}/logs/slurm-%x.%A_%a", + ) + rc = rc_lines( + store="$SAMPLE ${SLURM_ARRAY_JOB_ID} ${SLURM_ARRAY_TASK_ID}", + out="$OUTPUTDIR/logs/list", + ) + lines = ( + slurm + + [ + f"export INPUTDIR={nightLST}\n", + f"export OUTPUTDIR={stereoDir}\n", + "SAMPLE_LIST=($(<$OUTPUTDIR/logs/list_coin.txt))\n", + "SAMPLE=${SAMPLE_LIST[${SLURM_ARRAY_TASK_ID}]}\n", + "export LOG=$OUTPUTDIR/logs/stereo_${SLURM_ARRAY_JOB_ID}_${SLURM_ARRAY_TASK_ID}.log\n", + f"conda run -n {env_name} lst1_magic_stereo_reco --input-file $SAMPLE --output-dir $OUTPUTDIR --config-file {target_dir}/v{__version__}/{source}/config_stereo.yaml >$LOG 2>&1\n", + ] + + rc + ) + with open(f"{source}_StereoEvents_{night}.sh", "w") as f: + f.writelines(lines) + + +def main(): + + """ + Main function + """ + + parser = argparse.ArgumentParser() + parser.add_argument( + "--config-file", + "-c", + dest="config_file", + type=str, + default="./config_auto_MCP.yaml", + help="Path to a configuration file", + ) + + args = parser.parse_args() + with open( + args.config_file, "rb" + ) as f: # "rb" mode opens the file in binary format for reading + config = yaml.safe_load(f) + + target_dir = Path(config["directories"]["workspace_dir"]) + + env_name = config["general"]["env_name"] + config_file = config["general"]["base_config_file"] + + source_in = config["data_selection"]["source_name_database"] + source = config["data_selection"]["source_name_output"] + + cluster = config["general"]["cluster"] + + if source_in is None: + source_list = joblib.load("list_sources.dat") + else: + if source is None: + source = source_in + source_list = [source] + + for source_name in source_list: + + print("***** Generating file config_stereo.yaml...") + configfile_stereo(target_dir, source_name, config_file) + + # Below we run the analysis on the real data + + print("***** Generating the bashscript...") + bash_stereo(target_dir, source_name, env_name, cluster) + + print("***** Submitting processess to the cluster...") + print(f"Process name: {source_name}_stereo") + print( + f"To check the jobs submitted to the cluster, type: squeue -n {source_name}_stereo" + ) + + # Below we run the bash scripts to find the stereo events + list_of_stereo_scripts = np.sort(glob.glob(f"{source_name}_StereoEvents*.sh")) + if len(list_of_stereo_scripts) < 1: + logger.warning("No bash scripts for real data") + continue + launch_jobs = "" + for n, run in enumerate(list_of_stereo_scripts): + launch_jobs += (" && " if n > 0 else "") + f"sbatch {run}" + + os.system(launch_jobs) + + +if __name__ == "__main__": + main() diff --git a/setup.cfg b/setup.cfg index d2affb046..ecd18a554 100644 --- a/setup.cfg +++ b/setup.cfg @@ -80,6 +80,9 @@ all = %(docs)s %(dev)s +[options.package_data] +* = resources/* + [options.entry_points] console_scripts = create_dl3_index_files = magicctapipe.scripts.lst1_magic.create_dl3_index_files:main @@ -93,6 +96,20 @@ console_scripts = magic_calib_to_dl1 = magicctapipe.scripts.lst1_magic.magic_calib_to_dl1:main merge_hdf_files = magicctapipe.scripts.lst1_magic.merge_hdf_files:main tune_magic_nsb = magicctapipe.scripts.lst1_magic.tune_magic_nsb:main + check_MAGIC_runs = magicctapipe.scripts.lst1_magic.semi_automatic_scripts.check_MAGIC_runs:main + coincident_events = magicctapipe.scripts.lst1_magic.semi_automatic_scripts.coincident_events:main + create_LST_table = magicctapipe.scripts.lst1_magic.semi_automatic_scripts.database_production.create_LST_table:main + dl1_production = magicctapipe.scripts.lst1_magic.semi_automatic_scripts.dl1_production:main + job_accounting = magicctapipe.scripts.lst1_magic.semi_automatic_scripts.job_accounting:main + list_from_h5 = magicctapipe.scripts.lst1_magic.semi_automatic_scripts.list_from_h5:main + lstchain_version = magicctapipe.scripts.lst1_magic.semi_automatic_scripts.database_production.lstchain_version:main + LSTnsb = magicctapipe.scripts.lst1_magic.semi_automatic_scripts.database_production.LSTnsb:main + merge_stereo = magicctapipe.scripts.lst1_magic.semi_automatic_scripts.merge_stereo:main + merging_runs = magicctapipe.scripts.lst1_magic.semi_automatic_scripts.merging_runs:main + nsb_level = magicctapipe.scripts.lst1_magic.semi_automatic_scripts.database_production.nsb_level:main + nsb_to_h5 = magicctapipe.scripts.lst1_magic.semi_automatic_scripts.database_production.nsb_to_h5:main + stereo_events = magicctapipe.scripts.lst1_magic.semi_automatic_scripts.stereo_events:main + update_MAGIC_database = magicctapipe.scripts.lst1_magic.semi_automatic_scripts.database_production.update_MAGIC_database:main [tool:pytest] minversion=3.0