Version 21 (modified by 4 years ago) ( diff ) | ,
---|
How to use the Job scheduler package API
In this tutorial we will try to describe the main aspects of the programmatic API that other extensions can use in order to access and use Open Grid and Slurm clusters.
- The use case
- Enumerating clusters
- Creating a job script
- Submitting a job
- Getting notified when a job completes
- Aborting jobs
- Advanced usage
See also the Javadoc API documentation.
The use case
The use case in this tutorial is that we are going to implement an extension that wants to display a web page inside BASE for starting a job on a cluster. We focus on the Job scheduler part of this and care less about how the extension interacts with BASE. You may want to read more about this in the BASE documentation. We assume that you have already implemented an extension that can display a web page that allows the user to specify input parameters for the job. On this web page you also want the user to be able to select a cluster that the job should be submitted to. The list below is an overview of the main steps that are needed.
- Display a selection list with available clusters
- Submit the job parameters and other information to a servlet
- The servlet creates a job script and submit it to the cluster
- Get a notification once that job has been completed, so that we can import files and other data back into BASE
Note! The API has a lot of classes and methods that have OpenGrid
in the name. This doesn't mean that the API is only usable for Open Grid clusters. The same API can also be used for Slurm clusters. The naming is only for historical reasons and in order to maintain backwards compatibility.
Enumerating clusters
In this step we want to display a selection list on the web page that allows the user to select a pre-defined cluster. We recommend that the list is populated by JavaScript code that uses AJAX, a Servlet and JSON to retreive the information. You need to implement everything on the browser side and most of the servlet in your own extension package.
The OpenGridService
class is typically the starting point for a lot of actions. From this class it is possible to get information about and access all clusters that have been defined in the opengrid-config.xml
file. The service is a singleton, use the OpenGridService.getInstance()
method to get the instance. Note! It is important that the service is actually running inside BASE. Check the Administrate->Services page that this is the case, otherwise you will get an empty service.
To enumerate the available clusters use one of the OpenGridService.getClusters()
methods. It is possible to return all configured cluster or only a subset defined by a filter. All methods will return a collection of OpenGridCluster
instances. Most methods in this class are used for getting configuration information from the opengrid-config.xml
file. The OpenGridCluster.getId()
method returns the internal ID of the cluster definition. It is created by combining the username, address and port of the cluster (for example, griduser@grid.example.com:22
). The ID can then be used with OpenGridService.getClusterById()
to directly access the same cluster later on. Other useful information can be found in the objects returned by calling OpenGridCluster.getConnectionInfo()
and OpenGridCluster.getConfig()
. The OpenGridCluster.asJSONObject()
contains more or less the same information wrapped up as JSON data. This is useful for transferring information a web interface to allow a user to select a cluster to work with.
Java code in a servlet running on the BASE web server
DbControl dc = ... // We need an open DbControl from BASE // Options specifying which information that we want to return // Use JSONOptions.DEFAULT to only return the minimal information JSONOptions options = JSONOptions.DEFAULT; OpenGridService service = OpenGridService.getInstance(); JSONArray jsonHosts = new JSONArray(); // Enumerates all clusters that the current user has access to (no filtering) for (OpenGridCluster host : service.getClusters(dc, Include.ALL)) { jsonHosts.add(host.asJSONObject(options)); } return jsonHosts; // This is what we transfer to the web client via AJAX
JavaScript code running in the web browser the current user is using
// In the web client use the JSON data to populate a <select> list var list = document.getElementById('cluster-list'); list.length = 0; // The 'response' contains an array with cluster information var clusters = response; for (var i = 0; i < clusters.length; i++) { var cluster = clusters[i]; var option = new Option(cluster.connection.name + ' ('+cluster.config.type+')', cluster.id); option.cluster = cluster; list[list.length] = option; }
Note that there is no need to use the OpenGridCluster.connect()
method yet.
Some of the OpenGridService.getClusters()
methods can take a filter parameter. This makes it possible
to only list clusters with some specific properties. Predefined filter implementations can be found in the net.sf.basedb.opengrid.filter
package:
ClusterTypeFilter
: Filter on type of cluster (eg. Slurm or Open Grid)IsConnectedFilter
: Only return clusters that we can connect toUsernameFilter
: Only return clusters that we connect to with a given username
Creating a job script
In it's simplest form a job script is only a string with one or more (bash) commands to execute. For example, pwd; ls
is a valid job script that prints the current directory and then lists all files in it. To help you create longer and more complex scripts the ScriptBuilder
class can be used. The cmd()
, echo()
and comment()
methods are more or less self-describing. It is possible to start a command in the background with bkgr()
, but note that this must be paired with a waitForProcess()
otherwise the job script may finish before the commmand that is running in the background which may cause unpredictable results. The ScriptBuilder.progress()
method is a very useful method for jobs that are expected to take a long time to run. The method writes progress information to the ${WD}/progress
file. This information is picked up by the Job scheduler service and reported back to the BASE job that is acting as a proxy.
When creating a job script you may find the following variables useful:
${WD}
: A randomly generated subdirectory in the<job-folder>
directory. The directory contains the job script and other data for the current job. This is also the current working directory when the job is started and the directory that is used for communicating data to/from the BASE server. Data in this directory is preserved after the job has finished. After a job has finished, this folder can be found by callingOpenGridCluster.getWorkFolder()
. Files can be transferred to the BASE server withOpenGridSession.downloadFile()
,OpenGridSession.readFile()
orOpenGridSession.getJobFileAsString()
. The latter method is the simplest one to use for parsing out interesting data from text result files.${TMPDIR}
: A temporary working directory that is typically only available on the node the job is running on. Unless the job is started in debug mode, this directory is deleted soon after the job has finished.${NSLOTS}
: The number of slots that has been assigned to this job. If the job is starting a multi-threaded analysis program it is common practice to not use more threads than what this value specifies. Note that a single node may run more than one job at the same time and that one slot typically corresponds to one cpu core.
Note! NSLOTS
is a variable that is set by the Open Grid software. It also sets a lot of other variables that can be used by the job script. Slurm has a different set of variables. For backwards compatibility reasons when running on a Slurm cluster, a wrapper script will set NSLOTS=SLURM_JOB_CPUS_PER_NODE
.
In the code example below we assume that we have FASTQ files stored on a file server on the network. We want to align the FASTQ files with Tophat and we have a wrapper script that sets most of the parameters. We only need to provide the number of threads and the location of the FASTQ files. After Tophat we have a second post-alignment script that does some stuff and save the result in a subdirectory (${TMPDIR}/result
).
ScriptBuilder script = new ScriptBuilder(); // Copy all files we need to the local cluster node script.progress(5, "Copying data to temporary folder..."); script.cmd("cp /path/to/fastqfiles/*fastq.gz ${TMPDIR}"); // Wrapper script that calls tophat // We assume all other required parameters are set by the wrapper script.progress(10, "Analysing FASTQ files with Tophat..."); script.cmd("tophat-wrapper.sh -p ${NSLOTS} ${TMPDIR}"); // Another analysis script... script.progress(50, "Post-alignment analysis files..."); script.cmd("post-analysis.sh -p ${NSLOTS} ${TMPDIR}"); // Now we only need to copy the results back to our file server. // Remember that the ${TMPDIR} is cleaned automatically so we can // leave that as it is script.progress(90, "Copying analyzed data back to file server"); script.cmd("cp ${TMPDIR}/result/* /path/to/resultfiles/"); // Finally, we copy the logfile to the job directory so that // we can extract data from it to BASE script.cmd("cp ${TMPDIR}/logfile ${WD}/logfile");
Submitting a job
When the job script has been generated it is time to submit the job to the cluster. For this, you need a couple of more objects. The first object is a JobConfig
instance. Use this for setting various options that are related to the Open Grid qsub command or to the Slurm sbatch command. In most cases the default settings should work, but you can for example use the JobConfig.setPriority()
/JobConfig.setSlurmNice()
to change the priority of the job or JobConfig.setQsubOption()
/JobConfig.setSbatchOption()
to set almost any other option.
Note! Options for Open Grid are very different from options for Slurm. In most cases, it is the responsibility of the submitting code to detect and handle differences between the two system. There are two exceptions that are converted automatically:
- The priority of the job is automatically converted between the Open Grid and Slurm system.
- Number of slots/cpus to use. The 'pe' parameter in Open Grid is automatically converted to the 'cpus-per-task' Slurm parameter.
Read more about this in the javadoc for the JobConfig
class. Some options are set automatically by the job submission procedure and are ignored:
- In Open Grid:
-S, -terse, -N, -wd, -o, -e
- In Slurm:
parsable, job-name, J, chdir, D, outout, o, error, e
You also need a BASE Job item that is an OTHER
type job. It is recommended that the job is set up so that it can easily be identified later when notification about it's completion is sent out. Remember that during the time a job executes on a cluster almost anything can happen on the BASE server, including a restart. Do not rely on information that is stored in memory about jobs that has been submitted to the cluster since this information may not be there when the job completes. We recommend using one or more of Job.setName()
, Job.setPluginVersion()
and Job.setItemSubtype()
to be able to identify the job in a reliable manner. We will explain why this is important in the Getting notified when a job completes section below.
The last object you need is a JobDefinition
object. This is basically a compilation containing the job script, the job configuration and the BASE job item. The JobDefinition
is also used for uploading data files that are needed by the job. Read more about this in the Advanced usage section below.
The final step is to connect to the cluster and submit the job. If we assume that you know the ID of the cluster you can simply use the OpenGridService.getClusterById()
method and then OpenGridCluster.connect()
to create an OpenGridSession
instance that is connected to the cluster. Then, use the OpenGridSession.qsub()
method to submit the job. Note that this method need a List<JobDefinition>
input parameter. If you have multiple jobs to submit it will be a lot quicker to submit all of them in one go instead of doing multiple calls to the OpenGridSession.qsub()
method.
The OpenGridSession.qsub()
method will put together the final job script, upload it to the cluster, upload other files to the cluster and then schedule the job by calling the qsub
command (Open Grid) or the sbatch
command (Slurm). It will also update the BASE job item with some (important) information:
- The
Job.getServer()
property is set to the ID of the cluster. - The
Job.getExternalId()
property is set to the number assigned to the job on the cluster. - Signal handlers for progress reporting is set up.
- A callback action is set up on the current
DbControl
that aborts the job if the transaction is not committed. - Later on, the
Job.getNode()
property is set to a string that identifies the node the job is running on. Note that this is not the pure name of the node but also include some other information from the Open Grid Cluster.
The OpenGridSession.qsub()
method returns a CmdResult
object containing a list with JobStatus
instances. You should check that the CmdResult.getExitStatus()
returns 0. All other values indicate an error when submitting the jobs and your transaction should be aborted.
DbControl dc = .... // We need an open DbControl from BASE String clusterId = ... // The ID of the cluster selected by the user String jobScript = .... // See the previous example // Use default configuration but a lower priority JobConfig config = new JobConfig(); config.setPriority(-500); // Or config.setSlurmNice(500) // Create a new BASE job and set properties so that we can identify // it later. The 'null' parameters creates an 'OTHER' type job. Job job = Job.getNew(dc, null, null, null); job.setName("My analysis"); job.setPluginVersion("my-analysis-1.0"); // job.setItemSubtype(...); // This can also be useful dc.saveItem(job); // Important!!! // Create the job definition that links it all together JobDefinition jobDef = new JobDefinition("MyAnalysis", config, job); jobDef.setDebug(true); // Run in debug mode while developing jobDef.setCmd(jobScript); // Do not forget this! // Connect to the cluster OpenGridService service = OpenGridService.getInstance(); OpenGridCluster cluster = service.getClusterById(dc, clusterId); OpenGridSession session = cluster.connect(5); try { // Submit the job. Do not forget the error handling! CmdResult<List<JobStatus>> result = session.qsub(dc, Arrays.asList(jobDef)); result.throwExceptionIfNonZeroExitStatus(); // Do not forget to commit the transaction. // The job will be aborted otherwise. dc.commit(); } finally { // Finally, do not forget to close the DbControl and // the connection to the cluster OpenGrid.close(session); if (dc != null) dc.close(); }
Getting notified when a job completes
One important feature is that extensions can get notified when a job running on the cluster has ended. This is implemented in an asynchronous manner and it should not matter if the BASE server is updated or restarted or otherwise modified while a job is running. In the background there are three parts that work together to make this feature work.
- The BASE system for requesting job progress information about external jobs has been setup to send requests to the
OpenGridService
whenever it want new information about a job. This is the reason why it is important to create a BASE job item as a proxy for the all jobs submitted to a cluster. Without it, no progress information is requested and we never get to know when the job has ended. - The
OpenGridService
is polling each registered cluster at regular intervals. Typically once every minute but it may be more or less often depending on if there are any known jobs executing or not. TheOpenGridSession.qstat()
andOpenGridSession.qacct()
methods are used for this and will detect waiting, running and completed jobs. For running jobs, the service downloads theprogress
file (seeScriptBuilder.progress()
above) and update the progress information in the BASE database. - Once a job has been detected as completed the service will initiate the job completion sequence. This is implemented as a custom extension point (
net.sf.basedb.opengrid.job-complete
) that receive messages about completed jobs. Extensions that want to get notified should extend this extension point. Note that all registered extensions are notified about all jobs! It doesn't matter which extension that originally submitted the job to the cluster. Notifications are sent both for successful and failed jobs. Each extension is responsible for filtering and ignoring notifications about jobs that is of no interest to them. This is why it is important to set name, plug-in version, etc. on the job when submitting it. We recommend that this filtering step is done in theActionFactory
implementation that is registered for thenet.sf.basedb.opengrid.job-complete
extension point. Note that a single notification may handle more than one completed job. Thus, theprepareContext()
method is called once and without any information about the jobs while the thegetActions()
method is called once for every job.
public class MyAnalysisJobCompletionHandlerFactory implements ActionFactory<JobCompletionHandler> { public MyAnalysisJobCompletionHandlerFactory() {} @Override public boolean prepareContext(InvokationContext context) { // Always true since we do not know anything // about the job(s) that have been completed return true; } @Override public JobCompletionHandler[] getActions(InvokationContext context) { ClientContext cc = context.getClientContext(); Job job = (Job)cc.getCurrentItem(); String pluginVersion = job.getPluginVersion(); if (pluginVersion == null || !pluginVersion.startsWith("my-analysis")) { // This is not our job, ignore it return null; } // Note that Job item has not been updated yet so we // need to get the status information extracted from the cluster JobStatus status = (JobStatus)cc.getAttribute("job-status"); if (status.getStatus() != Job.Status.DONE) { // We don't do anything unless the job was successful. return null; } JobCompletionHandler action = null; String jobName = job.getName(); if (jobName.startsWith("My analysis")) { action = new MyAnalysisCompletionHandler(); } else { // In the future we may have more than one type of jobs... } return action == null ? null : new JobCompletionHandler[] { action }; } }
The ActionFactory.getActions()
implementation should not do anything except check if the job is of interest or not. It should return null
if it is not interested in the job, and an instance implementing the JobCompletionHandler
interface otherwise. This interface defines a single method: JobCompletionHandler.jobCompleted(SessionControl, OpenGridSession, Job, JobStaus)
. The Job
and JobStatus
objects are the same as in the ActionFactory
, but in this method you also get access to a SessionControl
instance and a connected OpenGridSession
to the cluster the job was running on. The OpenGridSession
can for example be used to download and parse result files. The SessionControl
can be used to access BASE and update items and/or annotations. The good thing about the SessionControl
is that it has been automatically configured so that the owner of the job is already logged in and a project (if any is specified on the job) is set as the active project (in the ActionFactory
the session control is a generic one with the root user logged in).
Do not update the Job
item since this may interfere with the updates to the job that are automatically made by the Job Scheduler extension. The method may return a string to set the status message of the job, or throw an exception to set the job status to ERROR.
public class MyAnalysisCompletionHandler implements JobCompletionHandler { public MyAnalysisCompletionHandler() {} @Override public String jobCompleted(SessionControl sc, OpenGridSession session, Job job, JobStatus status) { String jobName = status.getName(); String logfile = session.getJobFileAsString(jobName, "logfile", "UTF-8"); DbControl dc = sc.newDbControl(); try { String msg = parseLogfile(dc, logfile); dc.commit(); return msg; } finally { if (dc != null) dc.close(); } } }
Aborting jobs
This is automatically handled by the Job Scheduler extension by the same mechanism that is used for progress reporting. The abort is handled by calling the OpenGridSession.qdel()
method. After that the job is handled just as if any other error had occurred, eg. the job completion sequence is initiated. Extensions that are interested in manually aborted jobs should check for JobStatus.getStatus() == Job.Status.ERROR
and JobStatus.getExitCode() == 137
, which indicates that the job was aborted by the user.
Advanced usage
Uploading data files as part of a JobDefinition
The JobDefinition
that is used for submitting a job to a cluster has the ability to upload files that are needed for the job. This is done by calling the JobDefinition.addFile()
method with an UploadSource
parameter. The UploadSource
is an interface but we have provided several implementations that wraps, for example, a String
, a BASE File
item or an InputStream
.
Note that calling the JobDefinition.addFile()
method doesn't start the upload immediately. The upload happens in the OpenGridSession.qsub()
method. The file is placed in the subfolder to the <job-folder>
that has been created for the job (the ${WD}
folder).
JobDefinition jobDef = ... String data = ... // A string with 'data' that should be uploaded UploadSource src = new StringUploadSource("data.csv", data); jobDef.addFile(src); // Uploads the data to ${WD}/data.csv session.qsub( ... jobDef ...);
Connecting to non-cluster servers
Connections that are made to clusters are regular SSH connections. There is really nothing that is special about the connection itself. This means that it is possible to connect to more or less any server that supports SSH. It doesn't matter if the server is running an Open Grid or Slurm cluster or something else. Note that servers that are defined in the opengrid-config.xml
are expected to be either Open Grid or Slurm cluster servers and the OpenGridService
implementation will try to call cluster-specific commands on them.
However, it is possible to programmatically create a ConnectionInfo
instance and use it for creating a RemoteHost
object. With this you can connect to the server by calling the RemoteHost.connect()
method which returns a RemoteSession
object. It is very similar to what can be done with OpenGridCluster
/OpenGridSession
objects, except that the special methods for calling cluster commands are not available.
Tip! It is possible to create a ConnectionInfo
instance from a BASE FileServer
item (assuming that the file server contains all required information for connecting via SSH: host, fingerprint, username and password).
Tracking non-cluster jobs
Sometimes there are other things going on that are not Open Grid or Slurm jobs that would be interesting to track. One example is the sequencing progress of a sequencer machine. In this case we want to know when the sequencing has been completed and then start analysis jobs (as Open Grid Cluster jobs). A simple bash script has been implemented (http://baseplugins.thep.lu.se/browser/other/pipeline/trunk/nextseq_status.sh) that checks if all result files from the sequencing are present on the file server or not. We want to run this script at regular intervals. When all data is present, we run some checks to validate the sequence data and if all seems to be good, we start the analysis pipeline. There are three steps to consider:
- The sequencing process should be represented by a BASE job item as a proxy. Progress reporting need to be setup using the extension mechanism implemented in the BASE core. This need to be implemented completely by the other extension. It is not possible to re-use the setup the Job Scheduler package uses. In the code example below the
SequencingSignalHandler
class is assumed to take care of this.
String barcode = ... // Something that identifies the current sequencing String clusterId = ... // The cluster we use to check status // Create a new BASE job and set properties so that we can identify // it later. 'null' parameters to create an 'OTHER' type job Job job = Job.getNew(dc, null, null, null); job.setName("My sequencing"); job.setPluginVersion("my-sequencing-1.0"); job.setExternalId(barcode); // Instead of the OpenGrid/Slurm job ID // Setup signalling for progress reporting (see BASE documentation) String signalURI = SequencingSignalHandler.getSignalUri(barcode); job.setSignalTransporter(ExtensionSignalTransporter.class, signalURI); // We need to know which cluster to use job.setScheduled(clusterId, null); dc.saveItem(job); // Important!!!
- Once a request for a status update is received by the
SequencingSignalHandler
it should callOpenGridService.asyncJobStatusUpdate(JobIdentifier, JobStatusUpdater)
. The Job scheduler extension will then call theJobStatusUpdater.getJobStatus()
during the next asynchronous processing cycle. In the example above, theJobStatusUpdater
implementation should call the bash script to see how far the sequencing has come and then report that back in aJobStatus
object. The Job scheduler extension will take the responsibility of updating the Job item in BASE. It might be tempting to check the sequencing status directly from the signal handler, but this is not recommended since the signals may arrive quite often. The asynchronous approach is preferable and also gives you automatic updates of the Job item in BASE.
public class SequencingSignalHandler implements SignalHandler, JobStatusUpdater { // Not all SignalHandler methods are shown... @Override public void handleSignal(Signal signal) { Job job = ... // The BASE Job item String barcode = job.getExternalId(); String clusterId = job.getServer(); JobIdentifier jobId = new JobIdentifier(clusterId, flowCellId, job.getId()); // Register a callback that eventually will call the // getJobStatus() method below OpenGridService.getInstance().asyncJobStatusUpdate(jobId, this); } // Execute the bash script and parse out the result to a // JobStatus object. The code shown here has been simplified... @Override public JobStatus getJobStatus(OpenGridSession session, JobIdentifier jobId) { String barcode = jobId.getClusterJobId(); CmdResult<String> sequencingStatus = session.executeCmd("nextseq_status.sh " + barcode, 5); if (sequencingStatus .getExitStatus() != 0) { logger.error("nextseq_status.sh failed: " + sequencingStatus); return null; } JobStatus status = null; try { status = parseSequencingStatus(sequencingStatus.getStdout()); } catch (Exception ex) { logger.error("Could not parse nextseq_status.sh output", ex); } return status; } }
- When the sequencing has been completed (
status == Job.Status.DONE
) the normal job completion routines in the Job scheduler extension notifies all registeredJobCompletionHandler
implementations. The other extension simply need to extend theJobCompletionHandler
implementation to be able to detect the sequencing job and then do whatever needs to be done with that.
public class MySequencingJobCompletionHandlerFactory implements ActionFactory<JobCompletionHandler> { .... @Override public JobCompletionHandler[] getActions(InvokationContext context) { .... String pluginVersion = job.getPluginVersion(); if (pluginVersion != null && !pluginVersion.startsWith("my-sequencing")) { // This is our sequencing job action = new SequencingJobCompletionHandler(); } .... } }
Reacting to configuration changes
The opengrid-config.xml
is parsed and loaded into memory when the Job scheduler service extension is started. Changes to the configuration file are not applied until the service is re-started. For some extensions it may be critical to be able to detect when this happens. Luckily, everything that is needed is already built into the BASE core API. Extensions that need to know when the Job scheduler service is stoppped or started simply need to register an event handler with the manager in BASE. The event handler should listen to SERVICE_STOPPED
or SERVICE_STARTED
events for the net.sf.basedb.opengrid.service
extension.
// We need a filter that listens for SERVICE_STARTED event // related to the Job scheduler service EventFilter serviceStarted = new ExtensionEventFilter( "net.sf.basedb.opengrid.service", Services.SERVICE_STARTED); // We need to implement an event handler which, for example, // reloads our own configuration file EventHandler handler = new MyEventHandler(); // Register the event handler with BASE // The ClassLoader parameter is important for not leaking memory // in case this extension is updated or uninstalled Registry registry = Application.getExtensionsManager().getRegistry(); ClassLoader loader = this.getClass().getClassLoader(); registry.registerEventHandler(handler, serviceStarted , loader);
Now, every time the Job scheduler service is restarted, BASE calls the MyEventHandler.handleEvent()
method.