Skip to content

Commit

Permalink
Merge pull request #9 from bcgov/test
Browse files Browse the repository at this point in the history
pull latest changes from test to prod
  • Loading branch information
tpluzhni authored Sep 12, 2024
2 parents 6ef9eb0 + ffbdb32 commit 2296c8a
Show file tree
Hide file tree
Showing 8 changed files with 376 additions and 25 deletions.
35 changes: 34 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,38 @@
# medis-scheduler
MoH MEDIS project scheduler
## MoH MEDIS project ETL scheduler


Apache Airflow is an open-source workflow management platform for data engineering pipelines. It was chosen as our ETL process scheduler.
Airflow was deployed to OpenShift using Helm Chart downloaded to the local directory (laptop with Windows). It also can be installed directly from helm-chart repository ([https://airflow.apache.org/docs/helm-chart/stable/index.html])
### Airflow installation.
(The installation requires OpenShift CLI (oc) and helm to be installed on your OS)
1. Connect to OpenShift platform.
From [https://oauth-openshift.apps.silver.devops.gov.bc.ca/oauth/token/display] get the API token.
Copy oc command with the token to login and run it from your laptop. (Laptop should have access to government’s OpenShift).
The example of your command:
oc login --token=sha256~yxiCAMwFD_XXX --server=https://api.silver.devops.gov.bc.ca:6443
Logged into [https://api.silver.devops.gov.bc.ca:6443] as "username@github" using the token provided.
2. Choose the project you going to deploy or upgrade Airflow to.
3. Run the helm command to install/upgrade Airflow:
helm.exe upgrade --install airflow C:\path-to-helm-chart\helm-v3.14.3-windows-amd64\windows-amd64\airflow-1.13.0\airflow-1.13.0\airflow --namespace c2da03-test -f C:\Users\tatiana.pluzhnikova\Downloads\helm-v3.14.3-windows-amd64\windows-amd64\airflow-1.13.0\airflow-1.13.0\airflow\override-values-c2da03-test.yaml

This command uses the downloaded helm-chart for installation and the default settings will be overwritten with override-values-c2da03-test.yaml file which can be found in GitHub [https://github.com/bcgov/medis-scheduler/blob/6ef9eb0d6c5d61751b796123ae13484d29ca9de8/airflow/override-values-c2da03-test.yaml]


### Airflow configuration
1. Airflow can be accessed at [https://airflow-webserver-c2da03-test.apps.silver.devops.gov.bc.ca/] and [https://airflow-webserver-c2da03-prod.apps.silver.devops.gov.bc.ca/]
2. The DAGs are stored in GitHub (medis-scheduler/dags at main · bcgov/medis-scheduler (github.com)) and being synced to Airflow every 10 seconds from test branch to test Airflow and from main branch to prod Airflow.
(We try to do changes it test branch first then do pull request to main branch)
3. We use Airflow Variables for more flexibility, so some parameters can be modified without touching the DAGs. The variables can be exported and imported to/from json format file and we keep them in GitHub [https://github.com/bcgov/medis-scheduler/tree/6ef9eb0d6c5d61751b796123ae13484d29ca9de8/airflow]

### Airflow PCD-ETL and MEDIS-ETL DAGs description.
Airflow scheduler perform the same ETL steps as described there: https://proactionca.ent.cgi.com/confluence/pages/viewpage.action?spaceKey=BCMOHAD&title=ETL+process+design and ([https://proactionca.ent.cgi.com/confluence/display/BCMOHAD/Manual+trigger+for+ETL+process] ) plus we added extra steps for error handling, email notifications…
1. The extract phase is done by calling ETL service endpoint in OpenShift with Airflow HttpOperator. The URLs for each form being extracted are defined as Airflow variables. The payload is hardcoded in DAG.
2. Files uploader is implemented as a job on OpenShift and triggered by Airflow KubernetesJobOperator (task PCD_file_upload or MEDIS_file_upload)
The job spins a pod that executes upload.sh file that creates combined medis_ltc.flag file, uploads all encrypted files, uploads newly created flag file, moves all files into archive directory and also deletes files from archive directory that are older than specified retention period. ([https://proactionca.ent.cgi.com/confluence/display/BCMOHAD/Manual+trigger+for+ETL+process])




## Running Locally

Expand Down
33 changes: 17 additions & 16 deletions dags/MEDIS-ETL.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of the BashOperator."""


from __future__ import annotations

import datetime
Expand All @@ -32,6 +33,7 @@
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.callbacks import KubernetesPodOperatorCallback
from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.operators.python_operator import PythonOperator
from airflow.exceptions import AirflowSkipException
from airflow.utils.email import send_email
Expand Down Expand Up @@ -101,6 +103,7 @@ def get_failed_ids_send_email(ds=None, **kwargs):
etl_job_task = KubernetesJobOperator(
task_id='MEDIS_file_upload',
job_template_file='{{var.value.medis_job}}',
wait_until_job_complete=True,
)

failed_tasks_notification = PythonOperator(
Expand Down Expand Up @@ -190,25 +193,30 @@ def get_failed_ids_send_email(ds=None, **kwargs):
headers={"Content-Type": "application/json"},
)

# http_local_post_500_1 = BashOperator(
# task_id='http_local_post_500_1',
# bash_command='echo "Failed Task"; exit 1;',
# dag=dag,
# )

check_ltc_folder_task = KubernetesJobOperator(
task_id='Check_LTC_Shared_Folder',
job_template_file='{{var.value.medis_emtydir_job}}',
wait_until_job_complete=True,
)

start_facility_extract = EmptyOperator(
task_id="Start_LTC_Facility_Extract",
)

check_ltc_sftp_folder_task = KubernetesJobOperator(
task_id='Check_LTC_SFTP_Folder',
job_template_file='{{var.value.medis_emtysftp_job}}',
wait_until_job_complete=True,
)

check_ltc_sftp_folder_task >> check_ltc_folder_task >> start_facility_extract

start_facility_extract >> facility_fha_task >> start_ytd_extract
start_facility_extract >> facility_iha_task >> start_ytd_extract
start_facility_extract >> facility_viha_task >> start_ytd_extract
start_facility_extract >> facility_nha_task >> start_ytd_extract
start_facility_extract >> facility_vch_task >> start_ytd_extract
# start_facility_extract >> http_local_post_500_1 >> start_ytd_extract


start_ytd_extract >> ytd_fha_task >> etl_job_task
start_ytd_extract >> ytd_iha_task >> etl_job_task
start_ytd_extract >> ytd_viha_task >> etl_job_task
Expand All @@ -218,13 +226,6 @@ def get_failed_ids_send_email(ds=None, **kwargs):
etl_job_task >> failed_tasks_notification


#delay_5s_task = BashOperator(
# task_id="Delay",
# bash_command="sleep 5s",
#)

#delay_5s_task >> facility_viha_task


if __name__ == "__main__":
dag.test()
19 changes: 11 additions & 8 deletions dags/PCD-ETL.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of the BashOperator."""

from __future__ import annotations

import datetime
Expand Down Expand Up @@ -204,13 +204,18 @@ def get_failed_ids_send_email(ds=None, **kwargs):
headers={"Content-Type": "application/json"},
)

# http_local_post_500_1 = BashOperator(
# task_id='http_local_post_500_1',
# bash_command='echo "Failed Task"; exit 1;',
# dag=dag,
# )
check_pcd_folder_task = KubernetesJobOperator(
task_id='Check_PCD_Shared_Folder',
job_template_file='{{var.value.pcd_emtydir_job}}',
)

check_pcd_sftp_folder_task = KubernetesJobOperator(
task_id='Check_PCD_SFTP_Folder',
job_template_file='{{var.value.pcd_emtysftp_job}}',
wait_until_job_complete=True,
)

check_pcd_sftp_folder_task >> check_pcd_folder_task >> start_pcd_extract_1

start_pcd_extract_1 >> status_tracker_task >> start_pcd_extract_2
start_pcd_extract_1 >> financial_expense_task >> start_pcd_extract_2
Expand All @@ -225,8 +230,6 @@ def get_failed_ids_send_email(ds=None, **kwargs):
start_pcd_extract_2 >> pcn_budget_task >> etl_job_task


# start_facility_extract >> http_local_post_500_1 >> start_ytd_extract

etl_job_task >> failed_tasks_notification


Expand Down
54 changes: 54 additions & 0 deletions dags/airflow.app.job.emptydir.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
---

# A minimal OpenShift template
# to create a BusyBox container with
# a volume claim that can be used to
# launch a Job and inspect a PVC.
#
# Create with:
# oc process -p PVC_CLAIM_NAME=my-claim -f busy-job.yaml | oc create -f -
# Destroy with:
# oc delete job --selector template=busy-job

apiVersion: batch/v1
kind: Job
metadata:
name: medis-empty-job-airflow-once
spec:
parallelism: 1
completions: 1
activeDeadlineSeconds: 600
backoffLimit: 1
selector:
name: medis-empty-job
template:
metadata:
labels:
name: empty-job
spec:
containers:
- image: image-registry.openshift-image-registry.svc:5000/c2da03-tools/medis-sftp-image:latest
name: empty
resources:
limits:
cpu: 100m
memory: 128Mi
command:
- "sh"
- "-c"
- "/app/bin/empty.sh"
volumeMounts:
- name: medis-data-vol
mountPath: /data
- name: sftp-bin
mountPath: /app/bin
readOnly: true
volumes:
- name: medis-data-vol
persistentVolumeClaim:
claimName: medis-etl-pvc
- name: sftp-bin
configMap:
name: medis-sftp-bin
defaultMode: 0777
restartPolicy: Never
70 changes: 70 additions & 0 deletions dags/airflow.app.job.emptysftp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
---

# A minimal OpenShift template
# to create a BusyBox container with
# a volume claim that can be used to
# launch a Job and inspect a PVC.
#
# Create with:
# oc process -p PVC_CLAIM_NAME=my-claim -f busy-job.yaml | oc create -f -
# Destroy with:
# oc delete job --selector template=busy-job

apiVersion: batch/v1
kind: Job
metadata:
name: medis-emptysftp-job-airflow-once
spec:
parallelism: 1
completions: 1
activeDeadlineSeconds: 600
backoffLimit: 1
selector:
name: medis-emptysftp-job
template:
metadata:
labels:
name: emptysftp-job
spec:
containers:
- image: image-registry.openshift-image-registry.svc:5000/c2da03-tools/medis-sftp-image:latest
name: emptysftp
resources:
limits:
cpu: 100m
memory: 128Mi
command:
- "sh"
- "-c"
- "/app/bin/emptysftp.sh"
envFrom:
- configMapRef:
name: medis-sftp-config-env
volumeMounts:
- name: medis-data-vol
mountPath: /data
- name: ssh-known-hosts
mountPath: /ssh-config
readOnly: true
- name: ssh-key
mountPath: /ssh-hi
readOnly: true
- name: sftp-bin
mountPath: /app/bin
readOnly: true
volumes:
- name: medis-data-vol
persistentVolumeClaim:
claimName: medis-etl-pvc
- name: ssh-known-hosts
configMap:
name: medis-sftp-config
- name: sftp-bin
configMap:
name: medis-sftp-bin
defaultMode: 0777
- name: ssh-key
secret:
secretName: medis-sftp-secret
defaultMode: 400
restartPolicy: Never
54 changes: 54 additions & 0 deletions dags/pcd.airflow.app.job.emptydir.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
---

# A minimal OpenShift template
# to create a BusyBox container with
# a volume claim that can be used to
# launch a Job and inspect a PVC.
#
# Create with:
# oc process -p PVC_CLAIM_NAME=my-claim -f busy-job.yaml | oc create -f -
# Destroy with:
# oc delete job --selector template=busy-job

apiVersion: batch/v1
kind: Job
metadata:
name: pcd-empty-job-airflow-once
spec:
parallelism: 1
completions: 1
activeDeadlineSeconds: 600
backoffLimit: 1
selector:
name: pcd-empty-job
template:
metadata:
labels:
name: empty-job
spec:
containers:
- image: image-registry.openshift-image-registry.svc:5000/c2da03-tools/medis-sftp-image:latest
name: empty
resources:
limits:
cpu: 100m
memory: 128Mi
command:
- "sh"
- "-c"
- "/app/bin/empty.sh"
volumeMounts:
- name: medis-data-vol
mountPath: /data
- name: sftp-bin
mountPath: /app/bin
readOnly: true
volumes:
- name: pcd-data-vol
persistentVolumeClaim:
claimName: medis-etl-pvc
- name: sftp-bin
configMap:
name: pcd-sftp-bin
defaultMode: 0777
restartPolicy: Never
Loading

0 comments on commit 2296c8a

Please sign in to comment.