What data structure in Scala is Python's nested dictionary or a csv?

I am using Scala in a Spark shell. I have data reduced down to an RDD, byHour: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[47] at reduceByKey at <console>:16 or if collected an array, byHour: Array[(String, Int)], that looks like:

Array((6497+2006-03-19 20:00,13), (7511+2006-03-17 02:00,1), (13508+2006-03-26 10:00,4), (217+2006-05-16 16:00,1), (12404+2006-03-27 15:00,1), (9777+2006-05-14 09:00,1), (10291+2006-03-03 17:00,2), (4781+2006-05-10 14:00,2), (10291+2006-04-26 17:00,1), (15198+2006-04-26 12:00,1))

I would like to store this similar to a nested dictionary in Python or a csv file.

In Python I would create

{"6497": {"2006-03-19 20:00": 13, "2006-03-19 22:00": 1}, "7511": {"2006-03-17 02:00": 1}...}

In the end I want

userid, 2006-03-17 01:00, 2006-03-17 02:00, ... , 2006-03-19 20:00, 2006-03-19 21:00, 2006-03-19 22:00
6497,0,0, ..., 13,0,1
7511,0,1, ..., 0,0,0

I am not sure how to get there in Scala. I think I need a list or set of hash maps or a hashMap[String => hashMap].


Update: byHour is an RDD[(String, Int)]

val byUserHour = byHour.map(x => (x._1.split("\\+")(0),(x._1.split("\\+")(1),x._2)))
val byUser = byUserHour.groupByKey
val times = byHour.map(x => x._1.split("\\+")(1)).distinct.collect.sortWith(_ < _)
val broadcastTimes = sc.broadcast(times)
val userMaps = byUser.mapValues { 
  x => x.map{
    case(time,cnt) => time -> cnt
  }.toMap
}
val result = userMaps.map {
  case(u,ut) => (u +: broadcastTimes.value.map(ut.getOrElse(_,0).toString))}
val lines = result.map(_.mkString(","))
val header = List("userid") ::: times.toList

Answers


First you would split off the user ID, so you get a data: Seq[(String, String, Int)]. Then, group by user ID:

val byUser: Map[String, Seq[(String, String, Int)]] = data.groupBy(_._1)

Now we can create a map per user:

val userMaps: Map[String, Map[String, Int]] = byUser.mapValues {
  s => s.map {
    case (user, time, n) => time -> n
  }.toMap
}

For the final formatting you first need to get the distinct timestamps, then look those up in the per-user maps:

val times: Seq[String] = data.map(_._2).toSet.toList
val result: Seq[Seq[String]] = userMaps.toSeq.map {
  case (u, ut) => (u +: times.map(ut.getOrElse(_, 0).toString))
}
val lines: Seq[String] = result.map(_.mkString(","))

Hope this is enough to get you started. You can read more about Scala collections at http://twitter.github.io/scala_school/collections.html (and lots of other places).


All the above would be a local computation — not distributed at all. To do the same in a distributed way, you would read your data into an RDD at the start (sc.textFile()), and perform roughly the same sequence of operations.

A minor difference is that instead of groupBy you have groupByKey, which behaves a bit differently. From RDD[A, B] you get RDD[A, Iterable[B]], not Map[A, Seq[(A, B)]].

A major difference is that you need to collect times from the cluster to the application, and then broadcast it to all nodes:

val times: Seq[String] = data.map(_._2).distinct.collect
val broadcast = sc.broadcast(times)
val result: RDD[Seq[String]] = userMaps.map {
  val times = broadcast.value
  case (u, ut) => (u +: times.map(ut.getOrElse(_, 0).toString))
}

Need Your Help

Access a service from inside a directive's compile

angularjs angularjs-directive

I am having a hard time to access a service inside a directive. I define my service via $http and $q, and inject it into my directive. But I can;t get the directive to access the service.

How to detect iPad orientation in iOS8 keyboard extension

ios objective-c ipad swift ios8

Currently I'm using this method to detect the orientation and device:

About UNIX Resources Network

Original, collect and organize Developers related documents, information and materials, contains jQuery, Html, CSS, MySQL, .NET, ASP.NET, SQL, objective-c, iPhone, Ruby on Rails, C, SQL Server, Ruby, Arrays, Regex, ASP.NET MVC, WPF, XML, Ajax, DataBase, and so on.