wiki:net.sf.basedb.opengrid/using

How to use the Job scheduler package API

In this tutorial we will try to describe the main aspects of the programmatic API that other extensions can use in order to access and use Open Grid and Slurm clusters.

  1. The use case
  2. Enumerating clusters
  3. Creating a job script
  4. Submitting a job
  5. Getting notified when a job completes
  6. Aborting jobs
  7. Advanced usage
    1. Uploading data files as part of a JobDefinition
    2. Connecting to non-cluster servers
    3. Tracking non-cluster jobs
    4. Reacting to configuration changes

See also the Javadoc API documentation.

The use case

The use case in this tutorial is that we are going to implement an extension that wants to display a web page inside BASE for starting a job on a cluster. We focus on the Job scheduler part of this and care less about how the extension interacts with BASE. You may want to read more about this in the BASE documentation. We assume that you have already implemented an extension that can display a web page that allows the user to specify input parameters for the job. On this web page you also want the user to be able to select a cluster that the job should be submitted to. The list below is an overview of the main steps that are needed.

  1. Display a selection list with available clusters
  2. Submit the job parameters and other information to a servlet
  3. The servlet creates a job script and submit it to the cluster
  4. Get a notification once that job has been completed, so that we can import files and other data back into BASE

Note! The API has a lot of classes and methods that have OpenGrid in the name. This doesn't mean that the API is only usable for Open Grid clusters. The same API can also be used for Slurm clusters. The naming is only for historical reasons and in order to maintain backwards compatibility.

Enumerating clusters

In this step we want to display a selection list on the web page that allows the user to select a pre-defined cluster. We recommend that the list is populated by JavaScript code that uses AJAX, a Servlet and JSON to retreive the information. You need to implement everything on the browser side and most of the servlet in your own extension package.

The OpenGridService class is typically the starting point for a lot of actions. From this class it is possible to get information about and access all clusters that have been defined in the opengrid-config.xml file. The service is a singleton, use the OpenGridService.getInstance() method to get the instance. Note! It is important that the service is actually running inside BASE. Check the Administrate->Services page that this is the case, otherwise you will get an empty service.

To enumerate the available clusters use one of the OpenGridService.getClusters() methods. It is possible to return all configured cluster or only a subset defined by a filter. All methods will return a collection of OpenGridCluster instances. Most methods in this class are used for getting configuration information from the opengrid-config.xml file. The OpenGridCluster.getId() method returns the internal ID of the cluster definition. It is created by combining the username, address and port of the cluster (for example, griduser@grid.example.com:22). The ID can then be used with OpenGridService.getClusterById() to directly access the same cluster later on. Other useful information can be found in the objects returned by calling OpenGridCluster.getConnectionInfo() and OpenGridCluster.getConfig(). The OpenGridCluster.asJSONObject() contains more or less the same information wrapped up as JSON data. This is useful for transferring information a web interface to allow a user to select a cluster to work with.

Java code in a servlet running on the BASE web server

DbControl dc = ... // We need an open DbControl from BASE

// Options specifying which information that we want to return
// Use JSONOptions.DEFAULT to only return the minimal information
JSONOptions options = JSONOptions.DEFAULT;

OpenGridService service = OpenGridService.getInstance();
JSONArray jsonHosts = new JSONArray();

// Enumerates all clusters that the current user has access to (no filtering)
for (OpenGridCluster host : service.getClusters(dc, Include.ALL))
{
   jsonHosts.add(host.asJSONObject(options));
}

return jsonHosts; // This is what we transfer to the web client via AJAX

JavaScript code running in the web browser the current user is using

// In the web client use the JSON data to populate a <select> list
var list = document.getElementById('cluster-list');
list.length = 0;

// The 'response' contains an array with cluster information
var clusters = response; 
for (var i = 0; i < clusters.length; i++)
{
   var cluster = clusters[i];
   var option = new Option(cluster.connection.name + ' ('+cluster.config.type+')', cluster.id);
   option.cluster = cluster;
   list[list.length] = option;
}

Note that there is no need to use the OpenGridCluster.connect() method yet.

