Java 8 stream and incoming data (Kafka) -


i have queue (it happens kafka not sure matters) reading messages. want create stream represent data.

my pseudocode consume (kafka) queue looks this:

list<message> messages = new arraylist<>();  while (true) {     consumerrecords<string, message> records = kafkaconsumer.poll(100);      messages.add(recordstomessages(records));      if (x) {         break;     } }  return messages.stream(); 

using pseudocode stream not returned until while loop broken, i.e. until of queue has been read.

i want able return stream straight away, i.e. new messages can added stream after has returned.

i feel need use stream.generate not sure how, or maybe need spliterator?

i want close stream @ later point in code.

thanks!

here commented example of how done :

public static void main(string[] args) {      linkedblockingqueue<integer> queue = new linkedblockingqueue<>();      // data producer     runnable job = () -> {         // send data stream (could come kafka queue         threadlocalrandom random = threadlocalrandom.current();         (int = 0; < 10; i++) {             queue.offer(random.nextint(100));             delay(random.nextint(2) + 1);         }         // send magic signal stop stream         queue.offer(-1);     };     thread thread = new thread(job);     thread.start();      // define condition stream knows there no data left consume     // function returns next element wrapped in optional, or empty optional tell there no more data read     // in example, number -1 magic signal     function<blockingqueue<integer>, optional<integer>> endingcondition = q -> {         try {             integer element = q.take();             return element == -1 ? optional.empty() : optional.of(element);         } catch (interruptedexception e) {             return optional.empty();         }     };     queueconsumingiterator<integer> iterator = new queueconsumingiterator<>(queue, endingcondition);      // construct stream on top of our custom queue-consuming iterator     spliterator<object> spliterator = spliterators.spliteratorunknownsize(iterator, spliterator.ordered);     stream<object> stream = streamsupport.stream(spliterator, false);      // use stream usual :)     stream.map(string::valueof).foreach(system.out::println);  } 

.

// custom iterator takes data blockingqueue. // detection of end of data stream use-case-dependant, extracted user-provided function<queue, optional> // example may want wait particular item in queue, or consider queue "dead"" after timeout... public static class queueconsumingiterator<e> implements iterator<e> {      private final blockingqueue<e> queue;     private final function<blockingqueue<e>, optional<e>> queuereader;     private optional<e> element;     private boolean elementread = false;      public queueconsumingiterator(blockingqueue<e> queue, function<blockingqueue<e>, optional<e>> queuereader) {         this.queue = queue;         this.queuereader = queuereader;     }      @override     public boolean hasnext() {         if (!this.elementread) {             this.element = this.queuereader.apply(this.queue);             this.elementread = true;         }         return this.element.ispresent();     }      @override     public e next() {         if (hasnext()) {             this.elementread = false;             return this.element.get();         }         throw new nosuchelementexception();     }  }  private static void delay(int timeout) {     try {         timeunit.seconds.sleep(timeout);     } catch (interruptedexception e) {         e.printstacktrace();     } } 

the idea behind code can feed stream through custom iterator, extracting data external source.

data transferred external source iterator through queue. , because know data looks , how detect there isn't left read anymore, process determines if stream should continue fed extracted user-provided function.

hope helps ?


Comments

Popular posts from this blog

sql - invalid in the select list because it is not contained in either an aggregate function -

Angularjs unit testing - ng-disabled not working when adding text to textarea -

How to start daemon on android by adb -