-
Notifications
You must be signed in to change notification settings - Fork 2
/
file test notebook.py
90 lines (79 loc) · 2.96 KB
/
file test notebook.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
!pip3 install kfp
########################################################################################################################
import kfp
import kubernetes
########################################################################################################################
container_manifest = {
"apiVersion": "sparkoperator.k8s.io/v1beta2",
"kind": "SparkApplication",
"metadata": {
"name": "spark-app",
"namespace": "kubeflow"
},
"spec": {
"type": "Scala",
"mode": "cluster",
"image": "docker.io/rawkintrevo/spark-file-test:0.0.17",
"imagePullPolicy": "Always",
"hadoopConf": {
"fs.gs.project.id": "kubeflow-hacky-hacky",
"fs.gs.system.bucket": "covid-dicoms",
"fs.gs.impl" : "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
"google.cloud.auth.service.account.enable": "true",
"google.cloud.auth.service.account.json.keyfile": "/mnt/secrets/user-gcp-sa.json",
},
"mainClass": "org.rawkintrevo.covid.App",
"mainApplicationFile": "local:///covid-0.1-jar-with-dependencies.jar", # See the Dockerfile
"arguments": ["245", "15", "1"],
"sparkVersion": "2.4.5",
"restartPolicy": {
"type": "Never"
},
"driver": {
"cores": 1,
"secrets": [
{"name": "user-gcp-sa",
"path": "/mnt/secrets",
"secretType": "GCPServiceAccount"
}
],
"coreLimit": "1200m",
"memory": "512m",
"labels": {
"version": "2.4.5",
},
"serviceAccount": "spark-operatoroperator-sa", # also try spark-operatoroperator-sa
},
"executor": {
"cores": 1,
"secrets": [
{"name": "user-gcp-sa",
"path": "/mnt/secrets",
"secretType": "GCPServiceAccount"
}
],
"instances": 2,
"memory": "512m"
},
"labels": {
"version": "2.4.5"
},
}
}
########################################################################################################################
@kfp.dsl.pipeline(
name="Covid DICOM Pipe v2",
description="Create Basis Vectors for Lung Images"
)
def covid_dicom_pipeline():
rop = kfp.dsl.ResourceOp(
name="calculate-basis-vectors",
k8s_resource=container_manifest,
action="create",
success_condition="status.applicationState.state == COMPLETED"
)
kfp.compiler.Compiler().compile(covid_dicom_pipeline,"dicom-pipeline-2.zip")
client = kfp.Client()
########################################################################################################################
my_experiment = client.create_experiment(name='my-experiments')
my_run = client.run_pipeline(my_experiment.id, 'my-run1', 'dicom-pipeline-2.zip')