Some of the OpenGridService.getClusters() methods can take a filter parameter. This makes it possible to only list clusters with some specific properties. Predefined filter implementations can be found in the net.sf.basedb.opengrid.filter package:

  • ClusterTypeFilter: Filter on type of cluster (eg. Slurm or Open Grid)
  • IsConnectedFilter: Only return clusters that we can connect to
  • UsernameFilter: Only return clusters that we connect to with a given username

Creating a job script

In it's simplest form a job script is only a string with one or more (bash) commands to execute. For example, pwd; ls is a valid job script that prints the current directory and then lists all files in it. To help you create longer and more complex scripts the ScriptBuilder class can be used. The cmd(), echo() and comment() methods are more or less self-describing. It is possible to start a command in the background with bkgr(), but note that this must be paired with a waitForProcess() otherwise the job script may finish before the commmand that is running in the background which may cause unpredictable results. The ScriptBuilder.progress() method is a very useful method for jobs that are expected to take a long time to run. The method writes progress information to the ${WD}/progress file. This information is picked up by the Job scheduler service and reported back to the BASE job that is acting as a proxy.

When creating a job script you may find the following variables useful:

  • ${WD}: A randomly generated subdirectory in the <job-folder> directory. The directory contains the job script and other data for the current job. This is also the current working directory when the job is started and the directory that is used for communicating data to/from the BASE server. Data in this directory is preserved after the job has finished. After a job has finished, this folder can be found by calling OpenGridCluster.getWorkFolder(). Files can be transferred to the BASE server with OpenGridSession.downloadFile(), OpenGridSession.readFile() or OpenGridSession.getJobFileAsString(). The latter method is the simplest one to use for parsing out interesting data from text result files.
  • ${TMPDIR}: A temporary working directory that is typically only available on the node the job is running on. Unless the job is started in debug mode, this directory is deleted soon after the job has finished.
  • ${NSLOTS}: The number of slots that has been assigned to this job. If the job is starting a multi-threaded analysis program it is common practice to not use more threads than what this value specifies. Note that a single node may run more than one job at the same time and that one slot typically corresponds to one cpu core.

Note! NSLOTS is a variable that is set by the Open Grid software. It also sets a lot of other variables that can be used by the job script. Slurm has a different set of variables. For backwards compatibility reasons when running on a Slurm cluster, a wrapper script will set NSLOTS=SLURM_JOB_CPUS_PER_NODE.

In the code example below we assume that we have FASTQ files stored on a file server on the network. We want to align the FASTQ files with Tophat and we have a wrapper script that sets most of the parameters. We only need to provide the number of threads and the location of the FASTQ files. After Tophat we have a second post-alignment script that does some stuff and save the result in a subdirectory (${TMPDIR}/result).

ScriptBuilder script = new ScriptBuilder();
// Copy all files we need to the local cluster node
script.progress(5, "Copying data to temporary folder...");
script.cmd("cp /path/to/fastqfiles/*fastq.gz ${TMPDIR}");

// Wrapper script that calls tophat
// We assume all other required parameters are set by the wrapper
script.progress(10, "Analysing FASTQ files with Tophat...");
script.cmd("tophat-wrapper.sh -p ${NSLOTS} ${TMPDIR}"); 

// Another analysis script...
script.progress(50, "Post-alignment analysis files...");
script.cmd("post-analysis.sh -p ${NSLOTS} ${TMPDIR}");

// Now we only need to copy the results back to our file server. 
// Remember that the ${TMPDIR} is cleaned automatically so we can 
// leave that as it is
script.progress(90, "Copying analyzed data back to file server");
script.cmd("cp ${TMPDIR}/result/* /path/to/resultfiles/");

// Finally, we copy the logfile to the job directory so that 
// we can extract data from it to BASE
script.cmd("cp ${TMPDIR}/logfile ${WD}/logfile");

Submitting a job

When the job script has been generated it is time to submit the job to the cluster. For this, you need a couple of more objects. The first object is a JobConfig instance. Use this for setting various options that are related to the Open Grid qsub command or to the Slurm sbatch command. In most cases the default settings should work, but you can for example use the JobConfig.setPriority()/JobConfig.setSlurmNice() to change the priority of the job or JobConfig.setQsubOption()/JobConfig.setSbatchOption() to set almost any other option.

