Core Java
Apache Arrow on the JVM: Streaming Reads
Previously we wrote Arrow Data to a Stream. Now we shall read those data from a stream.
Just like on the previous blog the we shall implement the Closeable interface. This is needed to close the RootAllocator and free-up memory.
We shall pass a ReadableByteChannel and thus get the stream into read objects.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | package com.gkatzioura.arrow; import java.io.Closeable; import java.io.IOException; import java.nio.channels.ReadableByteChannel; import java.util.ArrayList; import java.util.List; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.ipc.ArrowStreamReader; public class DefaultEntriesReader implements Closeable { private final RootAllocator rootAllocator; public DefaultEntriesReader() { rootAllocator = new RootAllocator(Integer.MAX_VALUE); } public List<DefaultArrowEntry> readBytes(ReadableByteChannel readableByteChannel) throws IOException { List<DefaultArrowEntry> defaultArrowEntries = new ArrayList<>(); try (ArrowStreamReader arrowStreamReader = new ArrowStreamReader(readableByteChannel, rootAllocator)) { var root = arrowStreamReader.getVectorSchemaRoot(); var childVector1 = (VarCharVector)root.getVector( 0 ); var childVector2 = (IntVector)root.getVector( 1 ); while (arrowStreamReader.loadNextBatch()) { int batchSize = root.getRowCount(); for ( int i = 0 ; i < batchSize; i++) { var strData = new String(childVector1.get(i)); var intData = childVector2.get(i); DefaultArrowEntry defaultArrowEntry = DefaultArrowEntry.builder().col1(strData).col2(intData).build(); defaultArrowEntries.add(defaultArrowEntry); } } return defaultArrowEntries; } } @Override public void close() throws IOException { rootAllocator.close(); } } |
Let’s wrap it up with a write and a Read
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | package com.gkatzioura.arrow; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.channels.Channels; import java.util.stream.Collectors; import java.util.stream.IntStream; public class ArrowMain { public static void main(String[] args) throws IOException { var originalEntries = IntStream.rangeClosed( 0 , 11 ) .boxed() .map(i -> new DefaultArrowEntry( "data-" +i, i)).collect(Collectors.toList()); var outputStream = new ByteArrayOutputStream(); try (var arrowWriter = new DefaultEntriesWriter()) { arrowWriter.write(originalEntries, 10 , Channels.newChannel(outputStream)); } byte [] introBytes = outputStream.toByteArray(); var inputStream = new ByteArrayInputStream(introBytes); try (var arrowReader = new DefaultEntriesReader()) { var entries =arrowReader.readBytes(Channels.newChannel(inputStream)); for (DefaultArrowEntry entry : entries) { System.out.println( "Read " +entry.getCol1()+ " " +entry.getCol2()); } } } } |
That’s it. To summarise we created Arrow Schemas, we wrote data to a Stream and we read data from a Stream!
Published on Java Code Geeks with permission by Emmanouil Gkatziouras, partner at our JCG program. See the original article here: Apache Arrow on the JVM: Streaming Reads Opinions expressed by Java Code Geeks contributors are their own. |