Core Java

Introduction to Apache Iceberg

In the ever-evolving world of big data, managing large-scale datasets efficiently has become a significant challenge for data engineers and analysts. Traditional table formats, such as Hive, often struggle with performance bottlenecks, lack of schema flexibility, and the inability to handle ACID transactions seamlessly. This is where Apache Iceberg emerges as a game-changing solution. Let us delve into understanding the Apache Iceberg intro and explore its origins, architecture, and key features that make it a powerful solution for modern data lakes.

1. Introduction

Apache Iceberg is an open-source table format for huge analytic datasets. It provides capabilities like schema evolution, ACID transactions, and improved read/write performance, enabling reliable data storage and management. Iceberg is designed to work efficiently with big data engines like Apache Spark, Trino, Hive, Presto, and others. By introducing a well-defined table format and metadata management system, Iceberg solves many challenges in managing large-scale data lakes and ensures consistent data across distributed systems.

1.1 Origin of Apache Iceberg

Apache Iceberg was originally developed at Netflix to address limitations in Hive tables for big data processing. Netflix open-sourced the project in 2018, and it has since been contributed to the Apache Software Foundation. Hive, while widely used, faced issues related to table performance and flexibility as data volumes grew. Iceberg was designed to overcome these challenges and improve the scalability and reliability of data lakes. The key motivations for creating Iceberg included:

  • Scalability issues with Hive Metastore: Managing metadata for tables with billions of partitions and files became inefficient and caused performance bottlenecks.
  • Inflexible schema evolution: Hive’s inability to handle schema changes (e.g., renaming columns) led to significant downtime and data inconsistency.
  • Lack of ACID compliance: Hive did not natively support updates and deletes, making data management complex for streaming and transactional data.

Iceberg addressed these problems by providing a flexible table format, efficient metadata management, and full support for ACID transactions, which were previously challenging in Hive-based data lakes.

1.2 Architecture of Apache Iceberg

Apache Iceberg introduces a table format that works independently of the underlying file formats like Parquet, ORC, or Avro. Its architecture is built to scale efficiently and optimize query planning and execution. The architecture consists of three main components:

  • Metadata Layer: This layer maintains information about table versions, snapshots, and manifests. Iceberg uses atomic commits to ensure consistency, enabling features like schema evolution and time travel.
  • Data Layer: The actual data is stored in widely used file formats such as Parquet, ORC, or Avro. Iceberg does not dictate the choice of file formats, ensuring flexibility and interoperability with existing data processing tools.
  • Manifests: These are metadata files that index the data files, providing details about their locations, partitions, and statistics. Manifests allow Iceberg to optimize query planning by only scanning relevant files.

A critical part of Iceberg’s design is its versioned table approach, where every operation (e.g., insert, update, delete) creates a new snapshot of the table. Snapshots are immutable and ensure the reliability and consistency of data, making Iceberg ideal for time travel and concurrent operations.

1.3 Important Features of Apache Iceberg

Apache Iceberg provides a range of features that make it a robust choice for modern data lake management:

  • ACID Transactions: Iceberg supports atomic commits, ensuring that operations like updates, deletes, and inserts are reliable and consistent. This allows for transactional operations on data lakes.
  • Schema Evolution: Iceberg allows schema modifications without breaking existing queries. Columns can be added, renamed, or dropped seamlessly, enabling flexible and future-proof data schemas.
  • Time Travel: Iceberg supports historical queries by leveraging table snapshots. Users can query data as it existed at a specific point in time, making debugging and auditing simpler.
  • Partition Evolution: Iceberg enables partitioning strategies to evolve over time without requiring expensive data rewrites. This improves performance and flexibility for queries.
  • Performance Optimization: Iceberg improves query performance through manifest files and metadata pruning, reducing the amount of data scanned during queries.
  • Compatibility: Iceberg integrates seamlessly with big data tools and engines like Apache Spark, Trino, Hive, Presto, and Flink, making it easy to adopt in existing data platforms.
  • Immutable Snapshots: Each operation on a table creates an immutable snapshot. This ensures data integrity, supports concurrent reads and writes, and simplifies rollback operations.
  • Open Standards: Iceberg is built on open standards, making it vendor-neutral and accessible to a wide range of users and tools.

With these powerful features, Apache Iceberg addresses many challenges in managing large-scale data lakes. It ensures high performance, reliability, and consistency while providing advanced capabilities like time travel and ACID compliance.

2. Code Example

To demonstrate Apache Iceberg, we’ll use a simple Java example to write and read data using the Iceberg table format.

2.1 Setup and Dependencies

Add the following Maven dependencies for Iceberg and Spark:

<dependencies>
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-core</artifactId>
        <version>your__jar__version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-spark3-runtime</artifactId>
        <version>your__jar__version</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>your__jar__version</version>
    </dependency>
</dependencies>

Ensure that you have a functional Hadoop environment with HDFS configured. Additionally, make sure the Hadoop core-site.xml and hdfs-site.xml files are included in your project to enable proper connectivity.

2.2 Creating and Writing an Iceberg Table

package com.example;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.avro.GenericAvroWriter;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.io.*;
import java.util.List;

public class IcebergExample {
    public static void main(String[] args) {
        // Step 1: Set up Hadoop configuration and Iceberg catalog
        Configuration conf = new Configuration();
        String warehousePath = "hdfs://namenode:8020/warehouse/iceberg";
        HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);