Note! Options for Open Grid are very different from options for Slurm. In most cases, it is the responsibility of the submitting code to detect and handle differences between the two system. There are two exceptions that are converted automatically:

  • The priority of the job is automatically converted between the Open Grid and Slurm system.
  • Number of slots/cpus to use. The 'pe' parameter in Open Grid is automatically converted to the 'cpus-per-task' Slurm parameter.

Read more about this in the javadoc for the JobConfig class. Some options are set automatically by the job submission procedure and are ignored:

  • In Open Grid: -S, -terse, -N, -wd, -o, -e
  • In Slurm: parsable, job-name, J, chdir, D, outout, o, error, e

You also need a BASE Job item that is an OTHER type job. It is recommended that the job is set up so that it can easily be identified later when notification about it's completion is sent out. Remember that during the time a job executes on a cluster almost anything can happen on the BASE server, including a restart. Do not rely on information that is stored in memory about jobs that has been submitted to the cluster since this information may not be there when the job completes. We recommend using one or more of Job.setName(), Job.setPluginVersion() and Job.setItemSubtype() to be able to identify the job in a reliable manner. We will explain why this is important in the Getting notified when a job completes section below.

The last object you need is a JobDefinition object. This is basically a compilation containing the job script, the job configuration and the BASE job item. The JobDefinition is also used for uploading data files that are needed by the job. Read more about this in the Advanced usage section below.

The final step is to connect to the cluster and submit the job. If we assume that you know the ID of the cluster you can simply use the OpenGridService.getClusterById() method and then OpenGridCluster.connect() to create an OpenGridSession instance that is connected to the cluster. Then, use the OpenGridSession.qsub() method to submit the job. Note that this method need a List<JobDefinition> input parameter. If you have multiple jobs to submit it will be a lot quicker to submit all of them in one go instead of doing multiple calls to the OpenGridSession.qsub() method.

The OpenGridSession.qsub() method will put together the final job script, upload it to the cluster, upload other files to the cluster and then schedule the job by calling the qsub command (Open Grid) or the sbatch command (Slurm). It will also update the BASE job item with some (important) information:

  • The Job.getServer() property is set to the ID of the cluster.
  • The Job.getExternalId() property is set to the number assigned to the job on the cluster.
  • Signal handlers for progress reporting is set up.
  • A callback action is set up on the current DbControl that aborts the job if the transaction is not committed.
  • Later on, the Job.getNode() property is set to a string that identifies the node the job is running on. Note that this is not the pure name of the node but also include some other information from the Open Grid Cluster.

The OpenGridSession.qsub() method returns a CmdResult object containing a list with JobStatus instances. You should check that the CmdResult.getExitStatus() returns 0. All other values indicate an error when submitting the jobs and your transaction should be aborted.

DbControl dc = ....     // We need an open DbControl from BASE
String clusterId = ...  // The ID of the cluster selected by the user
String jobScript = .... // See the previous example

// Use default configuration but a lower priority
JobConfig config = new JobConfig();
config.setPriority(-500); // Or config.setSlurmNice(500)

// Create a new BASE job and set properties so that we can identify 
// it later. The 'null' parameters creates an 'OTHER' type job.
Job job = Job.getNew(dc, null, null, null);
job.setName("My analysis");
job.setPluginVersion("my-analysis-1.0");
// job.setItemSubtype(...); // This can also be useful
dc.saveItem(job); // Important!!!

// Create the job definition that links it all together
JobDefinition jobDef = new JobDefinition("MyAnalysis", config, job);
jobDef.setDebug(true);    // Run in debug mode while developing
jobDef.setCmd(jobScript); // Do not forget this!

// Connect to the cluster
OpenGridService service = OpenGridService.getInstance();
OpenGridCluster cluster = service.getClusterById(dc, clusterId);
OpenGridSession session = cluster.connect(5);
try
{
   // Submit the job. Do not forget the error handling!
   CmdResult<List<JobStatus>> result = 
      session.qsub(dc, Arrays.asList(jobDef));
   result.throwExceptionIfNonZeroExitStatus();

   // Do not forget to commit the transaction. 
   // The job will be aborted otherwise.
   dc.commit();
}
finally
{
   // Finally, do not forget to close the DbControl and
   // the connection to the cluster
   OpenGrid.close(session);
   if (dc != null) dc.close();
}

