Merge pull request #252 from erictune/master

Moved jobs examples to docs repo
pull/622/head
johndmulhausen 2016-06-24 00:59:12 -07:00 committed by GitHub
commit 07cb8b573c
15 changed files with 957 additions and 12 deletions

View File

@ -59,6 +59,17 @@ toc:
- title: Using kubectl to Manage Resources
path: /docs/user-guide/working-with-resources/
- title: Batch Jobs
section:
- title: Jobs
path: /docs/user-guide/jobs/
- title: Parallel Processing using Expansions
path: /docs/user-guide/jobs/expansions/
- title: Coarse Parallel Processing using a Work Queue
path: /docs/user-guide/jobs/work-queue-1/
- title: Fine Parallel Processing using a Work Queue
path: /docs/user-guide/jobs/work-queue-2/
- title: Service Discovery and Load Balancing
section:
- title: Connecting Applications with Services

View File

@ -249,12 +249,12 @@ The tradeoffs are:
The tradeoffs are summarized here, with columns 2 to 4 corresponding to the above tradeoffs.
The pattern names are also links to examples and more detailed description.
| Pattern | Single Job object | Fewer pods than work items? | Use app unmodified? | Works in Kube 1.1? |
| -------------------------------------------------------------------------- |:-----------------:|:---------------------------:|:-------------------:|:-------------------:|
| [Job Template Expansion](https://github.com/kubernetes/kubernetes/tree/{{page.githubbranch}}/examples/job/expansions/README.md) | | | ✓ | ✓ |
| [Queue with Pod Per Work Item](https://github.com/kubernetes/kubernetes/tree/{{page.githubbranch}}/examples/job/work-queue-1/README.md) | ✓ | | sometimes | ✓ |
| [Queue with Variable Pod Count](https://github.com/kubernetes/kubernetes/tree/{{page.githubbranch}}/examples/job/work-queue-2/README.md) | ✓ | ✓ | | ✓ |
| Single Job with Static Work Assignment | ✓ | | ✓ | |
| Pattern | Single Job object | Fewer pods than work items? | Use app unmodified? | Works in Kube 1.1? |
| -------------------------------------------------------------------- |:-----------------:|:---------------------------:|:-------------------:|:-------------------:|
| [Job Template Expansion](/docs/user-guide/job/expansions) | | | ✓ | ✓ |
| [Queue with Pod Per Work Item](/docs/user-guide/job/work-queue-1/) | ✓ | | sometimes | ✓ |
| [Queue with Variable Pod Count](/docs/user-guide/job/work-queue-2/) | ✓ | ✓ | | ✓ |
| Single Job with Static Work Assignment | ✓ | | ✓ | |
When you specify completions with `.spec.completions`, each Pod created by the Job controller
has an identical [`spec`](https://github.com/kubernetes/kubernetes/tree/{{page.githubbranch}}/docs/devel/api-conventions.md#spec-and-status). This means that
@ -265,12 +265,12 @@ are different ways to arrange for pods to work on different things.
This table shows the required settings for `.spec.parallelism` and `.spec.completions` for each of the patterns.
Here, `W` is the number of work items.
| Pattern | `.spec.completions` | `.spec.parallelism` |
| -------------------------------------------------------------------------- |:-------------------:|:--------------------:|
| [Job Template Expansion](https://github.com/kubernetes/kubernetes/tree/{{page.githubbranch}}/examples/job/expansions/README.md) | 1 | should be 1 |
| [Queue with Pod Per Work Item](https://github.com/kubernetes/kubernetes/tree/{{page.githubbranch}}/examples/job/work-queue-1/README.md) | W | any |
| [Queue with Variable Pod Count](https://github.com/kubernetes/kubernetes/tree/{{page.githubbranch}}/examples/job/work-queue-2/README.md) | 1 | any |
| Single Job with Static Work Assignment | W | any |
| Pattern | `.spec.completions` | `.spec.parallelism` |
| -------------------------------------------------------------------- |:-------------------:|:--------------------:|
| [Job Template Expansion](/docs/user-guide/job/expansions/) | 1 | should be 1 |
| [Queue with Pod Per Work Item](/docs/user-guide/job/work-queue-1/) | W | any |
| [Queue with Variable Pod Count](/docs/user-guide/job/work-queue-2/) | 1 | any |
| Single Job with Static Work Assignment | W | any |
## Advanced Usage

View File

@ -0,0 +1,192 @@
--- ---
* TOC
{:toc}
# Example: Multiple Job Objects from Template Expansion
In this example, we will run multiple Kubernetes Jobs created from
a common template. You may want to be familiar with the basic,
non-parallel, use of [Jobs](/docs/user-guide/jobs) first.
## Basic Template Expansion
First, download the following template of a job to a file called `job.yaml.txt`
{% include code.html language="yaml" file="job.yaml.txt" ghlink="/docs/user-guide/job/expansions/job.yaml.txt" %}
Unlike a *pod template*, our *job template* is not a Kubernetes API type. It is just
a yaml representation of a Job object that has some placeholders that need to be filled
in before it can be used. The `$ITEM` syntax is not meaningful to Kubernetes.
In this example, the only processing the container does is to `echo` a string and sleep for a bit.
In a real use case, the processing would be some substantial computation, such as rendering a frame
of a movie, or processing a range of rows in a database. The "$ITEM" parameter would specify for
example, the frame number or the row range.
This Job and its Pod template have a label: `jobgroup=jobexample`. There is nothing special
to the system about this label. This label
makes it convenient to operate on all the jobs in this group at once.
We also put the same label on the pod template so that we can check on all Pods of these Jobs
with a single command.
After the job is created, the system will add more labels that distinguish one Job's pods
from another Job's pods.
Note that the label key `jobgroup` is not special to Kubernetes. you can pick your own label scheme.
Next, expand the template into multiple files, one for each item to be processed.
```shell
# Expand files into a temporary directory
mkdir ./jobs
for i in apple banana cherry
do
cat job.yaml.txt | sed "s/\$ITEM/$i/" > ./jobs/job-$i.yaml
done
```
Check if it worked:
```shell
$ ls jobs/
job-apple.yaml
job-banana.yaml
job-cherry.yaml
```
Here, we used `sed` to replace the string `$ITEM` with the the loop variable.
You could use any type of template language (jinja2, erb) or write a program
to generate the Job objects.
Next, create all the jobs with one kubectl command:
```shell
$ kubectl create -f ./jobs
job "process-item-apple" created
job "process-item-banana" created
job "process-item-cherry" created
```
Now, check on the jobs:
```shell
$ kubectl get jobs -l app=jobexample
JOB CONTAINER(S) IMAGE(S) SELECTOR SUCCESSFUL
process-item-apple c busybox app in (jobexample),item in (apple) 1
process-item-banana c busybox app in (jobexample),item in (banana) 1
process-item-cherry c busybox app in (jobexample),item in (cherry) 1
```
Here we use the `-l` option to select all jobs that are part of this
group of jobs. (There might be other unrelated jobs in the system that we
do not care to see.)
We can check on the pods as well using the same label selector:
```shell
$ kubectl get pods -l app=jobexample
NAME READY STATUS RESTARTS AGE
process-item-apple-kixwv 0/1 Completed 0 4m
process-item-banana-wrsf7 0/1 Completed 0 4m
process-item-cherry-dnfu9 0/1 Completed 0 4m
```
There is not a single command to check on the output of all jobs at once,
but looping over all the pods is pretty easy:
```shell
$ for p in $(kubectl get pods -l app=jobexample -o name)
do
kubectl logs $p
done
Processing item apple
Processing item banana
Processing item cherry
```
## Multiple Template Parameters
In the first example, each instance of the template had one parameter, and that parameter was also
used as a label. However label keys are limited in [what characters they can
contain](docs/user-guide/labels/#syntax-and-character-set).
This slightly more complex example uses a the jinja2 template language to generate our objects.
We will use a one-line python script to convert the template to a file.
First, copy and paste the following template of a Job object, into a file called `job.yaml.jinja2`:
```liquid{% raw %}
{%- set params = [{ "name": "apple", "url": "http://www.orangepippin.com/apples", },
{ "name": "banana", "url": "https://en.wikipedia.org/wiki/Banana", },
{ "name": "raspberry", "url": "https://www.raspberrypi.org/" }]
%}
{%- for p in params %}
{%- set name = p["name"] %}
{%- set url = p["url"] %}
apiVersion: batch/v1
kind: Job
metadata:
name: jobexample-{{ {{ name }} }}
labels:
jobgroup: jobexample
spec:
template:
name: jobexample
labels:
jobgroup: jobexample
spec:
containers:
- name: c
image: busybox
command: ["sh", "-c", "echo Processing URL {{ url }} && sleep 5"]
restartPolicy: Never
---
{%- endfor %}
{% endraw %}
```
The above template defines parameters for each job object using a list of
python dicts (lines 1-4). Then a for loop emits one job yaml object
for each set of parameters (remaining lines).
We take advantage of the fact that multiple yaml documents can be concatenated
with the `---` separator (second to last line).
.) We can pipe the output directly to kubectl to
create the objects.
You will need the jinja2 package if you do not already have it: `pip install --user jinja2`.
Now, use this one-line python program to expand the template:
```shell
alias render_template='python -c "from jinja2 import Template; import sys; print(Template(sys.stdin.read()).render());"'
```
The output can be saved to a file, like this:
```shell
cat job.yaml.jinja2 | render_template > jobs.yaml
```
or sent directly to kubectl, like this:
```shell
cat job.yaml.jinja2 | render_template | kubectl create -f -
```
## Alternatives
If you have a large number of job objects, you may find that:
- even using labels, managing so many Job objects is cumbersome.
- You exceed resource quota when creating all the Jobs at once,
and do not want to wait to create them incrementally.
- You need a way to easily scale the number of pods running
concurrently. One reason would be to avoid using too many
compute resources. Another would be to limit the number of
concurrent requests to a shared resource, such as a database,
used by all the pods in the job.
- very large numbers of jobs created at once overload the
kubernetes apiserver, controller, or scheduler.
In this case, you can consider one of the
other [job patterns](/docs/user-guide/jobs/#job-patterns).

View File

@ -0,0 +1,18 @@
apiVersion: batch/v1
kind: Job
metadata:
name: process-item-$ITEM
labels:
jobgroup: jobexample
spec:
template:
metadata:
name: jobexample
labels:
jobgroup: jobexample
spec:
containers:
- name: c
image: busybox
command: ["sh", "-c", "echo Processing item $ITEM && sleep 5"]
restartPolicy: Never

View File

@ -0,0 +1,10 @@
# Specify BROKER_URL and QUEUE when running
FROM ubuntu:14.04
RUN apt-get update && \
apt-get install -y curl ca-certificates amqp-tools python \
--no-install-recommends \
&& rm -rf /var/lib/apt/lists/*
COPY ./worker.py /worker.py
CMD /usr/bin/amqp-consume --url=$BROKER_URL -q $QUEUE -c 1 /worker.py

View File

@ -0,0 +1,284 @@
---
---
* TOC
{:toc}
# Example: Job with Work Queue with Pod Per Work Item
In this example, we will run a Kubernetes Job with multiple parallel
worker processes. You may want to be familiar with the basic,
non-parallel, use of [Job](/docs/user-guide/jobs) first.
In this example, as each pod is created, it picks up one unit of work
from a task queue, completes it, deletes it from the queue, and exits.
Here is an overview of the steps in this example:
1. **Start a message queue service.** In this example, we use RabbitMQ, but you could use another
one. In practice you would set up a message queue service once and reuse it for many jobs.
1. **Create a queue, and fill it with messages.** Each message represents one task to be done. In
this example, a message is just an integer that we will do a lengthy computation on.
1. **Start a Job that works on tasks from the queue**. The Job starts several pods. Each pod takes
one task from the message queue, processes it, and repeats until the end of the queue is reached.
## Starting a message queue service
This example uses RabbitMQ, but it should be easy to adapt to another AMQP-type message service.
In practice you could set up a message queue service once in a
cluster and reuse it for many jobs, as well as for long-running services.
Start RabbitMQ as follows:
```shell
$ kubectl create -f examples/celery-rabbitmq/rabbitmq-service.yaml
service "rabbitmq-service" created
$ kubectl create -f examples/celery-rabbitmq/rabbitmq-controller.yaml
replicationController "rabbitmq-controller" created
```
We will only use the rabbitmq part from the celery-rabbitmq example.
## Testing the message queue service
Now, we can experiment with accessing the message queue. We will
create a temporary interactive pod, install some tools on it,
and experiment with queues.
First create a temporary interactive Pod.
```shell
# Create a temporary interactive container
$ kubectl run -i --tty temp --image ubuntu:14.04
Waiting for pod default/temp-loe07 to be running, status is Pending, pod ready: false
... [ previous line repeats several times .. hit return when it stops ] ...
```
Note that your pod name and command prompt will be different.
Next install the `amqp-tools` so we can work with message queues.
```shell
# Install some tools
root@temp-loe07:/# apt-get update
.... [ lots of output ] ....
root@temp-loe07:/# apt-get install -y curl ca-certificates amqp-tools python dnsutils
.... [ lots of output ] ....
```
Later, we will make a docker image that includes these packages.
Next, we will check that we can discover the rabbitmq service:
```
# Note the rabitmq-service has a DNS name, provided by Kubernetes:
root@temp-loe07:/# nslookup rabbitmq-service
Server: 10.0.0.10
Address: 10.0.0.10#53
Name: rabbitmq-service.default.svc.cluster.local
Address: 10.0.147.152
# Your address will vary.
```
If Kube-DNS is not setup correctly, the previous step may not work for you.
You can also find the service IP in an env var:
```
# env | grep RABBIT | grep HOST
RABBITMQ_SERVICE_SERVICE_HOST=10.0.147.152
# Your address will vary.
```
Next we will verify we can create a queue, and publish and consume messages.
```shell
# In the next line, rabbitmq-service is the hostname where the rabbitmq-service
# can be reached. 5672 is the standard port for rabbitmq.
root@temp-loe07:/# BROKER_URL=amqp://guest:guest@rabbitmq-service:5672
# If you could not resolve "rabbitmq-service" in the previous step,
# then use this command instead:
# root@temp-loe07:/# BROKER_URL=amqp://guest:guest@$RABBITMQ_SERVICE_SERVICE_HOST:5672
# Now create a queue:
root@temp-loe07:/# /usr/bin/amqp-declare-queue --url=$BROKER_URL -q foo -d
foo
# Publish one message to it:
root@temp-loe07:/# /usr/bin/amqp-publish --url=$BROKER_URL -r foo -p -b Hello
# And get it back.
root@temp-loe07:/# /usr/bin/amqp-consume --url=$BROKER_URL -q foo -c 1 cat && echo
Hello
root@temp-loe07:/#
```
In the last command, the `amqp-consume` tool takes one message (`-c 1`)
from the queue, and passes that message to the standard input of an
an arbitrary command. In this case, the program `cat` is just printing
out what it gets on the standard input, and the echo is just to add a carriage
return so the example is readable.
## Filling the Queue with tasks
Now lets fill the queue with some "tasks". In our example, our tasks are just strings to be
printed.
In a practice, the content of the messages might be:
- names of files to that need to be processed
- extra flags to the program
- ranges of keys in a database table
- configuration parameters to a simulation
- frame numbers of a scene to be rendered
In practice, if there is large data that is needed in a read-only mode by all pods
of the Job, you will typically put that in a shared file system like NFS and mount
that readonly on all the pods, or the program in the pod will natively read data from
a cluster file system like HDFS.
For our example, we will create the queue and fill it using the amqp command line tools.
In practice, you might write a program to fill the queue using an amqp client library.
```shell
$ /usr/bin/amqp-declare-queue --url=$BROKER_URL -q job1 -d
job1
$ for f in apple banana cherry date fig grape lemon melon
do
/usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f
done
```
So, we filled the queue with 8 messages.
## Create an Image
Now we are ready to create an image that we will run as a job.
We will use the `amqp-consume` utility to read the message
from the queue and run our actual program. Here is a very simple
example program:
{% include code.html language="python" file="worker.py" ghlink="/docs/user-guide/job/work-queue-1/worker.py" %}
Now, build an an image. If you are working in the source
tree, then change directory to `examples/job/work-queue-1`.
Otherwise, make a temporary directory, change to it,
download the [Dockerfile](Dockerfile?raw=true),
and [worker.py](worker.py?raw=true). In either case,
build the image with this command: `
```shell
$ docker build -t job-wq-1 .
```
For the [Docker Hub](https://hub.docker.com/), tag your app image with
your username and push to the Hub with the below commands. Replace
`<username>` with your Hub username.
```shell
docker tag job-wq-1 <username>/job-wq-1
docker push <username>/job-wq-1
```
If you are using [Google Container
Registry](https://cloud.google.com/tools/container-registry/), tag
your app image with your project ID, and push to GCR. Replace
`<project>` with your project ID.
```shell
docker tag job-wq-1 gcr.io/<project>/job-wq-1
gcloud docker push gcr.io/<project>/job-wq-1
```
## Defining a Job
Here is a job definition. You'll need to make a copy of the Job and edit the
image to match the name you used, and call it `./job.yaml`.
{% include code.html language="yaml" file="job.yaml" ghlink="/docs/user-guide/job/work-queue-1/job.yaml" %}
In this example, each pod works on one item from the queue and then exits.
So, the completion count of the Job corresponds to the number of work items
done. So we set, `.spec.completions: 8` for the example, since we put 8 items in the queue.
## Running the Job
So, now run the Job:
```shell
kubectl create -f ./job.yaml
```
Now wait a bit, then check on the job.
```shell
$ kubectl describe jobs/job-wq-1
Name: job-wq-1
Namespace: default
Image(s): gcr.io/causal-jigsaw-637/job-wq-1
Selector: app in (job-wq-1)
Parallelism: 4
Completions: 8
Labels: app=job-wq-1
Pods Statuses: 0 Running / 8 Succeeded / 0 Failed
No volumes.
Events:
FirstSeen LastSeen Count From SubobjectPath Reason Message
───────── ──────── ───── ──── ───────────── ────── ───────
27s 27s 1 {job } SuccessfulCreate Created pod: job-wq-1-hcobb
27s 27s 1 {job } SuccessfulCreate Created pod: job-wq-1-weytj
27s 27s 1 {job } SuccessfulCreate Created pod: job-wq-1-qaam5
27s 27s 1 {job } SuccessfulCreate Created pod: job-wq-1-b67sr
26s 26s 1 {job } SuccessfulCreate Created pod: job-wq-1-xe5hj
15s 15s 1 {job } SuccessfulCreate Created pod: job-wq-1-w2zqe
14s 14s 1 {job } SuccessfulCreate Created pod: job-wq-1-d6ppa
14s 14s 1 {job } SuccessfulCreate Created pod: job-wq-1-p17e0
```
All our pods succeeded. Yay.
## Alternatives
This approach has the advantage that you
do not need to modify your "worker" program to be aware that there is a work queue.
It does require that you run a message queue service.
If running a queue service is inconvenient, you may
want to consider one of the other [job patterns](/docs/user-guide/jobs/#job-patterns).
This approach creates a pod for every work item. If your work items only take a few seconds,
though, creating a Pod for every work item may add a lot of overhead. Consider another
[example](/docs/user-guide/job/work-queue-2), that executes multiple work items per Pod.
In this example, we used use the `amqp-consume` utility to read the message
from the queue and run our actual program. This has the advantage that you
do not need to modify your program to be aware of the queue.
A [different example](/docs/user-guide/job/work-queue-2), shows how to
communicate with the work queue using a client library.
## Caveats
If the number of completions is set to less than the number of items in the queue, then
not all items will be processed.
If the number of completions is set to more than the number of items in the queue,
then the Job will not appear to be completed, even though all items in the queue
have been processed. It will start additional pods which will block waiting
for a mesage.
There is an unlikely race with this pattern. If the container is killed in between the time
that the message is acknowledged by the amqp-consume command and the time that the container
exits with success, or if the node crashes before the kubelet is able to post the success of the pod
back to the api-server, then the Job will not appear to be complete, even though all items
in the queue have been processed.

View File

@ -0,0 +1,15 @@
apiVersion: batch/v1
kind: Job
metadata:
name: job-wq-1
spec:
completions: 8
parallelism: 2
template:
metadata:
name: job-wq-1
spec:
containers:
- name: c
image: gcr.io/<project>/job-wq-1
restartPolicy: OnFailure

View File

@ -0,0 +1,7 @@
#!/usr/bin/env python
# Just prints standard out and sleeps for 10 seconds.
import sys
import time
print("Processing " + sys.stdin.lines())
time.sleep(10)

View File

@ -0,0 +1,6 @@
FROM python
RUN pip install redis
COPY ./worker.py /worker.py
COPY ./rediswq.py /rediswq.py
CMD python worker.py

View File

@ -0,0 +1,210 @@
---
---
* TOC
{:toc}
# Example: Job with Work Queue with Pod Per Work Item
In this example, we will run a Kubernetes Job with multiple parallel
worker processes. You may want to be familiar with the basic,
non-parallel, use of [Job](/docs/user-guide/jobs) first.
In this example, as each pod is created, it picks up one unit of work
from a task queue, completes it, deletes it from the queue, and exits.
Here is an overview of the steps in this example:
1. **Start a storage service to hold the work queue.** In this example, we use Redis to store
our work items. In the previous example, we used RabbitMQ. In this example, we use Redis and
a custom work-queue client library because AMQP does not provide a good way for clients to
detect when a finite-length work queue is empty. In practice you would set up a store such
as Redis once and reuse it for the work queues of many jobs, and other things.
1. **Create a queue, and fill it with messages.** Each message represents one task to be done. In
this example, a message is just an integer that we will do a lengthy computation on.
1. **Start a Job that works on tasks from the queue**. The Job starts several pods. Each pod takes
one task from the message queue, processes it, and repeats until the end of the queue is reached.
## Starting Redis
For this example, for simplicitly, we will start a single instance of Redis.
See the [Redis Example](https://github.com/kubernetes/kubernetes/tree/{{page.githubbranch}}/examples/redis/README.md) for an example
of deploying Redis scaleably and redundantly.
Start a temporary Pod running Redis and a service so we can find it.
```shell
$ kubectl create -f examples/job/work-queue-2/redis-pod.yaml
pod "redis-master" created
$ kubectl create -f examples/job/work-queue-2/redis-service.yaml
service "redis" created
```
## Filling the Queue with tasks
Now lets fill the queue with some "tasks". In our example, our tasks are just strings to be
printed.
Start a temporary interactive pod for running the Redis CLI
```shell
$ kubectl run -i --tty temp --image redis --command "/bin/sh"
Waiting for pod default/redis2-c7h78 to be running, status is Pending, pod ready: false
Hit enter for command prompt
```
Now hit enter, start the redis CLI, and create a list with some work items in it.
```
# redis-cli -h redis
redis:6379> rpush job2 "apple"
(integer) 1
redis:6379> rpush job2 "banana"
(integer) 2
redis:6379> rpush job2 "cherry"
(integer) 3
redis:6379> rpush job2 "date"
(integer) 4
redis:6379> rpush job2 "fig"
(integer) 5
redis:6379> rpush job2 "grape"
(integer) 6
redis:6379> rpush job2 "lemon"
(integer) 7
redis:6379> rpush job2 "melon"
(integer) 8
redis:6379> rpush job2 "orange"
(integer) 9
redis:6379> lrange job2 0 -1
1) "apple"
2) "banana"
3) "cherry"
4) "date"
5) "fig"
6) "grape"
7) "lemon"
8) "melon"
9) "orange"
```
So, the list with key `job2` will be our work queue.
Note: if you do not have Kube DNS setup correctly, you may need to change
the first step of the above block to `redis-cli -h $REDIS_SERVICE_HOST`.
## Create an Image
Now we are ready to create an image that we will run.
We will use a python worker program with a redis client to read
the messages from the message queue.
A simple Redis work queue client library is provided,
called rediswq.py ([Download](rediswq.py?raw=true)).
The "worker" program in each Pod of the Job uses the work queue
client library to get work. Here it is:
{% include code.html language="python" file="worker.py" ghlink="/docs/user-guide/job/work-queue-2/worker.py" %}
If you are working from the source tree,
change directory to the `examples/job/work-queue-2` directory.
Otherwise, download [`worker.py`](worker.py?raw=true), [`rediswq.py`](rediswq.py?raw=true), and [`Dockerfile`](Dockerfile?raw=true)
using above links. Then build the image:
```shell
docker build -t job-wq-2 .
```
### Push the image
For the [Docker Hub](https://hub.docker.com/), tag your app image with
your username and push to the Hub with the below commands. Replace
`<username>` with your Hub username.
```shell
docker tag job-wq-2 <username>/job-wq-2
docker push <username>/job-wq-2
```
You need to push to a public repository or [configure your cluster to be able to access
your private repository](/docs/user-guide/images).
If you are using [Google Container
Registry](https://cloud.google.com/tools/container-registry/), tag
your app image with your project ID, and push to GCR. Replace
`<project>` with your project ID.
```shell
docker tag job-wq-2 gcr.io/<project>/job-wq-2
gcloud docker push gcr.io/<project>/job-wq-2
```
## Defining a Job
Here is the job definition:
{% include code.html language="yaml" file="job.yaml" ghlink="/docs/user-guide/job/work-queue-2/job.yaml" %}
Be sure to edit the job template to
change `gcr.io/myproject` to your own path.
In this example, each pod works on several items from the queue and then exits when there are no more items.
Since the workers themselves detect when the workqueue is empty, and the Job controller does not
know about the workqueue, it relies on the workers to signal when they are done working.
The workers signal that the queue is empty by exiting with success. So, as soon as any worker
exits with success, the controller knows the work is done, and the Pods will exit soon.
So, we set the completion count of the Job to 1. The job controller will wait for the other pods to complete
too.
## Running the Job
So, now run the Job:
```shell
kubectl create -f ./job.yaml
```
Now wait a bit, then check on the job.
```shell
$ kubectl describe jobs/job-wq-2
Name: job-wq-2
Namespace: default
Image(s): gcr.io/exampleproject/job-wq-2
Selector: app in (job-wq-2)
Parallelism: 2
Completions: Unset
Start Time: Mon, 11 Jan 2016 17:07:59 -0800
Labels: app=job-wq-2
Pods Statuses: 1 Running / 0 Succeeded / 0 Failed
No volumes.
Events:
FirstSeen LastSeen Count From SubobjectPath Type Reason Message
--------- -------- ----- ---- ------------- -------- ------ -------
33s 33s 1 {job-controller } Normal SuccessfulCreate Created pod: job-wq-2-lglf8
$ kubectl logs pods/job-wq-2-7r7b2
Worker with sessionID: bbd72d0a-9e5c-4dd6-abf6-416cc267991f
Initial queue state: empty=False
Working on banana
Working on date
Working on lemon
```
As you can see, one of our pods worked on several work units.
## Alternatives
If running a queue service or modifying your containers to use a work queue is inconvenient, you may
want to consider one of the other [job patterns](/docs/user-guide/jobs/#job-patterns).
If you have a continuous stream of background processing work to run, then
consider running your background workers with a `replicationController` instead,
and consider running a background processing library such as
https://github.com/resque/resque.

View File

@ -0,0 +1,14 @@
apiVersion: batch/v1
kind: Job
metadata:
name: job-wq-2
spec:
parallelism: 2
template:
metadata:
name: job-wq-2
spec:
containers:
- name: c
image: gcr.io/myproject/job-wq-2
restartPolicy: OnFailure

View File

@ -0,0 +1,15 @@
apiVersion: v1
kind: Pod
metadata:
name: redis-master
labels:
app: redis
spec:
containers:
- name: master
image: redis
env:
- name: MASTER
value: "true"
ports:
- containerPort: 6379

View File

@ -0,0 +1,10 @@
apiVersion: v1
kind: Service
metadata:
name: redis
spec:
ports:
- port: 6379
targetPort: 6379
selector:
app: redis

View File

@ -0,0 +1,130 @@
#!/usr/bin/env python
# Based on http://peter-hoffmann.com/2012/python-simple-queue-redis-queue.html
# and the suggestion in the redis documentation for RPOPLPUSH, at
# http://redis.io/commands/rpoplpush, which suggests how to implement a work-queue.
import redis
import uuid
import hashlib
class RedisWQ(object):
"""Simple Finite Work Queue with Redis Backend
This work queue is finite: as long as no more work is added
after workers start, the workers can detect when the queue
is completely empty.
The items in the work queue are assumed to have unique values.
This object is not intended to be used by multiple threads
concurrently.
"""
def __init__(self, name, **redis_kwargs):
"""The default connection parameters are: host='localhost', port=6379, db=0
The work queue is identified by "name". The library may create other
keys with "name" as a prefix.
"""
self._db = redis.StrictRedis(**redis_kwargs)
# The session ID will uniquely identify this "worker".
self._session = str(uuid.uuid4())
# Work queue is implemented as two queues: main, and processing.
# Work is initially in main, and moved to processing when a client picks it up.
self._main_q_key = name
self._processing_q_key = name + ":processing"
self._lease_key_prefix = name + ":leased_by_session:"
def sessionID(self):
"""Return the ID for this session."""
return self._session
def _main_qsize(self):
"""Return the size of the main queue."""
return self._db.llen(self._main_q_key)
def _processing_qsize(self):
"""Return the size of the main queue."""
return self._db.llen(self._processing_q_key)
def empty(self):
"""Return True if the queue is empty, including work being done, False otherwise.
False does not necessarily mean that there is work available to work on right now,
"""
return self._main_qsize() == 0 and self._processing_qsize() == 0
# TODO: implement this
# def check_expired_leases(self):
# """Return to the work queueReturn True if the queue is empty, False otherwise."""
# # Processing list should not be _too_ long since it is approximately as long
# # as the number of active and recently active workers.
# processing = self._db.lrange(self._processing_q_key, 0, -1)
# for item in processing:
# # If the lease key is not present for an item (it expired or was
# # never created because the client crashed before creating it)
# # then move the item back to the main queue so others can work on it.
# if not self._lease_exists(item):
# TODO: transactionally move the key from processing queue to
# to main queue, while detecting if a new lease is created
# or if either queue is modified.
def _itemkey(self, item):
"""Returns a string that uniquely identifies an item (bytes)."""
return hashlib.sha224(item).hexdigest()
def _lease_exists(self, item):
"""True if a lease on 'item' exists."""
return self._db.exists(self._lease_key_prefix + self._itemkey(item))
def lease(self, lease_secs=60, block=True, timeout=None):
"""Begin working on an item the work queue.
Lease the item for lease_secs. After that time, other
workers may consider this client to have crashed or stalled
and pick up the item instead.
If optional args block is true and timeout is None (the default), block
if necessary until an item is available."""
if block:
item = self._db.brpoplpush(self._main_q_key, self._processing_q_key, timeout=timeout)
else:
item = self._db.rpoplpush(self._main_q_key, self._processing_q_key)
if item:
# Record that we (this session id) are working on a key. Expire that
# note after the lease timeout.
# Note: if we crash at this line of the program, then GC will see no lease
# for this item an later return it to the main queue.
itemkey = self._itemkey(item)
self._db.setex(self._lease_key_prefix + itemkey, lease_secs, self._session)
return item
def complete(self, value):
"""Complete working on the item with 'value'.
If the lease expired, the item may not have completed, and some
other worker may have picked it up. There is no indication
of what happened.
"""
self._db.lrem(self._processing_q_key, 0, value)
# If we crash here, then the GC code will try to move the value, but it will
# not be here, which is fine. So this does not need to be a transaction.
itemkey = self._itemkey(value)
self._db.delete(self._lease_key_prefix + itemkey, self._session)
# TODO: add functions to clean up all keys associated with "name" when
# processing is complete.
# TODO: add a function to add an item to the queue. Atomically
# check if the queue is empty and if so fail to add the item
# since other workers might think work is done and be in the process
# of exiting.
# TODO(etune): move to my own github for hosting, e.g. github.com/erictune/rediswq-py and
# make it so it can be pip installed by anyone (see
# http://stackoverflow.com/questions/8247605/configuring-so-that-pip-install-can-work-from-github)
# TODO(etune): finish code to GC expired leases, and call periodically
# e.g. each time lease times out.

View File

@ -0,0 +1,23 @@
#!/usr/bin/env python
import time
import rediswq
host="redis"
# Uncomment next two lines if you do not have Kube-DNS working.
# import os
# host = os.getenv("REDIS_SERVICE_HOST")
q = rediswq.RedisWQ(name="job2", host="redis")
print("Worker with sessionID: " + q.sessionID())
print("Initial queue state: empty=" + str(q.empty()))
while not q.empty():
item = q.lease(lease_secs=10, block=True, timeout=2)
if item is not None:
itemstr = item.decode("utf=8")
print("Working on " + itemstr)
time.sleep(10) # Put your actual work here instead of sleep.
q.complete(item)
else:
print("Waiting for work")
print("Queue empty, exiting")