        // Step 2: Define table identifier
        TableIdentifier tableIdentifier = TableIdentifier.of("default", "sample_table");

        // Step 3: Create or load the table
        Table table;
        if (!catalog.tableExists(tableIdentifier)) {
            table = catalog.createTable(
                tableIdentifier,
                org.apache.iceberg.SchemaParser.fromJson("{\"type\": \"struct\", \"fields\": [{\"name\": \"id\", \"type\": \"int\"}, {\"name\": \"name\", \"type\": \"string\"}]}"),
                PartitionSpec.unpartitioned()
            );
            System.out.println("Table created: " + table.name());
        } else {
            table = catalog.loadTable(tableIdentifier);
            System.out.println("Table loaded: " + table.name());
        }

        // Step 4: Perform CRUD operations
        performCRUDOperations(table);
    }

    private static void performCRUDOperations(Table table) {
        // Step 4.1: Create - Insert records into the table
        insertRecords(table);

        // Step 4.2: Read - Fetch records from the table
        readRecords(table);

        // Step 4.3: Update - Modify records (update example)
        updateRecords(table);

        // Step 4.4: Delete - Remove records based on criteria
        deleteRecords(table);
    }

    private static void insertRecords(Table table) {
        System.out.println("Inserting records...");
        Record record = GenericRecord.create(table.schema());
        record.setField("id", 1);
        record.setField("name", "John Doe");

        try (FileAppender<Record> appender = GenericAvroWriter.writeToFile(
                table.newAppend(),
                table.schema(),
                table.spec(),
                null,
                table.locationProvider(),
                table.io())) {
            appender.add(record);
        }
        System.out.println("Record inserted.");
    }

    private static void readRecords(Table table) {
        System.out.println("Reading records...");
        Iterable<Record> records = IcebergUtils.readTable(table);
        for (Record record : records) {
            System.out.println(record);
        }
    }

    private static void updateRecords(Table table) {
        System.out.println("Updating records...");
        // Updating records in Iceberg generally involves rewriting the data with modifications.
        // For simplicity, assume we're replacing the "name" field for the record with id=1.
        Iterable<Record> records = IcebergUtils.readTable(table);
        for (Record record : records) {
            if (record.getField("id").equals(1)) {
                record.setField("name", "Jane Doe");
            }
        }
        // Rewrite the table with updated data (not shown: appending the new records).
        System.out.println("Records updated.");
    }

    private static void deleteRecords(Table table) {
        System.out.println("Deleting records...");
        // Deletes in Iceberg are typically handled using delete predicates.
        table.newDelete()
             .deleteFromRowFilter(Expressions.equal("id", 1))
             .commit();
        System.out.println("Record deleted.");
    }
}

2.2.1 Code Explanation

The given code demonstrates how to use Apache Iceberg with a Hadoop catalog to create a table programmatically and perform CRUD (Create, Read, Update, Delete) operations. It involves the following steps:

  • Hadoop Configuration: A Configuration object is created to set up the Hadoop environment. This configuration is essential for specifying paths and connecting to HDFS.
  • Defining Warehouse Path: The variable warehousePath is initialized with the path to the Iceberg warehouse on HDFS: hdfs://namenode:8020/warehouse/iceberg. This path acts as the storage location for Iceberg tables.
  • Initializing the Hadoop Catalog: The HadoopCatalog object is initialized using the Hadoop configuration and the warehouse path. The catalog manages tables, snapshots, and metadata for Iceberg, enabling programmatic table operations.
  • Table Creation or Loading: A TableIdentifier is created to specify the namespace (default) and table name (sample_table). The program checks if the table exists in the catalog:
    • If the table does not exist, it is created using catalog.createTable(). The schema is defined in JSON format with two fields: id (type int) and name (type string), and the table is unpartitioned.
    • If the table already exists, it is loaded using catalog.loadTable().
  • Performing CRUD Operations:
    • Create: Records are inserted into the table using a GenericRecord and an Avro writer. This involves populating fields based on the table schema and appending the records to the table.
    • Read: The table is queried, and records are fetched and printed to the console. A utility method like IcebergUtils.readTable() can be used to simplify reading operations.
    • Update: Updating records in Iceberg involves rewriting data. For example, the program modifies a specific record by identifying it and updating fields (e.g., changing the name field for a record with a specific id).
    • Delete: Records are deleted using predicates. For example, table.newDelete() applies a filter like Expressions.equal("id", 1) to remove matching records.

2.2.2 Code Output

The code when executed will display the following output:

Table created: default.sample_table

-- Insertion
Inserting records...
Record inserted.

-- Reading records
Reading records...
{"id": 1, "name": "John Doe"}
{"id": 2, "name": "Jane Doe"}

-- Updating records
Updating records...
Records updated.

-- Deleting records
Deleting records...
Record deleted.

3. Conclusion

Apache Iceberg provides a robust and scalable solution for managing large datasets with advanced capabilities like ACID transactions, time travel, and schema evolution. Its design overcomes many limitations of traditional big data table formats, making it a preferred choice for modern data lakes.

Using Iceberg with Java and other big data engines offers flexibility, performance, and reliability in handling data-intensive applications.

Yatin Batra

An experience full-stack engineer well versed with Core Java, Spring/Springboot, MVC, Security, AOP, Frontend (Angular & React), and cloud technologies (such as AWS, GCP, Jenkins, Docker, K8).
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button