Getting notified when a job completes

One important feature is that extensions can get notified when a job running on the cluster has ended. This is implemented in an asynchronous manner and it should not matter if the BASE server is updated or restarted or otherwise modified while a job is running. In the background there are three parts that work together to make this feature work.

  • The BASE system for requesting job progress information about external jobs has been setup to send requests to the OpenGridService whenever it want new information about a job. This is the reason why it is important to create a BASE job item as a proxy for the all jobs submitted to a cluster. Without it, no progress information is requested and we never get to know when the job has ended.
  • The OpenGridService is polling each registered cluster at regular intervals. Typically once every minute but it may be more or less often depending on if there are any known jobs executing or not. The OpenGridSession.qstat() and OpenGridSession.qacct() methods are used for this and will detect waiting, running and completed jobs. For running jobs, the service downloads the progress file (see ScriptBuilder.progress() above) and update the progress information in the BASE database.
  • Once a job has been detected as completed the service will initiate the job completion sequence. This is implemented as a custom extension point (net.sf.basedb.opengrid.job-complete) that receive messages about completed jobs. Extensions that want to get notified should extend this extension point. Note that all registered extensions are notified about all jobs! It doesn't matter which extension that originally submitted the job to the cluster. Notifications are sent both for successful and failed jobs. Each extension is responsible for filtering and ignoring notifications about jobs that is of no interest to them. This is why it is important to set name, plug-in version, etc. on the job when submitting it. We recommend that this filtering step is done in the ActionFactory implementation that is registered for the net.sf.basedb.opengrid.job-complete extension point. Note that a single notification may handle more than one completed job. Thus, the prepareContext() method is called once and without any information about the jobs while the the getActions() method is called once for every job.
public class MyAnalysisJobCompletionHandlerFactory 
   implements ActionFactory<JobCompletionHandler>
{
  
   public MyAnalysisJobCompletionHandlerFactory()
   {}

   @Override
   public boolean prepareContext(InvokationContext context) 
   {
      // Always true since we do not know anything 
      // about the job(s) that have been completed
      return true;
   }

   @Override
   public JobCompletionHandler[] getActions(InvokationContext context) 
   {
      ClientContext cc = context.getClientContext();
      Job job  = (Job)cc.getCurrentItem();
    
      String pluginVersion = job.getPluginVersion();
      if (pluginVersion == null || !pluginVersion.startsWith("my-analysis"))
      {
         // This is not our job, ignore it
         return null;
      }
    
      // Note that Job item has not been updated yet so we
      // need to get the status information extracted from the cluster
      JobStatus status = (JobStatus)cc.getAttribute("job-status");
      if (status.getStatus() != Job.Status.DONE)
      {
         // We don't do anything unless the job was successful.
         return null;
      }

      JobCompletionHandler action = null;
      String jobName = job.getName();
      if (jobName.startsWith("My analysis"))
      {
         action = new MyAnalysisCompletionHandler();
      }
      else
      {
          // In the future we may have more than one type of jobs...
      }

      return action == null ? null : new JobCompletionHandler[] { action };
   }
}

The ActionFactory.getActions() implementation should not do anything except check if the job is of interest or not. It should return null if it is not interested in the job, and an instance implementing the JobCompletionHandler interface otherwise. This interface defines a single method: JobCompletionHandler.jobCompleted(SessionControl, OpenGridSession, Job, JobStaus). The Job and JobStatus objects are the same as in the ActionFactory, but in this method you also get access to a SessionControl instance and a connected OpenGridSession to the cluster the job was running on. The OpenGridSession can for example be used to download and parse result files. The SessionControl can be used to access BASE and update items and/or annotations. The good thing about the SessionControl is that it has been automatically configured so that the owner of the job is already logged in and a project (if any is specified on the job) is set as the active project (in the ActionFactory the session control is a generic one with the root user logged in).

Do not update the Job item since this may interfere with the updates to the job that are automatically made by the Job Scheduler extension. The method may return a string to set the status message of the job, or throw an exception to set the job status to ERROR.

