Using a reactive stream as a data source for Drools
A few months ago we started redesigning the Drools lowest level executable model and making it accessible to end user with a Java 8 API. To demonstrate the flexibility of this approach I tried to integrate it with a reactive stream and in particular to use this stream as a data source for Drools.
To show how this works I created a simple temperature server that provides a RxJava Observable emitting every second the temperature for a given town and terminates after 5 seconds. There is also a second factory method that allows to merge more of these Observables in order to have a single Observable that emits the temperature for more than one town at the same time.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 | public class TempServer { public static Observable<TempInfo> getFeed(String town) { return Observable.create(subscriber -> Observable.interval( 1 , TimeUnit.SECONDS) .subscribe(i -> { if (i > 5 ) subscriber.onCompleted(); try { subscriber.onNext(TempInfo.fetch(town)); } catch (Exception e) { subscriber.onError(e); } })); } public static Observable<TempInfo> getFeeds(String... towns) { return Observable.merge( .map(TempServer::getFeed) .collect(toList())); } } |
where the TempInfo.fetch method just returns a random temperature between -20 and 50 degrees
1 2 3 4 5 6 7 8 | public TempInfo(String town, int temp) { this .town = town; this .temp = temp; } public static TempInfo fetch(String town) { return new TempInfo(town, random.nextInt( 70 ) - 20 ); } |
Using an improved version of the Java 8 DSL presented in the former article I defined the following 2 rules:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 | Variable<TempInfo> temp = any( TempInfo. class ); Variable<Person> person = any( Person. class ); Rule r1 = rule( "low temp" ) .view( subscribe(temp, "tempFeed" ), expr(temp, t -> t.getTemp() < 0 ), input(person, "persons" ), expr(person, temp, (p, t) -> p.getTown().equals(t.getTown())) ) .then(on(person, temp) .execute((p, t) -> System.out.println(p.getName() + " is freezing in " + p.getTown() + " - temp is " + t.getTemp()))); Rule r2 = rule( "high temp" ) .view( subscribe(temp, "tempFeed" ), expr(temp, t -> t.getTemp() > 30 ), input(person, "persons" ), expr(person, temp, (p, t) -> p.getTown().equals(t.getTown())) ) .then(on(person, temp) .execute((p, t) -> System.out.println(p.getName() + " is sweating in " + p.getTown() + " - temp is " + t.getTemp()))); |
Here I’m using 2 different kinds of data sources: a passive one that can be considered a mere store of facts:
1 2 3 | DataStore persons = storeOf( new Person( "Mark" , 37 , "London" ), new Person( "Edson" , 35 , "Toronto" ), new Person( "Mario" , 40 , "Milano" )); |
that can be bound to a specific Drools KieSession with
1 | bindDataSource(ksession, "persons" , persons); |
and a reactive one taken from the TempServer implemented above
1 | Observable<TempInfo> tempFeed = TempServer.getFeeds( "Milano" , "London" , "Toronto" ); |
that can also be bound to the same KieSession in a similar way
1 | bindRxObservable( ksession, "tempFeed" , tempFeed ); |
Having done this you can fire those 2 rules and obtain an output like the following:
01 02 03 04 05 06 07 08 09 10 | Mark is freezing in London - temp is - 9 Edson is sweating in Toronto - temp is 42 Mario is sweating in Milano - temp is 42 Mario is sweating in Milano - temp is 49 Mark is freezing in London - temp is - 17 Edson is sweating in Toronto - temp is 40 Edson is sweating in Toronto - temp is 47 Mario is freezing in Milano - temp is - 14 Mark is freezing in London - temp is - 8 Mark is freezing in London - temp is - 17 |
- The complete test case to run this example is available here.
Reference: | Using a rective stream as a data source for Drools from our JCG partner Mario Fusco at the Drools & jBPM blog. |
The getFeed() can be implemented with standard operators and there is no need to fiddle with create() and forwarding.
return Observable.interval(1, TimeUnit.SECONDS)
.map(i -> TempInfo.fetch(town));