Automate All the Boring Kubernetes Operations with Python

Kubernetes became a de-facto standard in recent years and many of us - both DevOps engineers and developers alike - use it on daily basis. Many of the task that we perform are however, same, boring and easy to automate. Oftentimes it's simple enough to whip up a quick shell script with a bunch of kubectl commands, but for more complicated automation tasks bash just isn't good enough, and you need the power of proper language, such as Python.

So, in this article we will look at how you can leverage Kubernetes Python Client library to automate whatever annoying Kubernetes task you might be dealing with!

Playground

Before we start playing with the Kubernetes client, we first need to create a playground cluster where we can safely test things out. We will use KinD (Kubernetes in Docker), which you can install from here.

We will use the following cluster configuration:


# kind.yaml
# https://kind.sigs.k8s.io/docs/user/configuration/
apiVersion: kind.x-k8s.io/v1alpha4
kind: Cluster
name: api-playground
nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker

To create cluster from above configuration, you can run:


kind create cluster --image kindest/node:v1.23.5 --config=kind.yaml

kubectl cluster-info --context kind-api-playground
# Kubernetes control plane is running at https://127.0.0.1:36599
# CoreDNS is running at https://127.0.0.1:36599/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy

kubectl get nodes
# NAME                           STATUS     ROLES                  AGE   VERSION
# api-playground-control-plane   Ready      control-plane,master   58s   v1.23.5
# api-playground-worker          Ready      <none>                 27s   v1.23.5
# api-playground-worker2         NotReady   <none>                 27s   v1.23.5
# api-playground-worker3         NotReady   <none>                 27s   v1.23.5

With cluster up-and-running, we also need to install the client library (optionally, inside virtual environment):


python3 -m venv venv
source venv/bin/activate
pip install kubernetes

Authentication

To perform any action inside our Kubernetes cluster we first need to authenticate.

We will use long-lived tokens so that we don't need to go through the authentication flow repeatedly. Long-lived tokens can be created by creating a ServiceAccount:


kubectl create sa playground
kubectl describe sa playground

Name:                playground
Namespace:           default
Labels:              <none>
Annotations:         <none>
Image pull secrets:  <none>
Mountable secrets:   playground-token-v8bq7
Tokens:              playground-token-v8bq7
Events:              <none>

export KIND_TOKEN=$(kubectl get secret playground-token-v8bq7 -o json | jq -r .data.token | base64 --decode)

Using a service account also has the benefit, that it's not tied to any single person, which is always preferable for automation purposes.

Token from the output above can be then used in requests:


curl -k -X GET -H "Authorization: Bearer $KIND_TOKEN" https://127.0.0.1:36599/apis

We're now authenticated, but not authorized to do much of anything. Therefore, next we need to create a Role and bind it to the ServiceAccount so that we can perform actions on resources:


kubectl create clusterrole manage-pods \
    --verb=get --verb=list --verb=watch --verb=create --verb=update --verb=patch --verb=delete \
    --resource=pods

kubectl -n default create rolebinding sa-manage-pods \
    --clusterrole=manage-pods \
    --serviceaccount=default:playground

The above gives our service account permission to perform any action on pods, limited to default namespace.

You should always keep your roles very narrow and specific, but playing around in KinD, it makes sense to apply cluster-wide admin role:


kubectl create clusterrolebinding sa-cluster-admin \
  --clusterrole=cluster-admin \
  --serviceaccount=default:playground

Raw Requests

To get a better understanding of what is kubectl and also the client doing under the hood, we will start with raw HTTP requests using curl.

The easiest way to find out what requests are being made under the hood, is to run the desired kubectl command with -v 10 which will output complete curl commands:


kubectl get pods -v 10
# <snip>
curl -k -v -XGET  -H "Accept: application/json;as=Table;v=v1;g=meta.k8s.io,application/json..." \
    'https://127.0.0.1:36599/api/v1/namespaces/default/pods?limit=500'
# <snip>

The output with loglevel 10 will be very verbose, but somewhere it there, you will find the above curl command.

Add a Bearer token header in the above curl command with your long-lived token and you should be able to perform same actions as kubectl, such as:


curl -s -k -XGET -H "Authorization: Bearer $KIND_TOKEN" -H "Accept: application/json, */*" -H "Content-Type: application/json" \
    -H "kubernetes/$Format" 'https://127.0.0.1:36599/api/v1/namespaces/default/pods/example' | jq .status.phase
# "Running"

In case there's request body needed, look up which fields need to be included in the request. For example when creating a Pod, we can use API described here, which results in following request:


