scala, transform a callback function to an iterator/list

For the sake of this question, I have the following example code which I cannot change:

  trait Receiver[T] {
    def receive(entry: T);
    def close();
  }

  def f1(r: Receiver[Int]) {
    new Thread() { override def run() { (0 to 9).foreach { i => r.receive(i); Thread.sleep(1000); }; r.close(); }}.start();
  }

that is, f1 takes a Receiver, calls receive(i) 10 times and then closes.

(that is an example, f1 could also call receive(i) infinite times)

so, one implementation of Receiver could be:

  class MyReceiver extends Receiver[Int] {
    def receive(entry: Int) {
      if (entry % 2 == 0) {
        println("Entry#" + entry);
      }
    }

    def close() {}
  }

and I would call it like this:

  f1(new MyReceiver());

now, the question is:

without changing the f1 and Receiver, how can I use an iterator or list approach, instead of filtering and mapping inside MyReceiver.receive function?

I've implemented the following, and it works. However I guess that there is a simpler way to achieve this.

  import java.util.concurrent.Semaphore

  class ReceiverToIterator[T] extends Receiver[T] with Iterator[T] {
    var lastEntry: T = _
    var waitingToReceive = new Semaphore(1)
    var waitingToBeConsumed = new Semaphore(1)
    var eof = false

    waitingToReceive.acquire()

    def receive(entry: T) {
      println("ReceiverToIterator.receive(" + entry + "). START.")
      waitingToBeConsumed.acquire()
      lastEntry = entry
      waitingToReceive.release()
      println("ReceiverToIterator.receive(" + entry + "). END.")
    }

    def close() {
      println("ReceiverToIterator.close().")
      eof = true
      waitingToReceive.release()
    }

    def hasNext = {
      println("ReceiverToIterator.hasNext().START.")
      waitingToReceive.acquire()
      waitingToReceive.release()
      println("ReceiverToIterator.hasNext().END.")
      !eof
    }

    def next = {
      println("ReceiverToIterator.next().START.")
      waitingToReceive.acquire()
      if (eof) { throw new NoSuchElementException }
      val entryToReturn = lastEntry
      waitingToBeConsumed.release()
      println("ReceiverToIterator.next().END.")
      entryToReturn
    }
  }

so, now I can use the filter, map and foreach implementations of the iterator:

  val receiverToIterator = new ReceiverToIterator[Int]();
  f2(receiverToIterator);

  receiverToIterator.filter(_ % 2 == 0).map("Entry#" + _).foreach(println);

So, the question is,

without changing f1 and Receiver,

how can I get a simpler and nicer implementation of ReceiverToIterator?

ps: note that f1 could also call receive(i) infinite times, so it is not an option to collect everything in a list. also, it shouldn't keep all the results in memory.




Update: Current solution

Current solution using the BlockingQueue[Option[T]] as suggested by nadavwr

As I find this callback to iterator pattern quite often, I've implemented to be reusable like this:

  trait OptionNextToIterator[T] extends Iterator[T] {
    def getOptionNext: Option[T];

    var answerReady: Boolean = false
    var eof: Boolean = false
    var element: T = _

    def hasNext = {
      prepareNextAnswerIfNecessary
      !eof
    }

    def next = {
      prepareNextAnswerIfNecessary
      if (eof) { throw new NoSuchElementException }
      val retVal = element
      answerReady = false
      retVal
    }

    def prepareNextAnswerIfNecessary {
      if (answerReady) {
        return
      }
      synchronized {
        getOptionNext match {
          case None => eof = true
          case Some(e) => element = e
        }
        answerReady = true
      }
    }
  }

  trait QueueToIterator[T] extends OptionNextToIterator[T] {
    val queueCapacity: Int
    val queue = new ArrayBlockingQueue[Option[T]](queueCapacity)
    var queueClosed = false

    def queuePut(entry: T) {
      if (queueClosed) { throw new IllegalStateException("The queue has already been closed."); }
      queue.put(Some(entry))
    }

    def queueClose() {
      queueClosed = true
      queue.put(None)
    }

    def getOptionNext = queue.take
  }

