On page 147. the book says “Using Task.async_stream/3 is a great way to process large collections of data, and provide back-pressure at the same time.”
but while using Task.async_stream/3
, I found some unexpected memory usages and ran some tests. and my test results show Task.async_stream is not handling back-pressure well. It works fine with fewer concurrency, but it fails to manage the back-pressure when I increased the max_concurrency option to 100.
This is my test code.
defmodule Playground do
def stream do
Stream.resource(
fn -> Enum.to_list(0..100) end,
fn list ->
case Enum.split(list, 10) do
{[], _} ->
{:halt, []}
{head, tail} ->
IO.inspect("emit")
{head, tail}
end
end,
fn _ -> :foo end
)
end
def run_async do
stream()
|> Task.async_stream(__MODULE__, :do_it, [], max_concurrency: 100, ordered: false)
|> Stream.run()
end
def do_it(i) do
Process.sleep(:rand.uniform(1000))
IO.inspect(i)
end
end
and the printed out result
iex(5)> Playground.run_async
"emit"
"emit"
"emit"
"emit"
"emit"
"emit"
"emit"
"emit"
"emit"
"emit"
"emit"
0
51
5
35
52
68
65
21
98
85
87
10
27
29
47
.
.
.