Monday, 15 June 2015

multithreading - Synchronizing on function parameter for multithreaded memoization -



multithreading - Synchronizing on function parameter for multithreaded memoization -

my core question is: how can implement synchronization in method on combination of object instance , method parameter?

here details of situation. i'm using next code implement memoization, adapted this answer:

/** * memoizes unary function * @param f function memoize * @tparam t argument type * @tparam r result type */ class memoized[-t, +r](f: t => r) extends (t => r) { import scala.collection.mutable private[this] val cache = mutable.map.empty[t, r] def apply(x: t): r = cache.getorelse(x, { val y = f(x) cache += ((x, y)) y }) }

in project, i'm memoizing futures deduplicate asynchronous api calls. worked fine when using for...yield map on resulting futures, created standard excecutioncontext, when upgraded scala async nicer handling of these futures. however, realized multithreading library uses allowed multiple threads come in apply, defeating memoization, because async blocks executed in parallel, entering "orelse" thunk before cache updated new future.

to work around this, set main apply function in this.synchronized block:

def apply(x: t): r = this.synchronized { cache.getorelse(x, { val y = f(x) cache += ((x, y)) y }) }

this restored memoized behavior. drawback block calls different params, @ to the lowest degree until future created. i'm wondering if there way set finer grained synchronization on combination of memoized instance , value of x parameter apply. way, calls deduplicated blocked.

as side note, i'm not sure performance critical, because synchronized block release 1 time future created , returned (i think?). if there concerns i'm not thinking of, know.

akka actors combined futures provide powerful way wrap on mutable state without blocking. here simple illustration of how utilize actor memoization:

import akka.actor._ import akka.util.timeout import akka.pattern.ask import scala.concurrent._ import scala.concurrent.duration._ class memoize(system: actorsystem) { class cacheactor(f: => future[any]) extends actor { private[this] val cache = scala.collection.mutable.map.empty[any, future[any]] def receive = { case x => sender ! cache.getorelseupdate(x, f(x)) } } def apply[k, v](f: k => future[v]): k => future[v] = { val fcast = f.asinstanceof[any => future[any]] val actorref = system.actorof(props(new cacheactor(fcast))) implicit val timeout = timeout(5.seconds) import system.dispatcher x => actorref.ask(x).asinstanceof[future[future[v]]].flatmap(identity) } }

we can utilize like:

val scheme = actorsystem() val memoize = new memoize(system) val f = memoize { x: int => println("computing " + x) scala.concurrent.future.successful { thread.sleep(1000) x + 1 } } import system.dispatcher f(5).foreach(println) f(5).foreach(println)

and "computing 5" print single time, "6" print twice.

there scary looking asinstanceof calls, type-safe.

multithreading scala asynchronous memoization thread-synchronization

No comments:

Post a Comment