我没有烫手代码。
import com.twitter.algebird.semigroup
import com.twitter.scalding._
case class Event(key1: String,key2: Long,value1: String,value2: Long)
val eventsemigroup1 = new semigroup[Event] {
override def plus(x: Event,y: Event): Event = Event(x.key1,x.key2,(x.value1 +" " + y.value1),(x.value2+ y.value2))
}
val eventsemigroup2 = new semigroup[Event] {
override def plus(x: Event,(x.value1 + "," + y.value1),(x.value2+ y.value2))
}
class WordCountJob(args: Args) extends Job(args) {
TypedPipe.from(TextLine(args("input")))
.map { line =>
val lineArray = line.split("\\s+")
Event(lineArray(0),lineArray(1).toLong,lineArray(2),lineArray(3).toLong)
}
.groupBy { il => (il.key1,il.key2) }
.sumLeft(eventsemigroup1)
.values
.groupBy { _.key1 }
.sortBy(_.key2)
.reverse
.sumLeft(eventsemigroup2)
.values
}
在上面的代码中,我有多个groupBy。如何将它们结合在一起?
我找不到任何以List[Event]
作为输入的函数?