wiki:net.sf.basedb.opengrid/using

Version 4 (modified by nicklas, 2 years ago) (diff)

Started to write the section about "Getting notified when a job completes"

How to use the Open Grid Scheduler package API

In this document we will try to describe the main aspects of the programmatic API that other extensions can use in order to access and use Open Grid Clusters.

Enumerating Open Grid Clusters

The OpenGridService class is typically the starting point for a lot of actions. From this class it is possible to get information about and access all cluster that has been defined in the opengrid-config.xml file. The service is a singleton instance. Use the OpenGridService.getInstance() method to get the object. Note! It is important that the service is actually running inside BASE. Check the Administrate->Services page that this is the case.

To enumerate the available Open Grid Clusters use one of the OpenGridService.getClusters() methods. This will return a collection of OpenGridCluster instances. Most methods in this class are used for getting configuration information from the opengrid-config.xml file. The OpenGridCluster.getId() method returns the internal ID of the cluster definition. It is created by combining the username, address and port of the cluster (for example, griduser@grid.example.com:22). The ID can the be used with OpenGridService.getClusterById() to directly access the same cluster later on. Other useful information can be found in the objects returned by calling OpenGridCluster.getConnectionInfo() and OpenGridCluster.getConfig(). The OpenGridCluster.asJSONObject() contains more or less the same information wrapped up as JSON data. This is useful for transferring information a web interface to allow a user to select a cluster to work with.

Java code in a servlet running on the BASE web server

DbControl dc = ... // We need an open DbControl from BASE

// Options specifying which (extra) information that we want to return
// Use JSONOptions.DEFAULT to only return the minimal information
JSONOptions options = new JSONOptions();
options.enable(JSONOption.CLUSTER_INFO);
options.enable(JSONOption.NODE_INFO);

OpenGridService service = OpenGridService.getInstance();
JSONArray jsonHosts = new JSONArray();

// Enumerates all clusters that the current user has access to
for (OpenGridCluster host : service.getClusters(dc, Include.ALL))
{
   jsonHosts.add(host.asJSONObject(options));
}

return jsonHosts; // This is what we transfer to the web client via AJAX

JavaScript code running in the web browser the current user is using

// In the web client use the JSON data to populate a <select> list
var list = document.getElementById('cluster-list');
list.length = 0;

var clusters = response; // Response contains an array with cluster information
for (var i = 0; i < clusters.length; i++)
{
   var cluster = clusters[i];
   var option = new Option(cluster.connection.name, cluster.id);
   option.cluster = cluster;
   list[list.length] = option;
}

Note that there is no need to use the OpenGridCluster.connect() method yet.

Creating a job script

In it's simplest form a job script is only a string with one or more (bash) commands to execute. For example, pwd; ls is a valid job script that will print the current directory and then list all files in it. To help creating longer and more complex scripts the ScriptBuilder class can be used. The cmd(), echo() and comment() methods are more or less self-describing. It is possible to start a command in the background with bkgr(), but not that this must be paired with a waitForProcess() otherwise the job script may finish before the commmand that is running in the background which may cause unpredictable results. The progress() method is a very useful method for jobs that are expected to take a long time to run. The method will write progress information to the {$WD}/progress file which will be picked up by the Open Grid Service and reported back to the BASE job that is acting as a proxy.

When creating a job script there are a few useful variables that has been set up:

  • {$WD}: A randomly generated subdirectory in the <job-folder> directory. The directory contains the job script which is also the current working directory when the job is started and the directory that is used for communicating data to/from the BASE server. Data in this directory is preserved after the job has finished. When running post-job code this folder can be found by calling OpenGridCluster.getWorkFolder(). Files can be downloaded to the BASE server with OpenGridSession.downloadFile(), OpenGridSession.readFile() or OpenGridSession.getJobFileAsString(). The latter method is the simplest one to use for parsing out interesting data from text result files.
  • {$TMPDIR}: A temporary working directory that is typically only available on the node the job is running on. Unless the job is started in debug mode, this directory is deleted soon after the has been completed.
  • {NSLOTS}: The number of slots that has been assigned to this job. If the job is starting a multi-threaded analysis program it is common practice to not use more threads that what this value specifies. Note that a single node may run more than one job at the same time so using nproc to determine the number of threads may cause resource issues.

