Skip to content

Commit 09eadd9

Browse files
authored
Merge pull request #20 from pyper-dev/bug-mp
Add better error capture and error message for invliad multiprocess targets
2 parents e513626 + aa4e707 commit 09eadd9

File tree

7 files changed

+83
-57
lines changed

7 files changed

+83
-57
lines changed

docs/src/docs/ApiReference/task.md

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,6 @@ The function or callable object defining the logic of the task. This is a positi
5252
```python
5353
from pyper import task
5454

55-
@task
56-
def add_one(x: int):
57-
return x + 1
58-
59-
# OR
6055
def add_one(x: int):
6156
return x + 1
6257

@@ -123,19 +118,20 @@ When `join` is `False`, a producer-consumer takes each individual output from th
123118
from typing import Iterable
124119
from pyper import task
125120

126-
@task(branch=True)
127121
def create_data(x: int):
128122
return [x + 1, x + 2, x + 3]
129123

130-
@task(branch=True, join=True)
131124
def running_total(data: Iterable[int]):
132125
total = 0
133126
for item in data:
134127
total += item
135128
yield total
136129

137130
if __name__ == "__main__":
138-
pipeline = create_data | running_total
131+
pipeline = (
132+
task(create_data, branch=True)
133+
| task(running_total, branch=True, join=True)
134+
)
139135
for output in pipeline(0):
140136
print(output)
141137
#> 1
@@ -190,15 +186,18 @@ The parameter `throttle` determines the maximum size of a task's output queue. T
190186
import time
191187
from pyper import task
192188

193-
@task(branch=True, throttle=5000)
194189
def fast_producer():
195190
for i in range(1_000_000):
196191
yield i
197192

198-
@task
199193
def slow_consumer(data: int):
200194
time.sleep(10)
201195
return data
196+
197+
pipeline = (
198+
task(fast_consumer, branch=True, throttle=5000)
199+
| task(slow_consumer)
200+
)
202201
```
203202

204203
In the example above, workers on `fast_producer` are paused after `5000` values have been generated, until workers for `slow_consumer` are ready to start processing again.

docs/src/docs/UserGuide/AdvancedConcepts.md

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -72,19 +72,16 @@ Executing CPU-bound tasks concurrently does not improve performance, as CPU-boun
7272
The correct way to optimize the performance of CPU-bound tasks is through parallel execution, using multiprocessing.
7373

7474
```python
75-
# Okay
76-
@task(workers=10, multiprocess=True)
7775
def long_computation(data: int):
7876
for i in range(1, 1_000_000):
7977
data *= i
8078
return data
8179

80+
# Okay
81+
pipeline = task(long_computation, workers=10, multiprocess=True)
82+
8283
# Bad -- cannot benefit from concurrency
83-
@task(workers=10)
84-
def long_computation(data: int):
85-
for i in range(1, 1_000_000):
86-
data *= i
87-
return data
84+
pipeline = task(long_computation, workers=10)
8885
```
8986

9087
Note, however, that processes incur a very high overhead cost (performance cost in creation and memory cost in inter-process communication). Specific cases should be benchmarked to fine-tune the task parameters for your program / your machine.
@@ -115,7 +112,6 @@ In Pyper, it is especially important to separate out different types of work int
115112

116113
```python
117114
# Bad -- functions not separated
118-
@task(branch=True, workers=20)
119115
def get_data(endpoint: str):
120116
# IO-bound work
121117
r = requests.get(endpoint)
@@ -124,23 +120,28 @@ def get_data(endpoint: str):
124120
# CPU-bound work
125121
for item in data["results"]:
126122
yield process_data(item)
123+
124+
pipeline = task(get_data, branch=True, workers=20)
127125
```
128126

129-
Whilst it makes sense to handle the network request concurrently, the call to `process_data` within the same task is blocking and will harm concurrency.
127+
Whilst it makes sense to handle the network request concurrently, the call to `process_data` within the same task requires holding onto the GIL and will harm concurrency.
130128
Instead, `process_data` should be implemented as a separate function:
131129

132130
```python
133-
@task(branch=True, workers=20)
134131
def get_data(endpoint: str):
135132
# IO-bound work
136133
r = requests.get(endpoint)
137134
data = r.json()
138135
return data["results"]
139136

140-
@task(workers=10, multiprocess=True)
141137
def process_data(data):
142138
# CPU-bound work
143139
return ...
140+
141+
pipeline = (
142+
task(get_data, branch=True, workers=20)
143+
| task(workers=10, multiprocess=True)
144+
)
144145
```
145146

