Parallel task execution in J2EE using the Work Manager specification
Pages: 1, 2, 3, 4

Modifying the servlet to use parallel execution

Below is the code for ServletParallel, which extends AbstractServlet. Note how the lookup for the WorkManager is done in the init() method. The general structure of the code is pretty similar to ServletSequential: The doTranslation() method contains the same loop over the input list, but instead of executing Translators directly, we create an instance of WorkTranslatorWrapper and call the schedule() method of WorkManager with the instance. Invocation of the schedule() method returns immediately, and we need to preserve the resulting WorkItem (by adding it to jobs list). After all jobs are scheduled, execution will block on the call to WorkManager.waitForAll(Collection, long), which takes a collection of WorkItems for the jobs we want to wait for. The second argument specifies the timeout in milliseconds, and there are two predefined constants we could use: WorkManager.IMMEDIATE to specify that the method should return immediately (it has the same effect as passing 0), and a second constant that we use, called WorkManager.INDEFINITE, which indicates that there is no timeout and to wait until all jobs are completed.

public class ServletParallel extends AbstractServlet {
    private WorkManager workManager;
    
    public void init(ServletConfig servletConfig) throws ServletException {
        try {
            InitialContext ctx = new InitialContext();
            this.workManager = (WorkManager)ctx.lookup("java:comp/env/wm/MyWorkManager");
        } 
        catch (Exception e) {
            throw new ServletException(e);
        }
    }

    protected List doTranslation(List input) throws Exception {
        List result = new ArrayList();
        List jobs = new ArrayList();
        // create translators and schedule execution
        for (Iterator iter = input.iterator(); iter.hasNext();) {
            String source = (String) iter.next();
            Translator translator = new DummyTranslator(source, 10 * 1000);

            // schedule
             
                        
Work work = new WorkTranslatorWrapper(translator);
             
                        
WorkItem workItem = this.workManager.schedule(work);
             
                        
jobs.add(workItem);
        }
                
        logger.info("All jobs scheduled");
        // wait for all jobs to complete
         
                        
this.workManager.waitForAll(jobs, WorkManager.INDEFINITE);
                
        // extract results
        for (Iterator iter = jobs.iterator(); iter.hasNext();) {
             
                        
WorkItem workItem = (WorkItem) iter.next();
             
                        
Translator translator = (Translator) workItem.getResult();
            result.add(translator.getTranslation());
        }
        return result;
        }
}
                      

Note how results are extracted when the jobs are completed: We loop over the WorkItems collection with calls to the getResult() method, which returns instances of the corresponding job. These can be cast to Translator in our case.

This servlet, if executed with the same input parameters as ServletSequential, should finish execution significantly faster (recall that it took 50 seconds in the sequential execution case). On my computer it takes approximately 25 to 30 seconds to execute but, of course, results may vary depending on the configuration of the particular WorkManager used, the server load, and some other factors. WebLogic Server 9 also optimizes the use of threads and shares them between Work Managers. In addition, it makes sure that requests are processed fairly.

The Work Lifecycle and Lifecycle Events

Let's now look more carefully at the functionality provided by the Work Manager specification. Every instance of Work has a well-defined lifecycle. The following states are defined:

  • ACCEPTED - constant defined as WorkEvent.WORK_ACCEPTED, indicates that Work has been accepted for dispatching
  • REJECTED - constant defined as WorkEvent.WORK_REJECTED, indicates that the already accepted Work cannot be started (most likely because of some problem with the WorkManager or application server itself)
  • STARTED - constant defined as WorkEvent.WORK_STARTED, indicates that Work has started executing
  • COMPLETED - constant defined as WorkEvent.WORK_COMPLETED, indicates that Work has completed execution

Figure 2
Figure 2. State diagram for Work

You can retrieve the current status of scheduled Work at any time using the WorkItem.getStatus() method call. This is especially useful when you don't want to wait for all jobs to complete. (You can use WorkManager.waitForAny(Collection, timeout) if you are interested in any job completion, or you can call Thread.sleep(long) in a loop and verify how many jobs are completed by iterating over the WorkItem collection and checking each individual job status.)

The specification also provides means for applications to be notified when Work instances change their lifecycle status. A WorkListener can be specified when work is being scheduled. The WorkManager will call back on the WorkListener instance for various work events (for example, accepted, rejected, started, completed). Note that the WorkListener instances are always executed in the same JVM as the thread that scheduled the Work with the WorkManager. A WorkListener class could be implemented as a standalone class or as part of your Work class. Here is a simple implementation of such a listener, which just logs a message when different events occur.

