1   Introduction

In a previous post (http://davekuhlman.org/elixir-python-erlport-export.html) we learned how to call Python functions from Elixir and how to save state (data) across multiple calls into Python.

In some use cases, however, (1) the work performed by the Python might be heavy-weight and slow; and (2) we might want to perform that work many times. So, it would be nice if we could have a pool of Python processes and if we could distribute our requests for that work across a pool of those processes.

This article describes how to do that.

2   Preliminaries

We'll use poolboy to manage and enable us to use a pool of processes, in our case a pool of Python processes that we have access to using Export, which is a wrapper around Erlport.

I found the sample Poolboy applications helpful, and you may want to refer to them. My sample code builds on the sample at Elixir School:

Information about Export and Erlport:

We'll implement our Elixir workers as supervised Elixir processes using the Elixir GenServer, Application, and Supervisor behaviors and modules.

For more about GenServer, see https://elixir-lang.org/getting-started/mix-otp/genserver.html and https://hexdocs.pm/elixir/GenServer.html.

For information about Application, and Supervisor, see https://elixir-lang.org/getting-started/mix-otp/supervisor-and-application.html and https://hexdocs.pm/elixir/Supervisor.html#content and https://hexdocs.pm/elixir/Application.html#content.

3   How it's done

You can find the source code for this sample application in poolboy_python_example.zip

I'll be showing and explaining some of that code in what follows.

First, create an elixir app. You can do something like this:

$ mix new my_python_app

Then add the Export and Poolboy dependencies to the mix.exs file in the resulting directory. In this example, we will also want support for JSON so that we can pass results back from Python:

# mix.exs

defp deps do
  [
    {:export, "~> 0.1.0"},
    {:poolboy, "~> 1.5"},
    {:jason, "~> 1.2"},
  ]
end

Now, fetch and compile those dependencies:

$ cd my_python_app
$ mix deps.get
$ mix deps.compile

Next we'll look at some sample code.

We'll have Elixir modules, implemented following the Application and GenServer protocol.

Here is our "application" module that sets up the supervision of our worker module:

# lib/datasci/application.ex

defmodule Datasci.Application do
  @moduledoc false

  use Application

  # gdk note: I changed max_overflow from 2 to 0 in order to prevent cration
  # of more and more worker processes.

  defp poolboy_config do
    [
      name: {:local, :worker},
      worker_module: Datasci.Worker,
      size: 5,
      max_overflow: 0
    ]
  end

  def start(_type, _args) do
    children = [
      :poolboy.child_spec(:worker, poolboy_config())
    ]

    opts = [strategy: :one_for_one, name: Datasci.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Notes:

  • In poolboy_config/1, I set max_overflow to 0 (zero). A value greater than zero causes the new processes to be added to the pool. That might be acceptable if these processes where plain Erlang/Elixir processes. However, in our case they are OS processes (or perhaps they are Erlang processes that each contain an OS process). In any case, they are a bit heavy-weight in comparison with Erlang/Elixir processes, and we do not want Poolboy to create more of them.
  • In start/2, the call to Supervisor.start_link/2 is what causes the start_link/ function in our worker module (Datasci.Worker) to be called. This gives us a chance to initialize each of our worker processes.

Let's look at the implementation of each worker:

# lib/datasci_tasks.ex

defmodule Datasci.Worker do
  use GenServer
  use Export.Python

  @spec start_link(any()) :: GenServer.on_start()
  def start_link(_) do
    IO.puts("----")
    {:ok, pid} = Python.start(python_path: Path.expand("lib/python"))
    value = GenServer.start_link(__MODULE__, pid)
    value
  end

  @spec init(any()) :: {:ok, keyword()}
  def init(pid) do
    {:ok, [pid: pid]}
  end

  def handle_call({:calc_mean, path, column}, _from, state) do
    pid = Keyword.get(state, :pid)
    {:ok, result} = calc_mean(pid, path, column)
    #Process.sleep(2000)
    {:reply, {:ok, result}, state}
  end

  def terminate(_reason, state) do
    pid = Keyword.get(state, :pid)
    Python.stop(pid)
    :ok
  end

  @doc """
  Calculate the mean of a column in a Pandas data-set.

  ## Examples

      iex> {:ok, result} = DatasciTasks.calc_mean("Data/data_set_01.cvs", 2)
      :world

  """
  @spec calc_mean(pid(), iodata(), String.t()) :: {:ok, []}
  def calc_mean(pid, path, column) do
    path1 = Path.absname(path)
    json_result = Python.call(pid, calc_mean(path1, column), from_file: "datasci_tasks")
    {:ok, result} = Jason.decode(json_result)
    {:ok, result}
  end

end

Notes:

  • Datasci.Worker.start_link/1 is called as a result of the call to Supervisor.start/2 in our function Datasci.Application.start/2. This is what gives us a chance to initialize each worker process. Notice that we call GenServer.start_link/2 and pass it the process ID of the Erlport/Export Python process. That results in the call to Datasci.Worker.init/1 passing it the Python process ID.
  • Datasci.Worker.init/1 returns a tuple containing the atom :ok and a keyword list containing the Python process ID. That keyword list becomes the GenServer state that is passed in each call to Datasci.Worker.handle_call/3.
  • Datasci.Worker.handle_call/3 retrieves the Python process ID from the GenServer state and uses that to make our call into our Python code. Datasci.Worker.handle_call/3 is a GenServer call-back that is called when we call GenServer.call/3. See sample code lib/datasci_test.ex.
  • And, Datasci.Worker.calc_mean/3 is where we actually call a Python function. That Python function returns a JSON string, which we convert into an Elixir data structure and return as our result.

So, let's look at the Python code:

# lib/python/datasci_tasks.py

import os
import pandas as pd
import json
from erlport.erlterms import Atom

DataSets = {}

def load_data_set(data_set_path):
    if data_set_path in DataSets:
        data_set = DataSets.get(data_set_path)
        return data_set
    else:
        if os.path.exists(data_set_path):
            data_set = pd.read_csv(data_set_path)
            DataSets[data_set_path] = data_set
            print(f'*** loading {data_set_path}')
            return data_set
        else:
            return None

def calc_mean(data_set_path, column_name):
    data_set_path = data_set_path.decode()
    column_name = column_name.decode()
    data_set = load_data_set(data_set_path)
    if data_set is not None:
        if column_name in data_set.columns:
            mean = data_set[column_name].mean()
            result = ('ok', column_name, mean)
            json_result = json.dumps(result)
            return json_result
        else:
            msg = f'no column "{column_name}" in data-set "{data_set_path}"'
            result = ('error', msg)
            json_result = json.dumps(result)
            return json_result
    else:
        msg = f'cannot locate data-set "{data_set_path}"'
        result = ('error', msg)
        json_result = json.dumps(result)
        return json_result

Notes:

  • Our calc_mean Python function calls load_data_set to load a dataset from a file into a Pandas data frame. load_data_set also caches any dataset that it loads in a Python dictionary (DataSets) using the file path and name as a key.
  • If the dataset has been loaded successfully, calc_mean uses Pandas to calculate the mean of the requested column in that loaded Pandas data frame. Then, calc_mean packages the result (or an error message if there was a problem) as a JSON string and returns it.

And, finally, lets look at a bit of code that we can use to perform a test of the capability that we've implemented:

# lib/datasci_test.ex

defmodule Datasci.Test do
  @timeout 60000

  def start do
    [
      {"Data/iris.csv", "petal_length"},
      {"Data/iris.csv", "petal_width"},
      {"Data/iris.csv", "sepal_length"},
      {"Data/iris.csv", "sepal_width"},
      {"Data/penguins.csv", "bill_length_mm"},
      {"Data/penguins.csv", "bill_depth_mm"},
      {"Data/penguins.csv", "flipper_length_mm"},
      {"Data/penguins.csv", "body_mass_g"},
      #
      # More tests if desired.
      #
    ]
    |> Enum.map(fn {data_set_path, column_name} ->
      async_call_calc_mean(data_set_path, column_name) end)
    |> Enum.with_index()
    |> Enum.each(fn {task, idx} -> await_and_inspect(task, idx) end)
  end

  def stop do
    {:ready, numworkers, _, _} = :poolboy.status(:worker)
    1 .. numworkers
    |> Enum.each(fn num -> stop_worker(num) end)
  end

  defp async_call_calc_mean(data_set_path, column_name) do
    Task.async(fn ->
      :poolboy.transaction(
        :worker,
        fn pid -> GenServer.call(pid, {:calc_mean, data_set_path, column_name}) end,
        @timeout
      )
    end)
  end

  #defp await_and_inspect(task), do: task |> Task.await(@timeout) |> IO.inspect(label: "(async_call_calc_mean) result")
  defp await_and_inspect(task, idx) do
     response = task
    |> Task.await(@timeout)
    #|> IO.inspect(label: "(async_call_calc_mean) result")
    {:ok, ["ok", column, value]} = response
    IO.puts("#{idx + 1}. column: #{column}  value: #{value}")
  end

  def stop_worker(num) do
    IO.puts("(stop_worker) num: #{num}")
    Task.async(fn ->
      :poolboy.transaction(
        :worker,
        fn pid -> GenServer.stop(pid, :normal) end,
        @timeout
      )
    end)
  end

end

Notes:

  • Task.async/1 and Task.await/2 -- In our sample application, we want to process multiple requests. And, we do not want to block on any one, single request. So, asynchronous requests make sense. A synchronous request in which we wait for the Python process to respond to our request is even simpler. If you want an example, see my previous post: "Elixir access to Python using Erlport/Export" (http://davekuhlman.org/elixir-python-erlport-export.html).

  • :poolboy.transaction/3 -- We need to check-out (acquire) a worker process from the pool and call some function on that process and check the process back into the pool. And, we need to do those three things as a single, atomic operation. :poolboy.transaction/3 is a convenience, wrapper function that does that. Here is the Erlang source code (from deps/poolboy/src/poolboy.erl in my project):

    -spec transaction(Pool :: pool(), Fun :: fun((Worker :: pid()) -> any()))
        -> any().
    transaction(Pool, Fun) ->
        transaction(Pool, Fun, ?TIMEOUT).
    
    -spec transaction(Pool :: pool(), Fun :: fun((Worker :: pid()) -> any()),
        Timeout :: timeout()) -> any().
    transaction(Pool, Fun, Timeout) ->
        Worker = poolboy:checkout(Pool, true, Timeout),
        try
            Fun(Worker)
        after
            ok = poolboy:checkin(Pool, Worker)
        end.
    

4   Discussion and comments

Sharing data across Python processes -- There seems to be no way for our multiple Python processes to share the same data structure. In a sense, that is not a deficiency; it is by design and is part of the Erlang/Elixir Actor model. (See https://en.wikipedia.org/wiki/Actor_model) Processes under this model of computation do not share memory space and can only "share" data by sending messages. This feature is actually a benefit, because it means that we do not need to worry and protect ourselves against our Python processes, which are executing concurrently, modifying their private data in unexpected ways and at unexpected times. These comments are meant as a bit of explanation for why each of the Python processes in our pool must keep its own cache of data structures that it has loaded.


Published

Category

elixir

Tags

Contact