146147
### Resource Management
@@ -225,14 +226,12 @@ import typing
225226
from pyper import task
226227

227228
# Okay
228-
@task(branch=True)
229229
def generate_values_lazily() -> typing.Iterable[dict]:
230230
for i in range(10_000_000):
231231
yield {"data": i}
232232

233233
# Bad -- this creates 10 million values in memory
234-
# Subsequent tasks also cannot start executing until the entire list is created
235-
@task(branch=True)
234+
# Within a pipeline, subsequent tasks also cannot start executing until the entire list is created
236235
def create_values_in_list() -> typing.List[dict]:
237236
return [{"data": i} for i in range(10_000_000)]
238237
```

docs/src/docs/UserGuide/BasicConcepts.md

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,24 @@ Pyper follows the [functional paradigm](https://docs.python.org/3/howto/function
1919
* Python functions are the building blocks used to create `Pipeline` objects
2020
* `Pipeline` objects can themselves be thought of as functions
2121

22-
For example, to create a simple pipeline, we can wrap a function in the `task` decorator:
22+
For example, to create a simple pipeline, we can wrap a function in the `task` class:
2323

2424
```python
2525
from pyper import task
2626

27-
@task
2827
def len_strings(x: str, y: str) -> int:
2928
return len(x) + len(y)
29+
30+
pipeline = task(len_strings)
3031
```
3132

32-
This defines `len_strings` as a pipeline consisting of a single task. It takes the parameters `(x: str, y: str)` and generates `int` outputs from an output queue:
33+
This defines `pipeline` as a pipeline consisting of a single task. It takes the parameters `(x: str, y: str)` and generates `int` outputs from an output queue:
3334

3435
<img src="../../assets/img/diagram1.png" alt="Diagram" style="height: 250px; width: auto;">
3536

3637
**Key Concepts**
3738

38-
* A <b style="color:#3399FF;">pipeline</b> is a functional reprentation of data-flow _(Pyper API)_
39+
* A <b style="color:#3399FF;">Pipeline</b> is a representation of data-flow _(Pyper API)_
3940
* A **task** represents a single functional operation within a pipeline _(user defined)_
4041
* Under the hood, tasks pass data along via <b style="color:#FF8000;">workers</b> and <b style="color:#FF8000;">queues</b> _(Pyper internal)_
4142

@@ -45,21 +46,22 @@ Pipelines are composable components; to create a pipeline which runs multiple ta
4546
import time
4647
from pyper import task
4748

48-
@task
4949
def len_strings(x: str, y: str) -> int:
5050
return len(x) + len(y)
5151

52-
@task(workers=3)
5352
def sleep(data: int) -> int:
5453
time.sleep(data)
5554
return data
5655

57-
@task(workers=2)
5856
def calculate(data: int) -> bool:
5957
time.sleep(data)
6058
return data % 2 == 0
6159

62-
pipeline = len_strings | sleep | calculate
60+
pipeline = (
61+
task(len_strings)
62+
| task(sleep, workers=3)
63+
| task(calculate, workers=2)
64+
)
6365
```
6466

6567
This defines `pipeline` as a series of tasks, taking the parameters `(x: str, y: str)` and generating `bool` outputs:

docs/src/docs/UserGuide/ComposingPipelines.md

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,10 @@ from typing import Dict, Iterable
5353

5454
from pyper import task
5555

56-
@task(branch=True)
5756
def step1(limit: int):
5857
for i in range(limit):
5958
yield {"data": i}
6059

61-
@task
6260
def step2(data: Dict):
6361
return data | {"hello": "world"}
6462

@@ -72,18 +70,26 @@ class JsonFileWriter:
7270
json.dump(data_list, f, indent=4)
7371

