Kannon is a wrapper for the gokart library that allows gokart tasks to be easily executed in a distributed and parallel manner on multiple kubernetes jobs.
Kannon can be installed via pip
.
pip install kannon
An easy and self-contained tutorial can be found in here!
It is required for users to prepare following two scripts and copy them into a docker container:
- A script to start task pipeline on master job.
- A script for child jobs to run assigned tasks.
Easy and self-contained quick starter will be available soon!
If you have a service account my-batch
within a namespace mynamespace
, the following permissions on jobs
and jobs/status
should be added.
# role.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: mynamespace
name: job-admin
rules:
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["batch"]
resources: ["jobs/status"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: job-admin-binding
namespace: mynamespace
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: job-admin
subjects:
- namespace: mynamespace
kind: ServiceAccount
name: my-batch
The way to define tasks is almost the same as for gokart.
All you have to do is replace the parent class of the task class you want to distribute with kannon.TaskOnBullet
from gokart.TaskOnKart
.
To get detailed explanations on how to define a gokart.TaskOnKart
, please refer to gokart docs.
Here is an example to define the following task pipeline with gokart and kannon.
Two parts (Task B0 + Task C0) and (Task B1 + Task C1) can be run in a distributed and parallel manner.
import gokart
import luigi
import kannon
class TaskA(gokart.TaskOnKart):
param = luigi.Parameter()
def run(self):
self.dump("A")
# TODO: Change gokart.TaskOnKart -> kannon.TaskOnBullet
class TaskB(kannon.TaskOnBullet):
param = luigi.Parameter()
parent = gokart.TaskInstanceParameter()
def requires(self):
return {"parent": self.parent}
def run(self):
self.dump("B")
# TODO: Change gokart.TaskOnKart -> kannon.TaskOnBullet
class TaskC(kannon.TaskOnBullet):
param = luigi.Parameter()
parent = gokart.TaskInstanceParameter()
def requires(self):
return {"parent": self.parent}
def run(self):
self.dump("C")
class TaskD(gokart.TaskOnKart):
param = luigi.Parameter()
parent_0 = gokart.TaskInstanceParameter()
parent_1 = gokart.TaskInstanceParameter()
parent_2 = gokart.TaskInstanceParameter()
def requires(self):
return {"parent_0": self.parent_0, "parent_1": self.parent_1, "parent_2": self.parent_2}
def run(self):
self.dump("D")
task_a = TaskA(param="a")
# b0 and b1 are executed in parallel
task_b0 = TaskB(param="b0", parent=task_a)
task_b1 = TaskB(param="b1", parent=task_a)
# c0 and c1 are executed in parallel
task_c0 = TaskC(param="c0", parent=task_b0)
task_c1 = TaskC(param="c1", parent=task_b1)
task_d = TaskD(param="d", parent_0=task_c0, parent_1=task_c1, parent_2=task_a)
Steps:
- Import module where
gokart.TaskOnKart
andkannon.TaskOnBullet
classes are defined. - Load luigi and k8s configs.
- Create a task instance.
- Create a template job for child jobs.
- Run
Kannon.build
.
""" This script runs on master job. """
import logging
import gokart
import luigi
from kubernetes import config, client
import fire
# TODO: Import task definition here!
import example_tasks
from kannon import Kannon
logging.basicConfig(level=logging.INFO)
def main(
container_name: str,
image_name: str,
):
# TODO: Load luigi config here!
luigi.configuration.LuigiConfigParser.add_config_path("./conf/base.ini")
# TODO: Load kube config here!
config.load_incluster_config()
v1 = client.BatchV1Api()
# TODO: Create task instance here!
task_root = [CREATE TASK INSTANCE HERE]
# TODO: create a template for child job from yaml file or using k8s API.
template_job = utils.create_from_yaml(v1, PATH_TO_CHILD_JOB_YAML)
# TODO: Run Kannon.build!
Kannon(
api_instance=v1,
template_job=template_job,
job_prefix="template-child-job",
path_child_script="./run_child.py",
).build(task_root)
if __name__ == "__main__":
Fire.fire(main)
Note that job.spec.template.containers[i].command
and job.metadata.name
are replaced within Kannon.build
.
For now, it is required for users to prepare the following script. In the future release, it will not be required.
Steps:
- Import module where
gokart.TaskOnKart
andkannon.TaskOnBullet
classes are defined. - Load luigi config.
- Parse a serialized task instance.
- Run
gokart.build
.
""" This script requires to be defined by user. """
import gokart
import luigi
import logging
# TODO: Import task definitions here!
import example_tasks
import fire
logging.basicConfig(level=logging.INFO)
def main(serialized_task: str):
# TODO: Load luigi config here!
luigi.configuration.LuigiConfigParser.add_config_path("./conf/base.ini")
# TODO: Parse a serialized gokart.TaskOnKart here!
task: gokart.TaskOnKart = gokart.TaskInstanceParameter().parse(serialized_task)
# TODO: Run gokart.build!
gokart.build(task)
if __name__ == "__main__":
fire.Fire(main)
Kannon is a wrapper for gokart. Thanks to gokart and dependent projects!