Parallel Map for Dart

18 Feb 2014

Dart programs run in a single isolate by default. Although Dart provides several comfortable asynchronous programming techniques like Futures and Streams Dart does not use multithreading features of modern processors by default.

This is a reasonable language design decision mainly due to problematic controllability of concurrency aspects (thread safety) in programming.

Let us assume we want to calculate the sum of the fibonacci numbers from 40 to 45 by applying the following function fib (and of course the fibonacci function is here only a placeholder for any computational intensive function of the real world)

int fib(int n) {
  if (n == 0) return 0;
  if (n == 1) return 1;
  return fib(n-1) + fib(n-2);
}

we could do this in Dart like that (a very simple example of map/reduce):

final vs = [40, 41, 42, 43, 44, 45];
print(
  "fib(40) + fib(41) + ... + fib(45) = "
  "${vs.map((n) => fib(n)).reduce((a, b) => a + b)}"
);

This would prompt the following result on console.

fib(40) + fib(41) + ... + fib(45) = 2805634932

Parallelize map

The most computational intensive part of this operation is to apply several times the fibonacci function fib using map. And although map is perfectly parallelizable in the shown case Dart would not execute it in parallel due to the fact that every Dart program runs in a single isolate (even if a Dart program runs on a processor being capable to process more than one thread in parallel).

Nevertheless Dart is capable to run several computations in parallel using isolates. Isolates in Dart are isolated threads having no shared access to memory (so they can share no data). Isolates can only pass messages between each other. This rigid concept solves all relevant thread safety problems and that is why you need no synchronized statement in Dart. Nevertheless spawning isolates capable to communicate with each other is unnecessary tricky in Dart. Diego Rocha has written a blog post about this complexity and states:

You have to spawn a new isolate, send it a SendPort of your current isolate, listen to the ReceivePort of your current isolate, then make the spawned isolate listen to its ReceivePort, send messages requesting something and then responding those messages. If you try to run multiple isolates at the same time, it is even more troublesome as you don’t know from which isolate a message is coming from.

Diego Rocha

Using the worker package to parallelize map

Diego Rocha proposes to use his Worker package to overcome the inherent spawning complexity of isolates in Dart. I recommend to read his blog post to get most out of the following paragraphs.

By using Rochas Worker package the calculation of our fibonacci sum can be parallelized like that

import "package:worker/worker.dart";
import "dart:async";

void main() {
  // We create a worker. A worker is used to assign computational tasks to
  // executing isolates without to worry about Send- and ReceivePorts and
  // all the nitty critty details.
  final worker = new Worker();

  // Our problem to compute.
  final vs = [40, 41, 42, 43, 44, 45];

  // We assign our problem to solve in parallel by using the handle method
  // of the worker.
  final futures = vs.map((i) => worker.handle(new FibTask(i)));

  // And wait until all isolates have calculated their
  // results in parallel and providing it in results
  Future.wait(futures).then((results) {
    print(
      "fib(40) + fib(41) + ... + fib(45) = "
      "${results.reduce((a, b) => a + b)}"
    );
    worker.close();
  });
}

// We have to define a computational task class to create
// computational objects which can be transferred by worker
// to isolates. A computational task encapsulates our
// computational function (in our case fib) in its #execute method.
class FibTask extends Task {
  int n;
  FibTask(this.n);
  execute() => fib(n);
}

and we will get the same result (but now computed in parallel).

fib(40) + fib(41) + ... + fib(45) = 2805634932

Discussion of the worker solution

Let us have a look at the above mentioned solution. It works parallel, solves the problem, and hides most of the spawning complexity of isolates thanks to the great work of Rochas Worker package.

Nevertheless the solution is not like pragmatic programmers would prefer to parallelize the given problem. We had to define another class to execute a function in parallel. OK, Java programmers are used to create a class for everything but most programmers do not see the necessity for such things. Me either (but actual it is not possible to do it in another way, but we will come to this later).

Let us take our initial problem.

final vs = [40, 41, 42, 43, 44, 45];
print(
  "fib(40) + fib(41) + ... + fib(45) = "
  "${vs.map((n) => fib(n)).reduce((a, b) => a + b)}"
);

