Java 7: Closing NIO.2 file channels without loosing data
Another issue when closing asynchronous channels is mentioned in the javadoc of AsynchronousFileChannel
: “Shutting down the executor service while the channel is open results in unspecified behavior.” This is because the close()
operation on AsynchronousFileChannel
issues tasks to the associated executor service that simulate the failure of pending I/O operations (in that same thread pool) with an AsynchronousCloseException
. Hence, you’ll get RejectedExecutionException
if you perform close()
on an asynchronous file channel instance when you previously closed the associated executor service.
That all being said, the proposed way to safely configure the file channel and shutdown that channel goes like this:
public class SimpleChannelClose_AsynchronousCloseException { private static final String FILE_NAME = "E:/temp/afile.out"; private static AsynchronousFileChannel outputfile; private static AtomicInteger fileindex = new AtomicInteger(0); private static ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); public static void main(String[] args) throws InterruptedException, IOException, ExecutionException { outputfile = AsynchronousFileChannel.open( Paths.get(FILE_NAME), new HashSet<StandardOpenOption>(Arrays.asList(StandardOpenOption.WRITE, StandardOpenOption.CREATE,StandardOpenOption.DELETE_ON_CLOSE)), pool); List<Future<Integer>> futures = new ArrayList<>(); for (int i = 0; i < 10000; i++) { futures.add(outputfile.write(ByteBuffer.wrap("Hello".getBytes()), fileindex.getAndIncrement() * 5)); } outputfile.close(); pool.shutdown(); pool.awaitTermination(60, TimeUnit.SECONDS); for (Future<Integer> future : futures) { try { future.get(); } catch (ExecutionException e) { System.out.println("Task wasn't executed!"); } } } }
The custom thread pool executor service is defined in lines 6 and 7. The file channel is defined in lines 10 to 13. In the lines 18 to 20 the asynchronous channel is closed in an orderly manner. First the channel itself is closed, then the executor service is shutdown and last not least the thread awaits termination of the thread pool executor.
Although this is a safe way to close a channel with a custom executor service, there’s a new issue introduced. The clients submitted asynchronous write tasks (line 16) and may want be sure that, once they’ve been submitted successfully, those tasks will definitely be executed. Always waiting for Future.get()
to return (line 23), isn’t an option, cause in many cases this would lead *asynchronous* file channels ad adsurdum. The snippet above will return lot’s of “Task wasn’t executed!” messages cause the channel is closed immediately after the write operations were submitted to the channel (line 18). To avoid such ‘data loss’ you can implement your own CompletionHandler
and pass that to the requested write operation.
public class SimpleChannelClose_CompletionHandler { ... public static void main(String[] args) throws InterruptedException, IOException, ExecutionException { ... outputfile.write(ByteBuffer.wrap("Hello".getBytes()), fileindex.getAndIncrement() * 5, "", defaultCompletionHandler); ... } private static CompletionHandler<integer, string=""> defaultCompletionHandler = new CompletionHandler<Integer, String>() { @Override public void completed(Integer result, String attachment) { // NOP } @Override public void failed(Throwable exc, String attachment) { System.out.println("Do something to avoid data loss ..."); } }; }
The CompletionHandler.failed()
method (line 16) catches any runtime exception during task processing. You can implement any compensation code here to avoid data loss. When you work on mission critical data, then it may be a good idea to use CompletionHandler
s. But *still* there’s another issue. The clients can submit tasks but they don’t know if the pool will successfully process these tasks. Successful in this context means that the bytes submitted actually reach their destination (the file on the hard disk). If you want to be sure that all submitted tasks are actually processed before closing, it gets a little trickier. You need a ‘graceful’ closing mechanism, that waits until the work queue is empty *before* it actually closes the channel and the associated executor service (this isn’t possible using standard lifecycle methods).
Introducing GracefulAsynchronousChannel
My last snippets introduce the GracefulAsynchronousFileChannel
. You can get the complete code here in my Git repository. The behaviour of that channel is like this: guarantee to process all successfully submitted write operations and throw an NonWritableChannelException
if the channel prepares shutdown. It takes two things to implement that behaviour. Firstly, you’ll need to implement the afterExecute()
in an extension of ThreadPoolExecutor
that sends a signal when the queue is empty. This is what DefensiveThreadPoolExecutor
does.
private class DefensiveThreadPoolExecutor extends ThreadPoolExecutor { public DefensiveThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, LinkedBlockingQueue<Runnable> workQueue, ThreadFactory factory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, factory, handler); } /** * "Last" task issues a signal that queue is empty after task processing was completed. */ @Override protected void afterExecute(Runnable r, Throwable t) { if (state == PREPARE) { closeLock.lock(); // only one thread will pass when closer thread is awaiting signal try { if (getQueue().isEmpty() && state < SHUTDOWN) { System.out.println("Issueing signal that queue is empty ..."); isEmpty.signal(); state = SHUTDOWN; // -> no other thread can issue empty-signal } } finally { closeLock.unlock(); } } super.afterExecute(r, t); } }
The afterExecute()
method (line 12) is executed after each processed task by the thread that processed that given task. The implementation sends the isEmpty
signal in line 18. The second part you need two gracefully close a channel is a custom implementation of the close()
method of AsynchronousFileChannel
.
/** * Method that closes this file channel gracefully without loosing any data. */ @Override public void close() throws IOException { AsynchronousFileChannel writeableChannel = innerChannel; System.out.println("Starting graceful shutdown ..."); closeLock.lock(); try { state = PREPARE; innerChannel = AsynchronousFileChannel.open(Paths.get(uri), new HashSet<StandardOpenOption>(Arrays.asList(StandardOpenOption.READ)), pool); System.out.println("Channel blocked for write access ..."); if (!pool.getQueue().isEmpty()) { System.out.println("Waiting for signal that queue is empty ..."); isEmpty.await(); System.out.println("Received signal that queue is empty ... closing"); } else { System.out.println("Don't have to wait, queue is empty ..."); } } catch (InterruptedException e) { Thread.interrupted(); throw new RuntimeException("Interrupted on awaiting Empty-Signal!", e); } catch (Exception e) { throw new RuntimeException("Unexpected error" + e); } finally { closeLock.unlock(); writeableChannel.force(false); writeableChannel.close(); // close the writable channel innerChannel.close(); // close the read-only channel System.out.println("File closed ..."); pool.shutdown(); // allow clean up tasks from previous close() operation to finish safely try { pool.awaitTermination(1, TimeUnit.MINUTES); } catch (InterruptedException e) { Thread.interrupted(); throw new RuntimeException("Could not terminate thread pool!", e); } System.out.println("Pool closed ..."); } }
Study that code for a while. The interesting bits are in line 11 where the innerChannel
gets replaced by a read-only channel. That causes any subsequent asynchronous write requests to fail with an NonWritableChannelException
. In line 16 the close()
method waits for the isEmpty
signal to happen. When this signal is send after the last write task the close()
method continues with an orderly shutdown procedure (line 27 ff.). Basically, the code adds a shared lifecycle state across the file channel and the associated thread pool. That way both objects can communicate during the shutdown procedure and avoid data loss.
Here is a logging client that uses the GracefulAsynchronousFileChannel
.
public class MyLoggingClient { private static AtomicInteger fileindex = new AtomicInteger(0); private static final String FILE_URI = "file:/E:/temp/afile.out"; public static void main(String[] args) throws IOException { new Thread(new Runnable() { // arbitrary thread that writes stuff into an asynchronous I/O data sink @Override public void run() { try { for (;;) { GracefulAsynchronousFileChannel.get(FILE_URI).write(ByteBuffer.wrap("Hello".getBytes()), fileindex.getAndIncrement() * 5); } } catch (NonWritableChannelException e) { System.out.println("Deal with the fact that the channel was closed asynchronously ... " + e.toString()); } catch (Exception e) { e.printStackTrace(); } } }).start(); Timer timer = new Timer(); // asynchronous channel closer timer.schedule(new TimerTask() { public void run() { try { GracefulAsynchronousFileChannel.get(FILE_URI).close(); long size = Files.size(Paths.get("E:/temp/afile.out")); System.out.println("Expected file size (bytes): " + (fileindex.get() - 1) * 5); System.out.println("Actual file size (bytes): " + size); if (size == (fileindex.get() - 1) * 5) System.out.println("No write operation was lost!"); Files.delete(Paths.get("E:/temp/afile.out")); } catch (IOException e) { e.printStackTrace(); } } }, 1000); } }
The client starts two threads, one thread issues write operations in an infinite loop (line 6 ff.). The other thread closes the file channel asynchronously after one second of processing (line 25 ff.). If you run that client, then the following output is produced:
Starting graceful shutdown ... Deal with the fact that the channel was closed asynchronously ... java.nio.channels.NonWritableChannelException Channel blocked for write access ... Waiting for signal that queue is empty ... Issueing signal that queue is empty ... Received signal that queue is empty ... closing File closed ... Pool closed ... Expected file size (bytes): 400020 Actual file size (bytes): 400020 No write operation was lost!
The output shows the orderly shutdown procedure of participating threads. The logging thread needs to deal with the fact that the channel was closed asynchronously. After the queued tasks are processed the channel resources are closed. No data was lost, everything that the client issued was really written to the file destination. No AsynchronousClosedException
s or RejectedExecutionException
s in such a graceful closing procedure.
That’s all in terms of safely closing asynchronous file channels. The complete code is here in my Git repository. I hope you’ve enjoyed it a little. Looking forward to your comments.