Version 5 (modified by 8 years ago) ( diff ) | ,
---|
How to use the Open Grid Scheduler package API
In this document we will try to describe the main aspects of the programmatic API that other extensions can use in order to access and use Open Grid Clusters.
Enumerating Open Grid Clusters
The OpenGridService
class is typically the starting point for a lot of actions. From this class it is possible to get information about and access all cluster that has been defined in the opengrid-config.xml
file. The service is a singleton instance. Use the OpenGridService.getInstance()
method to get the object. Note! It is important that the service is actually running inside BASE. Check the Administrate->Services page that this is the case.
To enumerate the available Open Grid Clusters use one of the OpenGridService.getClusters()
methods. This will return a collection of OpenGridCluster
instances. Most methods in this class are used for getting configuration information from the opengrid-config.xml
file. The OpenGridCluster.getId()
method returns the internal ID of the cluster definition. It is created by combining the username, address and port of the cluster (for example, griduser@grid.example.com:22
). The ID can the be used with OpenGridService.getClusterById()
to directly access the same cluster later on. Other useful information can be found in the objects returned by calling OpenGridCluster.getConnectionInfo()
and OpenGridCluster.getConfig()
. The OpenGridCluster.asJSONObject()
contains more or less the same information wrapped up as JSON data. This is useful for transferring information a web interface to allow a user to select a cluster to work with.
Java code in a servlet running on the BASE web server
DbControl dc = ... // We need an open DbControl from BASE // Options specifying which (extra) information that we want to return // Use JSONOptions.DEFAULT to only return the minimal information JSONOptions options = new JSONOptions(); options.enable(JSONOption.CLUSTER_INFO); options.enable(JSONOption.NODE_INFO); OpenGridService service = OpenGridService.getInstance(); JSONArray jsonHosts = new JSONArray(); // Enumerates all clusters that the current user has access to for (OpenGridCluster host : service.getClusters(dc, Include.ALL)) { jsonHosts.add(host.asJSONObject(options)); } return jsonHosts; // This is what we transfer to the web client via AJAX
JavaScript code running in the web browser the current user is using
// In the web client use the JSON data to populate a <select> list var list = document.getElementById('cluster-list'); list.length = 0; var clusters = response; // Response contains an array with cluster information for (var i = 0; i < clusters.length; i++) { var cluster = clusters[i]; var option = new Option(cluster.connection.name, cluster.id); option.cluster = cluster; list[list.length] = option; }
Note that there is no need to use the OpenGridCluster.connect()
method yet.
Creating a job script
In it's simplest form a job script is only a string with one or more (bash) commands to execute. For example, pwd; ls
is a valid job script that will print the current directory and then list all files in it. To help creating longer and more complex scripts the ScriptBuilder
class can be used. The cmd()
, echo()
and comment()
methods are more or less self-describing. It is possible to start a command in the background with bkgr()
, but not that this must be paired with a waitForProcess()
otherwise the job script may finish before the commmand that is running in the background which may cause unpredictable results. The progress()
method is a very useful method for jobs that are expected to take a long time to run. The method will write progress information to the {$WD}/progress
file which will be picked up by the Open Grid Service and reported back to the BASE job that is acting as a proxy.
When creating a job script there are a few useful variables that has been set up:
{$WD}
: A randomly generated subdirectory in the<job-folder>
directory. The directory contains the job script which is also the current working directory when the job is started and the directory that is used for communicating data to/from the BASE server. Data in this directory is preserved after the job has finished. When running post-job code this folder can be found by callingOpenGridCluster.getWorkFolder()
. Files can be downloaded to the BASE server withOpenGridSession.downloadFile()
,OpenGridSession.readFile()
orOpenGridSession.getJobFileAsString()
. The latter method is the simplest one to use for parsing out interesting data from text result files.{$TMPDIR}
: A temporary working directory that is typically only available on the node the job is running on. Unless the job is started in debug mode, this directory is deleted soon after the has been completed.{NSLOTS}
: The number of slots that has been assigned to this job. If the job is starting a multi-threaded analysis program it is common practice to not use more threads that what this value specifies. Note that a single node may run more than one job at the same time so usingnproc
to determine the number of threads may cause resource issues.
In the example code below we assume that we have FASTQ files stored on a file server on the network. The FASTQ files are going to be aligned with Tophat and we have a wrapper script that sets all parameters except the number of threads and the location of the FASTQ files. After Tophat we have a second post-alignment script that does some stuff and save the result in a subdirectory.
ScriptBuilder script = new ScriptBuilder(); // We do not want to hog the network so we copy all files we need to the local cluster node script.progress(5, "Copying data to temporary folder..."); script.cmd("cp /path/to/fastqfiles/*fastq.gz {$TMPDIR}"); // Wrapper script that calls tophat; we assume all other required parameters are set by the wrapper script.progress(10, "Analysing FASTQ files with Tophat..."); script.cmd("tophat-wrapper.sh -p {$NSLOTS} {$TMPDIR}"); // Another analysis script... script.progress(50, "Post-alignment analysis files..."); script.cmd("post-analysis.sh -p {$NSLOTS} {$TMPDIR}"); // Now we only need to copy the results back to our file server. // Remember that the {$TMPDIR} is cleaned automatically so we don't have to mess with that script.progress(90, "Copying analyzed data back to file server"); script.cmd("cp {$TMPDIR}/result/* /path/to/resultfiles/"); // Finally, we copy the logfile to the job directory so that we can extract data from it to BASE script.cmd("cp {$TMPDIR}/logfile {$WD}/logfile");
Submitting a job
When the job script has been generated it is time to submit the job to the cluster. For this, you'll need a couple of more objects. The first object is a JobConfig
instance. Use this for setting various options that are related to the Open Grid qsub command. In most cases the default settings should work, but you can for example use the JobConfig.setPriority()
to change the priority (-p) or JobConfig.setQsubOption()
to set almost any other option. Some options are set automatically by the job submission procedure and are ignored (-S, -terse, -N, -wd, -o, -e).
You also need a BASE Job item that is an OTHER
type job. It is recommended that the job is set up so that it can be identified later when notification about it's completion is sent out. Remember that during the time a job executes on the Open Grid Cluster almost anything can happen on the BASE server, including a restart. Do not rely on information that is stored in memory about jobs that has been submitted to the cluster since this information may not be there when the job completes. We recommend using one or more of Job.setName()
, Job.setPluginVersion()
and Job.setItemSubtype()
to be able to identify the job in a reliable manner. We will explain why this is important in the Getting notified when a job completes section below.
Now it is time to create a JobDefinition
object. This is basically a compilation containing the job script, the job configuration and the BASE job item. The JobDefinition
is also used for uploading data files that are needed by the job. Read more about this in the Advanced usage section below.
The final step is to connect to the Open Grid Cluster and submit the job. If we assume that you know the ID of the cluster you can simply use the OpenGridService.getClusterById()
method and then OpenGridCluster.connect()
to create an OpenGridSession
instance that is connected to the cluster. Then, use the OpenGridSession.qsub()
method to submit the job. Note that this method need a List<JobDefinition>
input parameter. If you have multiple jobs to submit it will be a lot quicker to submit all of them in one go instead of doing multiple calls to the OpenGridSession.qsub()
method.
The OpenGridSession.qsub()
method will put together the final job script, upload it to the cluster, upload other files to the cluster and then schedule the job by calling the qsub
command. It will also update the BASE job item with some (important) information:
- The
Job.getServer()
property is set to the ID of the Open Grid Cluster - The
Job.getExternalId()
property is set to the number assigned to the job on the cluster. - Signal handlers for progress reporting is set up.
- A callback action is set up on the current
DbControl
that aborts the job if the transaction is not committed. - Later on the
Job.getNode()
property is set to a string that identifies the node the job is running on. Note that this is not the pure name of the node but also include some other information from the Open Grid Cluster.
The OpenGridSession.qsub()
method returns a CmdResult
object containing a list with JobStatus
instances. You should check that the CmdResult.getExitStatus()
returns 0. All other values indicate an error when submitting the jobs and your transaction should be aborted.
DbControl dc = .... // We need an open DbControl from BASE String clusterId = ... // The ID of the cluster that the user selected in the web client String jobScript = .... // See the previous example // Use default configuration but a lower priority JobConfig config = new JobConfig(); config.setPriority(-500); // Create a new BASE job and set properties so that we can identify it later Job job = Job.getNew(dc, null, null, null); // All null to create an 'OTHER' type job job.setName("My analysis"); job.setPluginVersion("my-analysis-1.0"); // job.setItemSubtype(...); // This can also be useful dc.saveItem(job); // Important!!! // Create the job definition that links it all together JobDefinition jobDef = new JobDefinition("MyAnalysis", config, job); jobDef.setDebug(true); // Run in debug mode while developing jobDef.setCmd(jobScript); // Do not forget this! // Connect to the Open Grid Cluster OpenGridCluster cluster = OpenGridService.getInstance().getClusterById(dc, clusterId); OpenGridSession session = cluster.connect(5); try { // Submit the job and do not forget the error handling CmdResult<List<JobStatus>> result = session.qsub(dc, Arrays.asList(jobDef)); if (result.getExitStatus() != 0) { // Error handling, for example throw new RuntimeException(result.getStderr()); } // Do not forget to commit the transaction. The job will be aborted otherwise. dc.commit(); } finally { // Finally, do not forget to close the connection to the Open Grid Cluster OpenGrid.close(session); }
Getting notified when a job completes
One important feature is that other extensions can get notified when a job running on the cluster has ended. This is implemented in an asynchronous manner and it should not matter if the BASE server is updated or restarted or otherwise modified while a job is running. In the background there are three parts that work together to make this feature work.
- The BASE system for requesting job progress information about external jobs has been setup to send requests to the
OpenGridService
whenever it want new information about a job. This is the reason why it is important to create a BASE job item as a proxy for the Open Grid Cluster jobs. Without it, no progress information is requested and we never get to know when the job has ended. - The
OpenGridService
is polling each registered cluster at regular intervals. Typically once every minute but it may be more or less often depending on if there are any known jobs executing or not. TheOpenGridSession.qstat()
andOpenGridSession.qacct()
methods are used for this and will detect waiting, running and completed jobs. For running jobs, the service will download theprogress
file (seeScriptBuilder.progress()
above) and update the information in the BASE database. - Once a job has been detected as completed the service will invoke the job completion sequence. This is implemented as a custom extension point (
net.sf.basedb.opengrid.job-complete
) that will receive messages about completed jobs. Extensions that want to get notified should extend the extension point. Note that all registered extensions are notified about all jobs. It doesn't matter which extension that originally submitted the job to the cluster. Notifications are sent both for successful and failed jobs. Thus, each extension is responsible for filtering and ignoring notifications about jobs that is of no interest to them. This is why it is important to set name, plug-in version, etc. on the job when submitting it. We recommend that this filtering step is implemented in theActionFactory
that is registered for thenet.sf.basedb.opengrid.job-complete
extension point. Note that a single notification may handle more than one job. Thus, theprepareContext()
method is called once and without any information about the jobs while the thegetActions()
method is called once for every job.
public class MyAnalysisJobCompletionHandlerFactory implements ActionFactory<JobCompletionHandler> { public MyAnalysisJobCompletionHandlerFactory() {} @Override public boolean prepareContext(InvokationContext context) { // Always true since we do not know anything about the job(s) that have been completed return true; } @Override public JobCompletionHandler[] getActions(InvokationContext context) { ClientContext cc = context.getClientContext(); Job job = (Job)cc.getCurrentItem(); String pluginVersion = job.getPluginVersion(); if (pluginVersion == null || !pluginVersion.startsWith("my-analysis")) { // This is not our job, ignore it return null; } // Note that Job item has not been updated yet so we // need to get the status information extracted from the cluster JobStatus status = (JobStatus)cc.getAttribute("job-status"); if (status.getStatus() != Job.Status.DONE) { // We don't do anything unless the job was successful. return null; } JobCompletionHandler action = null; String jobName = job.getName(); if (jobName.startsWith("My analysis")) { action = new MyAnalysisCompletionHandler(); } else { // In the future we may have more than one type of jobs... } return action == null ? null : new JobCompletionHandler[] { action }; } }
The ActionFactory.getActions()
implementation should not do anything except checking if the job should be handled or not. It should return null
if it is not interested in the job, and an implementation of the JobCompletionHandler
interface otherwise. This interface defines a single method: JobCompletionHandler.jobCompleted(SessionControl, OpenGridSession, Job, JobStaus)
. The Job
and JobStatus
objects are the same as in the ActionFactory
, but in this method you also get access to a SessionControl
instance and an connected OpenGridSession
to the cluster the job was running on. The OpenGridSession
can for example be used to download and parse result files. The SessionControl
can be used to access BASE and update items and/or annotations. The good thing about the SessionControl
is that it has been automatically configured so that the owner of the job is already logged in and a project (if any is specified on the job) is set as the active project (in the ActionFactory
the session control is a generic one with the root user logged in).
Do not update the Job
item since this may interfere with the updates to the job made by the Open Grid extension. The method may return a string to set the status message of the job, or throw an exception to set the job status to ERROR.