Using a reactive stream as a data source for Drools
A few months ago we started redesigning the Drools lowest level executable model and making it accessible to end user with a Java 8 API. To demonstrate the flexibility of this approach I tried to integrate it with a reactive stream and in particular to use this stream as a data source for Drools.
To show how this works I created a simple temperature server that provides a RxJava Observable emitting every second the temperature for a given town and terminates after 5 seconds. There is also a second factory method that allows to merge more of these Observables in order to have a single Observable that emits the temperature for more than one town at the same time.
public class TempServer { public static Observable<TempInfo> getFeed(String town) { return Observable.create(subscriber -> Observable.interval(1, TimeUnit.SECONDS) .subscribe(i -> { if (i > 5) subscriber.onCompleted(); try { subscriber.onNext(TempInfo.fetch(town)); } catch (Exception e) { subscriber.onError(e); } })); } public static Observable<TempInfo> getFeeds(String... towns) { return Observable.merge(Arrays.stream(towns) .map(TempServer::getFeed) .collect(toList())); } }
where the TempInfo.fetch method just returns a random temperature between -20 and 50 degrees
public TempInfo(String town, int temp) { this.town = town; this.temp = temp; } public static TempInfo fetch(String town) { return new TempInfo(town, random.nextInt(70) - 20); }
Using an improved version of the Java 8 DSL presented in the former article I defined the following 2 rules:
Variable<TempInfo> temp = any( TempInfo.class ); Variable<Person> person = any( Person.class ); Rule r1 = rule("low temp") .view( subscribe(temp, "tempFeed"), expr(temp, t -> t.getTemp() < 0), input(person, "persons"), expr(person, temp, (p, t) -> p.getTown().equals(t.getTown())) ) .then(on(person, temp) .execute((p, t) -> System.out.println(p.getName() + " is freezing in " + p.getTown() + " - temp is " + t.getTemp()))); Rule r2 = rule("high temp") .view( subscribe(temp, "tempFeed"), expr(temp, t -> t.getTemp() > 30), input(person, "persons"), expr(person, temp, (p, t) -> p.getTown().equals(t.getTown())) ) .then(on(person, temp) .execute((p, t) -> System.out.println(p.getName() + " is sweating in " + p.getTown() + " - temp is " + t.getTemp())));
Here I’m using 2 different kinds of data sources: a passive one that can be considered a mere store of facts:
DataStore persons = storeOf(new Person("Mark", 37, "London"), new Person("Edson", 35, "Toronto"), new Person("Mario", 40, "Milano"));
that can be bound to a specific Drools KieSession with
bindDataSource(ksession, "persons", persons);
and a reactive one taken from the TempServer implemented above
Observable<TempInfo> tempFeed = TempServer.getFeeds( "Milano", "London", "Toronto" );
that can also be bound to the same KieSession in a similar way
bindRxObservable( ksession, "tempFeed", tempFeed );
Having done this you can fire those 2 rules and obtain an output like the following:
Mark is freezing in London - temp is -9 Edson is sweating in Toronto - temp is 42 Mario is sweating in Milano - temp is 42 Mario is sweating in Milano - temp is 49 Mark is freezing in London - temp is -17 Edson is sweating in Toronto - temp is 40 Edson is sweating in Toronto - temp is 47 Mario is freezing in Milano - temp is -14 Mark is freezing in London - temp is -8 Mark is freezing in London - temp is -17
- The complete test case to run this example is available here.
Reference: | Using a rective stream as a data source for Drools from our JCG partner Mario Fusco at the Drools & jBPM blog. |
The getFeed() can be implemented with standard operators and there is no need to fiddle with create() and forwarding.
return Observable.interval(1, TimeUnit.SECONDS)
.take(5)
.map(i -> TempInfo.fetch(town));