In the example code below we assume that we have FASTQ files stored on a file server on the network. The FASTQ files are going to be aligned with Tophat and we have a wrapper script that sets all parameters except the number of threads and the location of the FASTQ files. After Tophat we have a second post-alignment script that does some stuff and save the result in a subdirectory.

ScriptBuilder script = new ScriptBuilder();
// We do not want to hog the network so we copy all files we need to the local cluster node
script.progress(5, "Copying data to temporary folder...");
script.cmd("cp /path/to/fastqfiles/*fastq.gz {$TMPDIR}");

// Wrapper script that calls tophat; we assume all other required parameters are set by the wrapper
script.progress(10, "Analysing FASTQ files with Tophat...");
script.cmd("tophat-wrapper.sh -p {$NSLOTS} {$TMPDIR}"); 

// Another analysis script...
script.progress(50, "Post-alignment analysis files...");
script.cmd("post-analysis.sh -p {$NSLOTS} {$TMPDIR}");

// Now we only need to copy the results back to our file server. 
// Remember that the {$TMPDIR} is cleaned automatically so we don't have to mess with that
script.progress(90, "Copying analyzed data back to file server");
script.cmd("cp {$TMPDIR}/result/* /path/to/resultfiles/");

// Finally, we copy the logfile to the job directory so that we can extract data from it to BASE
script.cmd("cp {$TMPDIR}/logfile {$WD}/logfile");

Submitting a job

When the job script has been generated it is time to submit the job to the cluster. For this, you'll need a couple of more objects. The first object is a JobConfig instance. Use this for setting various options that are related to the Open Grid qsub command. In most cases the default settings should work, but you can for example use the JobConfig.setPriority() to change the priority (-p) or JobConfig.setQsubOption() to set almost any other option. Some options are set automatically by the job submission procedure and are ignored (-S, -terse, -N, -wd, -o, -e).

You also need a BASE Job item that is an OTHER type job. It is recommended that the job is set up so that it can be identified later when notification about it's completion is sent out. Remember that during the time a job executes on the Open Grid Cluster almost anything can happen on the BASE server, including a restart. Do not rely on information that is stored in memory about jobs that has been submitted to the cluster since this information may not be there when the job completes. We recommend using one or more of Job.setName(), Job.setPluginVersion() and Job.setItemSubtype() to be able to identify the job in a reliable manner. We will explain why this is important in the Getting notified when a job completes section below.

Now it is time to create a JobDefinition object. This is basically a compilation containing the job script, the job configuration and the BASE job item. The JobDefinition is also used for uploading data files that are needed by the job. Read more about this in the Advanced usage section below.

The final step is to connect to the Open Grid Cluster and submit the job. If we assume that you know the ID of the cluster you can simply use the OpenGridService.getClusterById() method and then OpenGridCluster.connect() to create an OpenGridSession instance that is connected to the cluster. Then, use the OpenGridSession.qsub() method to submit the job. Note that this method need a List<JobDefinition> input parameter. If you have multiple jobs to submit it will be a lot quicker to submit all of them in one go instead of doing multiple calls to the OpenGridSession.qsub() method.

The OpenGridSession.qsub() method will put together the final job script, upload it to the cluster, upload other files to the cluster and then schedule the job by calling the qsub command. It will also update the BASE job item with some (important) information:

  • The Job.getServer() property is set to the ID of the Open Grid Cluster
  • The Job.getExternalId() property is set to the number assigned to the job on the cluster.
  • Signal handlers for progress reporting is set up.
  • A callback action is set up on the current DbControl that aborts the job if the transaction is not committed.
  • Later on the Job.getNode() property is set to a string that identifies the node the job is running on. Note that this is not the pure name of the node but also include some other information from the Open Grid Cluster.

The OpenGridSession.qsub() method returns a CmdResult object containing a list with JobStatus instances. You should check that the CmdResult.getExitStatus() returns 0. All other values indicate an error when submitting the jobs and your transaction should be aborted.

DbControl dc = ....     // We need an open DbControl from BASE
String clusterId = ...  // The ID of the cluster that the user selected in the web client
String jobScript = .... // See the previous example

// Use default configuration but a lower priority
JobConfig config = new JobConfig();
config.setPriority(-500);

