scala - Flink: Syncronize/connect two streams -


i have 2 streams, first, cep library, it´s used detect patterns on-off. in second stream values have add. i'm looking way connect 2 streams, when receive off frame, return device status change, ultimate value of sum of second type of frames:

device 311 state change:ignition off  value:35.91 

my code:

// ignition pattern val ig = timedvalue.filter(_._2 == "ignition").keyby(_._4) val patternig_on: pattern[(long,string,string,long), _] = pattern.begin[(long,string,string,long)]("start").where(_._3 == "off").next("end").where(_._3 == "on") val patternstream: patternstream[(long,string,string,long)] = cep.pattern(ig, patternig_on) val patternig_off: pattern[(long,string,string,long), _] = pattern.begin[(long,string,string,long)]("start").where(_._3 == "on").next("end").where(_._3 == "off") val patternstreamig_off: patternstream[(long,string,string,long)] = cep.pattern(timedvalue, patternig_off)  def selectfn(pattern : mutable.map[string,(long,string,string,long)]): string = {   val startevent = pattern.get("start").get   val endevent = pattern.get("end").get   "device "+startevent._4 +" state change: "+startevent._2+" on" } def selectfnoff(pattern : mutable.map[string,(long,string,string,long)]): string = {   val startevent = pattern.get("start").get   val endevent = pattern.get("end").get   "device "+startevent._4 +" state change: "+startevent._2+" off " }  val patternstreamselected = patternstream.select(selectfn(_)).print() val suma: org.apache.flink.streaming.api.scala.datastream[(long, double)] = odo.map(s => (s._4,s._3)).keyby(_._1).mapwithstate(   (in, state: option[(int, double)]) => state match {     case some((count, sum)) => ((in._1,(sum + in._2).todouble), some((count + 1, sum + in._2)))     case none => ((in._1, in._2), some(1, in._2))   }) val patternstreamselectedoff = patternstreamig_off.select(selectfnoff(_)).connect(suma).keyby(_._1, _._1).map(l => l + " value:", r => r._2).print() 

update: result:

24.2 23.5 44.010000000000005 67.05000000000001 device 311 state change3:ignition on (311,device 311 state change: ignition off ) value: 

the first number (24.2) device, should not appear...

any idea best approach?? thank you


Comments

Popular posts from this blog

sql - invalid in the select list because it is not contained in either an aggregate function -

Angularjs unit testing - ng-disabled not working when adding text to textarea -

How to start daemon on android by adb -