Python k8s API
Kubernetes offers API clients that allow you to build integrations under-the-hood of applications. The following is an example Python client for Nautilus that can submit it’s own jobs and all the other functions of kubectl
The config mounts the Ceph Shared FileSystem as well as the node’s local scratch space. Be sure to configure resources appropriately to your application.
import osfrom kubernetes import client, config, utilsfrom kubernetes.client.rest import ApiExceptionfrom skimage.transform import resizefrom uuid import uuid4import timefrom pathlib import Path
def test(): try: config.load_kube_config() v1 = client.CoreV1Api() ret = v1.list_namespaced_pod('ncmir-mm') except: ValueError('Could not connect to kube cluster')
class Constants(object): NAMESPACE = YOUR_NAMESPACE
class KubernetesApiClient(object): def __init__(self): # load print("\n Loading Nautilus Client... \n") def create_batch_api_client(self): return client.BatchV1Api(client.ApiClient())
def create_job_object(self, job_name, container_image, args=[],cmd = ['/bin/bash'], min_cpu=1, min_ram = 4, max_cpu=2, max_ram=12): res = client.V1ResourceRequirements( requests={"cpu":"1","memory":"8Gi","ephemeral-storage": "4Gi"}, limits = {"cpu":"4","memory":"24Gi","ephemeral-storage": "16Gi"}) volume_mount_2 = client.V1VolumeMount( mount_path='/ceph', name='ceph' ) volume_mount_1 = client.V1VolumeMount( mount_path='/mnt/data', name='data' ) #env = client.V1EnvVar(name='GOOGLE_APPLICATION_CREDENTIALS',value=google_app_credentials_path) container = client.V1Container( name=job_name, command = cmd, image=container_image, args=args, volume_mounts=[volume_mount_1,volume_mount_2], env=[], image_pull_policy="Always", resources = res) volume_1 = client.V1Volume( name='data' )
flex_2 = client.V1FlexVolumeSource( driver='ceph.rook.io/rook', fs_type='ceph', options = {'fsName': 'nautilusfs', 'clusterNamespace': 'rook', 'path': 'YOUR_CEPHFS_MOUNT', 'mountUser': 'YOUR_NAMESPACE', 'mountSecret': 'YOUR_CEPHFS_SECRET'} ) volume_2=client.V1Volume( name = 'ceph', flex_volume=flex_2 )
template = client.V1PodTemplateSpec( metadata=client.V1ObjectMeta(labels={"app": "sample"}), spec=client.V1PodSpec(restart_policy="Never", containers=[container], volumes=[volume_1,volume_2]) ) spec = client.V1JobSpec( template=template, backoff_limit=6, ttl_seconds_after_finished=60) job = client.V1Job( api_version="batch/v1", kind="Job", metadata=client.V1ObjectMeta(name=job_name), spec=spec) return job
def submit_job(jobname, image = 'YOUR_IMAGE',args = []): api_client = KubernetesApiClient() job_api_client = api_client.create_batch_api_client() job = api_client.create_job_object(jobname, image, args) try: api_response = job_api_client.create_namespaced_job( namespace=Constants.NAMESPACE, body=job) print(str(api_response.status)) except ApiException as e: print(e) # Handle the exception. return job_api_client