public class TranslatorWorkListener implements WorkListener {
    public void workAccepted(WorkEvent workEvent) {
        logger.info("work accepted: "+workEvent.getWorkItem());
    }

    public void workRejected(WorkEvent workEvent) {
        logger.info("work rejected: "+workEvent.getWorkItem());
    }

    public void workStarted(WorkEvent workEvent) {
        logger.info("work started: "+workEvent.getWorkItem());
    }

    public void workCompleted(WorkEvent workEvent) {
        logger.info("work completed: "+workEvent.getWorkItem());
    }
}

Then doTranslation() implementation in the ServletParallel code above can be changed to pass a TranslatorListener to the WorkManager:

protected List doTranslation(List input) throws Exception {
    ...
     
                        
TranslatorWorkListener listener = new TranslatorWorkListener();
    for (Iterator iter = input.iterator(); iter.hasNext();) {
        String source = (String) iter.next();
        Translator translator = new DummyTranslator(source, 10 * 1000);

        // schedule
        Work work = new WorkTranslatorWrapper(translator);
        WorkItem workItem = this.workManager.schedule(work,  
                        
listener);
        jobs.add(workItem);
    }
                
    logger.info("All jobs scheduled");
    ...
}
                      

If you run the modified code you should see something similar to the following in your log files (or console depending on WebLogic and your logger configuration):

22:42:12 - begin
22:42:12 - work  
                        
accepted: executing: source=one, translation=null
22:42:12 - work  
                        
accepted: executing: source=two, translation=null
22:42:12 - work  
                        
accepted: executing: source=three, translation=null
22:42:12 - work  
                        
accepted: executing: source=four, translation=null
22:42:12 - work  
                        
accepted: executing: source=five, translation=null
22:42:12 - All jobs scheduled
22:42:20 - work  
                        
started: executing: source=one, translation=null
22:42:24 - work  
                        
started: executing: source=two, translation=null
22:42:28 - work  
                        
started: executing: source=three, translation=null
22:42:30 - work  
                        
started: executing: source=four, translation=null
22:42:30 - work  
                        
completed: executing: source=one, translation=one_tr
22:42:30 - work  
                        
started: executing: source=five, translation=null
22:42:34 - work  
                        
completed: executing: source=two, translation=two_tr
22:42:38 - work  
                        
completed: executing: source=three, translation=three_tr
22:42:40 - work  
                        
completed: executing: source=four, translation=four_tr
22:42:40 - work  
                        
completed: executing: source=five, translation=five_tr
22:42:40 - done in 27641
                      

There is a little trick in the TranslatorWorkListener defined above; it just prints WorkItem to the log, and because we have the toString() method overwritten in AbstractTranslator, you see all those 'source=one, translation=null' lines, which identify particular jobs. In reality, if you need to correlate WorkItem to Work objects in WorkListener, you have to pass each WorkItem to the listener and preserve it for matching with the WorkItem obtained from WorkEvent. (The getResult() call on workEvent.getWorkItem() would return null until the job reaches the COMPLETED status.) The following code illustrates this technique:

public class TranslatorWorkListener implements WorkListener {
    protected Map workMap = 
        Collections.synchronizedMap(new HashMap());
    
    public void workAccepted(WorkEvent workEvent) {
        logger.info("work accepted: "
            +getTranslator(we.getWorkItem()).getSource());
    }

    public void workRejected(WorkEvent workEvent) {
        logger.info("work rejected: "
            +removeTranslator(we.getWorkItem()).getSource());
    }

    public void workStarted(WorkEvent workEvent) {
        logger.info("work started: "
            +getTranslator(we.getWorkItem()).getSource());
    }

    public void workCompleted(WorkEvent workEvent) {
        logger.info("work completed: "
            +removeTranslator(we.WorkItem()).getSource());
    }
    
    public void  
                        
addTranslator(WorkItem wi, Translator t) {
        workMap.put(wi, t);
    }
    
    public Work  
                        
getTransaltor(WorkItem wi) {
        return (Translator)workMap.get(wi);
    }
    
    public Work  
                        
removeTranslator(WorkItem wi) {
        return (Translator)workMap.remove(wi);
    }
}
                      

To use this, the servlet code should be modified to call TranslatorWorkListener.addTranslator(WorkItem, Translator) right after the call to schedule().

Pages: 1, 2, 3, 4

Next Page ยป