Iterate with hasNext() and next() over an asynchronously generated stream of elements

I have to implement an Iterator interface (as defined by a Java API), with hasNext() and next() methods, that should return result elements which originate from an asynchronously processed HTTP response (processed with Akka actors).

Following requirements have to be satisfied:

  • do not block and wait for the async operation to finish as the generation of a large result set may take a while (the iterator should return result elements as soon as they become available)
  • Iterator.next() should block until the next element is available (or throw an exception if there are no more elements to come)
  • Iterator.hasNext() should return true as long as there are more elements to come (even if the next one is not available yet)
  • the overall number of results is unknown in advance. The result producing actor will send a specific "end message" when it is finished.
  • try to avoid the use of an InterruptedException, e.g. when the iterator is waiting on an empty queue but not more elements will be generated.

I have not looked into Java 8 streams or Akka streams yet. But since I basically have to iterate over a queue (a finite stream) I doubt that there are any suitable solution yet.

Currently, my Scala implementation stub uses java.util.concurrent.BlockingQueue and looks like this:

class ResultStreamIterator extends Iterator[Result] {
    val resultQueue = new ArrayBlockingQueue[Option[Result]](100)

    def hasNext(): Boolean = ???  // return true if not done yet
    def next(): Result = ???      // take() next element if not done yet

    case class Result(value: Any) // sent by result producing actor
    case object Done              // sent by result producing actor when finished

    class ResultCollector extends Actor {
        def receive = {
           case Result(value) => resultQueue.put(Some(value))
           case Done          => resultQueue.put(None)
        }
    }
}

I use an Option[Result] to indicate the end of the result stream with None. I have experimented with peeking at the next element and using a 'done' flag but I hope that there is an easier solution.

Bonus questions:

  • How can the sync/async implementation be covered with Unit Tests, especially testing delayed result generation?
  • How can the iterator be made thread-safe?

Answers


Following code would safisfy requirement. Actor's fields are able to be modified safely in Actor's receiver. So resultQueue should not be in Iterator's field, but be in Actor's field.

// ResultCollector should be initialized.
// Initilize code is like...
// resultCollector ! Initialize(100)
class ResultStreamIterator(resultCollector: ActorRef) extends Iterator[Result] {

  implicit val timeout: Timeout = ???

  override def hasNext(): Boolean = Await.result(resultCollector ? HasNext, Duration.Inf) match {
    case ResponseHasNext(hasNext) => hasNext
  }

  @scala.annotation.tailrec
  final override def next(): Result = Await.result(resultCollector ? RequestResult, Duration.Inf) match {
    case ResponseResult(result) => result
    case Finished => throw new NoSuchElementException("There is not result.")
    case WaitingResult => next()// should be wait for a moment.
  }

}

case object RequestResult
case object HasNext

case class ResponseResult(result: Result)
case class ResponseHasNext(hasNext: Boolean)
case object Finished
case object WaitingResult

case class Initialize(expects: Int)

// This code may be more ellegant if using Actor FSM
// Acotr's State is (beforeInitialized)->(collecting)->(allCollected)
class ResultCollector extends Actor with Stash {

  val results = scala.collection.mutable.Queue.empty[Result]

  var expects = 0

  var counts = 0

  var isAllCollected = false

  def beforeInitialized: Actor.Receive = {
    case Initialize(n) =>
      expects = n
      if (expects != 0) context become collecting
      else context become allCollected
      unstashAll
    case _ => stash()
  }

  def collecting: Actor.Receive = {
    case RequestResult =>
      if (results.isEmpty) sender ! WaitingResult
      else sender ! ResponseResult(results.dequeue())
    case HasNext => ResponseHasNext(true)
    case result: Result =>
      results += result
      counts += 1
      isAllCollected = counts >= expects
      if (isAllCollected) context become allCollected
  }

  def allCollected: Actor.Receive = {
    case RequestResult =>
      if (results.isEmpty) sender ! Finished
      else sender ! ResponseResult(results.dequeue())
    case HasNext => ResponseHasNext(!results.isEmpty)
  }

  def receive = beforeInitialized
}

You could store the next element using a variable and just wait for it at beginning of both methods:

private var nextNext: Option[Result] = null

def hasNext(): Boolean = {
  if (nextNext == null) nextNext = resultQueue.take()
  return !nextNext.isEmpty
}

def next(): Result = {
  if (nextNext == null) nextNext = resultQueue.take()
  if (nextNext.isEmpty) throw new NoSuchElementException()
  val result = nextNext.get
  nextNext = null
  return result
}

I followed the suggestions of jiro and did some adaptations as needed. In general, I like the approach of having getNext() and next() implemented as ask messages sent to the actor. This ensures that there is only one thread at any time which modifies the queue.

However, I'm not sure about the performance of this implementation as ask and Await.result will create two threads for each call of hasNext() and next().

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.language.postfixOps

import akka.actor.{ActorRef, ActorSystem, Props, Stash}
import akka.pattern.ask
import akka.util.Timeout

case object HasNext
case object GetNext

case class Result(value: Any)
case object Done

class ResultCollector extends Actor with Stash {

  val queue = scala.collection.mutable.Queue.empty[Result]

  def collecting: Actor.Receive = {
    case HasNext       => if (queue.isEmpty) stash else sender ! true
    case GetNext       => if (queue.isEmpty) stash else sender ! queue.dequeue
    case value: Result => unstashAll; queue += value
    case Done          => unstashAll; context become serving
  }

  def serving: Actor.Receive = {
    case HasNext => sender ! queue.nonEmpty
    case GetNext => sender ! { if (queue.nonEmpty) queue.dequeue else new NoSuchElementException }
  }

  def receive = collecting
}

class ResultStreamIteration(resultCollector: ActorRef) extends Iterator {

  implicit val timeout: Timeout = Timeout(30 seconds)

  override def hasNext(): Boolean = Await.result(resultCollector ? HasNext, Duration.Inf) match {
    case b: Boolean => b
  }

  override def next(): Any = Await.result(resultCollector ? GetNext, Duration.Inf) match {
    case Result(value: Any) => value
    case e: Throwable       => throw e
  }
}

object Test extends App {
  implicit val exec = scala.concurrent.ExecutionContext.global
  val system = ActorSystem.create("Test")
  val actorRef = system.actorOf(Props[ResultCollector])
  Future {
    for (i <- 1 to 10000) actorRef ! Result(s"Result $i"); actorRef ! Done
  }
  val iterator = new ResultStreamIteration(actorRef)
  while (iterator.hasNext()) println(iterator.next)
  system.shutdown()
}

Need Your Help

How can I remove the browser-added blurred border on focused form elements?

forms html5 google-chrome input border

I’ve built an HTML form for an email signup. When you click in the email input's textarea, a blurred gray border surrounds it in Chrome. How can I turn this off?

How to create a virtual line which is perpendicular of my current position

android math gps geometry 2d

Here I am not an expert in math, but I would draw a line starting or at least put 2 points on both sides of my current position, taking into account the orientation of the phone.