curl -k -XPOST -H "Authorization: Bearer $KIND_TOKEN" -H "Accept: application/json, */*" -H "Content-Type: application/json" \
    -H "kubernetes/$Format" https://127.0.0.1:36599/api/v1/namespaces/default/pods -d@pod.json

# To confirm
kubectl get pods
NAME      READY   STATUS    RESTARTS   AGE
example   0/1     Running   0          7s

Refer to the Kubernetes API reference for object attributes. Additionally, you can also view OpenAPI definition with:


curl -k -X GET -H "Authorization: Bearer $KIND_TOKEN" https://127.0.0.1:36599/apis

Interacting with Kubernetes directly using REST API might be a bit clunky, but there are situations where it might make sense to use it. That includes interacting with APIs that have no equivalent kubectl command or for example in case you're using different distribution of Kubernetes - such as OpenShift - which exposes additional APIs not covered by either kubectl or client SDK.

Python Client

Moving onto the Python client itself now. We need to go through the same step as with kubectl or curl. First being, authentication:


from kubernetes import client
import os

configuration = client.Configuration()
configuration.api_key_prefix["authorization"] = "Bearer"
configuration.host = "https://127.0.0.1:36599"
configuration.api_key["authorization"] = os.getenv("KIND_TOKEN", None)
configuration.verify_ssl = False  # Only for testing with KinD!
api_client = client.ApiClient(configuration)
v1 = client.CoreV1Api(api_client)

ret = v1.list_namespaced_pod(namespace="default", watch=False)
for pod in ret.items:
    print(f"Name: {pod.metadata.name}, Namespace: {pod.metadata.namespace} IP: {pod.status.pod_ip}")
    # Name: example, Namespace: default IP: 10.244.2.2

First we define configuration object which tells the client that we will authenticate using Bearer token. Considering that our KinD cluster doesn't use SSL, we disable it, in real cluster however, you should never do that.

To test out the configuration, we use list_namespaced_pod method of API client to get all pods in the default namespace, and we print out their name, namespace and IP.

Now, for a more realistic task, let's create a Deployment:


deployment_name = "my-deploy"
deployment_manifest = {
    "apiVersion": "apps/v1",
    "kind": "Deployment",
    "metadata": {"name": deployment_name, "namespace": "default"},
    "spec": {"replicas": 3,
             "selector": {
                "matchLabels": {
                    "app": "nginx"
                }},
        "template": {"metadata": {"labels": {"app": "nginx"}},
            "spec": {"containers": [
                {"name": "nginx", "image": "nginx:1.21.6", "ports": [{"containerPort": 80}]}]
            }
        },
    }
}

import time
from kubernetes.client.rest import ApiException

v1 = client.AppsV1Api(api_client)

response = v1.create_namespaced_deployment(body=deployment_manifest, namespace="default")
while True:
    try:
        response = v1.read_namespaced_deployment_status(name=deployment_name, namespace="default")
        if response.status.available_replicas != 3:
            print("Waiting for Deployment to become ready...")
            time.sleep(5)
        else:
            break
    except ApiException as e:
        print(f"Exception when calling AppsV1Api -> read_namespaced_deployment_status: {e}\n")

In addition to creating the Deployment, we also wait for its pods to become available. We do that by querying Deployment status and checking number of available replicas.

Also, notice the pattern in function names, such as create_namespaced_deployment. To make it more obvious let's look at couple more:

  • replace_namespaced_cron_job
  • patch_namespaced_stateful_set
  • list_namespaced_horizontal_pod_autoscaler
  • read_namespaced_daemon_set
  • read_custom_resource_definition

All of these are in format operation_namespaced_resource or just operation_resource for global resources. They can be additionally suffixed with _status or _scale for methods that perform operations on resource status such as read_namespaced_deployment_status or resource scale such as patch_namespaced_stateful_set_scale.

Another thing to highlight is that in the above example we performed the actions using client.AppsV1Api which allows us to work with all the resources that belong to apiVersion: apps/v1. If we - for example - wanted to use CronJob we would instead choose BatchV1Api (which is apiVersion: batch/v1 in YAML format) or for PVCs we would choose CoreV1Api because of apiVersion: v1 - you get the gist.

As you can imagine, that's a lot of functions to choose from, luckily all of them are listed in docs and you can click on any one of them to get an example of its usage.

Beyond basic CRUD operations, it's also possible to continuously watch objects for changes. Obvious choice is to watch Events:


from kubernetes import client, watch

v1 = client.CoreV1Api(api_client)
count = 10
w = watch.Watch()
for event in w.stream(partial(v1.list_namespaced_event, namespace="default"), timeout_seconds=10):
    print(f"Event - Message: {event['object']['message']} at {event['object']['metadata']['creationTimestamp']}")
    count -= 1
    if not count:
        w.stop()
