Implementing the State Machine Pattern as a Stream Processor
The scenario, just for once, isn’t contrived or far fetched, but something that I had to do the other day. In this scenario I have an app that’s just down loaded a bunch of timeline tweets for an authenticated Twitter user. Having parse the XML (or JSON) and got hold of the tweets I needed to format them for display. The problem was that they’re in plain text and I needed to convert them into HTML, adding anchor tags along the way to produce something similar to the way that twitter does when it formats the same thing on your twitter home page.
Just for reference, a users Tweets can be retrieved using the Twitter API via the following URL:
<a href="https://api.twitter.com/1/statuses/user_timeline.xml?include_entities=true&include_rts=true&screen_name=BentleyMotors&count=2" target="new">https://api.twitter.com/1/statuses/user_timeline.xml?include_entities=true&include_rts=true&screen_name=BentleyMotors&count=2</a>
…where the username in this case is “BentleyMotors”. If you specify XML formatting in the URL, the a tweet is returned in the text tag and looks something like this:
Deputy PM Nick Clegg visits #Bentley today to tour Manufacturing facilities. #RegionalGrowthFund http://t.co/kX81aZmY http://t.co/Eet31cCA
…and this needed converting into something like this:
Deputy PM Nick Clegg visits <a href=\"https://twitter.com/#!/search/%23Bentley\">#Bentley</a> today to tour Manufacturing facilities. <a href=\"https://twitter.com/#!/search/%23RegionalGrowthFund\">#RegionalGrowthFund</a> <a href=\"http://t.co/kX81aZmY\">t.co/kX81aZmY</a> <a href=\"http://t.co/Eet31cCA\">t.co/Eet31cCA</a>
The big idea in solving this problem1 is to use a State Machine that reads an input stream a byte a time to find the hashtags, user names and URLS and convert them into HTML anchor tags. For example, from the complete tweet above #Bentley
becomes
<a href=\"https://twitter.com/#!/search/%23Bentley\">#Bentley</a>
and http://t.co/Eet31cCA
becomes
<a href=\"http://t.co/Eet31cCA\">t.co/Eet31cCA</a>.
The means that the code has to find every word that begins with either ‘#’ or ‘@’ or a URL that begins with ‘http://’.
The URL diagram for this State Machine looks something like this:
This implementation does differ from the GOF diagram below in that for this application I’ve separated the state from the event/action. This has the benefits of improved decoupling and that actions can be associated with multiple states.
Gathering Up Your States
The first thing to do when building any state machine is to gather together your states. In the original GOF pattern states were abstract classes; however, I prefer to use more modern enums for simplicity. The states for this state machine are:
public enum TweetState { OFF("Off - not yet running"), // RUNNING("Running - happily processing any old byte bytes"), // READY("Ready - found a space, so there's maybe soemthing to do, but that depends upon the next byte"), // HASHTAG("#HashTag has been found - process it"), // NAMETAG("@Name has been found - process it"), // HTTPCHECK("Checking for a URL starting with http://"), // URL("http:// has been found so capture the rest of the URL"); private final String description; TweetState(String description) { this.description = description; } @Override public String toString() { return "TweetState: " + description; } }
Reading the Bytes
The next thing that’s needed is a class that reads an input stream a byte at a time, gets hold of the action class that’s associated with the machine’s current state and the process the byte using the action. This is done by the StateMachine class shown below:
public class StateMachine<T extends Enum<?>> { private final byte[] inputBuffer = new byte[32768]; private T currentState; private final Map<T, AbstractAction<T>> stateActionMap = new HashMap<T, AbstractAction<T>>(); public StateMachine(T startState) { this.currentState = startState; } /** * Main method that loops around and processes the input stream */ public void processStream(InputStream in) { // Outer loop - continually refill the buffer until there's nothing // left to read try { processBuffers(in); terminate(); } catch (Exception ioe) { throw new StateMachineException("Error processing input stream: " + ioe.getMessage(), ioe); } } private void processBuffers(InputStream in) throws Exception { for (int len = in.read(inputBuffer); (len != -1); len = in .read(inputBuffer)) { // Inner loop - process the contents of the Buffer for (int i = 0; i < len; i++) { processByte(inputBuffer[i]); } } } /** * Deal with each individual byte in the buffer */ private void processByte(byte b) throws Exception { // Get the set of actions associated with this state AbstractAction<T> action = stateActionMap.get(currentState); // do the action, get the next state currentState = action.processByte(b, currentState); } /** * The buffer is empty. Make sue that we tidy up */ private void terminate() throws Exception { AbstractAction<T> action = stateActionMap.get(currentState); action.terminate(currentState); } /** * Add an action to the machine and associated state to the machine. A state * can have more than one action associated with it */ public void addAction(T state, AbstractAction<T> action) { stateActionMap.put(state, action); } /** * Remove an action from the state machine */ public void removeAction(AbstractAction<T> action) { stateActionMap.remove(action); // Remove the action - if it's there } }
The key method here is processByte(...)
/** * Deal with each individual byte in the buffer */ private void processByte(byte b) throws Exception { // Get the set of actions associated with this state AbstractAction<T> action = stateActionMap.get(currentState); // do the action, get the next state currentState = action.processByte(b, currentState); }
For every byte this method gets hold of the an action that’s associated with the current state from the stateActionMap. The action is then called and performed updating the current state ready for the next byte.
Having sorted out the states and the state machine the next step is to write the actions. At this point I follow the GOF pattern more closely by creating an AbstractAction class that processes each event with…
public abstract T processByte(byte b, T currentState) throws Exception;
This method, given the current state, processes a byte of information and uses that byte to return the next state. The full implementation of the AbstractAction is:
public abstract class AbstractAction<T extends Enum<?>> { /** * This is the next action to take - See the Chain of Responsibility Pattern */ protected final AbstractAction<T> nextAction; /** Output Stream we're using */ protected final OutputStream os; /** The output buffer */ protected final byte[] buff = new byte[1]; public AbstractAction(OutputStream os) { this(null, os); } public AbstractAction(AbstractAction<T> nextAction, OutputStream os) { this.os = os; this.nextAction = nextAction; } /** * Call the next action in the chain of responsibility * * @param b * The byte to process * @param state * The current state of the machine. */ protected void callNext(byte b, T state) throws Exception { if (nextAction != null) { nextAction.processByte(b, state); } } /** * Process a byte using this action * * @param b * The byte to process * @param currentState * The current state of the state machine * * @return The next state */ public abstract T processByte(byte b, T currentState) throws Exception; /** * Override this to ensure an action tides up after itself and returns to a * default state. This may involve processing any data that's been captured * * This method is called when the input stream terminates */ public void terminate(T currentState) throws Exception { // blank } protected void writeByte(byte b) throws IOException { buff[0] = b; // Write the data to the output directory os.write(buff); } protected void writeByte(char b) throws IOException { writeByte((byte) b); } }
Building the State Machine
So far all the code that I’ve written has been generic and can be reused time and time again 2, all of which means that the next step is to write some domain specific code. From the UML diagram above, you can see that the domain specific actions are: DefaultAction, ReadyAction and CaptureTags. Before I go on to describe what they do, you may have guessed that some I need to inject the actions in to the StateMachine and associate them with a TweetState. The JUnit code below shows how this is done…
StateMachine<TweetState> machine = new StateMachine<TweetState>(TweetState.OFF); // Add some actions to the statemachine // Add the default action machine.addAction(TweetState.OFF, new DefaultAction(bos)); machine.addAction(TweetState.RUNNING, new DefaultAction(bos)); machine.addAction(TweetState.READY, new ReadyAction(bos)); machine.addAction(TweetState.HASHTAG, new CaptureTag(bos, new HashTagStrategy())); machine.addAction(TweetState.NAMETAG, new CaptureTag(bos, new UserNameStrategy())); machine.addAction(TweetState.HTTPCHECK, new CheckHttpAction(bos)); machine.addAction(TweetState.URL, new CaptureTag(bos, new UrlStrategy()));
From the code above you can see that DefaultAction is linked the OFF and RUNNING states, the ReadyAction is linked to the READY state, the CaptureTag action is linked to the HASHTAG, NAMETAG and URL states and the HttpCheckAction is linked to the HTTPCHECK state.
You may have noticed that the CaptureTag action is linked to more that one state. This is fine because the CaptureTag employs the Strategy pattern to change its behaviour on the fly; hence I have one action with some common code that, after injecting a strategy object, can do three things.
Writing Actions
Getting back to writing actions, the first action to write is usually the DefaultAction, which is the action that’s called when nothing interesting is happening. This action happily takes input characters and puts them into the output stream, whilst looking out for certain characters or character/state combinations. The heart of the DefaultAction is the switch statement in the processByte(...) method.
public class DefaultAction extends AbstractAction<TweetState> { public DefaultAction(OutputStream os) { super(os); } /** * Process a byte using this action * * @param b * The byte to process * @param currentState * The current state of the state machine */ @Override public TweetState processByte(byte b, TweetState currentState) throws Exception { TweetState retVal = TweetState.RUNNING; // Switch state if a ' ' char if (isSpace(b)) { retVal = TweetState.READY; writeByte(b); } else if (isHashAtStart(b, currentState)) { retVal = TweetState.HASHTAG; } else if (isNameAtStart(b, currentState)) { retVal = TweetState.NAMETAG; } else if (isUrlAtStart(b, currentState)) { retVal = TweetState.HTTPCHECK; } else { writeByte(b); } return retVal; } private boolean isSpace(byte b) { return b == ' '; } private boolean isHashAtStart(byte b, TweetState currentState) { return (currentState == TweetState.OFF) && (b == '#'); } private boolean isNameAtStart(byte b, TweetState currentState) { return (currentState == TweetState.OFF) && (b == '@'); } private boolean isUrlAtStart(byte b, TweetState currentState) { return (currentState == TweetState.OFF) && (b == 'h'); } }
From the code above you can see that the central switch statement is checking each byte. If the byte is a space, then the next byte maybe a special character: ‘#’ for the start of a hashtag, ‘@’ for the start of a name tag and ‘h’ for the start of a URL; hence, if a space is found then the DefaultAction returns the READY state as there may be more work to do. If a space isn’t found then it returns a RUNNING state which tells StateMachine to call the DefaultAction when the next byte is read.
The DefaultAction also checks for special characters at the start of a line as the first character of a tweet maybe a ‘#’, ‘@’ or ‘h’.
Control has now been passed back to the StateMachine object, which reads the next byte from the input stream. As the state is now READY, the next call to processByte(...) retrieves the ReadyAction.
@Override public TweetState processByte(byte b, TweetState currentState) throws Exception { TweetState retVal = TweetState.RUNNING; switch (b) { case '#': retVal = TweetState.HASHTAG; break; case '@': retVal = TweetState.NAMETAG; break; case 'h': retVal = TweetState.HTTPCHECK; break; default: super.writeByte(b); break; } return retVal; }
From ReadyAction’s switch statement you can see that its responsibility is to confirm that the code has found a hashtag, name or URL by checking for a ‘#’, ‘@’ and ‘h’ respectively. If it finds one then it returns one of the following states: HASHTAG, NAMETAG or HTTPCHECK to the StateMachine
Assuming that the ReadyAction found a ‘#’ character and returned a HASHTAG state, then StateMachine, when it reads the next byte, will pull the CaptureTag class with the injected HashTagStrategy class from the stateActionMap
public class CaptureTag extends AbstractAction<TweetState> { private final ByteArrayOutputStream tagStream; private final byte[] buf; private final OutputStrategy output; private boolean terminating; public CaptureTag(OutputStream os, OutputStrategy output) { super(os); tagStream = new ByteArrayOutputStream(); buf = new byte[1]; this.output = output; } /** * Process a byte using this action * @param b * The byte to process * @param currentState * The current state of the state machine */ @Override public TweetState processByte(byte b, TweetState currentState) throws Exception { TweetState retVal = currentState; if (b == ' ') { retVal = TweetState.READY; // fix 1 output.build(tagStream.toString(), os); if (!terminating) { super.writeByte(' '); } reset(); } else { buf[0] = b; tagStream.write(buf); } return retVal; } /** * Reset the object ready for processing */ public void reset() { terminating = false; tagStream.reset(); } @Override public void terminate(TweetState state) throws Exception { terminating = true; processByte((byte) ' ', state); } }
The idea behind the CaptureTag code is that it captures characters adding them to a ByteArrayOutputStream until it detects a space or the input buffer is empty. When a space is detected, the CaptureTag call its OutputStrategy interface, which in this case is implemented by HashTagStrategy.
public class HashTagStrategy implements OutputStrategy { /** * @see state_machine.tweettohtml.OutputStrategy#build(java.lang.String, * java.io.OutputStream) */ @Override public void build(String tag, OutputStream os) throws IOException { String url = "<a href=\"https://twitter.com/#!/search/%23" + tag + "\">#" + tag + "</a>"; os.write(url.getBytes()); } }
The HashTagStrategy builds a hashtag search URL and writes it to the output stream. Once the URL has been written to the stream, the CaptureTag returns a state of READY – as a space has been detected and returns control to the StateMachine.
The StateMachine reads the next byte and so the process continues.
Processing a hashtag is only one of several possible scenarios that this code can handle and in demonstrating this scenario I’ve tried to demonstrate how a state machine can be used to process an input stream a byte at a time in order to realize some predefined solution. If you’re interested in how the other scenarios are handled take a look at the source code on github
In Summary
In summary, this isn’t a technique that you’d want to use on a regular basis; it’s complex, pretty hard to implement and prone to error, plus there’s usually a simpler way of parsing incoming data. However, there are those odd few times when it is useful, when, despite its complexity, it is a good solution, so I’d recommend keeping it in your metaphorical toolbox and saving it for a rainy day.
1There are several ways of solving this puzzle some of which may be simpler and less complex than State Machine
2This version of StateMachine was written in 2006 to process XML. In this scenario the code had to unzip some Base 64 XML fields and as the pattern was re-usable I just dug it out of my toolbox of code samples for the Tweet to HTML case.
3The complete project is available on github …
Reference: Implementing the State Machine Pattern as a Stream Processor from our JCG partner Roger Hughes at the Captain Debug’s Blog blog.
Nice article. I found it a bit strange the way you approached the StateMachine pattern implementation. As most of the patterns, you have to know when is appropriate to use a complex pattern to solve a problem. From my experience I would say that the StateMachine concept is quite powerful if implemented correctly as it forces the division of the flow into isolated stationary states in which you have limited inputs (events) you should care about. We use StateMachine -s for critical asynchronous components (mostly mediators) and found them quite useful and bulletproof (if written correctly). The biggest challenge in writing a… Read more »
Wait. So essentially what you need is a parser for tweets, right? While nobody would disagree that parsers indeed use state machines under the hood, the very approach of building a parser by manually implementing its state machine is questionable. I mean, there’s nothing wrong with it if you’re doing it for your CS class toy project but for production code there are far more advanced tools available. For example, take antlr, define your tweets grammar (I bet it won’t be longer than half a page) and get a parser to turn those tweets into Abstract Syntax Tree; then plug… Read more »