Enterprise Java

Google AppEngine: Task Queues API

Task Queues

com.google.appengine.api.taskqueue

With Task Queues a user can initiate a request to have applications perform work outside of this request; they are a powerful tool for background work.
Furthermore, you can organize work into small, discrete units (tasks). The application then inserts these tasks into one or more queues based on the queue’s configuration and processes them in FIFO order. Here’s a diagram I took from a Google IO presentation which illustrates at a high level task insertion into the queue:

Queue Configuration

1. Push Queues (default):

Push queue will process tasks based on the processing rate configured in the queue definition (see below). App Engine automatically manages the lifetime of these queues (creation, deletion, etc) and adjusts the processing capacity to match your configuration and processing volume. These can only be used within App Engine (internal to your app).

 2. Pull Queues:

Allow a task consumer to lease tasks at a specific time within a specific timeframe. They are accessible internally as well as externally through the Task Queue REST API. In this scenarion, however, GAE does not manage the lifecycle and processing rate of queues automatically, it is up to the developer to do it. A backend also has access to these queues.

Tasks
 

They represent a unit of work performed by application. T asks are are idempotent, i.e they are unique in a queue and according to Google documentation cannot be invoked more than once simultaneously (unless some weird internal error condition happens).
Instances of TaskOptions class, tasks consist of URL and a payload which can be a simple string, a binary object (byte[ ]), or an instance of a DeferredTask. A DeferredTask is basically a Runnable. This allows you to chain tasks together. Our team had to do this in order to simulate long runnings tasks when GAE’s max execution limit was 30 seconds. Presently, a task must finish executing and send an HTTP response value between 200–299 within 10 minutes of the original request. This deadline is separate from user requests, which have a 60-second deadline.
Furthermore, t asks use token buckets to control the rate of task execution. Each time task is i nvoked, a token is used. This leasing model (acquire a token) is typically of brokering systems or message-passing systems and it allows users to control the rate of execution of these tasks (see below on configuring queues).
Lastly, a very important feature of the Task Queue API is that it has automatic retries of tasks. You can configure this with the RetriesOptions parameter when creating the TaskOptions object.

Task within a Transaction

Tasks can be enqueued as part of a datastore transaction. Insertion (not execution) will be guaranteed if the transaction was committed successfully. The only caveat is that Transactional tasks cannot have user-defined names and there is a maximum of 5 insertions into task queues in a single transaction.

Configuration
 

Queues are configured via queue.xml. If omitted, default queue with default configuration is used. Since Pull Queues are for more advanced needs, they must be specifically configured (there is no default pull queue).
An application’s, queue configuration applies to all versions of the app. You can override this behavior for push queues using the target parameter in queue.xml. This is used in case you want different versions of your app (different sites) with different queue processing configuration.

Here are some of things you are allowed to configure (the documentation is more extensive):

bucket-size: how fast the queue is processed when many tasks are in the queue and the rate is high (push only). (Warning: Development server ignores this value)

max-concurrent-requests: maximum number of tasks that can be executed at any \ given time in the specified queue (push only).

mode: whether it’s push or pull.

name: queue name

rate: How often tasks are processed on this queue (s=seconds, m=minutes, h=hours, d=days). If 0, queue is considered paused. (Warning: Development server ignores this value)

target: target a task to a specfic backend or application version.

<queue-entries> 
<!--Set the number of max concurrent requests to 10-->   
  <queue>     
     <name>optimize-queue</name>                 
     <rate>20/s</rate>   
     <bucket-size>40</bucket-size>       
     <max-concurrent-requests>10</max-concurrent-requests>     
  </queue> 
</queue-entries> 

Sample Code
 

This is a very straightforward example. As I said before, task queues are basically a URL handler. In this servlet, the GET will handle enqueueing a task. The task will POST to this same servlet and execute the doPost( ) method carrying out the task. In this case, it’s just a simple counter. Notice the counter is a volatile property. If you access this servlet as GET request, it will enqueue another task. So, you will see the counter being incremented by both tasks.
public class TaskQInfo extends HttpServlet {
                     
   private static volatile int TASK_COUNTER = 0;
                     

   // Executed by user menu click
   public void doGet(HttpServletRequest req, HttpServletResponse resp)
        throws IOException {
                      
        // Build a task using the TaskOptions Builder pattern from ** above
        Queue queue = QueueFactory.getDefaultQueue();
        queue.add(withUrl("/taskq_demo").method(TaskOptions.Method.POST)); 
                     
        resp.getWriter().println("Task have been added to default queue...");
                     
        resp.getWriter().println("Refresh this page to add another count task");
   }
                     
   // Executed by TaskQueue
   @Override
   protected void doPost(HttpServletRequest req, HttpServletResponse resp)
       throws ServletException, IOException {
                     
       // This is the body of the task
       for(int i = 0; i < 1000; i++) {
             log.info("Processing: " + req.getHeader("X-AppEngine-TaskName") + "-" +           
                     TASK_COUNTER++); 
                     
             try { 
                // Sleep for a second (if the rate is set to 1/s this will allow at 
                // most 1 more task to be processed)
                     
                Thread.sleep(1000); 
             } catch (InterruptedException e) { // ignore}
       }
   }
}

Task queues allow you to achieve some level of concurrency in your application by invoking background processes on demand. For very lengthy tasks, you might want to take a look at App Engine backends, which are basically special App Engine instances with no request time limit.

Reference: Google AppEngine: Task Queues API from our JCG partner Luis Atencio at the Reflective Thought blog.

Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button