now, I can implement the question of this post like this:

  class ReceiverToIterator[T](val queueCapacity: Int = 1) extends Receiver[T] with QueueToIterator[T] {
    def receive(entry: T) { queuePut(entry) }
    def close() { queueClose() }
    // other things if necessary
  }

  val receiverToIterator = new ReceiverToIterator[Int](queueCapacity = 3);
  f1(receiverToIterator);

  Thread.sleep(3000)  // test that f1 is not blocked before 3 calls at this point
  receiverToIterator.filter(_ % 2 == 0).map("Entry#" + _).foreach(println)

but I still think that there should be a simpler solution. I needed to split the implementation into two traits, one that implements getOptionNext (instead of next() and hasNext()), and OptionNextToIterator which transforms this to a real iterator. that's because BlockingQueue lacks a method that "Retrieves, but does not remove, the head of this queue, waiting if necessary until an element becomes available." (as the take() version of pool() but for peek()). also, I am using the synchronized block inside OptionNextToIterator, which I don't like very much.

So, is there a simpler and nicer version to implement this? (without using Iteratees, I will study this option later, and open a new stackoverflow question if necessary)

Answers


Updated: BlockingQueue of 1 entry

What you've implemented here is essentially Java's BlockingQueue, with a queue size of 1.

Main characteristic: uber-blocking. A slow consumer will kill your producer's performance.

Update: @gzm0 mentioned that BlockingQueue doesn't cover EOF. You'll have to use BlockingQueue[Option[T]] for that.

Update: Here's a code fragment. It can be made to fit with your Receiver. Some of it inspired by Iterator.buffered. Note that peek is a misleading name, as it may block -- and so will hasNext.

// fairness enabled -- you probably want to preserve order...
// alternatively, disable fairness and increase buffer to be 'big enough'
private val queue = new java.util.concurrent.ArrayBlockingQueue[Option[T]](1, true)

// the following block provides you with a potentially blocking peek operation
// it should `queue.take` when the previous peeked head has been invalidated
// specifically, it will `queue.take` and block when the queue is empty
private var head: Option[T] = _
private var headDefined: Boolean = false
private def invalidateHead() { headDefined = false }
private def peek: Option[T] = {
  if (!headDefined) {
    head = queue.take()
    headDefined = true
  }
  head
}

def iterator = new Iterator[T] {

  // potentially blocking; only false upon taking `None`
  def hasNext = peek.isDefined

  // peeks and invalidates head; throws NoSuchElementException as appropriate
  def next: T = {
    val opt = peek; invalidateHead()
    if (opt.isEmpty) throw new NoSuchElementException
    else opt.get
  }
}

Alternative: Iteratees

Iterator-based solutions will generally involve more blocking. Conceptually, you could use continuations on the thread doing the iteration to avoid blocking the thread, but continuations mess with Scala's for-comprehensions, so no joy down that road.

Alternatively, you could consider an iteratee-based solution. Iteratees are different than iterators in that the consumer isn't responsible for advancing the iteration -- the producer is. With iteratees, the consumer basically folds over the entries pushed by the producer over time. Folding each next entry as it becomes available can take place in a thread pool, since the thread is relinquished after each fold completes.

You won't get nice for-syntax for iteration, and the learning curve is a little challenging, but if you feel confident using a foldLeft you'll end up with a non-blocking solution that does look reasonable on the eye.

To read more about iteratees, I suggest taking a peek at PlayFramework 2.X's iteratee reference. The documentation describes their stand-alone iteratee library, which is 100% usable outside the context of Play. Scalaz 7 also has a comprehensive iteratee library.


Need Your Help

How to turn off php-indent warning in Emacs

php html emacs elisp emacs23

When editing a PHP file with PHP code and HTML markup in Emacs, I continue to get the warning:

About UNIX Resources Network

Original, collect and organize Developers related documents, information and materials, contains jQuery, Html, CSS, MySQL, .NET, ASP.NET, SQL, objective-c, iPhone, Ruby on Rails, C, SQL Server, Ruby, Arrays, Regex, ASP.NET MVC, WPF, XML, Ajax, DataBase, and so on.