Java EE 7: An Overview of Batch Processing

Batch processing is used in many industries for tasks ranging from payroll processing; statement generation; end-of-day jobs such as interest calculation and ETL (extract, load, and transform) in a data warehouse; and many more. Typically, batch processing is bulk-oriented, non-interactive, and long running—and might be data- or computation-intensive. Batch jobs can be run on schedule or initiated on demand. Also, since batch jobs are typically long-running jobs, check-pointing and restarting are common features found in batch jobs.
JSR 352 (Batch Processing for Java Platform), part of the recently introduced Java EE 7 platform, defines the programming model for batch applications plus a runtime to run and manage batch jobs. This article covers some of the key concepts including feature highlights, an overview of selected APIs, the structure of Job Specification Language, and a sample batch application. The article also describes how you can run batch applications using GlassFish Server Open Source Edition 4.0.
Batch Processing Architecture
This section and Figure 1 describe the basic components of the batch processing architecture.
  • job encapsulates the entire batch process. A job contains one or more steps. A job is put together using a Job Specification Language (JSL) that specifies the sequence in which the steps must be executed. In JSR 352, JSL is specified in an XML file called the job XML file. In short, a job (with JSR 352) is basically a container for steps.
  • step is a domain object that encapsulates an independent, sequential phase of the job. A step contains all the necessary logic and data to perform the actual processing. The batch specification deliberately leaves the definition of a step vague because the content of a step is purely application-specific and can be as complex or simple as the developer desires. There are two kinds of steps: chunk and batchlet.
    • chunk-style step contains exactly one ItemReader, one ItemProcessor, and one ItemWriter. In this pattern, ItemReaderreads one item at a time, ItemProcessor processes the item based upon the business logic (such as "calculate account balance"), and hands it to the batch runtime for aggregation. Once the "chunk-size" number of items are read and processed, they are given to an ItemWriter, which writes the data (for example, to a database table or a flat file). The transaction is then committed.
    • JSR 352 also defines a roll-your-own kind of a step called a batchlet. A batchlet is free to use anything to accomplish the step, such as sending an e-mail.
  • JobOperator provides an interface to manage all aspects of job processing, including operational commands, such as start, restart, and stop, as well as job repository commands, such as retrieval of job and step executions. See section 10.4 of the JSR 352 specification for more details about JobOperator.
  • JobRepository holds information about jobs currently running and jobs that ran in the past. JobOperator provides APIs to access this repository. A JobRepository could be implemented using, say, a database or a file system.

Developing a simple Payroll processing application

This article demonstrates some of the key features of JSR 352 using a simple payroll processing application. The application has been intentionally kept quite simple in order to focus on the key concepts of JSR 352.
The SimplePayrollJob batch job involves reading input data for payroll processing from a comma-separated values (CSV) file. Each line in the file contains an employee ID and the base salary (per month) for one employee. The batch job then calculates the tax to be withheld, the bonus, and the net salary. The job finally needs to write out the processed payroll records into a database table.
We use a CSV file in this example just to demonstrate that JSR 352 allows batch applications to read and write from any arbitrary source.

Job specification language for the Payroll processing application

We discussed that a step is a domain object that encapsulates an independent, sequential phase of the job, and a job is basically a container for one or more steps.
In JSR 352, a JSL basically specifies the order in which steps must be executed to accomplish the job. The JSL is powerful enough to allow conditional execution of steps, and it also allows each step to have its own properties, listeners, and so on.
A batch application can have as many JSLs as it wants, thus allowing it to start as many batch jobs as required. For example, an application can have two JSLs, one for payroll processing and another for report generation. Each JSL must be named uniquely and must be placed in the META-INF/batch-jobs directory. Subdirectories under META-INF/batch-jobs are ignored.
Our JSL for payroll processing is placed in a file called SimplePayrollJob.xml and looks like Listing 1:
<job id="SimplePayrollJob" xmlns=http://xmlns.jcp.org/xml/ns/javaee version="1.0">
    <step id="process">
        <chunk item-count="2">
            <reader ref="simpleItemReader/>
            <processor ref="simpleItemProcessor/>
            <writer ref="simpleItemWriter/>
        </chunk>
    </step>
</job>

