rx java - How to wait for async Observable to complete -
i'm trying build sample using rxjava. sample should orchestrate reactivewareservice , reactivereviewservice retruning wareandreview composite.
reactivewareservice public observable<ware> findwares() { homecoming observable.from(wareservice.findwares()); } reactivereviewservice: reviewservice.findreviewsbyitem threadsleep simulate latency! public observable<review> findreviewsbyitem(final string item) { homecoming observable.create((observable.onsubscribe<review>) observer -> executor.execute(() -> { seek { list<review> reviews = reviewservice.findreviewsbyitem(item); reviews.foreach(observer::onnext); observer.oncompleted(); } grab (exception e) { observer.onerror(e); } })); } public list<wareandreview> findwareswithreviews() throws runtimeexception { final list<wareandreview> wareandreviews = new arraylist<>(); wareservice.findwares() .map(wareandreview::new) .subscribe(wr -> { wareandreviews.add(wr); //async!!!! reviewservice.findreviewsbyitem(wr.getware().getitem()) .subscribe(wr::addreview, throwable -> system.out.println("error while trying find reviews " + wr) ); } ); //todo: there should improve way wait async reviewservice.findreviewsbyitem completion! seek { thread.sleep(3000); } grab (interruptedexception e) {} homecoming wareandreviews; }
given fact don't want homecoming observable, how can wait async observable (findreviewsbyitem) complete?
you may utilize methods blockingobservable see https://github.com/netflix/rxjava/wiki/blocking-observable-operators. e.g
blockingobservable.from(reviewservice.findreviewsbyitem(..)).toiterable()
rx-java
No comments:
Post a Comment