website/content/en/docs/tasks/job/coarse-parallel-processing-...

336 lines
11 KiB
Markdown
Raw Normal View History

---
title: Coarse Parallel Processing Using a Work Queue
min-kubernetes-server-version: v1.8
2020-05-30 19:10:23 +00:00
content_type: task
weight: 30
---
2020-05-30 19:10:23 +00:00
<!-- overview -->
In this example, we will run a Kubernetes Job with multiple parallel
2018-10-25 18:02:31 +00:00
worker processes.
In this example, as each pod is created, it picks up one unit of work
from a task queue, completes it, deletes it from the queue, and exits.
Here is an overview of the steps in this example:
1. **Start a message queue service.** In this example, we use RabbitMQ, but you could use another
one. In practice you would set up a message queue service once and reuse it for many jobs.
1. **Create a queue, and fill it with messages.** Each message represents one task to be done. In
this example, a message is just an integer that we will do a lengthy computation on.
1. **Start a Job that works on tasks from the queue**. The Job starts several pods. Each pod takes
one task from the message queue, processes it, and repeats until the end of the queue is reached.
2020-05-30 19:10:23 +00:00
## {{% heading "prerequisites" %}}
Be familiar with the basic,
non-parallel, use of [Job](/docs/concepts/jobs/run-to-completion-finite-workloads/).
{{< include "task-tutorial-prereqs.md" >}}
2020-05-30 19:10:23 +00:00
<!-- steps -->
## Starting a message queue service
This example uses RabbitMQ, but it should be easy to adapt to another AMQP-type message service.
In practice you could set up a message queue service once in a
cluster and reuse it for many jobs, as well as for long-running services.
Start RabbitMQ as follows:
```shell
kubectl create -f https://raw.githubusercontent.com/kubernetes/kubernetes/release-1.3/examples/celery-rabbitmq/rabbitmq-service.yaml
```
```
service "rabbitmq-service" created
```
```shell
kubectl create -f https://raw.githubusercontent.com/kubernetes/kubernetes/release-1.3/examples/celery-rabbitmq/rabbitmq-controller.yaml
```
```
2017-10-10 20:15:21 +00:00
replicationcontroller "rabbitmq-controller" created
```
We will only use the rabbitmq part from the [celery-rabbitmq example](https://github.com/kubernetes/kubernetes/tree/release-1.3/examples/celery-rabbitmq).
## Testing the message queue service
Now, we can experiment with accessing the message queue. We will
create a temporary interactive pod, install some tools on it,
and experiment with queues.
First create a temporary interactive Pod.
```shell
# Create a temporary interactive container
kubectl run -i --tty temp --image ubuntu:18.04
```
```
Waiting for pod default/temp-loe07 to be running, status is Pending, pod ready: false
... [ previous line repeats several times .. hit return when it stops ] ...
```
Note that your pod name and command prompt will be different.
Next install the `amqp-tools` so we can work with message queues.
```shell
# Install some tools
root@temp-loe07:/# apt-get update
.... [ lots of output ] ....
root@temp-loe07:/# apt-get install -y curl ca-certificates amqp-tools python dnsutils
.... [ lots of output ] ....
```
Later, we will make a docker image that includes these packages.
Next, we will check that we can discover the rabbitmq service:
```
# Note the rabbitmq-service has a DNS name, provided by Kubernetes:
root@temp-loe07:/# nslookup rabbitmq-service
Server: 10.0.0.10
Address: 10.0.0.10#53
Name: rabbitmq-service.default.svc.cluster.local
Address: 10.0.147.152
# Your address will vary.
```
If Kube-DNS is not setup correctly, the previous step may not work for you.
You can also find the service IP in an env var:
```
# env | grep RABBIT | grep HOST
RABBITMQ_SERVICE_SERVICE_HOST=10.0.147.152
# Your address will vary.
```
Next we will verify we can create a queue, and publish and consume messages.
```shell
# In the next line, rabbitmq-service is the hostname where the rabbitmq-service
# can be reached. 5672 is the standard port for rabbitmq.
root@temp-loe07:/# export BROKER_URL=amqp://guest:guest@rabbitmq-service:5672
# If you could not resolve "rabbitmq-service" in the previous step,
# then use this command instead:
# root@temp-loe07:/# BROKER_URL=amqp://guest:guest@$RABBITMQ_SERVICE_SERVICE_HOST:5672
# Now create a queue:
root@temp-loe07:/# /usr/bin/amqp-declare-queue --url=$BROKER_URL -q foo -d
foo
# Publish one message to it:
root@temp-loe07:/# /usr/bin/amqp-publish --url=$BROKER_URL -r foo -p -b Hello
# And get it back.
root@temp-loe07:/# /usr/bin/amqp-consume --url=$BROKER_URL -q foo -c 1 cat && echo
Hello
root@temp-loe07:/#
```
In the last command, the `amqp-consume` tool takes one message (`-c 1`)
from the queue, and passes that message to the standard input of an arbitrary command. In this case, the program `cat` is just printing
out what it gets on the standard input, and the echo is just to add a carriage
return so the example is readable.
## Filling the Queue with tasks
2019-01-13 08:51:11 +00:00
Now let's fill the queue with some "tasks". In our example, our tasks are just strings to be
printed.
In a practice, the content of the messages might be:
- names of files to that need to be processed
- extra flags to the program
- ranges of keys in a database table
- configuration parameters to a simulation
- frame numbers of a scene to be rendered
In practice, if there is large data that is needed in a read-only mode by all pods
of the Job, you will typically put that in a shared file system like NFS and mount
that readonly on all the pods, or the program in the pod will natively read data from
a cluster file system like HDFS.
For our example, we will create the queue and fill it using the amqp command line tools.
In practice, you might write a program to fill the queue using an amqp client library.
```shell
/usr/bin/amqp-declare-queue --url=$BROKER_URL -q job1 -d
job1
```
```shell
for f in apple banana cherry date fig grape lemon melon
do
/usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f
done
```
So, we filled the queue with 8 messages.
## Create an Image
Now we are ready to create an image that we will run as a job.
We will use the `amqp-consume` utility to read the message
from the queue and run our actual program. Here is a very simple
example program:
{{< codenew language="python" file="application/job/rabbitmq/worker.py" >}}
Give the script execution permission:
```shell
chmod +x worker.py
```
Now, build an image. If you are working in the source
tree, then change directory to `examples/job/work-queue-1`.
Otherwise, make a temporary directory, change to it,
download the [Dockerfile](/examples/application/job/rabbitmq/Dockerfile),
and [worker.py](/examples/application/job/rabbitmq/worker.py). In either case,
build the image with this command:
```shell
docker build -t job-wq-1 .
```
For the [Docker Hub](https://hub.docker.com/), tag your app image with
your username and push to the Hub with the below commands. Replace
`<username>` with your Hub username.
```shell
docker tag job-wq-1 <username>/job-wq-1
docker push <username>/job-wq-1
```
If you are using [Google Container
Registry](https://cloud.google.com/tools/container-registry/), tag
your app image with your project ID, and push to GCR. Replace
`<project>` with your project ID.
```shell
docker tag job-wq-1 gcr.io/<project>/job-wq-1
gcloud docker -- push gcr.io/<project>/job-wq-1
```
## Defining a Job
Here is a job definition. You'll need to make a copy of the Job and edit the
image to match the name you used, and call it `./job.yaml`.
{{< codenew file="application/job/rabbitmq/job.yaml" >}}
In this example, each pod works on one item from the queue and then exits.
So, the completion count of the Job corresponds to the number of work items
done. So we set, `.spec.completions: 8` for the example, since we put 8 items in the queue.
## Running the Job
So, now run the Job:
```shell
Official 1.14 Release Docs (#13174) * Official documentation on Poseidon/Firmament, a new multi-scheduler support for K8S. (#11752) * Added documentation about Poseidon-Firmament scheduler * Fixed some style issues. * Udpated the document as per the review comments. * Fixed some typos and updated the document * Updated the document as per the review comments. * Document timeout attribute for kms-plugin. (#12158) See 72540. * Official documentation on Poseidon/Firmament, a new multi-scheduler (#12343) * Removed the old version of the Poseidon documentation. Incorrect location. * Official documentation on Poseidon/Firmament, a new multi-scheduler support for K8S (#12069) * Official documentation on Poseidon/Firmament, a new multi-scheduler support for K8S. (#11752) * Added documentation about Poseidon-Firmament scheduler * Fixed some style issues. * Udpated the document as per the review comments. * Fixed some typos and updated the document * Updated the document as per the review comments. * Updated the document as per review comments. Added config details. * Updated the document as per the latest review comments. Fixed nits * Made changes as per latest suggestions. * Some more changes added. * Updated as per suggestions. * Changed the release process section. * SIG Docs edits Small edits to match style guidelines. * add plus to feature state * capitalization * revert feature state shortcode since this is a Kubernetes extension, not a direct feature, it shouldn't use the regular feature state tagging. (cherry picked from commit 7730c1540b637be74b9b21d4128a145994eb19cc) * Remove initializers from doc. It will be removed in 1.14 (#12331) * kubeadm: Document CRI auto detection functionality (#12462) Signed-off-by: Rostislav M. Georgiev <rostislavg@vmware.com> * Minor doc change for GAing Pod DNS Config (#12514) * Graduate ExpandInUsePersistentVolumes feature to beta (#10574) * Rename 2018-11-07-grpc-load-balancing-with-linkerd.md.md file (#12594) * Add dynamic percentage of node scoring to user docs (#12235) * Add dynamic percentage of node scoring to user docs * addressed review comments * delete special symbol (#12445) * Update documentation for VolumeSubpathEnvExpansion (#11843) * Update documentation for VolumeSubpathEnvExpansion * Address comments - improve descriptions * Graduate Pod Priority and Preemption to GA (#12428) * Added Instana links to the documentation (#12977) * Added link to the Instana Kubernetes integration * Added Instana link for services section Added Instana and a link to the Kubernetes integration to the analytics services section and broadened the scope to APM, monitoring and analytics. * Oxford comma /flex * More Oxford commas, because they matter * Update kubectl plugins to stable (#12847) * documentation for CSI topology beta (#12889) * Document changes to default RBAC discovery ClusterRole(Binding)s (#12888) * Document changes to default RBAC discovery ClusterRole(Binding)s Documentation for https://github.com/kubernetes/enhancements/issues/789 and https://github.com/kubernetes/kubernetes/pull/73807 * documentation review feedback * CSI raw block to beta (#12931) * Change incorrect string raw to block (#12926) Fixes #12925 * Update documentation on node OS/arch labels (#12976) These labels have been promoted to GA: https://github.com/kubernetes/enhancements/issues/793 * local pv GA doc updates (#12915) * Publish CRD OpenAPI Documentation (#12910) * add documentation for CustomResourcePublishOpenAPI * address comments fix links, ordered lists, style and typo * kubeadm: add document for upgrading from 1.13 to 1.14 (single CP and HA) (#13189) * kubeadm: add document for upgrading from 1.13 to 1.14 - remove doc for upgrading 1.10 -> 1.11 * kubeadm: apply amends to upgrade-1.14 doc * kubeadm: apply amends to upgrade-1.14 doc (part2) * kubeadm: apply amends to upgrade-1.14 doc (part3) * kubeadm: add note about "upgrade node experimental-control-plane" + add comment about `upgrade plan` * kubeadm: add missing "You should see output similar to this" * fix bullet indentation (#13214) * mark PodReadinessGate GA (#12800) * Update RuntimeClass documentation for beta (#13043) * Update RuntimeClass documentation for beta * Update feature gate & add upgrade section * formatting fixes * Highlight upgrade action required * Address feedback * CSI ephemeral volume alpha documentation (#10934) * update kubectl documentation (#12867) * update kubectl documentation * add document for Secret/ConfigMap generators * replace `kubectl create -f` by `kubectl apply -f` * Add page for kustomization support in kubectl * fix spelling errors and address comments * Documentation for Windows GMSA feature (#12936) * Documentation for Windows GMSA feature Signed-off-by: Deep Debroy <ddebroy@docker.com> * Enhancements to GMSA docs Signed-off-by: Deep Debroy <ddebroy@docker.com> * Fix links Signed-off-by: Deep Debroy <ddebroy@docker.com> * Fix GMSA link Signed-off-by: Deep Debroy <ddebroy@docker.com> * Add GMSA feature flag in feature flag list Signed-off-by: Deep Debroy <ddebroy@docker.com> * Relocate GMSA to container configuration Signed-off-by: Deep Debroy <ddebroy@docker.com> * Add example for container spec Signed-off-by: Deep Debroy <ddebroy@docker.com> * Remove changes in Windows index Signed-off-by: Deep Debroy <ddebroy@docker.com> * Update configure-gmsa.md * Update configure-gmsa.md * Update configure-gmsa.md * Update configure-gmsa.md * Rearrange the steps into two sections and other edits Signed-off-by: Deep Debroy <ddebroy@docker.com> * Fix links Signed-off-by: Deep Debroy <ddebroy@docker.com> * Add reference to script to generate GMSA YAMLs Signed-off-by: Deep Debroy <ddebroy@docker.com> * Some more clarifications for GMSA Signed-off-by: Deep Debroy <ddebroy@docker.com> * HugePages graduated to GA (#13004) * HugePages graduated to GA * fixing nit for build * Docs for node PID limiting (https://github.com/kubernetes/kubernetes/pull/73651) (#12932) * kubeadm: update the reference documentation for 1.14 (#12911) * kubeadm: update list of generated files for 1.14 NOTE: PLACEHOLDERS! these files are generated by SIG Docs each release, but we need them to pass the k/website PR CI. - add join_phase* (new sub phases of join) - add init_phase_upload-certs.md (new upload certs phase for init) - remove alpha-preflight (now both init and join have this) * kubeadm: update reference docs includes for 1.14 - remove includes from alpha.md - add upload-certs to init-phase.md - add join-phase.md and it's phases * kubeadm: update the editorial content of join and init - cleanup master->control-plane node - add some notes about phases and join - remove table about pre-pulling images - remove outdated info about self-hosting * kubeadm: update target release for v1alpha3 removal 1.14 -> 1.15 * kubeadm: copy edits for 1.14 reference docs (part1) * kubeadm: use "shell" for code blocks * kubeadm: update the 1.14 HA guide (#13191) * kubeadm: update the 1.14 HA guide * kubeadm: try to fix note/caution indent in HA page * kubeadm: fix missing sudo and minor amends in HA doc * kubeadm: apply latest amends to the HA doc for 1.14 * fixed a few missed merge conflicts * Admission Webhook new features doc (#12938) - kubernetes/kubernetes#74998 - kubernetes/kubernetes#74477 - kubernetes/kubernetes#74562 * Clarifications and fixes in GMSA doc (#13226) * Clarifications and fixes in GMSA doc Signed-off-by: Deep Debroy <ddebroy@docker.com> * Update configure-gmsa.md * Reformat to align headings and pre-reqs better Signed-off-by: Deep Debroy <ddebroy@docker.com> * Reformat to align headings and pre-reqs better Signed-off-by: Deep Debroy <ddebroy@docker.com> * Reformat to fix bullets Signed-off-by: Deep Debroy <ddebroy@docker.com> * Reword application of sample gmsa Signed-off-by: Deep Debroy <ddebroy@docker.com> * Update configure-gmsa.md * Address feedback to use active voice Signed-off-by: Deep Debroy <ddebroy@docker.com> * Address feedback to use active voice Signed-off-by: Deep Debroy <ddebroy@docker.com> * RunAsGroup documentation for Progressing this to Beta (#12297) * start serverside-apply documentation (#13077) * start serverside-apply documentation * add more concept info on server side apply * Update api concepts * Update api-concepts.md * fix style issues * Document CSI update (#12928) * Document CSI update * Finish CSI documentation Also fix mistake with ExpandInUsePersistentVolumes documented as beta * Overall docs for CSI Migration feature (#12935) * Placeholder docs for CSI Migration feature Signed-off-by: Deep Debroy <ddebroy@docker.com> * Address CR comments and update feature gates Signed-off-by: Deep Debroy <ddebroy@docker.com> * Add mappings for CSI plugins Signed-off-by: Deep Debroy <ddebroy@docker.com> * Add sections for AWS and GCE PD migration Signed-off-by: Deep Debroy <ddebroy@docker.com> * Add docs for Cinder and CSI Migration info Signed-off-by: Deep Debroy <ddebroy@docker.com> * Clarify scope to volumes with file system Signed-off-by: Deep Debroy <ddebroy@docker.com> * Change the format of EBS and Cinder CSI Migration sections to follow the GCE template Signed-off-by: Deep Debroy <ddebroy@docker.com> * Windows documentation updates for 1.14 (#12929) * Updated the note to indicate doc work for 1.14 * first attempt at md export from gdoc * simplifyig * big attempt * moving DRAFT windows content to PR for review * moving content to PR in markdown for review * updated note tags * Delete windows-contributing.md deleting this file as it is already ported to the github contributor guide * fixed formatting in intro and cluster setup guide * updating formatting for running containers guide * rejiggered end of troubleshooting * fixed minor typos * Clarified the windows binary download step * Update _index.md making updates based on feedback * Update _index.md updating ovn-kubernetes docs * Update _index.md * Update _index.md * updating relative docs links updating all the links to be relative links to /docs * Update _index.md * Update _index.md updates for windows services and ovn-kubernetes * formatted for correct step numbering * fix typos * Update _index.md updates for flannel PR in troubleshooting * Update _index.md * Update _index.md updating a few sections like roadmap, services, troubleshooting/filing tickets * Update _index.md * Update _index.md * Update _index.md * Fixed a few whitespace issues * Update _index.md * Update _index.md * Update _index.md * add section on upgrading CoreDNS (#12909) * documentation for kubelet resource metrics endpoint (#12934) * windows docs updates for 1.14 (#13279) * Delete sample-l2bridge-wincni-config.json this file is not used anywhere * Update _index.md * Update _index.md * Update _index.md * Update _index.md * Update _index.md * Rename content/en/docs/getting-started-guides/windows/_index.md to content/en/docs/setup/windows/_index.md moving to new location * Delete flannel-master-kubectl-get-ds.png * Delete flannel-master-kubeclt-get-pods.png * Delete windows-docker-error.png * Add files via upload * Rename _index.md to add-windows-nodes.md * Create _index.md * Update _index.md * Update add-windows-nodes.md * Update add-windows-nodes.md * Create user-guide-windows-nodes.md * Create user-guide-windows-containers.md * Update and rename add-windows-nodes.md to intro-windows-nodes.md * Update user-guide-windows-containers.md * Rename intro-windows-nodes.md to intro-windows-in-kubernetes.md * Update user-guide-windows-nodes.md * Update user-guide-windows-containers.md * Update user-guide-windows-containers.md * Update user-guide-windows-nodes.md * Update user-guide-windows-containers.md * Update _index.md * Update intro-windows-in-kubernetes.md * Update intro-windows-in-kubernetes.md fixing the pause image * Update intro-windows-in-kubernetes.md changing tables from html to MD * Update user-guide-windows-nodes.md converting tables from HTML to MD * Update intro-windows-in-kubernetes.md * Update user-guide-windows-nodes.md * Update user-guide-windows-nodes.md * Update user-guide-windows-nodes.md updating the numbering , even though it messes up the notes a little bit. Jim will file a ticket to follow up * Update user-guide-windows-nodes.md * update to windows docs for 1.14 (#13322) * Update intro-windows-in-kubernetes.md * Update intro-windows-in-kubernetes.md * Update intro-windows-in-kubernetes.md * Update intro-windows-in-kubernetes.md * Update intro-windows-in-kubernetes.md * Update user-guide-windows-containers.md * Update user-guide-windows-nodes.md * Update intro-windows-in-kubernetes.md (#13344) * server side apply followup (#13321) * change some parts of serverside apply docs in response to comments * fix typos and wording * Update config.toml (#13365)
2019-03-25 22:06:16 +00:00
kubectl apply -f ./job.yaml
```
Now wait a bit, then check on the job.
```shell
kubectl describe jobs/job-wq-1
```
```
Name: job-wq-1
Namespace: default
Selector: controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
Labels: controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
job-name=job-wq-1
Annotations: <none>
Parallelism: 2
Completions: 8
Start Time: Wed, 06 Sep 2017 16:42:02 +0800
Pods Statuses: 0 Running / 8 Succeeded / 0 Failed
Pod Template:
Labels: controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
job-name=job-wq-1
Containers:
c:
Image: gcr.io/causal-jigsaw-637/job-wq-1
Port:
Environment:
BROKER_URL: amqp://guest:guest@rabbitmq-service:5672
QUEUE: job1
Mounts: <none>
Volumes: <none>
Events:
FirstSeen LastSeen Count From SubobjectPath Type Reason Message
───────── ──────── ───── ──── ───────────── ────── ────── ───────
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-hcobb
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-weytj
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-qaam5
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-b67sr
26s 26s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-xe5hj
15s 15s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-w2zqe
14s 14s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-d6ppa
14s 14s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-p17e0
```
All our pods succeeded. Yay.
2020-05-30 19:10:23 +00:00
<!-- discussion -->
## Alternatives
This approach has the advantage that you
do not need to modify your "worker" program to be aware that there is a work queue.
It does require that you run a message queue service.
If running a queue service is inconvenient, you may
want to consider one of the other [job patterns](/docs/concepts/jobs/run-to-completion-finite-workloads/#job-patterns).
This approach creates a pod for every work item. If your work items only take a few seconds,
though, creating a Pod for every work item may add a lot of overhead. Consider another
[example](/docs/tasks/job/fine-parallel-processing-work-queue/), that executes multiple work items per Pod.
In this example, we use the `amqp-consume` utility to read the message
from the queue and run our actual program. This has the advantage that you
do not need to modify your program to be aware of the queue.
A [different example](/docs/tasks/job/fine-parallel-processing-work-queue/), shows how to
communicate with the work queue using a client library.
## Caveats
If the number of completions is set to less than the number of items in the queue, then
not all items will be processed.
If the number of completions is set to more than the number of items in the queue,
then the Job will not appear to be completed, even though all items in the queue
have been processed. It will start additional pods which will block waiting
for a message.
There is an unlikely race with this pattern. If the container is killed in between the time
that the message is acknowledged by the amqp-consume command and the time that the container
exits with success, or if the node crashes before the kubelet is able to post the success of the pod
back to the api-server, then the Job will not appear to be complete, even though all items
in the queue have been processed.
2020-05-30 19:10:23 +00:00