Our SimplePayrollJob batch job has just one step (called "process"). It is a chunk-style step and has (as required for a chunk-style step), an ItemReader, an ItemProcessor, and an ItemWriter. The implementations for ItemReaderItemProcessor, andItemWriter for this step are specified using the ref attribute in the <reader><processor>, and <writer> elements.
When the job is submitted (we will see later how to submit batch jobs), the batch runtime starts with the first step in the JSL and walks its way through until the entire job is completed or one of the steps fails. The JSL is powerful enough to allow both conditional steps and parallel execution of steps, but we will not cover those details in this article.
The item-count attribute, which is defined as 2 in Listing 1, defines the chunk size of the chunk.
Here is a high-level overview of how chunk-style steps are executed. Please see section "Regular Chunk Processing" of the JSR 352 specification for more details.
  1. Start a transaction.
  2. Invoke the ItemReader and pass the item read by the ItemReader to the ItemProcessorItemProcessor processes the item and returns the processed item to the batch runtime.
  3. The batch runtime repeats Step 2 item-count times and maintains a list of processed items.
  4. The batch runtime invokes the ItemWriter that writes item-count number of processed items.
  5. If exceptions are thrown from ItemReaderItemProcessor, or ItemWriter, the transaction fails and the step is marked as "FAILED." Please refer to Section 5.2.1.2.1 ("Skipping Exceptions") in the JSR 352 specification.
  6. If there are no exceptions, the batch runtime obtains checkpoint data from ItemReader and ItemWriter (see section 2.5 in the JSR 352 specification for more details). The batch runtime commits the transaction.
  7. Steps 1 through 6 are repeated if the ItemReader has more data to read.
This means that in our example, the batch runtime will read and process two records and the ItemWriter will write out two records per transaction.

Writing the ItemReader, ItemProcessor and ItemWriter

Writing the ItemReader

Our payroll processing batch JSL defines a single chunk style step and specifies that the step uses an ItemReader namedsimpleItemReader. Our application contains an implementation of ItemReader to read input CSV data. Listing 2 shows a snippet of our ItemReader:

@Named
public class SimpleItemReader
       extends AbstractItemReader {

       @Inject
       private JobContext jobContext; 
       ...
}

Note that the class is annotated with the @Named annotation. Because the @Named annotation uses the default value, the Contexts and Dependency Injection (CDI) name for this bean is simpleItemReader. The JSL specifies the CDI name of the ItemReader in the<reader> element. This allows the batch runtime to instantiate (through CDI) our ItemReader when the step is executed.
Our ItemReader also injects a JobContextJobContext allows the batch artifact (ItemReader, in this case) to read values that were passed during job submission.
Our payroll SimpleItemReader overrides the open() method to open the input from which payroll input data is read. As we shall see later, the parameter prevCheckpointInfo will not be null if the job is being restarted.
In our example, the open() method, which is shown in Listing 3, opens the payroll input file (which has been packaged along with the application).
public void open(Serializable prevCheckpointInfo) throws Exception {
        JobOperator jobOperator = BatchRuntime.getJobOperator();
        Properties jobParameters = jobOperator.getParameters(jobContext.getExecutionId());
        String resourceName = (String) jobParameters.get("payrollInputDataFileName");
        inputStream = new FileInputStream(resourceName);        
        br = new BufferedReader(new InputStreamReader(inputStream));

        if (prevCheckpointInfo != null)
            recordNumber = (Integer) prevCheckpointInfo;
        for (int i=1; i<recordNumber; i++) {   //Skip upto recordNumber
            br.readLine();
        } 
       System.out.println("[SimpleItemReader] Opened Payroll file for reading from record number: "              + recordNumber);
    }

The readItem() method basically reads one line of data from the input file and determines whether the line contains two integers (one for employee ID and one for base salary). If there are two integers, it creates and returns a new instance of PayrollInputRecord and returns to the batch runtime (which is then passed to ItemWriter).
Listing 4
public Object readItem() throws Exception {       
       Object record = null;
       if (line != null) {
            String[] fields = line.split("[, \t\r\n]+");
            PayrollInputRecord payrollInputRecord = new PayrollInputRecord();
            payrollInputRecord.setId(Integer.parseInt(fields[0]));
            payrollInputRecord.setBaseSalary(Integer.parseInt(fields[1]));
            record = payrollInputRecord;
            //Now that we could successfully read, Increment the record number
           recordNumber++;
        }
        return record;
}

The method checkpointInfo() is called by the batch runtime at the end of every successful chunk transaction. This allows the Reader to check point the last successful read position.
In our example, the checkpointInfo() returns the recordNumber indicating the number of records that have been read successfully, as shown in Listing 5.
@Override
public Serializable checkpointInfo() throws Exception {
        return recordNumber;
}

