Install and run Dask on a Kubernetes cluster in WEkEO cloud

Dask enables scaling computation tasks either as multiple processes on a single machine, or on Dask clusters that consist of multiple worker machines. Dask provides a scalable alternative to popular Python libraries e.g. Numpy, Pandas or SciKit Learn, but still using a compact and very similar API.

Dask scheduler, once presented with a computation task, splits it into smaller tasks that can be executed in parallel on the worker nodes/processes.

In this article you will install a Dask cluster on Kubernetes and run Dask worker nodes as Kubernetes pods. As part of the installation, you will get access to a Jupyter instance, where you can run the sample code.

What We Are Going To Cover

  • Install Dask on Kubernetes

  • Access Jupyter and Dask Scheduler dashboard

  • Run a sample computing task

  • Configure Dask cluster on Kubernetes from Python

  • Resolving errors

Prerequisites

No. 1 Hosting

You need a WEkEO hosting account with Horizon interface https://horizon.cloudferro.com.

No. 2 Kubernetes cluster on CloudFerro cloud

To create Kubernetes cluster on WAW3-2 cloud refer to this guide: How to Create a Kubernetes Cluster Using WEkEO OpenStack Magnum

No. 3 Access to kubectl command line

The instructions for activation of kubectl are provided in: How To Access Kubernetes Cluster Post Deployment Using Kubectl On WEkEO OpenStack Magnum

No. 4 Familiarity with Helm

For more information on using Helm and installing apps with Helm on Kubernetes, refer to Deploying Helm Charts on Magnum Kubernetes Clusters on WEkEO WAW3-2 Cloud

No. 5 Python3 available on your machine

Python3 preinstalled on the working machine.

No. 6 Basic familiarity with Jupyter and Python scientific libraries

We will use Pandas as an example.

Step 1 Install Dask on Kubernetes

To install Dask as a Helm chart, first download the Dask Helm repository:

helm repo add dask https://helm.dask.org/

Instead of installing the chart out of the box, let us customize the configuration for convenience. To view all possible configurations and their defaults run:

helm show dask/dask

Prepare file dask-values.yaml to override some of the defaults:

dask-values.yaml

scheduler:
  serviceType: LoadBalancer
jupyter:
  serviceType: LoadBalancer
worker:
  replicas: 4

This changes the default service type for Jupyter and Scheduler to LoadBalancer, so that they get exposed publicly. Also, the default number of Dask workers is 3 but is now changed to 4. Each Dask worker pod will get allocated 3GB RAM and 1CPU, we keep it at this default.

To deploy the chart, create the namespace dask and install to it:

helm install dask dask/dask -n dask --create-namespace -f dask-values.yaml

Step 2 Access Jupyter and Dask Scheduler dashboard

After the installation step, you can access Dask services:

kubectl get services -n dask

There are two services, for Jupyter and Dask Scheduler dashboard. Populating external IPs will take few minutes:

NAME             TYPE           CLUSTER-IP       EXTERNAL-IP      PORT(S)                       AGE
dask-jupyter     LoadBalancer   10.254.230.230   64.225.128.91    80:32437/TCP                  6m49s
dask-scheduler   LoadBalancer   10.254.41.250    64.225.128.236   8786:31707/TCP,80:31668/TCP   6m49s

We can paste the external IPs to the browser to view the services. To access Jupyter, you will first need to pass the login screen, the default password is dask. Then you can view the Jupyter instance:

../_images/image2023-8-8_14-2-4.png

Similarly, with the Scheduler Dashboard, paste the floating IP to the browser to view it. If you then click on the “Workers” tab above, you can see that 4 workers are running on our Dask cluster:

../_images/image2023-8-8_14-4-40.png

Step 3 Run a sample computing task

The installed Jupyter instance already contains Dask and other useful Python libraries installed. To run a sample job, first activate the notebook by clicking on icon named NoteBookPython3(ipykernel) on the right hand side of the Jupyter instance browser screen.

The sample job performs calculation on table (dataframe) of 100k rows, and just one column. Each record will be filled with a random integer from 1 to 100,000 and the task is to calculate the sum of all records.

The code will run the same example for Pandas (single process) and Dask (parallelized on our cluster) and we will be able to inspect the results.

Copy the following code and paste to the cell in Jupyter notebook:

import dask.dataframe as dd
import pandas as pd
import numpy as np
import time

data = {'A': np.random.randint(1, 100_000_000, 100_000_000)}
df_pandas = pd.DataFrame(data)
df_dask = dd.from_pandas(df_pandas, npartitions=4)

# Pandas
start_time_pandas = time.time()
result_pandas = df_pandas['A'].sum()
end_time_pandas = time.time()
print(f"Result Pandas: {result_pandas}")
print(f"Computation time Pandas: {end_time_pandas - start_time_pandas:.2f} seconds.")

# Dask
start_time_dask = time.time()
result_dask = df_dask['A'].sum().compute()
end_time_dask = time.time()
print(f"Result Dask: {result_dask}")
print(f"Computation time Dask: {end_time_dask - start_time_dask:.2f} seconds.")

Hit play or use option Run from the main menu to execute the code. After a few seconds, the result will appear below the cell with code.

Some of the results we could observe for this example:

Result Pandas: 4999822570722943
Computation time Pandas: 0.15 seconds.
Result Dask: 4999822570722943
Computation time Dask: 0.07 seconds.

Note these results are not deterministic and simple Pandas could also perform better case by case. The overhead to distribute and collect results from Dask workers needs to be also taken into account. Further tuning the performance of Dask is beyond the scope of this article.

Step 4 Configure Dask cluster on Kubernetes from Python

For managing the Dask cluster on Kubernetes we can use a dedicated Python library dask-kubernetes. Using this library, we can reconfigure certain parameters of our Dask cluster.

One way to run dask-kubernetes would be from the Jupyter instance but then we would have to provide reference to kubeconfig of our cluster. Instead, we install dask-kubernetes in our local environment, with the following command:

pip install dask-kubernetes

Once this is done, we can manage the Dask cluster from Python. As an example, let us upscale it to 5 Dask nodes. Use nano to create file scale-cluster.py:

nano scale-cluster.py

then insert the following commands:

scale-cluster.py

from dask_kubernetes import HelmCluster

cluster = HelmCluster(release_name="dask", namespace="dask")
cluster.scale(5)

Apply with:

python3 scale-cluster.py

Using the command

kubectl get pods -n dask

you can see that the number of workers now is 5:

../_images/kubectl_show_5_workers.png

Or, you can see the current number of worker nodes in the Dask Scheduler dashboard (refresh the screen):

../_images/dask_dashboard_5_workers.png

Note that the functionalities of dask-kubernetes should be possible to achieve using just Kubernetes API directly, the choice will depend on your personal preference.

Resolving errors

When running command

python3 scale-cluster.py

on WSL version 1, error messages such as these may appear:

../_images/wsl_v1_error_message.png

The code will work properly, that is, it will increase the number of workers to 5, as required. The error should not appear on WSL version 2 and other Ubuntu distros.