public class MyAnalysisCompletionHandler
   implements JobCompletionHandler
{
    
   public MyAnalysisCompletionHandler()
   {}
  
   @Override
   public String jobCompleted(SessionControl sc, OpenGridSession session, 
      Job job, JobStatus status)
   {
      String jobName = status.getName();
      String logfile = session.getJobFileAsString(jobName, "logfile", "UTF-8");
 
      DbControl dc = sc.newDbControl();
      try
      {
         String msg = parseLogfile(dc, logfile);
         dc.commit();
         return msg;
      }
      finally
      {
         if (dc != null) dc.close();
      }
   }
}

Aborting jobs

This is automatically handled by the Job Scheduler extension by the same mechanism that is used for progress reporting. The abort is handled by calling the OpenGridSession.qdel() method. After that the job is handled just as if any other error had occurred, eg. the job completion sequence is initiated. Extensions that are interested in manually aborted jobs should check for JobStatus.getStatus() == Job.Status.ERROR and JobStatus.getExitCode() == 137, which indicates that the job was aborted by the user.

Advanced usage

Uploading data files as part of a JobDefinition

The JobDefinition that is used for submitting a job to a cluster has the ability to upload files that are needed for the job. This is done by calling the JobDefinition.addFile() method with an UploadSource parameter. The UploadSource is an interface but we have provided several implementations that wraps, for example, a String, a BASE File item or an InputStream.

Note that calling the JobDefinition.addFile() method doesn't start the upload immediately. The upload happens in the OpenGridSession.qsub() method. The file is placed in the subfolder to the <job-folder> that has been created for the job (the ${WD} folder).

JobDefinition jobDef = ...
String data = ... // A string with 'data' that should be uploaded

UploadSource src = new StringUploadSource("data.csv", data);
jobDef.addFile(src);

// Uploads the data to ${WD}/data.csv
session.qsub( ... jobDef ...);

Connecting to non-cluster servers

Connections that are made to clusters are regular SSH connections. There is really nothing that is special about the connection itself. This means that it is possible to connect to more or less any server that supports SSH. It doesn't matter if the server is running an Open Grid or Slurm cluster or something else. Note that servers that are defined in the opengrid-config.xml are expected to be either Open Grid or Slurm cluster servers and the OpenGridService implementation will try to call cluster-specific commands on them.

However, it is possible to programmatically create a ConnectionInfo instance and use it for creating a RemoteHost object. With this you can connect to the server by calling the RemoteHost.connect() method which returns a RemoteSession object. It is very similar to what can be done with OpenGridCluster/OpenGridSession objects, except that the special methods for calling cluster commands are not available.

Tip! It is possible to create a ConnectionInfo instance from a BASE FileServer item (assuming that the file server contains all required information for connecting via SSH: host, fingerprint, username and password).

Tracking non-cluster jobs