Writing the ItemProcessor

Our SimpleItemProcessor follows a pattern similar to the pattern for SimpleItemReader.

The processItem() method receives (from the batch runtime) the PayrollInputRecord. It then calculates the tax and net and returns a PayrollRecord as output. Notice in Listing 6 that the type of object returned by an ItemProcessor can be very different from the type of object it received from ItemReader.

@Named
public class SimpleItemProcessor
    implements ItemProcessor {

    @Inject
    private JobContext jobContext;

    public Object processItem(Object obj) 
                        throws Exception {
        PayrollInputRecord inputRecord =
                (PayrollInputRecord) obj;
        PayrollRecord payrollRecord = 
                new PayrollRecord();

        int base = inputRecord.getBaseSalary();
        float tax = base * 27 / 100.0f;
        float bonus = base * 15 / 100.0f;

        payrollRecord.setEmpID(inputRecord.getId());
        payrollRecord.setBase(base);
        payrollRecord.setTax(tax);
        payrollRecord.setBonus(bonus);
        payrollRecord.setNet(base + bonus - tax);   
        return payrollRecord;
    } 
}

Writing the ItemWriter 

By now, SimpleItemWriter must be following predictable lines for you.
The only difference is that it injects an EntityManager so that it can persist the PayrollRecord instances (which are JPA entities) into a database, as shown in Listing 7.
@Named
public class SimpleItemWriter
    extends AbstractItemWriter {

    @PersistenceContext
    EntityManager em;

    public void writeItems(List list) throws Exception {
        for (Object obj : list) {
            System.out.println("PayrollRecord: " + obj);
            em.persist(obj);
        }
    }
    
}
The writeItems() method persists all the PayrollRecord instances into a database table using JPA. There will be at most item-count entries (the chunk size) in the list.
Now that we have our JSL, ItemReaderItemProcessor, and ItemWriter ready, let's see how a batch job can be submitted.

Starting a batch job from a servlet

Note that the mere presence of a job XML file or other batch artifacts (such as ItemReader) doesn't mean that a batch job is automatically started when the application is deployed. A batch job must be initiated explicitly, say, from a servlet or from an Enterprise JavaBeans (EJB) timer or an EJB business method.
In our payroll application, we use a servlet (named PayrollJobSubmitterServlet) to submit a batch job. The servlet displays an HTML page that presents to the user a form containing two buttons. When the first button, labeled Calculate Payroll, is clicked, the servlet invokes the startNewBatchJob method, shown in Listing 8, which starts a new batch job.

private long startNewBatchJob()
throws Exception {
        JobOperator jobOperator = BatchRuntime.getJobOperator();
        Properties props = new Properties();
        props.setProperty("payrollInputDataFileName", payrollInputDataFileName);
        return jobOperator.start(JOB_NAME, props);
}

The first step is to obtain an instance of JobOperator. This can be done by calling the following:
JobOperator jobOperator = BatchRuntime.getJobOperator(); 

The servlet then creates a Properties object and stores the input file name in it. Finally, a new batch job is started by calling the following:
jobOperator.start(jobName, properties) 

The jobname is nothing but the job JSL XML file name (minus the .xml extension). The properties parameter serves to pass any input data to the job. The Properties object (containing the name of the payroll input file) is made available to other batch artifacts (such as ItemReaderItemProcessor, and so on) through the JobContext interface.
The batch runtime assigns a unique ID, called the execution ID, to identify each execution of a job whether it is a freshly submitted job or a restarted job. Many of the JobOperator methods take the execution ID as parameter. Using the execution ID, a program can obtain the current (and past) execution status and other statistics about the job. The JobOperator.start() method returns the execution ID of the job that was started.

Retrieving details about batch jobs

When a batch job is submitted, the batch runtime creates an instance of JobExecution to track it. JobExecution has methods to obtain various details such as the job start time, job completion time, job exit status, and so on. To obtain the JobExecution for an execution ID, you can use the JobOperator.getJobExecution(executionId) method. Listing 9 shows the definition of JobExecution:

package javax.batch.runtime;
public interface JobExecution {
    long getExecutionId();
    java.lang.String getJobName();
    javax.batch.runtime.BatchStatus getBatchStatus();
    java.util.Date getStartTime();
    java.util.Date getEndTime();
    java.lang.String getExitStatus();
    java.util.Date getCreateTime();
    java.util.Date getLastUpdatedTime();
    java.util.Properties getJobParameters();
}

0 comments:

Post a Comment