Java EE 7 Batch Processing and World of Warcraft – Part 1
This was one of my sessions at the last JavaOne. This post is going to expand the subject and look into a real application using the Batch JSR-352 API. This application integrates with the MMORPG World of Warcraft.
Since the JSR-352 is a new specification in the Java EE world, I think that many people don’t know how to use it properly. It may also be a challenge to identify the use cases to which this specification apply. Hopefully this example can help you understand better the use cases.
Abstract
World of Warcraft is a game played by more than 8 million players worldwide. The service is offered by region: United States (US), Europe (EU), China and Korea. Each region has a set of servers called Realm that you use to connect to be able to play the game. For this example, we are only looking into the US and EU regions.
One of the most interesting features about the game is that allows you to buy and sell in-game goods called Items, using an Auction House. Each Realm has two Auction House’s. On average each Realm trades around 70.000 Items. Let’s crunch some numbers:
- 512 Realm’s (US and EU)
- 70 K Item’s per Realm
- More than 35 M Item’s overall
The Data
Another cool thing about World of Warcraft is that the developers provide a REST API to access most of the in-game information, including the Auction House’s data. Check here the complete API.
The Auction House’s data is obtained in two steps. First we need to query the correspondent Auction House Realm REST endpoint to get a reference to a JSON file. Next we need to access this URL and download the file with all the Auction House Item’s information. Here is an example:
http://eu.battle.net/api/wow/auction/data/aggra-portugues
The Application
Our objective here is to build an application that downloads the Auction House’s, process it and extract metrics. These metrics are going to build a history of the Items price evolution through time. Who knows? Maybe with this information we can predict price fluctuation and buy or sell Items at the best times.
The Setup
For the setup, we’re going to use a few extra things to Java EE 7:
Jobs
The main work it’s going to be performed by Batch JSR-352 Jobs. A Job is an entity that encapsulates an entire batch process. A Job will be wired together via a Job Specification Language. With JSR-352, a Job is simply a container for the steps. It combines multiple steps that belong logically together in a flow.
We’re going to split the business login into three jobs:
- Prepare – Creates all the supporting data needed. List Realms, create folders to copy files.
- Files – Query realms to check for new files to process.
- Process – Downloads the file, process the data, extract metrics.
The Code
Back-end – Java EE 7 with Java 8
Most of the code is going to be in the back-end. We need Batch JSR-352, but we are also going to use a lot of other technologies from Java EE: like JPA, JAX-RS, CDI and JSON-P.
Since the Prepare Job is only to initialize application resources for the processing, I’m skipping it and dive into the most interesting parts.
Files Job
The Files Job is an implementation of AbstractBatchlet
. A Batchlet is the simplest processing style available in the Batch specification. It’s a task oriented step where the task is invoked once, executes, and returns an exit status. This type is most useful for performing a variety of tasks that are not item-oriented, such as executing a command or doing file transfer. In this case, our Batchlet
is going to iterate on every Realm make a REST request to each one and retrieve an URL with the file containing the data that we want to process. Here is the code:
LoadAuctionFilesBatchlet
@Named public class LoadAuctionFilesBatchlet extends AbstractBatchlet { @Inject private WoWBusiness woWBusiness; @Inject @BatchProperty(name = "region") private String region; @Inject @BatchProperty(name = "target") private String target; @Override public String process() throws Exception { List<Realm> realmsByRegion = woWBusiness.findRealmsByRegion(Realm.Region.valueOf(region)); realmsByRegion.parallelStream().forEach(this::getRealmAuctionFileInformation); return "COMPLETED"; } void getRealmAuctionFileInformation(Realm realm) { try { Client client = ClientBuilder.newClient(); Files files = client.target(target + realm.getSlug()) .request(MediaType.TEXT_PLAIN).async() .get(Files.class) .get(2, TimeUnit.SECONDS); files.getFiles().forEach(auctionFile -> createAuctionFile(realm, auctionFile)); } catch (Exception e) { getLogger(this.getClass().getName()).log(Level.INFO, "Could not get files for " + realm.getRealmDetail()); } } void createAuctionFile(Realm realm, AuctionFile auctionFile) { auctionFile.setRealm(realm); auctionFile.setFileName("auctions." + auctionFile.getLastModified() + ".json"); auctionFile.setFileStatus(FileStatus.LOADED); if (!woWBusiness.checkIfAuctionFileExists(auctionFile)) { woWBusiness.createAuctionFile(auctionFile); } } }
A cool thing about this is the use of Java 8. With parallelStream()
invoking multiple REST request at once is easy as pie! You can really notice the difference. If you want to try it out, just run the sample and replace parallelStream()
with stream()
and check it out. On my machine, using parallelStream()
makes the task execute around 5 or 6 times faster.
Update
Usually, I would not use this approach. I’ve done it, because part of the logic involves invoking slow REST requests and parallelStreams really shine here. Doing this using batch partitions is possible, but hard to implement. We also need to pool the servers for new data every time, so it’s not terrible if we skip a file or two. Keep in mind that if you don’t want to miss a single record a Chunk processing style is more suitable. Thank you to Simon Martinelli for bringing this to my attention.
Since the Realms of US and EU require different REST endpoints to invoke, these are perfect to partitioned. Partitioning means that the task is going to run into multiple threads. One thread per partition. In this case we have two partitions.
To complete the job definition we need to provide a JoB XML file. This needs to be placed in the META-INF/batch-jobs
directory. Here is the files-job.xml
for this job:
files-job.xml
<job id="loadRealmAuctionFileJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <step id="loadRealmAuctionFileStep"> <batchlet ref="loadAuctionFilesBatchlet"> <properties> <property name="region" value="#{partitionPlan['region']}"/> <property name="target" value="#{partitionPlan['target']}"/> </properties> </batchlet> <partition> <plan partitions="2"> <properties partition="0"> <property name="region" value="US"/> <property name="target" value="http://us.battle.net/api/wow/auction/data/"/> </properties> <properties partition="1"> <property name="region" value="EU"/> <property name="target" value="http://eu.battle.net/api/wow/auction/data/"/> </properties> </plan> </partition> </step> </job>
In the files-job.xml
we need to define our Batchlet
in batchlet
element. For the partitions just define the partition
element and assign different properties
to each plan
. These properties
can then be used to late bind the value into the LoadAuctionFilesBatchlet
with the expressions #{partitionPlan['region']}
and #{partitionPlan['target']}
. This is a very simple expression binding mechanism and only works for simple properties and Strings.
Process Job
Now we want to process the Realm Auction Data file. Using the information from the previous job, we can now download the file and do something with the data. The JSON file has the following structure:
item-auctions-sample.json
{ "realm": { "name": "Grim Batol", "slug": "grim-batol" }, "alliance": { "auctions": [ { "auc": 279573567, // Auction Id "item": 22792, // Item for sale Id "owner": "Miljanko", // Seller Name "ownerRealm": "GrimBatol", // Realm "bid": 3800000, // Bid Value "buyout": 4000000, // Buyout Value "quantity": 20, // Numbers of items in the Auction "timeLeft": "LONG", // Time left for the Auction "rand": 0, "seed": 1069994368 }, { "auc": 278907544, "item": 40195, "owner": "Mongobank", "ownerRealm": "GrimBatol", "bid": 38000, "buyout": 40000, "quantity": 1, "timeLeft": "VERY_LONG", "rand": 0, "seed": 1978036736 } ] }, "horde": { "auctions": [ { "auc": 278268046, "item": 4306, "owner": "Thuglifer", "ownerRealm": "GrimBatol", "bid": 570000, "buyout": 600000, "quantity": 20, "timeLeft": "VERY_LONG", "rand": 0, "seed": 1757531904 }, { "auc": 278698948, "item": 4340, "owner": "Celticpala", "ownerRealm": "Aggra(Português)", "bid": 1000000, "buyout": 1000000, "quantity": 10, "timeLeft": "LONG", "rand": 0, "seed": 0 } ] } }
The file has a list of the Auction’s from the Realm it was downloaded from. In each record we can check the item for sale, prices, seller and time left until the end of the auction. Auction’s are algo aggregated by Auction House type: Alliance and Horde.
For the process-job
we want to read the JSON file, transform the data and save it to a database. This can be achieved by Chunk Processing. A Chunk is an ETL (Extract – Transform – Load) style of processing which is suitable for handling large amounts of data. A Chunk reads the data one item at a time, and creates chunks that will be written out, within a transaction. One item is read in from an ItemReader
, handed to an ItemProcessor
, and aggregated. Once the number of items read equals the commit interval, the entire chunk is written out via the ItemWriter
, and then the transaction is committed.
ItemReader
The real files are so big that they cannot be loaded entirely into memory or you may end up running out of it. Instead we use JSON-P API to parse the data in a streaming way.
AuctionDataItemReader
@Named public class AuctionDataItemReader extends AbstractAuctionFileProcess implements ItemReader { private JsonParser parser; private AuctionHouse auctionHouse; @Inject private JobContext jobContext; @Inject private WoWBusiness woWBusiness; @Override public void open(Serializable checkpoint) throws Exception { setParser(Json.createParser(openInputStream(getContext().getFileToProcess(FolderType.FI_TMP)))); AuctionFile fileToProcess = getContext().getFileToProcess(); fileToProcess.setFileStatus(FileStatus.PROCESSING); woWBusiness.updateAuctionFile(fileToProcess); } @Override public void close() throws Exception { AuctionFile fileToProcess = getContext().getFileToProcess(); fileToProcess.setFileStatus(FileStatus.PROCESSED); woWBusiness.updateAuctionFile(fileToProcess); } @Override public Object readItem() throws Exception { while (parser.hasNext()) { JsonParser.Event event = parser.next(); Auction auction = new Auction(); switch (event) { case KEY_NAME: updateAuctionHouseIfNeeded(auction); if (readAuctionItem(auction)) { return auction; } break; } } return null; } @Override public Serializable checkpointInfo() throws Exception { return null; } protected void updateAuctionHouseIfNeeded(Auction auction) { if (parser.getString().equalsIgnoreCase(AuctionHouse.ALLIANCE.toString())) { auctionHouse = AuctionHouse.ALLIANCE; } else if (parser.getString().equalsIgnoreCase(AuctionHouse.HORDE.toString())) { auctionHouse = AuctionHouse.HORDE; } else if (parser.getString().equalsIgnoreCase(AuctionHouse.NEUTRAL.toString())) { auctionHouse = AuctionHouse.NEUTRAL; } auction.setAuctionHouse(auctionHouse); } protected boolean readAuctionItem(Auction auction) { if (parser.getString().equalsIgnoreCase("auc")) { parser.next(); auction.setAuctionId(parser.getLong()); parser.next(); parser.next(); auction.setItemId(parser.getInt()); parser.next(); parser.next(); parser.next(); parser.next(); auction.setOwnerRealm(parser.getString()); parser.next(); parser.next(); auction.setBid(parser.getInt()); parser.next(); parser.next(); auction.setBuyout(parser.getInt()); parser.next(); parser.next(); auction.setQuantity(parser.getInt()); return true; } return false; } public void setParser(JsonParser parser) { this.parser = parser; } }
To open a JSON Parse stream we need Json.createParser
and pass a reference of an inputstream. To read elements we just need to call the hasNext()
and next()
methods. This returns a JsonParser.Event
that allows us to check the position of the parser in the stream. Elements are read and returned in the readItem()
method from the Batch API ItemReader
. When no more elements are available to read, return null
to finish the processing. Note that we also implements the method open
and close
from ItemReader
. These are used to initialize and clean up resources. They only execute once.
ItemProcessor
The ItemProcessor
is optional. It’s used to transform the data that was read. In this case we need to add additional information to the Auction.
AuctionDataItemProcessor
@Named public class AuctionDataItemProcessor extends AbstractAuctionFileProcess implements ItemProcessor { @Override public Object processItem(Object item) throws Exception { Auction auction = (Auction) item; auction.setRealm(getContext().getRealm()); auction.setAuctionFile(getContext().getFileToProcess()); return auction; } }
ItemWriter
Finally we just need to write the data down to a database:
AuctionDataItemWriter
@Named public class AuctionDataItemWriter extends AbstractItemWriter { @PersistenceContext protected EntityManager em; @Override public void writeItems(List<Object> items) throws Exception { items.forEach(em::persist); } }
The entire process with a file of 70 k record takes around 20 seconds on my machine. I did notice something very interesting. Before this code, I was using an injected EJB that called a method with the persist operation. This was taking 30 seconds in total, so injecting the EntityManager and performing the persist directly saved me a third of the processing time. I can only speculate that the delay is due to an increase of the stack call, with EJB interceptors in the middle. This was happening in Wildfly. I will investigate this further.
To define the chunk we need to add it to a process-job.xml file:
process-job.xml
<step id="processFile" next="moveFileToProcessed"> <chunk item-count="100"> <reader ref="auctionDataItemReader"/> <processor ref="auctionDataItemProcessor"/> <writer ref="auctionDataItemWriter"/> </chunk> </step>
In the item-count
property we define how many elements fit into each chunk of processing. This means that for every 100 the transaction is committed. This is useful to keep the transaction size low and to checkpoint the data. If we need to stop and then restart the operation we can do it without having to process every item again. We have to code that logic ourselves. This is not included in the sample, but I will do it in the future.
Running
To run a job we need to get a reference to a JobOperator
. The JobOperator
provides an interface to manage all aspects of job processing, including operational commands, such as start, restart, and stop, as well as job repository related commands, such as retrieval of job and step executions.
To run the previous files-job.xml
Job we execute:
Execute Job
JobOperator jobOperator = BatchRuntime.getJobOperator(); jobOperator.start("files-job", new Properties());
Note that we use the name of job xml file without the extension into the JobOperator
.
Next Steps
We still need to aggregate the data to extract metrics and display it into a web page. This post is already long, so I will describe the following steps in a future post. Anyway, the code for that part is already in the Github repo. Check the Resources section.
Resources
You can clone a full working copy from my github repository and deploy it to Wildfly. You can find instructions there to deploy it.
Reference: | Java EE 7 Batch Processing and World of Warcraft – Part 1 from our JCG partner Roberto Cortez at the Roberto Cortez Java Blog blog. |