scala - Flink: Syncronize/connect two streams -
i have 2 streams, first, cep library, it´s used detect patterns on-off. in second stream values have add. i'm looking way connect 2 streams, when receive off frame, return device status change, ultimate value of sum of second type of frames:
device 311 state change:ignition off value:35.91
my code:
// ignition pattern val ig = timedvalue.filter(_._2 == "ignition").keyby(_._4) val patternig_on: pattern[(long,string,string,long), _] = pattern.begin[(long,string,string,long)]("start").where(_._3 == "off").next("end").where(_._3 == "on") val patternstream: patternstream[(long,string,string,long)] = cep.pattern(ig, patternig_on) val patternig_off: pattern[(long,string,string,long), _] = pattern.begin[(long,string,string,long)]("start").where(_._3 == "on").next("end").where(_._3 == "off") val patternstreamig_off: patternstream[(long,string,string,long)] = cep.pattern(timedvalue, patternig_off) def selectfn(pattern : mutable.map[string,(long,string,string,long)]): string = { val startevent = pattern.get("start").get val endevent = pattern.get("end").get "device "+startevent._4 +" state change: "+startevent._2+" on" } def selectfnoff(pattern : mutable.map[string,(long,string,string,long)]): string = { val startevent = pattern.get("start").get val endevent = pattern.get("end").get "device "+startevent._4 +" state change: "+startevent._2+" off " } val patternstreamselected = patternstream.select(selectfn(_)).print() val suma: org.apache.flink.streaming.api.scala.datastream[(long, double)] = odo.map(s => (s._4,s._3)).keyby(_._1).mapwithstate( (in, state: option[(int, double)]) => state match { case some((count, sum)) => ((in._1,(sum + in._2).todouble), some((count + 1, sum + in._2))) case none => ((in._1, in._2), some(1, in._2)) }) val patternstreamselectedoff = patternstreamig_off.select(selectfnoff(_)).connect(suma).keyby(_._1, _._1).map(l => l + " value:", r => r._2).print()
update: result:
24.2 23.5 44.010000000000005 67.05000000000001 device 311 state change3:ignition on (311,device 311 state change: ignition off ) value:
the first number (24.2) device, should not appear...
any idea best approach?? thank you
Comments
Post a Comment