Sometimes there are other things going on that are not Open Grid or Slurm jobs that would be interesting to track. One example is the sequencing progress of a sequencer machine. In this case we want to know when the sequencing has been completed and then start analysis jobs (as Open Grid Cluster jobs). A simple bash script has been implemented (http://baseplugins.thep.lu.se/browser/other/pipeline/trunk/nextseq_status.sh) that checks if all result files from the sequencing are present on the file server or not. We want to run this script at regular intervals. When all data is present, we run some checks to validate the sequence data and if all seems to be good, we start the analysis pipeline. There are three steps to consider:

  • The sequencing process should be represented by a BASE job item as a proxy. Progress reporting need to be setup using the extension mechanism implemented in the BASE core. This need to be implemented completely by the other extension. It is not possible to re-use the setup the Job Scheduler package uses. In the code example below the SequencingSignalHandler class is assumed to take care of this.
String barcode = ... // Something that identifies the current sequencing 
String clusterId = ... // The cluster we use to check status 

// Create a new BASE job and set properties so that we can identify 
// it later. 'null' parameters to create an 'OTHER' type job
Job job = Job.getNew(dc, null, null, null);
job.setName("My sequencing");
job.setPluginVersion("my-sequencing-1.0");

job.setExternalId(barcode);  // Instead of the OpenGrid/Slurm job ID
// Setup signalling for progress reporting (see BASE documentation)
String signalURI = SequencingSignalHandler.getSignalUri(barcode);
job.setSignalTransporter(ExtensionSignalTransporter.class, signalURI);

// We need to know which cluster to use
job.setScheduled(clusterId, null);
dc.saveItem(job); // Important!!!
  • Once a request for a status update is received by the SequencingSignalHandler it should call OpenGridService.asyncJobStatusUpdate(JobIdentifier, JobStatusUpdater). The Job scheduler extension will then call the JobStatusUpdater.getJobStatus() during the next asynchronous processing cycle. In the example above, the JobStatusUpdater implementation should call the bash script to see how far the sequencing has come and then report that back in a JobStatus object. The Job scheduler extension will take the responsibility of updating the Job item in BASE. It might be tempting to check the sequencing status directly from the signal handler, but this is not recommended since the signals may arrive quite often. The asynchronous approach is preferable and also gives you automatic updates of the Job item in BASE.
public class SequencingSignalHandler
   implements SignalHandler, JobStatusUpdater
{

   // Not all SignalHandler methods are shown...

   @Override
   public void handleSignal(Signal signal) 
   {
      Job job = ... // The BASE Job item
      String barcode = job.getExternalId();
      String clusterId = job.getServer();
    
      JobIdentifier jobId = new JobIdentifier(clusterId, flowCellId, job.getId());
      // Register a callback that eventually will call the
      // getJobStatus() method below
      OpenGridService.getInstance().asyncJobStatusUpdate(jobId, this);
   }

   // Execute the bash script and parse out the result to a
   // JobStatus object. The code shown here has been simplified...
   @Override
   public JobStatus getJobStatus(OpenGridSession session,
     JobIdentifier jobId)
   {
      String barcode = jobId.getClusterJobId();
    
      CmdResult<String> sequencingStatus = 
         session.executeCmd("nextseq_status.sh " + barcode, 5);
      if (sequencingStatus .getExitStatus() != 0)
      {
         logger.error("nextseq_status.sh failed: " + sequencingStatus);
         return null;
      }

      JobStatus status = null;
      try
      {
         status = parseSequencingStatus(sequencingStatus.getStdout());
      }
      catch (Exception ex)
      {
         logger.error("Could not parse nextseq_status.sh output", ex);
      }
      return status;
   }
}
  • When the sequencing has been completed (status == Job.Status.DONE) the normal job completion routines in the Job scheduler extension notifies all registered JobCompletionHandler implementations. The other extension simply need to extend the JobCompletionHandler implementation to be able to detect the sequencing job and then do whatever needs to be done with that.
public class MySequencingJobCompletionHandlerFactory 
   implements ActionFactory<JobCompletionHandler>
{
  
   ....

   @Override
   public JobCompletionHandler[] getActions(InvokationContext context) 
   {
      ....
    
      String pluginVersion = job.getPluginVersion();
      if (pluginVersion != null && !pluginVersion.startsWith("my-sequencing"))
      {
         // This is our sequencing job
         action = new SequencingJobCompletionHandler();
      }
    
      ....
   }
}

Reacting to configuration changes

The opengrid-config.xml is parsed and loaded into memory when the Job scheduler service extension is started. Changes to the configuration file are not applied until the service is re-started. For some extensions it may be critical to be able to detect when this happens. Luckily, everything that is needed is already built into the BASE core API. Extensions that need to know when the Job scheduler service is stoppped or started simply need to register an event handler with the manager in BASE. The event handler should listen to SERVICE_STOPPED or SERVICE_STARTED events for the net.sf.basedb.opengrid.service extension.

// We need a filter that listens for SERVICE_STARTED event 
// related to the Job scheduler service
EventFilter serviceStarted = new ExtensionEventFilter(
   "net.sf.basedb.opengrid.service", Services.SERVICE_STARTED);

// We need to implement an event handler which, for example, 
// reloads our own configuration file
EventHandler handler = new MyEventHandler();

// Register the event handler with BASE
// The ClassLoader parameter is important for not leaking memory
// in case this extension is updated or uninstalled
Registry registry = Application.getExtensionsManager().getRegistry();
ClassLoader loader = this.getClass().getClassLoader();
registry.registerEventHandler(handler, serviceStarted , loader);

Now, every time the Job scheduler service is restarted, BASE calls the MyEventHandler.handleEvent() method.

Last modified 4 years ago Last modified on Aug 24, 2020, 2:14:31 PM
Note: See TracWiki for help on using the wiki.