7472
if __name__ == "__main__":
75-
pipeline = step1 | step2 # The pipeline
73+
pipeline = task(step1, branch=True) | task(step2) # The pipeline
7674
writer = JsonFileWriter("data.json") # A consumer
7775
writer(pipeline(limit=10)) # Run
7876
```
7977

8078
The `>` operator (again inspired by UNIX syntax) is used to pipe a `Pipeline` into a consumer function (any callable that takes an `Iterable` of inputs) returning simply a function that handles the 'run' operation. This is syntactic sugar for the `Pipeline.consume` method.
8179
```python
8280
if __name__ == "__main__":
83-
run = step1 | step2 > JsonFileWriter("data.json")
81+
run = (
82+
task(step1, branch=True)
83+
| task(step2)
84+
> JsonFileWriter("data.json")
85+
)
8486
run(limit=10)
8587
# OR
86-
run = step1.pipe(step2).consume(JsonFileWriter("data.json"))
88+
run = (
89+
task(step1, branch=True).pipe(
90+
task(step2)).consume(
91+
JsonFileWriter("data.json"))
92+
)
8793
run(limit=10)
8894
```
8995

@@ -163,12 +169,10 @@ from typing import AsyncIterable, Dict
163169

164170
from pyper import task
165171

166-
@task(branch=True)
167172
async def step1(limit: int):
168173
for i in range(limit):
169174
yield {"data": i}
170175

171-
@task
172176
def step2(data: Dict):
173177
return data | {"hello": "world"}
174178

@@ -182,7 +186,11 @@ class AsyncJsonFileWriter:
182186
json.dump(data_list, f, indent=4)
183187

184188
async def main():
185-
run = step1 | step2 > AsyncJsonFileWriter("data.json")
189+
run = (
190+
task(step1, branch=True)
191+
| task(step2)
192+
> AsyncJsonFileWriter("data.json")
193+
)
186194
await run(limit=10)
187195

188196
if __name__ == "__main__":

docs/src/docs/UserGuide/CreatingPipelines.md

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,16 @@ Pyper's `task` decorator is the means by which we instantiate pipelines and cont
1919
```python
2020
from pyper import task, Pipeline
2121

22-
@task
2322
def func(x: int):
2423
return x + 1
2524

26-
assert isinstance(func, Pipeline)
25+
pipeline = task(func)
26+
27+
assert isinstance(pipeline, Pipeline)
2728
```
2829

2930
This creates a `Pipeline` object consisting of one 'task' (one step of data transformation).
3031

31-
The `task` decorator can also be used more dynamically, which is preferable in most cases as this separates execution logic from the functional definitions themselves:
32-
33-
```python
34-
from pyper import task
35-
36-
def func(x: int):
37-
return x + 1
38-
39-
pipeline = task(func)
40-
```
41-
4232
In addition to functions, anything `callable` in Python can be wrapped in `task` in the same way:
4333

4434
```python

src/pyper/_core/task.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,19 @@ def __init__(
4848
or inspect.iscoroutinefunction(func.__call__) \
4949
or inspect.isasyncgenfunction(func.__call__)
5050

51-
if self.is_async and multiprocess:
52-
raise ValueError("multiprocess cannot be True for an async task")
53-
51+
if multiprocess:
52+
# Asynchronous functions cannot be multiprocessed
53+
if self.is_async:
54+
raise ValueError("multiprocess cannot be True for an async task")
55+
56+
# The function needs to be globally accessible to be multiprocessed
57+
# This excludes objects like lambdas and closures
58+
# We capture these cases to throw a clear error message
59+
module = inspect.getmodule(func)
60+
if module is None or getattr(module, func.__name__, None) is not func:
61+
raise RuntimeError(f"{func} cannot be multiprocessed because it is not globally accessible"
62+
f" -- it must be a globally defined object accessible by the name {func.__name__}")
63+
5464
self.func = func if bind is None else functools.partial(func, *bind[0], **bind[1])
5565
self.branch = branch
5666
self.join = join

tests/test_task.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,32 @@ def test_raise_for_invalid_func():
7474
else:
7575
raise AssertionError
7676

77-
def test_raise_for_invalid_multiprocess():
77+
def test_raise_for_async_multiprocess():
7878
try:
7979
task(afunc, multiprocess=True)
8080
except Exception as e:
8181
assert isinstance(e, ValueError)
8282
else:
8383
raise AssertionError
8484

85+
def test_raise_for_lambda_multiprocess():
86+
try:
87+
task(lambda x: x, multiprocess=True)
88+
except Exception as e:
89+
assert isinstance(e, RuntimeError)
90+
else:
91+
raise AssertionError
92+
93+
def test_raise_for_non_global_multiprocess():
94+
try:
95+
@task(multiprocess=True)
96+
def f(x):
97+
return x
98+
except Exception as e:
99+
assert isinstance(e, RuntimeError)
100+
else:
101+
raise AssertionError
102+
85103
def test_async_task():
86104
p = task(afunc)
87105
assert isinstance(p, AsyncPipeline)

0 commit comments

Comments
 (0)