Java 8 stream and incoming data (Kafka) -
i have queue (it happens kafka not sure matters) reading messages. want create stream represent data.
my pseudocode consume (kafka) queue looks this:
list<message> messages = new arraylist<>(); while (true) { consumerrecords<string, message> records = kafkaconsumer.poll(100); messages.add(recordstomessages(records)); if (x) { break; } } return messages.stream();
using pseudocode stream not returned until while loop broken, i.e. until of queue has been read.
i want able return stream straight away, i.e. new messages can added stream after has returned.
i feel need use stream.generate not sure how, or maybe need spliterator?
i want close stream @ later point in code.
thanks!
here commented example of how done :
public static void main(string[] args) { linkedblockingqueue<integer> queue = new linkedblockingqueue<>(); // data producer runnable job = () -> { // send data stream (could come kafka queue threadlocalrandom random = threadlocalrandom.current(); (int = 0; < 10; i++) { queue.offer(random.nextint(100)); delay(random.nextint(2) + 1); } // send magic signal stop stream queue.offer(-1); }; thread thread = new thread(job); thread.start(); // define condition stream knows there no data left consume // function returns next element wrapped in optional, or empty optional tell there no more data read // in example, number -1 magic signal function<blockingqueue<integer>, optional<integer>> endingcondition = q -> { try { integer element = q.take(); return element == -1 ? optional.empty() : optional.of(element); } catch (interruptedexception e) { return optional.empty(); } }; queueconsumingiterator<integer> iterator = new queueconsumingiterator<>(queue, endingcondition); // construct stream on top of our custom queue-consuming iterator spliterator<object> spliterator = spliterators.spliteratorunknownsize(iterator, spliterator.ordered); stream<object> stream = streamsupport.stream(spliterator, false); // use stream usual :) stream.map(string::valueof).foreach(system.out::println); }
.
// custom iterator takes data blockingqueue. // detection of end of data stream use-case-dependant, extracted user-provided function<queue, optional> // example may want wait particular item in queue, or consider queue "dead"" after timeout... public static class queueconsumingiterator<e> implements iterator<e> { private final blockingqueue<e> queue; private final function<blockingqueue<e>, optional<e>> queuereader; private optional<e> element; private boolean elementread = false; public queueconsumingiterator(blockingqueue<e> queue, function<blockingqueue<e>, optional<e>> queuereader) { this.queue = queue; this.queuereader = queuereader; } @override public boolean hasnext() { if (!this.elementread) { this.element = this.queuereader.apply(this.queue); this.elementread = true; } return this.element.ispresent(); } @override public e next() { if (hasnext()) { this.elementread = false; return this.element.get(); } throw new nosuchelementexception(); } } private static void delay(int timeout) { try { timeunit.seconds.sleep(timeout); } catch (interruptedexception e) { e.printstacktrace(); } }
the idea behind code can feed stream
through custom iterator
, extracting data external source.
data transferred external source iterator
through queue
. , because know data looks , how detect there isn't left read anymore, process determines if stream should continue fed extracted user-provided function.
hope helps ?
Comments
Post a Comment