print("Finished namespace stream.")

# Event - Message: Successfully assigned default/my-deploy-cb69f686c-2dspd to api-playground-worker2 at 2022-04-19T11:18:25Z
# Event - Message: Container image "nginx:1.21.6" already present on machine at 2022-04-19T11:18:26Z
# Event - Message: Created container nginx at 2022-04-19T11:18:26Z
# Event - Message: Started container nginx at 2022-04-19T11:18:26Z

Here we chose to watch events in default namespace. We take the first 10 events and then close the stream. If we wanted to continuously monitor the resources we would just remove the timeout_seconds and the w.stop() call.

In the first example you saw that we used plain Python dict to define the Deployment object which we passed to the client. Alternatively though, we can use a more OOP style by using API Models (classes) provided by the library:


v1 = client.AppsV1Api(api_client)

deployment_manifest = client.V1Deployment(
    api_version="apps/v1",
    kind="Deployment",
    metadata=client.V1ObjectMeta(name=deployment_name),
    spec=client.V1DeploymentSpec(
        replicas=3,
        selector=client.V1LabelSelector(match_labels={
         "app": "nginx"
        }),
        template=client.V1PodTemplateSpec(
            metadata=client.V1ObjectMeta(labels={"app": "nginx"}),
            spec=client.V1PodSpec(
                containers=[client.V1Container(name="nginx",
                                               image="nginx:1.21.6",
                                               ports=[client.V1ContainerPort(container_port=80)]
                                               )]))
        )
)

response = v1.create_namespaced_deployment(body=deployment_manifest, namespace="default")

Trying to figure out which model you should use for each argument is a losing battle, tough. When creating resources like shown above, you should always use documentation for models and traverse the links as you create the individual sub-objects to figure out what values/types are expected in each field.

Handy Examples

You should now have a basic idea about how the client works, so let's take a look at some handy examples and snippets that might help you automate daily Kubernetes operations.

A very common thing you might want to perform is a Deployment rollout - usually done with kubectl rollout restart. There's however no API to do this. The way kubectl does it is by updating Deployment Annotations, more specifically, setting kubectl.kubernetes.io/restartedAt to current time. This works because any change made to Pod spec causes a restart.

If we want to perform a restart using Python client we need to do the same:


from kubernetes import dynamic
from kubernetes.client import api_client  # Careful - different import - not the same as previous client!
import datetime

client = dynamic.DynamicClient(api_client.ApiClient(configuration=configuration))

api = client.resources.get(api_version="apps/v1", kind="Deployment")

# Even though the Deployment manifest was previously created with class model, it still behaves as dictionary:
deployment_manifest["spec"]["template"]["metadata"]["annotations"] = {
    "kubectl.kubernetes.io/restartedAt": datetime.datetime.utcnow().isoformat()
}

deployment_patched = api.patch(body=deployment_manifest, name=deployment_name, namespace="default")

Another common operation is scaling a Deployment, this one fortunately has an API function we can use:


from kubernetes import client

api_client = client.ApiClient(configuration)
apps_v1 = client.AppsV1Api(api_client)

# The body can be of different patch types - https://github.com/kubernetes-client/python/issues/1206#issuecomment-668118057
api_response = apps_v1.patch_namespaced_deployment_scale(deployment_name, "default", {"spec": {"replicas": 5}})

For troubleshooting purposes, it often makes sense to exec into a Pod and take a look around, possibly grab environment variable to verify correct configuration:


from kubernetes.stream import stream

def pod_exec(name, namespace, command, api_instance):
    exec_command = ["/bin/sh", "-c", command]

    resp = stream(api_instance.connect_get_namespaced_pod_exec,
                  name,
                  namespace,
                  command=exec_command,
                  stderr=True, stdin=False,
                  stdout=True, tty=False,
                  _preload_content=False)

    while resp.is_open():
        resp.update(timeout=1)
        if resp.peek_stdout():
            print(f"STDOUT: \n{resp.read_stdout()}")
        if resp.peek_stderr():
            print(f"STDERR: \n{resp.read_stderr()}")

    resp.close()

    if resp.returncode != 0:
        raise Exception("Script failed")

pod = "example"
api_client = client.ApiClient(configuration)
v1 = client.CoreV1Api(api_client)

pod_exec(pod, "default", "env", v1)

# STDOUT:
# KUBERNETES_SERVICE_PORT=443
# KUBERNETES_PORT=tcp://10.96.0.1:443
# HOSTNAME=example
# HOME=/root
# ...

The snippet above also allows you to run whole shell scripts if needs be.

Moving onto more cluster administration-oriented tasks - let's say you want to apply a Taint onto a node that has some issue. Well, once again there's no direct API for Node Taints, but we can find a way:


