org.galagosearch.tupleflow.execution
Class JobExecutor

java.lang.Object
  extended by org.galagosearch.tupleflow.execution.JobExecutor

public class JobExecutor
extends java.lang.Object

This class is responsible for executing TupleFlow jobs.

A job is specified using the TupleFlow Job class. The Job class has an XML form which can be parsed by JobConstructor, but you can create one programmatically as well.

Before the job is executed, it is verified. JobExecutor verifies that all the classes references by the Job object actually exist, and that the connections point sensible places. Once it has been verified, the JobExecutor builds an execution plan that will execute the job with as much parallelism as possible while not violate any ordering constraints dictated by stage connections. After the plan is generated, the job is sent to a StageExecutor for the low-level details of execution.

TupleFlow has many different kinds of StageExecutors you can use. To get started and to debug your code, use the LocalStageExecutor or ThreadedExecutor. To harness more parallelism, use the SSHStageExecutor or the DRMAAStageExecutor.

Author:
trevor

Nested Class Summary
static class JobExecutor.JobExecutionStatus
           
 
Constructor Summary
JobExecutor(Job job, java.lang.String temporaryStorage, ErrorStore store)
           
 
Method Summary
 void addMergeStage(Job job, java.lang.String stageName, java.lang.String pointName)
          Add a merge stage to this job that merges the output of stageName called pointName.
 void addMergeStages(Job job)
          Find stages that need to open a lot of files for reading when running, and add some intermediate merge stages to reduce problems with open files.
 void clear()
           
 void findDanglingEndpoints(Job job)
          In the parameter file, each stage has a connections section that describes a set of connection endpoints for the stage (inputs and outputs).
static void main(java.lang.String[] args)
           
 boolean needsMergeStages()
          Checks to see if any stage has too many inputs.
static Job optimize(Job job)
          This method tries to combine stages together to reduce overhead.
static Job parseFile(java.lang.String filename, ErrorStore store)
          Parses the XML text in the file specified by the filename parameter into a Job.
static Job parseText(java.lang.String text, ErrorStore store)
          Parses the XML text in the text parameter into a Job.
 void prepare()
           
static void renameConnections(Job job, Stage source, Stage destination)
           
 void run(StageExecutor executor)
           
static boolean runLocally(Job job, ErrorStore store)
           
 void runWithServer(StageExecutor executor, org.mortbay.jetty.Server server)
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

JobExecutor

public JobExecutor(Job job,
                   java.lang.String temporaryStorage,
                   ErrorStore store)
Method Detail

parseFile

public static Job parseFile(java.lang.String filename,
                            ErrorStore store)
                     throws org.xml.sax.SAXException,
                            javax.xml.parsers.ParserConfigurationException,
                            java.io.IOException
Parses the XML text in the file specified by the filename parameter into a Job.

Parameters:
filename - A path to the XML file to parse.
store - An ErrorStore object where all error information should be sent.
Returns:
A job instance based on the XML description in the text parameter.
Throws:
org.xml.sax.SAXException
javax.xml.parsers.ParserConfigurationException
java.io.IOException

parseText

public static Job parseText(java.lang.String text,
                            ErrorStore store)
                     throws org.xml.sax.SAXException,
                            javax.xml.parsers.ParserConfigurationException,
                            java.io.IOException
Parses the XML text in the text parameter into a Job.

Parameters:
text - XML text to parse into a Job.
store - An ErrorStore object where all error information should be sent.
Returns:
A job instance based on the XML description in the text parameter.
Throws:
org.xml.sax.SAXException
javax.xml.parsers.ParserConfigurationException
java.io.IOException

optimize

public static Job optimize(Job job)
This method tries to combine stages together to reduce overhead. In particular, this method looks for two stages, A and B, where each copy of B takes input from only one copy of stage A. In this case, all the steps from B are moved into stage A, saving a lot of file overhead in transferring tuples from A to B. This method is particularly useful for jobs that are created automatically.

Parameters:
job - The job instance to optimize.
Returns:
A new job instance, perhaps with fewer stages.

renameConnections

public static void renameConnections(Job job,
                                     Stage source,
                                     Stage destination)

prepare

public void prepare()

clear

public void clear()

needsMergeStages

public boolean needsMergeStages()
Checks to see if any stage has too many inputs.

Returns:
true, if there is a stage with more than maximumFileInputs.

addMergeStage

public void addMergeStage(Job job,
                          java.lang.String stageName,
                          java.lang.String pointName)
Add a merge stage to this job that merges the output of stageName called pointName.

Parameters:
job - The job that should get the new merge stage.
stageName - The stage that contains the output that needs merging.
pointName - The output point that needs merging in the stage stageName.

addMergeStages

public void addMergeStages(Job job)
Find stages that need to open a lot of files for reading when running, and add some intermediate merge stages to reduce problems with open files.

Parameters:
job -

findDanglingEndpoints

public void findDanglingEndpoints(Job job)
In the parameter file, each stage has a connections section that describes a set of connection endpoints for the stage (inputs and outputs). This method verifies that all of those endpoints are connected to valid connections, defined under the connections tag in the job. If the method finds an dangling (unconnected) endpoint, an error message is added to the ErrorStore.


run

public void run(StageExecutor executor)
         throws java.lang.InterruptedException,
                java.util.concurrent.ExecutionException,
                java.net.UnknownHostException,
                java.io.IOException
Throws:
java.lang.InterruptedException
java.util.concurrent.ExecutionException
java.net.UnknownHostException
java.io.IOException

runWithServer

public void runWithServer(StageExecutor executor,
                          org.mortbay.jetty.Server server)
                   throws java.util.concurrent.ExecutionException,
                          java.lang.InterruptedException,
                          java.net.UnknownHostException
Throws:
java.util.concurrent.ExecutionException
java.lang.InterruptedException
java.net.UnknownHostException

runLocally

public static boolean runLocally(Job job,
                                 ErrorStore store)
                          throws java.io.IOException,
                                 java.lang.InterruptedException,
                                 java.util.concurrent.ExecutionException,
                                 java.lang.Exception
Throws:
java.io.IOException
java.lang.InterruptedException
java.util.concurrent.ExecutionException
java.lang.Exception

main

public static void main(java.lang.String[] args)
                 throws javax.xml.parsers.ParserConfigurationException,
                        org.xml.sax.SAXException,
                        java.io.IOException,
                        java.lang.InterruptedException,
                        java.util.concurrent.ExecutionException
Throws:
javax.xml.parsers.ParserConfigurationException
org.xml.sax.SAXException
java.io.IOException
java.lang.InterruptedException
java.util.concurrent.ExecutionException


Copyright © 2009. All Rights Reserved.