Exploring the Spline Data Tracker and Visualization tool for Apache Spark (Part 2)
In part 1 we have learned how to test data lineage info collection with
Spline from a Spark shell. The same can be done in any Scala or Java Spark application. The same dependencies for the Spark shell need to be registered in your build tool of choice (Maven, Gradle or sbt):
groupId: za.co.absa.spline artifactId: spline-core version: 0.3.5 groupId: za.co.absa.spline artifactId: spline-persistence-mongo version:0.3.5 groupId: za.co.absa.spline artifactId:spline-core-spark-adapter-2.3 version:0.3.5
With reference to Scala and Spark 2.3.x, a Spark job like this:
// Create the Spark session val sparkSession = SparkSession .builder() .appName("Spline Tester") .getOrCreate() // Init Spline System.setProperty("spline.persistence.factory", "za.co.absa.spline.persistence.mongo.MongoPersistenceFactory") System.setProperty("spline.mongodb.url", args(0)) System.setProperty("spline.mongodb.name", args(1)) import za.co.absa.spline.core.SparkLineageInitializer._ sparkSession.enableLineageTracking() //Do something with DataFrames import sparkSession.sqlContext.implicits._ val df1 = sparkSession.sparkContext.parallelize(1 to 10000, 42).toDF("FirstValue") val df2 = sparkSession.sparkContext.parallelize(1.to(100000, 17), 42).toDF("SecondValue") val output = df1.crossJoin(df2).where('FirstValue % 42 === 'SecondValue % 42) // Write results to file system output.write.format("parquet").save("splinetester.parquet") // Stop the Spark Session sparkSession.stop()
can be submitted to a Spark cluster this way:
$SPARK_HOME/bin/spark-submit --class org.googlielmo.splinetest.SplineExample --master <url> --packages "za.co.absa.spline:spline-core:0.3.5,za.co.absa.spline:spline-persistence-mongo:0.3.5,za.co.absa.spline:spline-core-spark-adapter-2.3:0.3.5" splinetest-1.0.jar mongodb://<username>:<password>@<hostname>:<port> <dbname>
The Spline configuration properties can be also stored into a properties file in the application classpath. Here’s the full list of the available Spline properties:
- spline.mode: 3 possible values, BEST_EFFORT (default), DISABLED, REQUIRED. If BEST_EFFORT, Spline tries to initialize itself, but if fails it switches to DISABLED mode so that the Spark application can proceed normally with no lineage tracking. If DISABLED, no lineage tracking at all happens. If REQUIRED, whether Spline should fail, for any reason, to initialize itself, the Spark application aborts with an error.
- spline.persistence.factory: could be za.co.absa.spline.persistence.mongo.MongoPersistenceFactory (for persistence to MongoDB) or za.co.absa.spline.persistence.hdfs.HdfsPersistenceFactory (for persistence to HDFS).
- spline.mongodb.url: the MongoDB connection string (for MongoDB persistence only).
- spline.mongodb.name: the MongoDB database name (for MongoDB persistence only).
- spline.persistence.composition.factories: a comma separated list of factories to delegate to (in case of Composition Factories only).
The first time Spline is enabled from a Spark job, it creates 6 collections into the destination MongoDB database:
- attributes_v4: info about the attributes of the involved Spark Datasets.
- dataTypes_v4: info about the data types for each data lineage.
- datasets_v4: info about the DataSets.
- lineages_v4: the data lineages graphs for Spark Datasets.
- operations_v4: the operations on DataSets across lineages.
- transformations_v4: the transformations on DataSets across lineages.
The documents in those 6 collections are used by the Spline web application to generate the visual representation of the lineages in the UI.
In the third and last part of this series, I am going to share the outcome after the first weeks of adoption of this project in Spark pre-production environments.
Published on Java Code Geeks with permission by Guglielmo Iozzia, partner at our JCG program. See the original article here: Exploring the Spline Data Tracker and Visualization tool for Apache Spark (Part 2) Opinions expressed by Java Code Geeks contributors are their own. |
Getting a “Exception in thread “main” java.lang.NoClassDefFoundError: za/co/absa/spline/core/SparkLineageInitializer$”
Getting a Exception in thread “main” java.lang.NoClassDefFoundError: za/co/absa/spline/core/SparkLineageInitializer$ from the above code
Getting a Exception in thread “main” java.lang.NoSuchMethodError: za.co.absa.spline.model.op.Write.(Lza/co/absa/spline/model/op/OperationProps;Ljava/lang/String;Ljava/lang/String;Z)V