Concurrent Data Processing in Elixir:147

On page 147. the book says “Using Task.async_stream/3 is a great way to process large collections of data, and provide back-pressure at the same time.”

but while using Task.async_stream/3, I found some unexpected memory usages and ran some tests. and my test results show Task.async_stream is not handling back-pressure well. It works fine with fewer concurrency, but it fails to manage the back-pressure when I increased the max_concurrency option to 100.

This is my test code.

defmodule Playground do
  def stream do
    Stream.resource(
      fn -> Enum.to_list(0..100) end,
      fn list ->
        case Enum.split(list, 10) do
          {[], _} ->
            {:halt, []}
          {head, tail} ->
            IO.inspect("emit")
            {head, tail}
        end
      end,
      fn _ -> :foo end
    )
  end

  def run_async do
    stream()
    |> Task.async_stream(__MODULE__, :do_it, [], max_concurrency: 100, ordered: false)
    |> Stream.run()
  end

  def do_it(i) do
    Process.sleep(:rand.uniform(1000))
    IO.inspect(i)
  end
end

and the printed out result

iex(5)> Playground.run_async
"emit"
"emit"
"emit"
"emit"
"emit"
"emit"
"emit"
"emit"
"emit"
"emit"
"emit"
0
51
5
35
52
68
65
21
98
85
87
10
27
29
47
.
.
.