From 72a3fb3c3719f15b056d5949ac08f6e4699faecf Mon Sep 17 00:00:00 2001 From: Eric Tune Date: Mon, 20 Jun 2016 07:59:19 -0700 Subject: [PATCH] Moved jobs examples to docs repo These are not considered "examples", per https://github.com/kubernetes/kubernetes/pull/17345#issuecomment-197153744 --- _data/guides.yml | 11 + docs/user-guide/jobs.md | 24 +- docs/user-guide/jobs/expansions/index.md | 192 ++++++++++++ docs/user-guide/jobs/expansions/job.yaml.txt | 18 ++ docs/user-guide/jobs/work-queue-1/Dockerfile | 10 + docs/user-guide/jobs/work-queue-1/index.md | 284 ++++++++++++++++++ docs/user-guide/jobs/work-queue-1/job.yaml | 15 + docs/user-guide/jobs/work-queue-1/worker.py | 7 + docs/user-guide/jobs/work-queue-2/Dockerfile | 6 + docs/user-guide/jobs/work-queue-2/index.md | 210 +++++++++++++ docs/user-guide/jobs/work-queue-2/job.yaml | 14 + .../jobs/work-queue-2/redis-pod.yaml | 15 + .../jobs/work-queue-2/redis-service.yaml | 10 + docs/user-guide/jobs/work-queue-2/rediswq.py | 130 ++++++++ docs/user-guide/jobs/work-queue-2/worker.py | 23 ++ 15 files changed, 957 insertions(+), 12 deletions(-) create mode 100644 docs/user-guide/jobs/expansions/index.md create mode 100644 docs/user-guide/jobs/expansions/job.yaml.txt create mode 100644 docs/user-guide/jobs/work-queue-1/Dockerfile create mode 100644 docs/user-guide/jobs/work-queue-1/index.md create mode 100644 docs/user-guide/jobs/work-queue-1/job.yaml create mode 100755 docs/user-guide/jobs/work-queue-1/worker.py create mode 100644 docs/user-guide/jobs/work-queue-2/Dockerfile create mode 100644 docs/user-guide/jobs/work-queue-2/index.md create mode 100644 docs/user-guide/jobs/work-queue-2/job.yaml create mode 100644 docs/user-guide/jobs/work-queue-2/redis-pod.yaml create mode 100644 docs/user-guide/jobs/work-queue-2/redis-service.yaml create mode 100644 docs/user-guide/jobs/work-queue-2/rediswq.py create mode 100755 docs/user-guide/jobs/work-queue-2/worker.py diff --git a/_data/guides.yml b/_data/guides.yml index f5654a6efd..8f4d76580f 100644 --- a/_data/guides.yml +++ b/_data/guides.yml @@ -59,6 +59,17 @@ toc: - title: Using kubectl to Manage Resources path: /docs/user-guide/working-with-resources/ +- title: Batch Jobs + section: + - title: Jobs + path: /docs/user-guide/jobs/ + - title: Parallel Processing using Expansions + path: /docs/user-guide/jobs/expansions/ + - title: Coarse Parallel Processing using a Work Queue + path: /docs/user-guide/jobs/work-queue-1/ + - title: Fine Parallel Processing using a Work Queue + path: /docs/user-guide/jobs/work-queue-2/ + - title: Service Discovery and Load Balancing section: - title: Connecting Applications with Services diff --git a/docs/user-guide/jobs.md b/docs/user-guide/jobs.md index 9acde65d0a..0d60220873 100644 --- a/docs/user-guide/jobs.md +++ b/docs/user-guide/jobs.md @@ -249,12 +249,12 @@ The tradeoffs are: The tradeoffs are summarized here, with columns 2 to 4 corresponding to the above tradeoffs. The pattern names are also links to examples and more detailed description. -| Pattern | Single Job object | Fewer pods than work items? | Use app unmodified? | Works in Kube 1.1? | -| -------------------------------------------------------------------------- |:-----------------:|:---------------------------:|:-------------------:|:-------------------:| -| [Job Template Expansion](https://github.com/kubernetes/kubernetes/tree/{{page.githubbranch}}/examples/job/expansions/README.md) | | | ✓ | ✓ | -| [Queue with Pod Per Work Item](https://github.com/kubernetes/kubernetes/tree/{{page.githubbranch}}/examples/job/work-queue-1/README.md) | ✓ | | sometimes | ✓ | -| [Queue with Variable Pod Count](https://github.com/kubernetes/kubernetes/tree/{{page.githubbranch}}/examples/job/work-queue-2/README.md) | ✓ | ✓ | | ✓ | -| Single Job with Static Work Assignment | ✓ | | ✓ | | +| Pattern | Single Job object | Fewer pods than work items? | Use app unmodified? | Works in Kube 1.1? | +| -------------------------------------------------------------------- |:-----------------:|:---------------------------:|:-------------------:|:-------------------:| +| [Job Template Expansion](/docs/user-guide/job/expansions) | | | ✓ | ✓ | +| [Queue with Pod Per Work Item](/docs/user-guide/job/work-queue-1/) | ✓ | | sometimes | ✓ | +| [Queue with Variable Pod Count](/docs/user-guide/job/work-queue-2/) | ✓ | ✓ | | ✓ | +| Single Job with Static Work Assignment | ✓ | | ✓ | | When you specify completions with `.spec.completions`, each Pod created by the Job controller has an identical [`spec`](https://github.com/kubernetes/kubernetes/tree/{{page.githubbranch}}/docs/devel/api-conventions.md#spec-and-status). This means that @@ -265,12 +265,12 @@ are different ways to arrange for pods to work on different things. This table shows the required settings for `.spec.parallelism` and `.spec.completions` for each of the patterns. Here, `W` is the number of work items. -| Pattern | `.spec.completions` | `.spec.parallelism` | -| -------------------------------------------------------------------------- |:-------------------:|:--------------------:| -| [Job Template Expansion](https://github.com/kubernetes/kubernetes/tree/{{page.githubbranch}}/examples/job/expansions/README.md) | 1 | should be 1 | -| [Queue with Pod Per Work Item](https://github.com/kubernetes/kubernetes/tree/{{page.githubbranch}}/examples/job/work-queue-1/README.md) | W | any | -| [Queue with Variable Pod Count](https://github.com/kubernetes/kubernetes/tree/{{page.githubbranch}}/examples/job/work-queue-2/README.md) | 1 | any | -| Single Job with Static Work Assignment | W | any | +| Pattern | `.spec.completions` | `.spec.parallelism` | +| -------------------------------------------------------------------- |:-------------------:|:--------------------:| +| [Job Template Expansion](/docs/user-guide/job/expansions/) | 1 | should be 1 | +| [Queue with Pod Per Work Item](/docs/user-guide/job/work-queue-1/) | W | any | +| [Queue with Variable Pod Count](/docs/user-guide/job/work-queue-2/) | 1 | any | +| Single Job with Static Work Assignment | W | any | ## Advanced Usage diff --git a/docs/user-guide/jobs/expansions/index.md b/docs/user-guide/jobs/expansions/index.md new file mode 100644 index 0000000000..892dff0df3 --- /dev/null +++ b/docs/user-guide/jobs/expansions/index.md @@ -0,0 +1,192 @@ +--- --- + +* TOC +{:toc} + +# Example: Multiple Job Objects from Template Expansion + +In this example, we will run multiple Kubernetes Jobs created from +a common template. You may want to be familiar with the basic, +non-parallel, use of [Jobs](/docs/user-guide/jobs) first. + +## Basic Template Expansion + +First, download the following template of a job to a file called `job.yaml.txt` + +{% include code.html language="yaml" file="job.yaml.txt" ghlink="/docs/user-guide/job/expansions/job.yaml.txt" %} + +Unlike a *pod template*, our *job template* is not a Kubernetes API type. It is just +a yaml representation of a Job object that has some placeholders that need to be filled +in before it can be used. The `$ITEM` syntax is not meaningful to Kubernetes. + +In this example, the only processing the container does is to `echo` a string and sleep for a bit. +In a real use case, the processing would be some substantial computation, such as rendering a frame +of a movie, or processing a range of rows in a database. The "$ITEM" parameter would specify for +example, the frame number or the row range. + +This Job and its Pod template have a label: `jobgroup=jobexample`. There is nothing special +to the system about this label. This label +makes it convenient to operate on all the jobs in this group at once. +We also put the same label on the pod template so that we can check on all Pods of these Jobs +with a single command. +After the job is created, the system will add more labels that distinguish one Job's pods +from another Job's pods. +Note that the label key `jobgroup` is not special to Kubernetes. you can pick your own label scheme. + +Next, expand the template into multiple files, one for each item to be processed. + +```shell +# Expand files into a temporary directory +mkdir ./jobs +for i in apple banana cherry +do + cat job.yaml.txt | sed "s/\$ITEM/$i/" > ./jobs/job-$i.yaml +done +``` + +Check if it worked: + +```shell +$ ls jobs/ +job-apple.yaml +job-banana.yaml +job-cherry.yaml +``` + +Here, we used `sed` to replace the string `$ITEM` with the the loop variable. +You could use any type of template language (jinja2, erb) or write a program +to generate the Job objects. + +Next, create all the jobs with one kubectl command: + +```shell +$ kubectl create -f ./jobs +job "process-item-apple" created +job "process-item-banana" created +job "process-item-cherry" created +``` + +Now, check on the jobs: + +```shell +$ kubectl get jobs -l app=jobexample +JOB CONTAINER(S) IMAGE(S) SELECTOR SUCCESSFUL +process-item-apple c busybox app in (jobexample),item in (apple) 1 +process-item-banana c busybox app in (jobexample),item in (banana) 1 +process-item-cherry c busybox app in (jobexample),item in (cherry) 1 +``` + +Here we use the `-l` option to select all jobs that are part of this +group of jobs. (There might be other unrelated jobs in the system that we +do not care to see.) + +We can check on the pods as well using the same label selector: + +```shell +$ kubectl get pods -l app=jobexample +NAME READY STATUS RESTARTS AGE +process-item-apple-kixwv 0/1 Completed 0 4m +process-item-banana-wrsf7 0/1 Completed 0 4m +process-item-cherry-dnfu9 0/1 Completed 0 4m +``` + +There is not a single command to check on the output of all jobs at once, +but looping over all the pods is pretty easy: + +```shell +$ for p in $(kubectl get pods -l app=jobexample -o name) +do + kubectl logs $p +done +Processing item apple +Processing item banana +Processing item cherry +``` + +## Multiple Template Parameters + +In the first example, each instance of the template had one parameter, and that parameter was also +used as a label. However label keys are limited in [what characters they can +contain](docs/user-guide/labels/#syntax-and-character-set). + +This slightly more complex example uses a the jinja2 template language to generate our objects. +We will use a one-line python script to convert the template to a file. + +First, copy and paste the following template of a Job object, into a file called `job.yaml.jinja2`: + + +```liquid{% raw %} +{%- set params = [{ "name": "apple", "url": "http://www.orangepippin.com/apples", }, + { "name": "banana", "url": "https://en.wikipedia.org/wiki/Banana", }, + { "name": "raspberry", "url": "https://www.raspberrypi.org/" }] +%} +{%- for p in params %} +{%- set name = p["name"] %} +{%- set url = p["url"] %} +apiVersion: batch/v1 +kind: Job +metadata: + name: jobexample-{{ {{ name }} }} + labels: + jobgroup: jobexample +spec: + template: + name: jobexample + labels: + jobgroup: jobexample + spec: + containers: + - name: c + image: busybox + command: ["sh", "-c", "echo Processing URL {{ url }} && sleep 5"] + restartPolicy: Never +--- +{%- endfor %} +{% endraw %} +``` + +The above template defines parameters for each job object using a list of +python dicts (lines 1-4). Then a for loop emits one job yaml object +for each set of parameters (remaining lines). +We take advantage of the fact that multiple yaml documents can be concatenated +with the `---` separator (second to last line). +.) We can pipe the output directly to kubectl to +create the objects. + +You will need the jinja2 package if you do not already have it: `pip install --user jinja2`. +Now, use this one-line python program to expand the template: + +```shell +alias render_template='python -c "from jinja2 import Template; import sys; print(Template(sys.stdin.read()).render());"' +``` + + + +The output can be saved to a file, like this: + +```shell +cat job.yaml.jinja2 | render_template > jobs.yaml +``` + +or sent directly to kubectl, like this: + +```shell +cat job.yaml.jinja2 | render_template | kubectl create -f - +``` + +## Alternatives + +If you have a large number of job objects, you may find that: +- even using labels, managing so many Job objects is cumbersome. +- You exceed resource quota when creating all the Jobs at once, + and do not want to wait to create them incrementally. +- You need a way to easily scale the number of pods running + concurrently. One reason would be to avoid using too many + compute resources. Another would be to limit the number of + concurrent requests to a shared resource, such as a database, + used by all the pods in the job. +- very large numbers of jobs created at once overload the + kubernetes apiserver, controller, or scheduler. + +In this case, you can consider one of the +other [job patterns](/docs/user-guide/jobs/#job-patterns). diff --git a/docs/user-guide/jobs/expansions/job.yaml.txt b/docs/user-guide/jobs/expansions/job.yaml.txt new file mode 100644 index 0000000000..790025b38b --- /dev/null +++ b/docs/user-guide/jobs/expansions/job.yaml.txt @@ -0,0 +1,18 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: process-item-$ITEM + labels: + jobgroup: jobexample +spec: + template: + metadata: + name: jobexample + labels: + jobgroup: jobexample + spec: + containers: + - name: c + image: busybox + command: ["sh", "-c", "echo Processing item $ITEM && sleep 5"] + restartPolicy: Never diff --git a/docs/user-guide/jobs/work-queue-1/Dockerfile b/docs/user-guide/jobs/work-queue-1/Dockerfile new file mode 100644 index 0000000000..cbd36bb620 --- /dev/null +++ b/docs/user-guide/jobs/work-queue-1/Dockerfile @@ -0,0 +1,10 @@ +# Specify BROKER_URL and QUEUE when running +FROM ubuntu:14.04 + +RUN apt-get update && \ + apt-get install -y curl ca-certificates amqp-tools python \ + --no-install-recommends \ + && rm -rf /var/lib/apt/lists/* +COPY ./worker.py /worker.py + +CMD /usr/bin/amqp-consume --url=$BROKER_URL -q $QUEUE -c 1 /worker.py diff --git a/docs/user-guide/jobs/work-queue-1/index.md b/docs/user-guide/jobs/work-queue-1/index.md new file mode 100644 index 0000000000..7f2ba0b6ac --- /dev/null +++ b/docs/user-guide/jobs/work-queue-1/index.md @@ -0,0 +1,284 @@ +--- +--- + +* TOC +{:toc} + +# Example: Job with Work Queue with Pod Per Work Item + +In this example, we will run a Kubernetes Job with multiple parallel +worker processes. You may want to be familiar with the basic, +non-parallel, use of [Job](/docs/user-guide/jobs) first. + +In this example, as each pod is created, it picks up one unit of work +from a task queue, completes it, deletes it from the queue, and exits. + + +Here is an overview of the steps in this example: + +1. **Start a message queue service.** In this example, we use RabbitMQ, but you could use another + one. In practice you would set up a message queue service once and reuse it for many jobs. +1. **Create a queue, and fill it with messages.** Each message represents one task to be done. In + this example, a message is just an integer that we will do a lengthy computation on. +1. **Start a Job that works on tasks from the queue**. The Job starts several pods. Each pod takes + one task from the message queue, processes it, and repeats until the end of the queue is reached. + +## Starting a message queue service + +This example uses RabbitMQ, but it should be easy to adapt to another AMQP-type message service. + +In practice you could set up a message queue service once in a +cluster and reuse it for many jobs, as well as for long-running services. + +Start RabbitMQ as follows: + +```shell +$ kubectl create -f examples/celery-rabbitmq/rabbitmq-service.yaml +service "rabbitmq-service" created +$ kubectl create -f examples/celery-rabbitmq/rabbitmq-controller.yaml +replicationController "rabbitmq-controller" created +``` + +We will only use the rabbitmq part from the celery-rabbitmq example. + +## Testing the message queue service + +Now, we can experiment with accessing the message queue. We will +create a temporary interactive pod, install some tools on it, +and experiment with queues. + +First create a temporary interactive Pod. + +```shell +# Create a temporary interactive container +$ kubectl run -i --tty temp --image ubuntu:14.04 +Waiting for pod default/temp-loe07 to be running, status is Pending, pod ready: false +... [ previous line repeats several times .. hit return when it stops ] ... +``` + +Note that your pod name and command prompt will be different. + +Next install the `amqp-tools` so we can work with message queues. + +```shell +# Install some tools +root@temp-loe07:/# apt-get update +.... [ lots of output ] .... +root@temp-loe07:/# apt-get install -y curl ca-certificates amqp-tools python dnsutils +.... [ lots of output ] .... +``` + +Later, we will make a docker image that includes these packages. + +Next, we will check that we can discover the rabbitmq service: + +``` +# Note the rabitmq-service has a DNS name, provided by Kubernetes: + +root@temp-loe07:/# nslookup rabbitmq-service +Server: 10.0.0.10 +Address: 10.0.0.10#53 + +Name: rabbitmq-service.default.svc.cluster.local +Address: 10.0.147.152 + +# Your address will vary. +``` + +If Kube-DNS is not setup correctly, the previous step may not work for you. +You can also find the service IP in an env var: + +``` +# env | grep RABBIT | grep HOST +RABBITMQ_SERVICE_SERVICE_HOST=10.0.147.152 +# Your address will vary. +``` + +Next we will verify we can create a queue, and publish and consume messages. + +```shell +# In the next line, rabbitmq-service is the hostname where the rabbitmq-service +# can be reached. 5672 is the standard port for rabbitmq. + +root@temp-loe07:/# BROKER_URL=amqp://guest:guest@rabbitmq-service:5672 +# If you could not resolve "rabbitmq-service" in the previous step, +# then use this command instead: +# root@temp-loe07:/# BROKER_URL=amqp://guest:guest@$RABBITMQ_SERVICE_SERVICE_HOST:5672 + +# Now create a queue: + +root@temp-loe07:/# /usr/bin/amqp-declare-queue --url=$BROKER_URL -q foo -d +foo + +# Publish one message to it: + +root@temp-loe07:/# /usr/bin/amqp-publish --url=$BROKER_URL -r foo -p -b Hello + +# And get it back. + +root@temp-loe07:/# /usr/bin/amqp-consume --url=$BROKER_URL -q foo -c 1 cat && echo +Hello +root@temp-loe07:/# +``` + +In the last command, the `amqp-consume` tool takes one message (`-c 1`) +from the queue, and passes that message to the standard input of an +an arbitrary command. In this case, the program `cat` is just printing +out what it gets on the standard input, and the echo is just to add a carriage +return so the example is readable. + +## Filling the Queue with tasks + +Now lets fill the queue with some "tasks". In our example, our tasks are just strings to be +printed. + +In a practice, the content of the messages might be: + +- names of files to that need to be processed +- extra flags to the program +- ranges of keys in a database table +- configuration parameters to a simulation +- frame numbers of a scene to be rendered + +In practice, if there is large data that is needed in a read-only mode by all pods +of the Job, you will typically put that in a shared file system like NFS and mount +that readonly on all the pods, or the program in the pod will natively read data from +a cluster file system like HDFS. + +For our example, we will create the queue and fill it using the amqp command line tools. +In practice, you might write a program to fill the queue using an amqp client library. + +```shell +$ /usr/bin/amqp-declare-queue --url=$BROKER_URL -q job1 -d +job1 +$ for f in apple banana cherry date fig grape lemon melon +do + /usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f +done +``` + +So, we filled the queue with 8 messages. + +## Create an Image + +Now we are ready to create an image that we will run as a job. + +We will use the `amqp-consume` utility to read the message +from the queue and run our actual program. Here is a very simple +example program: + +{% include code.html language="python" file="worker.py" ghlink="/docs/user-guide/job/work-queue-1/worker.py" %} + +Now, build an an image. If you are working in the source +tree, then change directory to `examples/job/work-queue-1`. +Otherwise, make a temporary directory, change to it, +download the [Dockerfile](Dockerfile?raw=true), +and [worker.py](worker.py?raw=true). In either case, +build the image with this command: ` + +```shell +$ docker build -t job-wq-1 . +``` + +For the [Docker Hub](https://hub.docker.com/), tag your app image with +your username and push to the Hub with the below commands. Replace +`` with your Hub username. + +```shell +docker tag job-wq-1 /job-wq-1 +docker push /job-wq-1 +``` + +If you are using [Google Container +Registry](https://cloud.google.com/tools/container-registry/), tag +your app image with your project ID, and push to GCR. Replace +`` with your project ID. + +```shell +docker tag job-wq-1 gcr.io//job-wq-1 +gcloud docker push gcr.io//job-wq-1 +``` + +## Defining a Job + +Here is a job definition. You'll need to make a copy of the Job and edit the +image to match the name you used, and call it `./job.yaml`. + + +{% include code.html language="yaml" file="job.yaml" ghlink="/docs/user-guide/job/work-queue-1/job.yaml" %} + +In this example, each pod works on one item from the queue and then exits. +So, the completion count of the Job corresponds to the number of work items +done. So we set, `.spec.completions: 8` for the example, since we put 8 items in the queue. + +## Running the Job + +So, now run the Job: + +```shell +kubectl create -f ./job.yaml +``` + +Now wait a bit, then check on the job. + +```shell +$ kubectl describe jobs/job-wq-1 +Name: job-wq-1 +Namespace: default +Image(s): gcr.io/causal-jigsaw-637/job-wq-1 +Selector: app in (job-wq-1) +Parallelism: 4 +Completions: 8 +Labels: app=job-wq-1 +Pods Statuses: 0 Running / 8 Succeeded / 0 Failed +No volumes. +Events: + FirstSeen LastSeen Count From SubobjectPath Reason Message + ───────── ──────── ───── ──── ───────────── ────── ─────── + 27s 27s 1 {job } SuccessfulCreate Created pod: job-wq-1-hcobb + 27s 27s 1 {job } SuccessfulCreate Created pod: job-wq-1-weytj + 27s 27s 1 {job } SuccessfulCreate Created pod: job-wq-1-qaam5 + 27s 27s 1 {job } SuccessfulCreate Created pod: job-wq-1-b67sr + 26s 26s 1 {job } SuccessfulCreate Created pod: job-wq-1-xe5hj + 15s 15s 1 {job } SuccessfulCreate Created pod: job-wq-1-w2zqe + 14s 14s 1 {job } SuccessfulCreate Created pod: job-wq-1-d6ppa + 14s 14s 1 {job } SuccessfulCreate Created pod: job-wq-1-p17e0 +``` + +All our pods succeeded. Yay. + + +## Alternatives + +This approach has the advantage that you +do not need to modify your "worker" program to be aware that there is a work queue. + +It does require that you run a message queue service. +If running a queue service is inconvenient, you may +want to consider one of the other [job patterns](/docs/user-guide/jobs/#job-patterns). + +This approach creates a pod for every work item. If your work items only take a few seconds, +though, creating a Pod for every work item may add a lot of overhead. Consider another +[example](/docs/user-guide/job/work-queue-2), that executes multiple work items per Pod. + +In this example, we used use the `amqp-consume` utility to read the message +from the queue and run our actual program. This has the advantage that you +do not need to modify your program to be aware of the queue. +A [different example](/docs/user-guide/job/work-queue-2), shows how to +communicate with the work queue using a client library. + +## Caveats + +If the number of completions is set to less than the number of items in the queue, then +not all items will be processed. + +If the number of completions is set to more than the number of items in the queue, +then the Job will not appear to be completed, even though all items in the queue +have been processed. It will start additional pods which will block waiting +for a mesage. + +There is an unlikely race with this pattern. If the container is killed in between the time +that the message is acknowledged by the amqp-consume command and the time that the container +exits with success, or if the node crashes before the kubelet is able to post the success of the pod +back to the api-server, then the Job will not appear to be complete, even though all items +in the queue have been processed. diff --git a/docs/user-guide/jobs/work-queue-1/job.yaml b/docs/user-guide/jobs/work-queue-1/job.yaml new file mode 100644 index 0000000000..d2696ed022 --- /dev/null +++ b/docs/user-guide/jobs/work-queue-1/job.yaml @@ -0,0 +1,15 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: job-wq-1 +spec: + completions: 8 + parallelism: 2 + template: + metadata: + name: job-wq-1 + spec: + containers: + - name: c + image: gcr.io//job-wq-1 + restartPolicy: OnFailure diff --git a/docs/user-guide/jobs/work-queue-1/worker.py b/docs/user-guide/jobs/work-queue-1/worker.py new file mode 100755 index 0000000000..a20884515d --- /dev/null +++ b/docs/user-guide/jobs/work-queue-1/worker.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python + +# Just prints standard out and sleeps for 10 seconds. +import sys +import time +print("Processing " + sys.stdin.lines()) +time.sleep(10) diff --git a/docs/user-guide/jobs/work-queue-2/Dockerfile b/docs/user-guide/jobs/work-queue-2/Dockerfile new file mode 100644 index 0000000000..2de23b3c98 --- /dev/null +++ b/docs/user-guide/jobs/work-queue-2/Dockerfile @@ -0,0 +1,6 @@ +FROM python +RUN pip install redis +COPY ./worker.py /worker.py +COPY ./rediswq.py /rediswq.py + +CMD python worker.py diff --git a/docs/user-guide/jobs/work-queue-2/index.md b/docs/user-guide/jobs/work-queue-2/index.md new file mode 100644 index 0000000000..434859093b --- /dev/null +++ b/docs/user-guide/jobs/work-queue-2/index.md @@ -0,0 +1,210 @@ +--- +--- + +* TOC +{:toc} + +# Example: Job with Work Queue with Pod Per Work Item + +In this example, we will run a Kubernetes Job with multiple parallel +worker processes. You may want to be familiar with the basic, +non-parallel, use of [Job](/docs/user-guide/jobs) first. + +In this example, as each pod is created, it picks up one unit of work +from a task queue, completes it, deletes it from the queue, and exits. + + +Here is an overview of the steps in this example: + +1. **Start a storage service to hold the work queue.** In this example, we use Redis to store + our work items. In the previous example, we used RabbitMQ. In this example, we use Redis and + a custom work-queue client library because AMQP does not provide a good way for clients to + detect when a finite-length work queue is empty. In practice you would set up a store such + as Redis once and reuse it for the work queues of many jobs, and other things. +1. **Create a queue, and fill it with messages.** Each message represents one task to be done. In + this example, a message is just an integer that we will do a lengthy computation on. +1. **Start a Job that works on tasks from the queue**. The Job starts several pods. Each pod takes + one task from the message queue, processes it, and repeats until the end of the queue is reached. + + +## Starting Redis + +For this example, for simplicitly, we will start a single instance of Redis. +See the [Redis Example](https://github.com/kubernetes/kubernetes/tree/{{page.githubbranch}}/examples/redis/README.md) for an example +of deploying Redis scaleably and redundantly. + +Start a temporary Pod running Redis and a service so we can find it. + +```shell +$ kubectl create -f examples/job/work-queue-2/redis-pod.yaml +pod "redis-master" created +$ kubectl create -f examples/job/work-queue-2/redis-service.yaml +service "redis" created +``` + +## Filling the Queue with tasks + +Now lets fill the queue with some "tasks". In our example, our tasks are just strings to be +printed. + +Start a temporary interactive pod for running the Redis CLI + +```shell +$ kubectl run -i --tty temp --image redis --command "/bin/sh" +Waiting for pod default/redis2-c7h78 to be running, status is Pending, pod ready: false +Hit enter for command prompt +``` + +Now hit enter, start the redis CLI, and create a list with some work items in it. + +``` +# redis-cli -h redis +redis:6379> rpush job2 "apple" +(integer) 1 +redis:6379> rpush job2 "banana" +(integer) 2 +redis:6379> rpush job2 "cherry" +(integer) 3 +redis:6379> rpush job2 "date" +(integer) 4 +redis:6379> rpush job2 "fig" +(integer) 5 +redis:6379> rpush job2 "grape" +(integer) 6 +redis:6379> rpush job2 "lemon" +(integer) 7 +redis:6379> rpush job2 "melon" +(integer) 8 +redis:6379> rpush job2 "orange" +(integer) 9 +redis:6379> lrange job2 0 -1 +1) "apple" +2) "banana" +3) "cherry" +4) "date" +5) "fig" +6) "grape" +7) "lemon" +8) "melon" +9) "orange" +``` + +So, the list with key `job2` will be our work queue. + +Note: if you do not have Kube DNS setup correctly, you may need to change +the first step of the above block to `redis-cli -h $REDIS_SERVICE_HOST`. + + +## Create an Image + +Now we are ready to create an image that we will run. + +We will use a python worker program with a redis client to read +the messages from the message queue. + +A simple Redis work queue client library is provided, +called rediswq.py ([Download](rediswq.py?raw=true)). + +The "worker" program in each Pod of the Job uses the work queue +client library to get work. Here it is: + +{% include code.html language="python" file="worker.py" ghlink="/docs/user-guide/job/work-queue-2/worker.py" %} + +If you are working from the source tree, +change directory to the `examples/job/work-queue-2` directory. +Otherwise, download [`worker.py`](worker.py?raw=true), [`rediswq.py`](rediswq.py?raw=true), and [`Dockerfile`](Dockerfile?raw=true) +using above links. Then build the image: + +```shell +docker build -t job-wq-2 . +``` + +### Push the image + +For the [Docker Hub](https://hub.docker.com/), tag your app image with +your username and push to the Hub with the below commands. Replace +`` with your Hub username. + +```shell +docker tag job-wq-2 /job-wq-2 +docker push /job-wq-2 +``` + +You need to push to a public repository or [configure your cluster to be able to access +your private repository](/docs/user-guide/images). + +If you are using [Google Container +Registry](https://cloud.google.com/tools/container-registry/), tag +your app image with your project ID, and push to GCR. Replace +`` with your project ID. + +```shell +docker tag job-wq-2 gcr.io//job-wq-2 +gcloud docker push gcr.io//job-wq-2 +``` + +## Defining a Job + +Here is the job definition: + +{% include code.html language="yaml" file="job.yaml" ghlink="/docs/user-guide/job/work-queue-2/job.yaml" %} + +Be sure to edit the job template to +change `gcr.io/myproject` to your own path. + +In this example, each pod works on several items from the queue and then exits when there are no more items. +Since the workers themselves detect when the workqueue is empty, and the Job controller does not +know about the workqueue, it relies on the workers to signal when they are done working. +The workers signal that the queue is empty by exiting with success. So, as soon as any worker +exits with success, the controller knows the work is done, and the Pods will exit soon. +So, we set the completion count of the Job to 1. The job controller will wait for the other pods to complete +too. + + +## Running the Job + +So, now run the Job: + +```shell +kubectl create -f ./job.yaml +``` + +Now wait a bit, then check on the job. + +```shell +$ kubectl describe jobs/job-wq-2 +Name: job-wq-2 +Namespace: default +Image(s): gcr.io/exampleproject/job-wq-2 +Selector: app in (job-wq-2) +Parallelism: 2 +Completions: Unset +Start Time: Mon, 11 Jan 2016 17:07:59 -0800 +Labels: app=job-wq-2 +Pods Statuses: 1 Running / 0 Succeeded / 0 Failed +No volumes. +Events: + FirstSeen LastSeen Count From SubobjectPath Type Reason Message + --------- -------- ----- ---- ------------- -------- ------ ------- + 33s 33s 1 {job-controller } Normal SuccessfulCreate Created pod: job-wq-2-lglf8 + + +$ kubectl logs pods/job-wq-2-7r7b2 +Worker with sessionID: bbd72d0a-9e5c-4dd6-abf6-416cc267991f +Initial queue state: empty=False +Working on banana +Working on date +Working on lemon +``` + +As you can see, one of our pods worked on several work units. + +## Alternatives + +If running a queue service or modifying your containers to use a work queue is inconvenient, you may +want to consider one of the other [job patterns](/docs/user-guide/jobs/#job-patterns). + +If you have a continuous stream of background processing work to run, then +consider running your background workers with a `replicationController` instead, +and consider running a background processing library such as +https://github.com/resque/resque. diff --git a/docs/user-guide/jobs/work-queue-2/job.yaml b/docs/user-guide/jobs/work-queue-2/job.yaml new file mode 100644 index 0000000000..ee7a06c732 --- /dev/null +++ b/docs/user-guide/jobs/work-queue-2/job.yaml @@ -0,0 +1,14 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: job-wq-2 +spec: + parallelism: 2 + template: + metadata: + name: job-wq-2 + spec: + containers: + - name: c + image: gcr.io/myproject/job-wq-2 + restartPolicy: OnFailure diff --git a/docs/user-guide/jobs/work-queue-2/redis-pod.yaml b/docs/user-guide/jobs/work-queue-2/redis-pod.yaml new file mode 100644 index 0000000000..ae0c43a793 --- /dev/null +++ b/docs/user-guide/jobs/work-queue-2/redis-pod.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Pod +metadata: + name: redis-master + labels: + app: redis +spec: + containers: + - name: master + image: redis + env: + - name: MASTER + value: "true" + ports: + - containerPort: 6379 diff --git a/docs/user-guide/jobs/work-queue-2/redis-service.yaml b/docs/user-guide/jobs/work-queue-2/redis-service.yaml new file mode 100644 index 0000000000..85f2ca2271 --- /dev/null +++ b/docs/user-guide/jobs/work-queue-2/redis-service.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: Service +metadata: + name: redis +spec: + ports: + - port: 6379 + targetPort: 6379 + selector: + app: redis diff --git a/docs/user-guide/jobs/work-queue-2/rediswq.py b/docs/user-guide/jobs/work-queue-2/rediswq.py new file mode 100644 index 0000000000..ebefa64311 --- /dev/null +++ b/docs/user-guide/jobs/work-queue-2/rediswq.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python + +# Based on http://peter-hoffmann.com/2012/python-simple-queue-redis-queue.html +# and the suggestion in the redis documentation for RPOPLPUSH, at +# http://redis.io/commands/rpoplpush, which suggests how to implement a work-queue. + + +import redis +import uuid +import hashlib + +class RedisWQ(object): + """Simple Finite Work Queue with Redis Backend + + This work queue is finite: as long as no more work is added + after workers start, the workers can detect when the queue + is completely empty. + + The items in the work queue are assumed to have unique values. + + This object is not intended to be used by multiple threads + concurrently. + """ + def __init__(self, name, **redis_kwargs): + """The default connection parameters are: host='localhost', port=6379, db=0 + + The work queue is identified by "name". The library may create other + keys with "name" as a prefix. + """ + self._db = redis.StrictRedis(**redis_kwargs) + # The session ID will uniquely identify this "worker". + self._session = str(uuid.uuid4()) + # Work queue is implemented as two queues: main, and processing. + # Work is initially in main, and moved to processing when a client picks it up. + self._main_q_key = name + self._processing_q_key = name + ":processing" + self._lease_key_prefix = name + ":leased_by_session:" + + def sessionID(self): + """Return the ID for this session.""" + return self._session + + def _main_qsize(self): + """Return the size of the main queue.""" + return self._db.llen(self._main_q_key) + + def _processing_qsize(self): + """Return the size of the main queue.""" + return self._db.llen(self._processing_q_key) + + def empty(self): + """Return True if the queue is empty, including work being done, False otherwise. + + False does not necessarily mean that there is work available to work on right now, + """ + return self._main_qsize() == 0 and self._processing_qsize() == 0 + +# TODO: implement this +# def check_expired_leases(self): +# """Return to the work queueReturn True if the queue is empty, False otherwise.""" +# # Processing list should not be _too_ long since it is approximately as long +# # as the number of active and recently active workers. +# processing = self._db.lrange(self._processing_q_key, 0, -1) +# for item in processing: +# # If the lease key is not present for an item (it expired or was +# # never created because the client crashed before creating it) +# # then move the item back to the main queue so others can work on it. +# if not self._lease_exists(item): +# TODO: transactionally move the key from processing queue to +# to main queue, while detecting if a new lease is created +# or if either queue is modified. + + def _itemkey(self, item): + """Returns a string that uniquely identifies an item (bytes).""" + return hashlib.sha224(item).hexdigest() + + def _lease_exists(self, item): + """True if a lease on 'item' exists.""" + return self._db.exists(self._lease_key_prefix + self._itemkey(item)) + + def lease(self, lease_secs=60, block=True, timeout=None): + """Begin working on an item the work queue. + + Lease the item for lease_secs. After that time, other + workers may consider this client to have crashed or stalled + and pick up the item instead. + + If optional args block is true and timeout is None (the default), block + if necessary until an item is available.""" + if block: + item = self._db.brpoplpush(self._main_q_key, self._processing_q_key, timeout=timeout) + else: + item = self._db.rpoplpush(self._main_q_key, self._processing_q_key) + if item: + # Record that we (this session id) are working on a key. Expire that + # note after the lease timeout. + # Note: if we crash at this line of the program, then GC will see no lease + # for this item an later return it to the main queue. + itemkey = self._itemkey(item) + self._db.setex(self._lease_key_prefix + itemkey, lease_secs, self._session) + return item + + def complete(self, value): + """Complete working on the item with 'value'. + + If the lease expired, the item may not have completed, and some + other worker may have picked it up. There is no indication + of what happened. + """ + self._db.lrem(self._processing_q_key, 0, value) + # If we crash here, then the GC code will try to move the value, but it will + # not be here, which is fine. So this does not need to be a transaction. + itemkey = self._itemkey(value) + self._db.delete(self._lease_key_prefix + itemkey, self._session) + +# TODO: add functions to clean up all keys associated with "name" when +# processing is complete. + +# TODO: add a function to add an item to the queue. Atomically +# check if the queue is empty and if so fail to add the item +# since other workers might think work is done and be in the process +# of exiting. + +# TODO(etune): move to my own github for hosting, e.g. github.com/erictune/rediswq-py and +# make it so it can be pip installed by anyone (see +# http://stackoverflow.com/questions/8247605/configuring-so-that-pip-install-can-work-from-github) + +# TODO(etune): finish code to GC expired leases, and call periodically +# e.g. each time lease times out. + diff --git a/docs/user-guide/jobs/work-queue-2/worker.py b/docs/user-guide/jobs/work-queue-2/worker.py new file mode 100755 index 0000000000..49e5dae798 --- /dev/null +++ b/docs/user-guide/jobs/work-queue-2/worker.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python + +import time +import rediswq + +host="redis" +# Uncomment next two lines if you do not have Kube-DNS working. +# import os +# host = os.getenv("REDIS_SERVICE_HOST") + +q = rediswq.RedisWQ(name="job2", host="redis") +print("Worker with sessionID: " + q.sessionID()) +print("Initial queue state: empty=" + str(q.empty())) +while not q.empty(): + item = q.lease(lease_secs=10, block=True, timeout=2) + if item is not None: + itemstr = item.decode("utf=8") + print("Working on " + itemstr) + time.sleep(10) # Put your actual work here instead of sleep. + q.complete(item) + else: + print("Waiting for work") +print("Queue empty, exiting")