Enterprise Java

Introducing the ReactiveInflux: non-blocking InfluxDB driver for Scala and Java supporting Apache Spark

I am excited to announce a very first release of ReactiveInflux developed at Pygmalios. InfluxDB missed a non-blocking driver for both Scala and Java. Immutability, testability and extensibility are key features of ReactiveInflux. Comming with a support for Apache Spark it is the weapon of choice.

It internally uses Play Framework WS API which is a rich asynchronous HTTP client built on top of Async Http Client.
 

Features

  • asynchronous (non-blocking) interface for Scala
  • synchronous (blocking) interface for Scala and Java
  • supports both Spark and Spark streaming
  • immutability
  • testability
  • extensibility

Compatibility

  • InfluxDB 0.11, 0.10 and 0.9 (maybe even older too)
  • Scala 2.11 and 2.10
  • Java 7 and above
  • Apache Spark 1.4 and above

Scala asynchronous (non-blocking) example

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
val result = withInfluxDb(new URI("http://localhost:8086/"), "example1") { db =>
  db.create().flatMap { _ =>
    val point = Point(
      time        = DateTime.now(),
      measurement = "measurement1",
      tags        = Map("t1" -> "A", "t2" -> "B"),
      fields      = Map(
        "f1" -> 10.3,
        "f2" -> "x",
        "f3" -> -1,
        "f4" -> true)
    )
    db.write(point).flatMap { _ =>
      db.query("SELECT * FROM measurement1").flatMap { queryResult =>
        println(queryResult.row.mkString)
        db.drop()
      }
    }
  }
}

Scala synchronous (blocking) example

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
implicit val awaitAtMost = 10.seconds
syncInfluxDb(new URI("http://localhost:8086/"), "example1") { db =>
  db.create()
 
  val point = Point(
    time        = DateTime.now(),
    measurement = "measurement1",
    tags        = Map("t1" -> "A", "t2" -> "B"),
    fields      = Map(
      "f1" -> 10.3,
      "f2" -> "x",
      "f3" -> -1,
      "f4" -> true)
  )
  db.write(point)
 
  val queryResult = db.query("SELECT * FROM measurement1")
  println(queryResult.row.mkString)
 
  db.drop()
}

Java synchronous (blocking) example

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Use Influx at the provided URL
ReactiveInfluxConfig config = new JavaReactiveInfluxConfig(
  new URI("http://localhost:8086/"));
long awaitAtMostMillis = 30000;
try (SyncReactiveInflux reactiveInflux = new JavaSyncReactiveInflux(
  config, awaitAtMostMillis)) {
    SyncReactiveInfluxDb db = reactiveInflux.database("example1");
    db.create();
 
    Map tags = new HashMap<>();
    tags.put("t1", "A");
    tags.put("t2", "B");
 
    Map fields = new HashMap<>();
    fields.put("f1", 10.3);
    fields.put("f2", "x");
    fields.put("f3", -1);
    fields.put("f4", true);
 
    Point point = new JavaPoint(
        DateTime.now(),
        "measurement1",
        tags,
        fields
    );
    db.write(point);
 
    QueryResult queryResult = db.query("SELECT * FROM measurement1");
    System.out.println(queryResult.getRow().mkString());
 
    db.drop();
}

Apache Spark Scala example

01
02
03
04
05
06
07
08
09
10
11
val point1 = Point(
  time        = DateTime.now(),
  measurement = "measurement1",
  tags        = Map(
    "tagKey1" -> "tagValue1",
    "tagKey2" -> "tagValue2"),
  fields      = Map(
    "fieldKey1" -> "fieldValue1",
    "fieldKey2" -> 10.7)
)
sc.parallelize(Seq(point1)).saveToInflux()

Apache Spark streaming Scala example

01
02
03
04
05
06
07
08
09
10
11
12
13
val point1 = Point(
  time        = DateTime.now(),
  measurement = "measurement1",
  tags        = Map(
    "tagKey1" -> "tagValue1",
    "tagKey2" -> "tagValue2"),
  fields      = Map(
    "fieldKey1" -> "fieldValue1",
    "fieldKey2" -> 10.7)
)
val queue = new mutable.Queue[RDD[Point]]
queue.enqueue(ssc.sparkContext.parallelize(Seq(point1)))
ssc.queueStream(queue).saveToInflux()

Apache Spark Java example

1
2
3
...
SparkInflux sparkInflux = new SparkInflux("example", 1000);
sparkInflux.saveToInflux(sc.parallelize(Collections.singletonList(point)));

Apache Spark streaming Java example

1
2
3
4
5
...
SparkInflux sparkInflux = new SparkInflux("example", 1000);
Queue> queue = new LinkedList<>();
queue.add(ssc.sparkContext().parallelize(Collections.singletonList(point)));
sparkInflux.saveToInflux(ssc.queueStream(queue));

Credit to Pygmalios

Top-tech startup based in Bratislava, Slovakia invests into cutting edge technologies to ensure rapid growth in the domain of real-time predictive retail analytics.

Do you want to know how to develop your skillset to become a Java Rockstar?
Subscribe to our newsletter to start Rocking right now!
To get you started we give you our best selling eBooks for FREE!
1. JPA Mini Book
2. JVM Troubleshooting Guide
3. JUnit Tutorial for Unit Testing
4. Java Annotations Tutorial
5. Java Interview Questions
6. Spring Interview Questions
7. Android UI Design
and many more ....
I agree to the Terms and Privacy Policy
Subscribe
Notify of
guest


This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button