Move Guide topic: Fine Parallel Processing using a Work Queue (#2870)

* Move Guide topic: Fine Parallel Processing using a Work Queue

* add migration notice

* fixed capitalization
reviewable/pr2871/r1
Andrew Chen 2017-03-16 12:46:09 -07:00 committed by GitHub
parent ad8f009317
commit 7cc984d66b
12 changed files with 419 additions and 213 deletions

View File

@ -35,6 +35,7 @@ toc:
section:
- docs/tasks/job/parallel-processing-expansion.md
- docs/tasks/job/work-queue-1/index.md
- docs/tasks/job/fine-parallel-processing-work-queue/index.md
- title: Accessing Applications in a Cluster
section:

View File

@ -263,7 +263,7 @@ The pattern names are also links to examples and more detailed description.
| -------------------------------------------------------------------- |:-----------------:|:---------------------------:|:-------------------:|:-------------------:|
| [Job Template Expansion](/docs/user-guide/jobs/expansions) | | | ✓ | ✓ |
| [Queue with Pod Per Work Item](/docs/tasks/job/work-queue-1/) | ✓ | | sometimes | ✓ |
| [Queue with Variable Pod Count](/docs/user-guide/jobs/work-queue-2/) | ✓ | ✓ | | ✓ |
| [Queue with Variable Pod Count](/docs/tasks/job/fine-parallel-processing-work-queue/) | ✓ | ✓ | | ✓ |
| Single Job with Static Work Assignment | ✓ | | ✓ | |
When you specify completions with `.spec.completions`, each Pod created by the Job controller
@ -279,7 +279,7 @@ Here, `W` is the number of work items.
| -------------------------------------------------------------------- |:-------------------:|:--------------------:|
| [Job Template Expansion](/docs/tasks/job/parallel-processing-expansion/) | 1 | should be 1 |
| [Queue with Pod Per Work Item](/docs/tasks/job/work-queue-1/) | W | any |
| [Queue with Variable Pod Count](/docs/user-guide/jobs/work-queue-2/) | 1 | any |
| [Queue with Variable Pod Count](/docs/tasks/job/fine-parallel-processing-work-queue/) | 1 | any |
| Single Job with Static Work Assignment | W | any |

View File

@ -0,0 +1,6 @@
FROM python
RUN pip install redis
COPY ./worker.py /worker.py
COPY ./rediswq.py /rediswq.py
CMD python worker.py

View File

@ -0,0 +1,213 @@
---
title: Fine Parallel Processing Using a Work Queue
---
* TOC
{:toc}
# Example: Job with Work Queue with Pod Per Work Item
In this example, we will run a Kubernetes Job with multiple parallel
worker processes. You may want to be familiar with the basic,
non-parallel, use of [Job](/docs/concepts/jobs/run-to-completion-finite-workloads/) first.
In this example, as each pod is created, it picks up one unit of work
from a task queue, completes it, deletes it from the queue, and exits.
Here is an overview of the steps in this example:
1. **Start a storage service to hold the work queue.** In this example, we use Redis to store
our work items. In the previous example, we used RabbitMQ. In this example, we use Redis and
a custom work-queue client library because AMQP does not provide a good way for clients to
detect when a finite-length work queue is empty. In practice you would set up a store such
as Redis once and reuse it for the work queues of many jobs, and other things.
1. **Create a queue, and fill it with messages.** Each message represents one task to be done. In
this example, a message is just an integer that we will do a lengthy computation on.
1. **Start a Job that works on tasks from the queue**. The Job starts several pods. Each pod takes
one task from the message queue, processes it, and repeats until the end of the queue is reached.
## Starting Redis
For this example, for simplicitly, we will start a single instance of Redis.
See the [Redis Example](https://github.com/kubernetes/kubernetes/tree/master/examples/guestbook) for an example
of deploying Redis scalably and redundantly.
Start a temporary Pod running Redis and a service so we can find it.
```shell
$ kubectl create -f docs/tasks/job/fine-parallel-processing-work-queue/redis-pod.yaml
pod "redis-master" created
$ kubectl create -f docs/tasks/job/fine-parallel-processing-work-queue/redis-service.yaml
service "redis" created
```
If you're not working from the source tree, you could also download [`redis-pod.yaml`](redis-pod.yaml?raw=true) and [`redis-service.yaml`](redis-service.yaml?raw=true) directly.
## Filling the Queue with tasks
Now let's fill the queue with some "tasks". In our example, our tasks are just strings to be
printed.
Start a temporary interactive pod for running the Redis CLI
```shell
$ kubectl run -i --tty temp --image redis --command "/bin/sh"
Waiting for pod default/redis2-c7h78 to be running, status is Pending, pod ready: false
Hit enter for command prompt
```
Now hit enter, start the redis CLI, and create a list with some work items in it.
```
# redis-cli -h redis
redis:6379> rpush job2 "apple"
(integer) 1
redis:6379> rpush job2 "banana"
(integer) 2
redis:6379> rpush job2 "cherry"
(integer) 3
redis:6379> rpush job2 "date"
(integer) 4
redis:6379> rpush job2 "fig"
(integer) 5
redis:6379> rpush job2 "grape"
(integer) 6
redis:6379> rpush job2 "lemon"
(integer) 7
redis:6379> rpush job2 "melon"
(integer) 8
redis:6379> rpush job2 "orange"
(integer) 9
redis:6379> lrange job2 0 -1
1) "apple"
2) "banana"
3) "cherry"
4) "date"
5) "fig"
6) "grape"
7) "lemon"
8) "melon"
9) "orange"
```
So, the list with key `job2` will be our work queue.
Note: if you do not have Kube DNS setup correctly, you may need to change
the first step of the above block to `redis-cli -h $REDIS_SERVICE_HOST`.
## Create an Image
Now we are ready to create an image that we will run.
We will use a python worker program with a redis client to read
the messages from the message queue.
A simple Redis work queue client library is provided,
called rediswq.py ([Download](rediswq.py?raw=true)).
The "worker" program in each Pod of the Job uses the work queue
client library to get work. Here it is:
{% include code.html language="python" file="worker.py" ghlink="/docs/tasks/job/fine-parallel-processing-work-queue/worker.py" %}
If you are working from the source tree,
change directory to the `docs/tasks/job/fine-parallel-processing-work-queue/` directory.
Otherwise, download [`worker.py`](worker.py?raw=true), [`rediswq.py`](rediswq.py?raw=true), and [`Dockerfile`](Dockerfile?raw=true)
using above links. Then build the image:
```shell
docker build -t job-wq-2 .
```
### Push the image
For the [Docker Hub](https://hub.docker.com/), tag your app image with
your username and push to the Hub with the below commands. Replace
`<username>` with your Hub username.
```shell
docker tag job-wq-2 <username>/job-wq-2
docker push <username>/job-wq-2
```
You need to push to a public repository or [configure your cluster to be able to access
your private repository](/docs/user-guide/images).
If you are using [Google Container
Registry](https://cloud.google.com/tools/container-registry/), tag
your app image with your project ID, and push to GCR. Replace
`<project>` with your project ID.
```shell
docker tag job-wq-2 gcr.io/<project>/job-wq-2
gcloud docker push gcr.io/<project>/job-wq-2
```
## Defining a Job
Here is the job definition:
{% include code.html language="yaml" file="job.yaml" ghlink="/docs/tasks/job/fine-parallel-processing-work-queue/job.yaml" %}
Be sure to edit the job template to
change `gcr.io/myproject` to your own path.
In this example, each pod works on several items from the queue and then exits when there are no more items.
Since the workers themselves detect when the workqueue is empty, and the Job controller does not
know about the workqueue, it relies on the workers to signal when they are done working.
The workers signal that the queue is empty by exiting with success. So, as soon as any worker
exits with success, the controller knows the work is done, and the Pods will exit soon.
So, we set the completion count of the Job to 1. The job controller will wait for the other pods to complete
too.
## Running the Job
So, now run the Job:
```shell
kubectl create -f ./job.yaml
```
Now wait a bit, then check on the job.
```shell
$ kubectl describe jobs/job-wq-2
Name: job-wq-2
Namespace: default
Image(s): gcr.io/exampleproject/job-wq-2
Selector: app in (job-wq-2)
Parallelism: 2
Completions: Unset
Start Time: Mon, 11 Jan 2016 17:07:59 -0800
Labels: app=job-wq-2
Pods Statuses: 1 Running / 0 Succeeded / 0 Failed
No volumes.
Events:
FirstSeen LastSeen Count From SubobjectPath Type Reason Message
--------- -------- ----- ---- ------------- -------- ------ -------
33s 33s 1 {job-controller } Normal SuccessfulCreate Created pod: job-wq-2-lglf8
$ kubectl logs pods/job-wq-2-7r7b2
Worker with sessionID: bbd72d0a-9e5c-4dd6-abf6-416cc267991f
Initial queue state: empty=False
Working on banana
Working on date
Working on lemon
```
As you can see, one of our pods worked on several work units.
## Alternatives
If running a queue service or modifying your containers to use a work queue is inconvenient, you may
want to consider one of the other [job patterns](/docs/concepts/jobs/run-to-completion-finite-workloads/#job-patterns).
If you have a continuous stream of background processing work to run, then
consider running your background workers with a `replicationController` instead,
and consider running a background processing library such as
https://github.com/resque/resque.

View File

@ -0,0 +1,14 @@
apiVersion: batch/v1
kind: Job
metadata:
name: job-wq-2
spec:
parallelism: 2
template:
metadata:
name: job-wq-2
spec:
containers:
- name: c
image: gcr.io/myproject/job-wq-2
restartPolicy: OnFailure

View File

@ -0,0 +1,15 @@
apiVersion: v1
kind: Pod
metadata:
name: redis-master
labels:
app: redis
spec:
containers:
- name: master
image: redis
env:
- name: MASTER
value: "true"
ports:
- containerPort: 6379

View File

@ -0,0 +1,10 @@
apiVersion: v1
kind: Service
metadata:
name: redis
spec:
ports:
- port: 6379
targetPort: 6379
selector:
app: redis

View File

@ -0,0 +1,130 @@
#!/usr/bin/env python
# Based on http://peter-hoffmann.com/2012/python-simple-queue-redis-queue.html
# and the suggestion in the redis documentation for RPOPLPUSH, at
# http://redis.io/commands/rpoplpush, which suggests how to implement a work-queue.
import redis
import uuid
import hashlib
class RedisWQ(object):
"""Simple Finite Work Queue with Redis Backend
This work queue is finite: as long as no more work is added
after workers start, the workers can detect when the queue
is completely empty.
The items in the work queue are assumed to have unique values.
This object is not intended to be used by multiple threads
concurrently.
"""
def __init__(self, name, **redis_kwargs):
"""The default connection parameters are: host='localhost', port=6379, db=0
The work queue is identified by "name". The library may create other
keys with "name" as a prefix.
"""
self._db = redis.StrictRedis(**redis_kwargs)
# The session ID will uniquely identify this "worker".
self._session = str(uuid.uuid4())
# Work queue is implemented as two queues: main, and processing.
# Work is initially in main, and moved to processing when a client picks it up.
self._main_q_key = name
self._processing_q_key = name + ":processing"
self._lease_key_prefix = name + ":leased_by_session:"
def sessionID(self):
"""Return the ID for this session."""
return self._session
def _main_qsize(self):
"""Return the size of the main queue."""
return self._db.llen(self._main_q_key)
def _processing_qsize(self):
"""Return the size of the main queue."""
return self._db.llen(self._processing_q_key)
def empty(self):
"""Return True if the queue is empty, including work being done, False otherwise.
False does not necessarily mean that there is work available to work on right now,
"""
return self._main_qsize() == 0 and self._processing_qsize() == 0
# TODO: implement this
# def check_expired_leases(self):
# """Return to the work queueReturn True if the queue is empty, False otherwise."""
# # Processing list should not be _too_ long since it is approximately as long
# # as the number of active and recently active workers.
# processing = self._db.lrange(self._processing_q_key, 0, -1)
# for item in processing:
# # If the lease key is not present for an item (it expired or was
# # never created because the client crashed before creating it)
# # then move the item back to the main queue so others can work on it.
# if not self._lease_exists(item):
# TODO: transactionally move the key from processing queue to
# to main queue, while detecting if a new lease is created
# or if either queue is modified.
def _itemkey(self, item):
"""Returns a string that uniquely identifies an item (bytes)."""
return hashlib.sha224(item).hexdigest()
def _lease_exists(self, item):
"""True if a lease on 'item' exists."""
return self._db.exists(self._lease_key_prefix + self._itemkey(item))
def lease(self, lease_secs=60, block=True, timeout=None):
"""Begin working on an item the work queue.
Lease the item for lease_secs. After that time, other
workers may consider this client to have crashed or stalled
and pick up the item instead.
If optional args block is true and timeout is None (the default), block
if necessary until an item is available."""
if block:
item = self._db.brpoplpush(self._main_q_key, self._processing_q_key, timeout=timeout)
else:
item = self._db.rpoplpush(self._main_q_key, self._processing_q_key)
if item:
# Record that we (this session id) are working on a key. Expire that
# note after the lease timeout.
# Note: if we crash at this line of the program, then GC will see no lease
# for this item a later return it to the main queue.
itemkey = self._itemkey(item)
self._db.setex(self._lease_key_prefix + itemkey, lease_secs, self._session)
return item
def complete(self, value):
"""Complete working on the item with 'value'.
If the lease expired, the item may not have completed, and some
other worker may have picked it up. There is no indication
of what happened.
"""
self._db.lrem(self._processing_q_key, 0, value)
# If we crash here, then the GC code will try to move the value, but it will
# not be here, which is fine. So this does not need to be a transaction.
itemkey = self._itemkey(value)
self._db.delete(self._lease_key_prefix + itemkey, self._session)
# TODO: add functions to clean up all keys associated with "name" when
# processing is complete.
# TODO: add a function to add an item to the queue. Atomically
# check if the queue is empty and if so fail to add the item
# since other workers might think work is done and be in the process
# of exiting.
# TODO(etune): move to my own github for hosting, e.g. github.com/erictune/rediswq-py and
# make it so it can be pip installed by anyone (see
# http://stackoverflow.com/questions/8247605/configuring-so-that-pip-install-can-work-from-github)
# TODO(etune): finish code to GC expired leases, and call periodically
# e.g. each time lease times out.

View File

@ -0,0 +1,23 @@
#!/usr/bin/env python
import time
import rediswq
host="redis"
# Uncomment next two lines if you do not have Kube-DNS working.
# import os
# host = os.getenv("REDIS_SERVICE_HOST")
q = rediswq.RedisWQ(name="job2", host="redis")
print("Worker with sessionID: " + q.sessionID())
print("Initial queue state: empty=" + str(q.empty()))
while not q.empty():
item = q.lease(lease_secs=10, block=True, timeout=2)
if item is not None:
itemstr = item.decode("utf=8")
print("Working on " + itemstr)
time.sleep(10) # Put your actual work here instead of sleep.
q.complete(item)
else:
print("Waiting for work")
print("Queue empty, exiting")

View File

@ -259,12 +259,12 @@ want to consider one of the other [job patterns](/docs/concepts/jobs/run-to-comp
This approach creates a pod for every work item. If your work items only take a few seconds,
though, creating a Pod for every work item may add a lot of overhead. Consider another
[example](/docs/user-guide/jobs/work-queue-2/), that executes multiple work items per Pod.
[example](/docs/tasks/job/fine-parallel-processing-work-queue/), that executes multiple work items per Pod.
In this example, we used use the `amqp-consume` utility to read the message
from the queue and run our actual program. This has the advantage that you
do not need to modify your program to be aware of the queue.
A [different example](/docs/user-guide/jobs/work-queue-2/), shows how to
A [different example](/docs/tasks/job/fine-parallel-processing-work-queue/), shows how to
communicate with the work queue using a client library.
## Caveats

View File

@ -2,212 +2,6 @@
title: Fine Parallel Processing using a Work Queue
---
* TOC
{:toc}
{% include user-guide-content-moved.md %}
# Example: Job with Work Queue with Pod Per Work Item
In this example, we will run a Kubernetes Job with multiple parallel
worker processes. You may want to be familiar with the basic,
non-parallel, use of [Job](/docs/concepts/jobs/run-to-completion-finite-workloads/) first.
In this example, as each pod is created, it picks up one unit of work
from a task queue, completes it, deletes it from the queue, and exits.
Here is an overview of the steps in this example:
1. **Start a storage service to hold the work queue.** In this example, we use Redis to store
our work items. In the previous example, we used RabbitMQ. In this example, we use Redis and
a custom work-queue client library because AMQP does not provide a good way for clients to
detect when a finite-length work queue is empty. In practice you would set up a store such
as Redis once and reuse it for the work queues of many jobs, and other things.
1. **Create a queue, and fill it with messages.** Each message represents one task to be done. In
this example, a message is just an integer that we will do a lengthy computation on.
1. **Start a Job that works on tasks from the queue**. The Job starts several pods. Each pod takes
one task from the message queue, processes it, and repeats until the end of the queue is reached.
## Starting Redis
For this example, for simplicitly, we will start a single instance of Redis.
See the [Redis Example](https://github.com/kubernetes/kubernetes/tree/master/examples/guestbook) for an example
of deploying Redis scalably and redundantly.
Start a temporary Pod running Redis and a service so we can find it.
```shell
$ kubectl create -f docs/user-guide/jobs/work-queue-2/redis-pod.yaml
pod "redis-master" created
$ kubectl create -f docs/user-guide/jobs/work-queue-2/redis-service.yaml
service "redis" created
```
If you're not working from the source tree, you could also download [`redis-pod.yaml`](redis-pod.yaml?raw=true) and [`redis-service.yaml`](redis-service.yaml?raw=true) directly.
## Filling the Queue with tasks
Now let's fill the queue with some "tasks". In our example, our tasks are just strings to be
printed.
Start a temporary interactive pod for running the Redis CLI
```shell
$ kubectl run -i --tty temp --image redis --command "/bin/sh"
Waiting for pod default/redis2-c7h78 to be running, status is Pending, pod ready: false
Hit enter for command prompt
```
Now hit enter, start the redis CLI, and create a list with some work items in it.
```
# redis-cli -h redis
redis:6379> rpush job2 "apple"
(integer) 1
redis:6379> rpush job2 "banana"
(integer) 2
redis:6379> rpush job2 "cherry"
(integer) 3
redis:6379> rpush job2 "date"
(integer) 4
redis:6379> rpush job2 "fig"
(integer) 5
redis:6379> rpush job2 "grape"
(integer) 6
redis:6379> rpush job2 "lemon"
(integer) 7
redis:6379> rpush job2 "melon"
(integer) 8
redis:6379> rpush job2 "orange"
(integer) 9
redis:6379> lrange job2 0 -1
1) "apple"
2) "banana"
3) "cherry"
4) "date"
5) "fig"
6) "grape"
7) "lemon"
8) "melon"
9) "orange"
```
So, the list with key `job2` will be our work queue.
Note: if you do not have Kube DNS setup correctly, you may need to change
the first step of the above block to `redis-cli -h $REDIS_SERVICE_HOST`.
## Create an Image
Now we are ready to create an image that we will run.
We will use a python worker program with a redis client to read
the messages from the message queue.
A simple Redis work queue client library is provided,
called rediswq.py ([Download](rediswq.py?raw=true)).
The "worker" program in each Pod of the Job uses the work queue
client library to get work. Here it is:
{% include code.html language="python" file="worker.py" ghlink="/docs/user-guide/jobs/work-queue-2/worker.py" %}
If you are working from the source tree,
change directory to the `docs/user-guide/jobs/work-queue-2/` directory.
Otherwise, download [`worker.py`](worker.py?raw=true), [`rediswq.py`](rediswq.py?raw=true), and [`Dockerfile`](Dockerfile?raw=true)
using above links. Then build the image:
```shell
docker build -t job-wq-2 .
```
### Push the image
For the [Docker Hub](https://hub.docker.com/), tag your app image with
your username and push to the Hub with the below commands. Replace
`<username>` with your Hub username.
```shell
docker tag job-wq-2 <username>/job-wq-2
docker push <username>/job-wq-2
```
You need to push to a public repository or [configure your cluster to be able to access
your private repository](/docs/user-guide/images).
If you are using [Google Container
Registry](https://cloud.google.com/tools/container-registry/), tag
your app image with your project ID, and push to GCR. Replace
`<project>` with your project ID.
```shell
docker tag job-wq-2 gcr.io/<project>/job-wq-2
gcloud docker push gcr.io/<project>/job-wq-2
```
## Defining a Job
Here is the job definition:
{% include code.html language="yaml" file="job.yaml" ghlink="/docs/user-guide/jobs/work-queue-2/job.yaml" %}
Be sure to edit the job template to
change `gcr.io/myproject` to your own path.
In this example, each pod works on several items from the queue and then exits when there are no more items.
Since the workers themselves detect when the workqueue is empty, and the Job controller does not
know about the workqueue, it relies on the workers to signal when they are done working.
The workers signal that the queue is empty by exiting with success. So, as soon as any worker
exits with success, the controller knows the work is done, and the Pods will exit soon.
So, we set the completion count of the Job to 1. The job controller will wait for the other pods to complete
too.
## Running the Job
So, now run the Job:
```shell
kubectl create -f ./job.yaml
```
Now wait a bit, then check on the job.
```shell
$ kubectl describe jobs/job-wq-2
Name: job-wq-2
Namespace: default
Image(s): gcr.io/exampleproject/job-wq-2
Selector: app in (job-wq-2)
Parallelism: 2
Completions: Unset
Start Time: Mon, 11 Jan 2016 17:07:59 -0800
Labels: app=job-wq-2
Pods Statuses: 1 Running / 0 Succeeded / 0 Failed
No volumes.
Events:
FirstSeen LastSeen Count From SubobjectPath Type Reason Message
--------- -------- ----- ---- ------------- -------- ------ -------
33s 33s 1 {job-controller } Normal SuccessfulCreate Created pod: job-wq-2-lglf8
$ kubectl logs pods/job-wq-2-7r7b2
Worker with sessionID: bbd72d0a-9e5c-4dd6-abf6-416cc267991f
Initial queue state: empty=False
Working on banana
Working on date
Working on lemon
```
As you can see, one of our pods worked on several work units.
## Alternatives
If running a queue service or modifying your containers to use a work queue is inconvenient, you may
want to consider one of the other [job patterns](/docs/concepts/jobs/run-to-completion-finite-workloads/#job-patterns).
If you have a continuous stream of background processing work to run, then
consider running your background workers with a `replicationController` instead,
and consider running a background processing library such as
https://github.com/resque/resque.
[Fine Parallel Processing Using a Work Queue](/docs/tasks/job/fine-parallel-processing-work-queue/)

View File

@ -250,7 +250,7 @@ func TestExampleObjectSchemas(t *testing.T) {
"../docs/tasks/job/work-queue-1": {
"job": {&batch.Job{}},
},
"../docs/user-guide/jobs/work-queue-2": {
"../docs/tasks/job/fine-parallel-processing-work-queue": {
"job": {&batch.Job{}},
"redis-pod": {&api.Pod{}},
"redis-service": {&api.Service{}},