In fact we want to tell Dart to execute vs.map((n) => fib(n) in parallel. It would be cool, if we could write something like vs.pmap((n) => fib(n)) and the method pmap would do all the parallelization for us because that is in fact what most programmers want to express.

Lovely programm please process all elements of vs in parallel (and not in sequential order, which is what map does).

This is called a parallel map and it works great on iterables. Parallel map pmap might be typed like that:

Future<Iterable> pmap(dynamic f(dynamic));

Because pmap returns a Future (it is executed asynchronously because it is parallel) of an Iterable we have to rewrite our initial problem a little bit.

final vs = [40, 41, 42, 43, 44, 45];
vs.pmap((n) => fib(n)).then((results) {
  print(
    "fib(40) + fib(41) + ... + fib(45) = "
    "${results.reduce((a, b) => a + b)}"
  );
});

From my point of view this would be the perfect solution for Dart providing a very pragmatic form of parallelization.

Why closures are not always cool

But this is not possible in Dart. Due to the fact that a closure can reference (and change) a variable state outside its scope it is not allowed to be passed between isolates. If we could we had all problems of thread safety again. So closures are no adequate solution for our problem but what is about wannabe functions?

Gilad Bracha has written an interesting blog post about emulating functions in Dart. According to this article we are able to implement our fibonacci function as a wannabe function like that

class FibFunc {
  int call(int n) => fib(n);
}

and we can instantiate objects of this class being callable like “normal” functions.

final fibFunc = new FibFunc();
final fib40 = fibFunc(40);

These objects are no closures but behave like functions and they are transferable between isolates. Our solution would change into

final vs = [40, 41, 42, 43, 44, 45];
final func = new FibFunc();
vs.pmap(func).then((results) {
  print(
    "fib(40) + fib(41) + ... + fib(45) = "
    "${results.reduce((a, b) => a + b)}"
  );
});

which is very close to our intent.

The parallel package

Is it possible to build something like that in Dart? Yes. Take a look at the parallel package I propose. The parallel package is in fact only a small wrapper around the worker package. It defines a parallel map method and delegates all other method calls to Iterable. So in fact it behaves almost like an iterable but provides the opportunity to launch a parallel map. In fact you can use it almost like a normal iterable.

Therefore the parallel package defines a function parallel to transform any Iterable collection into a PIterable collection.

PIterable parallel(Iterable collection) => new PIterable(
    new Future.value(collection)
);

It furthermore defines a library private MessagableFunction by extending the Task concept of the Worker package (this library private class is in fact the bridge to the worker package).

class _MessageableFunction extends Task {
  final f;
  final x;
  _MessageableFunction(this.f, this.x);
  dynamic execute() => f(x);
}

Finally the PIterable class is implemented like this:

class PIterable implements Iterable {

  // Wrapped future of an iterable
  final Future<Iterable> _futureIterable;

  // Constructor
  PIterable(this._futureIterable);

  // Parallel map. [f] has to be a wannabe function.
  PIterable map(Function f) {
    final worker = new Worker();
    final c = new Completer<Iterable>();
    final computations = [];
    this._futureIterable.then((iterable) {
      for (var entry in iterable) {
        final t = new _MessageableFunction(f, entry);
        computations.add(worker.handle(t));
      }
      Future.wait(computations).then((results) {
        worker.close();
        c.complete(results);
      });
    });
    return new PIterable(c.future);
  }

  // Delegates all methods except #map to [Iterable].
  dynamic noSuchMethod(Invocation msg) {
    final c = new Completer();
    this._futureIterable.then((completed) {
      final result = reflect(completed).delegate(msg);
      c.complete(result);
    });
    return c.future;
  }
}

Now we have a function parallel to transform each Iterable into a parallelizable PIterable form. And we have a map method defined on PIterable which can be executed in parallel.

Due to the fact that PIterable delegates all unknown method calls via noSuchMethod to Iterable you can combine the parallel map with all methods known from Iterable (for example reduce).

class FibFunc {
  int call(int n) {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return call(n-1) + call(n-2);
  }
}

main() {
  final vs = [40, 41, 42, 43, 44, 45];
  final sum = parallel(vs).map(new FibFunc())      // parallel
                          .reduce((a, b) => a + b) // delegated to Iterable
                          .then((result) {
                            print(result)
                          });
}

Some closing remarks:

  • Be aware: The parallel package has version 0.0.3 and main reason to release it is to foster discussions about pragmatic forms of parallelization in Dart (it is far from being complete or perfect)
  • The parallel library is provided via http://pub.dartlang.org/
  • The parallel library is also provided via Github
  • This post is to foster discussions. So discussions, comments, remarks, opinions, etc. are very welcome.

Special thanks go to Diego Rocha and his awesome Worker library.


My last wish is about syntactic sugar

This here goes out to the Dart Language Specialists at Google. If there are thoughts how to improve the programmability of isolates you could think about some syntactic sugar.

It would be great if Dart had a syntax to create anonymous wannabe functions (not closures) so that

  final f = <n> => fib(n);  // the closure form would be (n) => fib(n)

would be syntactic sugar for:

  class FibFunc {
    int call(int n) => fib(n);
  }
  final f = new FibFunc();

If Dart would provide that form of syntactic sugar it would be possible to come very close to a pragmatic form to express parallelization in Dart:

  final vs = [40, 41, 42, 43, 44, 45];
  final sum = parallel(vs).map(<n> => fib(n))       // parallel
                          .reduce((a, b) => a + b)  // delegated to Iterable
                          .then((result) {
                              print(result)
                          });