from kubernetes import client

api_client = client.ApiClient(configuration)
v1 = client.CoreV1Api(api_client)

# kubectl taint nodes api-playground-worker some-taint=1:NoSchedule
v1.patch_node("api-playground-worker", {"spec": {"taints": [{"effect": "NoSchedule", "key": "some-taint", "value": "1"}]}})

# kubectl get nodes -o custom-columns=NAME:.metadata.name,TAINTS:.spec.taints --no-headers
# api-playground-control-plane   [map[effect:NoSchedule key:node-role.kubernetes.io/master]]
# api-playground-worker          [map[effect:NoSchedule key:some-taint value:1]]
# api-playground-worker2         <none>
# api-playground-worker3         <none>

You might also want to monitor a cluster resource utilization to possibly automate cluster scaling. Usually, you'd use kubectl top to get the information interactively, with the client library you can do:


# https://github.com/kubernetes-sigs/kind/issues/398
# kubectl apply -f https://github.com/kubernetes-sigs/metrics-server/releases/download/v0.5.0/components.yaml
# kubectl patch -n kube-system deployment metrics-server --type=json \
#   -p '[{"op":"add","path":"/spec/template/spec/containers/0/args/-","value":"--kubelet-insecure-tls"}]'

from kubernetes import client

api_client = client.ApiClient(configuration)
custom_api = client.CustomObjectsApi(api_client)

response = custom_api.list_cluster_custom_object("metrics.k8s.io", "v1beta1", "nodes")  # also works with "pods" instead of "nodes"

for node in response["items"]:
    print(f"{node['metadata']['name']: <30} CPU: {node['usage']['cpu']: <10} Memory: {node['usage']['memory']}")

# api-playground-control-plane   CPU: 148318488n Memory: 2363504Ki
# api-playground-worker          CPU: 91635913n  Memory: 1858680Ki
# api-playground-worker2         CPU: 75473747n  Memory: 1880860Ki
# api-playground-worker3         CPU: 105692650n Memory: 1881560Ki

The above example assumes that you have metrics-server installed in your cluster. You can run kubectl top to verify that. Use the comment in the snippet to install it if you're working with KinD.

Last but not least - you might already have a bunch of YAML or JSON files that you want to use to deploy or modify objects in your cluster, or you might want to export and backup what you've created with the client. Here's how you can convert from YAML/JSON files to Kubernetes object and back to files again:


# pip install kopf  # (Python 3.7+)
import kopf

api_client = client.ApiClient(configuration)
v1 = client.CoreV1Api(api_client)

pods = []

# https://stackoverflow.com/questions/59977058/clone-kubernetes-objects-programmatically-using-the-python-api/59977059#59977059
ret = v1.list_namespaced_pod(namespace="default")
for pod in ret.items:
    # Simple conversion to Dict/JSON
    print(api_client.sanitize_for_serialization(pod))

    # Conversion with fields clean-up
    pods.append(kopf.AnnotationsDiffBaseStorage()
                .build(body=kopf.Body(api_client.sanitize_for_serialization(pod))))


# Conversion from Dict back to Client object
class FakeKubeResponse:
    def __init__(self, obj):
        import json
        self.data = json.dumps(obj)


for pod in pods:
    pod_manifest = api_client.deserialize(FakeKubeResponse(pod), "V1Pod")
    ...

First way to convert existing object into Python dictionary (JSON) is to use sanitize_for_serialization which produces raw output with all the generated/default fields. Better option is to use utility methods of kopf library which will remove all the unnecessary fields. From there it's simple enough to convert dictionary into proper YAML or JSON file.

For the reverse - that is if we want to go from dictionary to Client Object Model - we can use deserialize method of API Client. This method however expects its argument to have a data attribute, so we pass it a container class instance with such attribute.

If you already have YAML files which you'd like to use with the Python client, then you can use the utility function kubernetes.utils.create_from_yaml.

To get complete overview of all the features of the library, I recommend you take a look at the examples directory in the repository.

I'd also encourage you to look through the issues in the library repository, as it has a lot of great examples of client usage, such as processing events in parallel or watching ConfigMaps for updates.

Conclusion

The Python client library contains literally hundreds of function, so it's difficult to cover every little feature or use-case there is. Most of them however, follow a common pattern which should make the library's usage pretty natural after couple minutes.

If you're looking for more examples beyond what was shown and referenced above, I recommend exploring other popular tools that make use Python Kubernetes client, such kopf - the library for creating Kubernetes operators. I also find it very useful to take a look at tests of the library itself, as it showcases its intended usage such this client test suite.

Subscribe: