Core Java

Processing huge files with Java

I recently had to process a set of files containg historical tick-by-tick fx market data and quickly realized that none of them could be read into memory using a traditional InputStream because every file was over 4 gigabytes in size. Emacs couldn’t even open them.

In this particular case I could write a simple bash script that divide files into smaller pieces and read them as usual. But I don’t want that since binary formats would invalidate this approach.
 
 
 
 
So the way to handle this problem properly is to process regions of data incrementally using memory mapped files. What’s nice about memory mapped files is that they do not consume virtual memory or paging space since it is backed by file data on disk.

Okey, let’s have a look at these files and extract some data. Seems like they contain ASCII text rows with comma delimited fields.

Format: [currency-pair],[timestamp],[bid-price],[ask-price]

Example: EUR/USD,20120102 00:01:30.420,1.29451,1.2949

Fair enough, I could write a program for that format. But reading and parsing files are orthogonal concepts; so let’s take a step back and think about a generic design that can be reused in case confronted with a similar problem in the future.

The problem boils down to incrementally decode a set of entries encoded in a infinitely long byte array without exhausting memory. The fact that the example format is encoded in comma/line delimited text is irrelevant for the general solution so it is clear that a decoder interface is needed in order to handle different formats.

Again, every entry cannot be parsed and kept in memory until the whole file is processed so we need a way to incrementally hand off chunks of entries that can be written elsewhere, disk or network, before they are garbage collected. An iterator is a good abstraction to handle this requirement because they act like cursors, which is exactly the point. Every iteration forwards the file pointer and let us do something with the data.

So first the Decoder interface. The idea is to incrementally decode objects from a MappedByteBuffer or return null if no objects remains in the buffer.

public interface Decoder<T> {
    public T decode(ByteBuffer buffer);
}

Then comes the FileReader which implements Iterable. Each iteration will process next 4096 bytes of data and decode them into a list of objects using the Decoder. Notice that FileReader accept a list of files, which is nice since it enable traversal through the data without worrying about aggregation across files. By the way, 4096 byte chunks are probably a bit small for bigger files.

public class FileReader implements Iterable<List<T>> {
  private static final long CHUNK_SIZE = 4096;
  private final Decoder<T> decoder;
  private Iterator<File> files;
 
  private FileReader(Decoder<T> decoder, File... files) {
    this(decoder, Arrays.asList(files));
  }
  private FileReader(Decoder<T> decoder, List<File> files) {
    this.files = files.iterator();
    this.decoder = decoder;
  }
  public static <T> FileReader<T> create(Decoder<T> decoder, List<File> files) {
    return new FileReader<T>(decoder, files);
  }

  public static <T> FileReader<T> create(Decoder<T> decoder, File... files) {
    return new FileReader<T>(decoder, files);
  }
  @Override
  public Iterator<List<T>> iterator() {
    return new Iterator<List<T>>() {
      private List<T> entries;
      private long chunkPos = 0;
      private MappedByteBuffer buffer;
      private FileChannel channel;
      @Override
      public boolean hasNext() {
        if (buffer == null || !buffer.hasRemaining()) {
          buffer = nextBuffer(chunkPos);
          if (buffer == null) {
            return false;
          }
        }
        T result = null;
        while ((result = decoder.decode(buffer)) != null) {
          if (entries == null) {
            entries = new ArrayList<T>();
          }
          entries.add(result);
        }
        // set next MappedByteBuffer chunk
        chunkPos += buffer.position();
        buffer = null;
        if (entries != null) {
          return true;
        } else {
          Closeables.closeQuietly(channel);
          return false;
        }
      }
 
      private MappedByteBuffer nextBuffer(long position) {
        try {
          if (channel == null || channel.size() == position) {
            if (channel != null) {
              Closeables.closeQuietly(channel);
              channel = null;
            }
            if (files.hasNext()) {
              File file = files.next();
              channel = new RandomAccessFile(file, "r").getChannel();
              chunkPos = 0;
              position = 0;
            } else {
              return null;
            }
          }
          long chunkSize = CHUNK_SIZE;
          if (channel.size() - position < chunkSize) {
            chunkSize = channel.size() - position;
          }
           return channel.map(FileChannel.MapMode.READ_ONLY, chunkPos, chunkSize);
        } catch (IOException e) {
           Closeables.closeQuietly(channel);
           throw new RuntimeException(e);
        }
      }
 
      @Override
      public List<T> next() {
        List<T> res = entries;
        entries = null;
        return res;
      }
 
      @Override
      public void remove() {
        throw new UnsupportedOperationException();
      }
    };
  }
}

Next task is to write a Decoder and I decided to implement a generic TextRowDecoder for any comma delimited text file format, accepting number of fields per row and a field delimiter and returning an array of byte arrays. TextRowDecoder can then be reused by format specific decoders that maybe handle different character sets.

public class TextRowDecoder implements Decoder<byte[][]> {
  private static final byte LF = 10;
  private final int numFields;
  private final byte delimiter;
  public TextRowDecoder(int numFields, byte delimiter) {
   this.numFields = numFields;
   this.delimiter = delimiter;
  }
  @Override
  public byte[][] decode(ByteBuffer buffer) {
    int lineStartPos = buffer.position();
    int limit = buffer.limit();
    while (buffer.hasRemaining()) {
      byte b = buffer.get();
      if (b == LF) { // reached line feed so parse line
        int lineEndPos = buffer.position();
        // set positions for one row duplication
        if (buffer.limit() < lineEndPos + 1) {
          buffer.position(lineStartPos).limit(lineEndPos);
        } else {
          buffer.position(lineStartPos).limit(lineEndPos + 1);
        }
        byte[][] entry = parseRow(buffer.duplicate());
        if (entry != null) {
          // reset main buffer
          buffer.position(lineEndPos);
          buffer.limit(limit);
          // set start after LF
          lineStartPos = lineEndPos;
        }
        return entry;
      }
    }
    buffer.position(lineStartPos);
    return null;
  }
 
  public byte[][] parseRow(ByteBuffer buffer) {
    int fieldStartPos = buffer.position();
    int fieldEndPos = 0;
    int fieldNumber = 0;
    byte[][] fields = new byte[numFields][];
    while (buffer.hasRemaining()) {
      byte b = buffer.get();
      if (b == delimiter || b == LF) {
        fieldEndPos = buffer.position();
        // save limit
        int limit = buffer.limit();
        // set positions for one row duplication
        buffer.position(fieldStartPos).limit(fieldEndPos);
        fields[fieldNumber] = parseField(buffer.duplicate(), fieldNumber, fieldEndPos - fieldStartPos - 1);
        fieldNumber++;
        // reset main buffer
        buffer.position(fieldEndPos);
        buffer.limit(limit);
        // set start after LF
        fieldStartPos = fieldEndPos;
      }
      if (fieldNumber == numFields) {
        return fields;
      }
    }
    return null;
  }
 
  private byte[] parseField(ByteBuffer buffer, int pos, int length) {
    byte[] field = new byte[length];
    for (int i = 0; i < field.length; i++) {
      field[i] = buffer.get();
    }
    return field;
  }
}

And this is how files are processed. Each list contain elements decoded from a single buffer and each element is an array of byte arrays as specified by the TextRowDecoder.

               
               TextRowDecoder decoder = new TextRowDecoder(4, comma);
FileReader<byte[][]> reader = FileReader.create(decoder, file.listFiles());
for (List<byte[][]> chunk : reader) {
  // do something with each chunk
}

We could stop here but there was one more requirement. Every row contain a timestamp and each batch must be grouped according to periods of time instead of buffers, day-by-day or hour-by-hour. I still want to iterate through each batch so the immediate reaction was to create a Iterable wrapper for FileReader that would implement this behaviour. One additional detail is that each element must to provide its timestamp to PeriodEntries by implementing the Timestamped interface (not shown here).

               
                public class PeriodEntries<T extends Timestamped> implements Iterable<List<T>> {
  private final Iterator<List<T extends Timestamped>> entriesIt;
  private final long interval;
  private PeriodEntries(Iterable<List<T>> entriesIt, long interval) {
    this.entriesIt = entriesIt.iterator();
    this.interval = interval;
  }

  public static <T extends Timestamped> PeriodEntries<T> create(Iterable<List<T>> entriesIt, long interval) {
   return new PeriodEntries<T>(entriesIt, interval);
  }
 
  @Override
  public Iterator<List<T extends Timestamped>> iterator() {
    return new Iterator<List<T>>() {
      private Queue<List<T>> queue = new LinkedList<List<T>>();
      private long previous;
      private Iterator<T> entryIt;
 
      @Override
      public boolean hasNext() {
        if (!advanceEntries()) {
          return false;
        }
        T entry =  entryIt.next();
        long time = normalizeInterval(entry);
        if (previous == 0) {
          previous = time;
        }
        if (queue.peek() == null) {
          List<T> group = new ArrayList<T>();
          queue.add(group);
        }
        while (previous == time) {
          queue.peek().add(entry);
          if (!advanceEntries()) {
            break;
          }
          entry = entryIt.next();
          time = normalizeInterval(entry);
        }
        previous = time;
        List<T> result = queue.peek();
        if (result == null || result.isEmpty()) {
          return false;
        }
        return true;
      }
 
      private boolean advanceEntries() {
        // if there are no rows left
        if (entryIt == null || !entryIt.hasNext()) {
          // try get more rows if possible
          if (entriesIt.hasNext()) {
            entryIt = entriesIt.next().iterator();
            return true;
          } else {
            // no more rows
            return false;
          }
        }
        return true;
      }
 
      private long normalizeInterval(Timestamped entry) {
        long time = entry.getTime();
        int utcOffset = TimeZone.getDefault().getOffset(time);
        long utcTime = time + utcOffset;
        long elapsed = utcTime % interval;
        return time - elapsed;
      }
      @Override
      public List<T> next() {
        return queue.poll();
      }
      @Override
      public void remove() {
        throw new UnsupportedOperationException();
      }
   };
  }
}

The final processing code did not change much by introducing this functionality, only one clean and tight for-loop that does not have to care about grouping elements across files, buffers and periods. PeriodEntries is also flexible enough to mange any length on the interval.

TrueFxDecoder decoder = new TrueFxDecoder();
FileReader<TrueFxData> reader = FileReader.create(decoder, file.listFiles());
long periodLength = TimeUnit.DAYS.toMillis(1);
PeriodEntries<TrueFxData> periods = PeriodEntries.create(reader, periodLength);
 
for (List<TrueFxData> entries : periods) {
   // data for each day
   for (TrueFxData entry : entries) {
     // process each entry
   }
}

As you may realize, it would not have been possible to solve this problem with collections; choosing iterators was a crucial design decision to be able to parse terabytes of data without consuming too much heap space.
 

Reference: Processing huge files with Java from our JCG partner Kristoffer Sjogren at the deephacks blog.

Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

5 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
selvaraj
11 years ago

Yeah sounds good even im a newbie. But we can use read each line from file, java RandomAccessFile Class has given us readline() method na? Can we use this method.

Manoj
Manoj
11 years ago

Hi ,

I have got a task of reading a huge file . Can you please suggest me best possible way to read the file ?

Arnaud
Arnaud
9 years ago

I don’t really understand why you need to use a memory mapped file when you will read the file by buffer of 4096 bytes. You can do that with a InputStream.read(byte[] buffer). Also from my experience it is much faster to use a FileReader to handle the the char decoding that going through ByteBuffer – Charset – CharBuffer even with CharBuffer caching. Most CsvParser would parse and map to object Line by line and deal with the buffer boundary. Also the series seems to be ordered so why do you aggregate it by timestamp to anyway in the end iterate… Read more »

Ramesh Reddy
Ramesh Reddy
9 years ago

The fastest way, if your data is ASCII and you don’t need charset conversion, is to use a BufferedInputStream and do all the parsing yourself — find the line terminators, parse the numbers. Do NOT use a Reader, or create Strings, or create any objects per line, or use parseInt. Just use byte arrays and look at the bytes. Because String is inefficient in java.

Raghuram
Raghuram
8 years ago

Hey hi,
I understood the way you are telling but how can I do so in an excel file of say 35mb which has various sheets and some sheets even have charts. I am not able to read the file using normal convention of InputStream. Is there any way I can solve this problem.

Thankyou

Back to top button