Concurrent Data Processing in Elixir: Using a mapper operation after reduce/3 is deprecated (page 114)

@svilen

warning: Using a mapper operation, such as map/filter/reject, after reduce/3 is deprecated. Use Flow.on_trigger/2 instead
(flow 1.2.0) lib/flow.ex:1916: Flow.add_mapper/3
(airports 0.1.0) lib/airports.ex:25: Airports.open_airports/0
(stdlib 4.0.1) erl_eval.erl:744: :erl_eval.do_apply/7
(elixir 1.13.4) src/elixir.erl:296: :elixir.recur_eval/3
(elixir 1.13.4) src/elixir.erl:274: :elixir.eval_forms/3
(iex 1.13.4) lib/iex/evaluator.ex:310: IEx.Evaluator.handle_eval/3

How would we rewrite this as an on_trigger function?

This should work, but is longer than Flow.map version from book:

...
     |> Flow.reject(&(&1.type == "closed"))
     |> Flow.partition(key: {:key, :country})
     |> Flow.group_by(& &1.country)
     |> Flow.on_trigger(fn map ->
        country_data = Enum.map(map, fn {country, data} -> {country, Enum.count(data)} end)
        {country_data, map}
      end)
     #|> Flow.map(fn {country, data} -> {country, Enum.count(data)} end)
     |> Enum.to_list()

@svilen

I ran into this problem as well. While @andybab’s solution will produce the correct answer, I believe the use of Enum.map means that this part of the computation will not be calculated concurrently, whereas (I think?) the version in the book which uses Flow.map will be run in parallel. I could be wrong, I’m totally new at this.

It sure would be nice if the author of the book could provide a version that is both concurrent and not deprecated.

Edit: looks like my thinking was wrong. If you throw an IO.inspect(self()) into the andybab’s on_trigger callback, you can see that this does get split in parallel across multiple stages. For a noob, it’s confusing why/how this works and why you can’t use Flow.map instead, hopefully the author can update this section with an explanation.