// Create a new BASE job and set properties so that we can identify it later
Job job = Job.getNew(dc, null, null, null); // All null to create an 'OTHER' type job
job.setName("My analysis");
job.setPluginVersion("my-analysis-1.0");
// job.setItemSubtype(...); // This can also be useful
dc.saveItem(job); // Important!!!

// Create the job definition that links it all together
JobDefinition jobDef = new JobDefinition("MyAnalysis", config, job);
jobDef.setDebug(true);    // Run in debug mode while developing
jobDef.setCmd(jobScript); // Do not forget this!

// Connect to the Open Grid Cluster
OpenGridCluster cluster = OpenGridService.getInstance().getClusterById(dc, clusterId);
OpenGridSession session = cluster.connect(5);
try
{
   // Submit the job and do not forget the error handling
   CmdResult<List<JobStatus>> result = session.qsub(dc, Arrays.asList(jobDef));
   if (result.getExitStatus() != 0)
   {
       // Error handling, for example
       throw new RuntimeException(result.getStderr());
   }

   // Do not forget to commit the transaction. The job will be aborted otherwise.
   dc.commit();
}
finally
{
   // Finally, do not forget to close the connection to the Open Grid Cluster
   OpenGrid.close(session);
}

Getting notified when a job completes

One important feature is that other extensions can get notified when a job running on the cluster has ended. This is implemented in an asynchronous manner and it should not matter if the BASE server is updated or restarted or otherwise modified while a job is running. In the background there are two parts that work together to make this feature work.

  • The BASE system for requesting job progress information about external jobs has been setup to send requests to the OpenGridService whenever it want new information about a job. This is the reason why it is important to create a BASE job item as a proxy for the Open Grid Cluster jobs. Without it no progress information is requested and we never get to know when the job has ended.
  • The OpenGridService is polling each registered cluster at regular intervals. Typically once every minute but it may be more or less often depending on if there are any known jobs executing or not. The OpenGridSession.qstat() and OpenGridSession.qacct() methods are used for this and will detect waiting, running and completed jobs. For running jobs, the service will download the progress file (see ScriptBuilder.progress() above) and about the information in the BASE database.

Once a job has been detected as completed the service will invoke the job completion sequence. This is implemented as a custom extension point (net.sf.basedb.opengrid.job-complete) that will receive messages about completed jobs. Extensions that want to get notified should extend the extension point. Note that all registered extensions are notified about all jobs. It doesn't matter which extension that originally submitted the job to the cluster. Notifications are sent both for successful and failed jobs. Thus, each extension is responsible for filtering and ignoring notifications about jobs that is of no interest to them. This is why it is important to set name, plug-in version, etc. on the job when submitting it. We recommend that this filtering step is implemented in the ActionFactory that is registered for the net.sf.basedb.opengrid.job-complete extension point. Note that a single notification may handle more than one job. Thus, the prepareContext() method is called once and without any information about the jobs while the the getActions() method is called once for every job.

public class MyAnalysisJobCompletionHandlerFactory 
   implements ActionFactory<JobCompletionHandler>
{
	
   public MyAnalysisJobCompletionHandlerFactory()
   {}

   @Override
   public boolean prepareContext(InvokationContext context) 
   {
      // Always true since we do not know anything about the job(s) that have been completed
      return true;
   }

   @Override
   public JobCompletionHandler[] getActions(InvokationContext context) 
   {
      ClientContext cc = context.getClientContext();
      Job job  = (Job)cc.getCurrentItem();
		
      String pluginVersion = job.getPluginVersion();
      if (pluginVersion == null || !pluginVersion.startsWith("my-analysis"))
      {
         // This is not our job, ignore it
         return null;
      }
		
      // Note that job.getStatus() has not been updated yet so we
      // need to get the status information extracted from the cluster
      JobStatus status = (JobStatus)cc.getAttribute("job-status");
      if (status.getStatus() != Job.Status.DONE)
      {
         // We don't do anything unless the job was successful.
         return null;
      }

      JobCompletionHandler action = null;
      String jobName = job.getName();
      if (jobName.startsWith("My analysis"))
      {
         action = new MyAnalysisCompletionHandler();
      }
      else
      {
          // In the future we may have more than one type of jobs...
      }

      return action == null ? null : new JobCompletionHandler[] { new JobCompletionWrapper(action) };
   }
}

Aborting jobs

Advanced usage