Enterprise Java

Spring Batch Tutorial with Spring Boot and Java Configuration

I’ve been working on migrating some batch jobs for Podcastpedia.org to Spring Batch. Before, these jobs were developed in my own kind of way, and I thought it was high time to use a more “standardized” approach. Because I had never used Spring with java configuration before, I thought this were a good opportunity to learn about it, by configuring the Spring Batch jobs in java. And since I am all into trying new things with Spring, why not also throw Spring Boot into the boat…

Note:
Before you begin with this tutorial I recommend you read first Spring’s Getting started – Creating a Batch Service, because  the structure and the code presented here builds on that original.

1. What I’ll build

So, as mentioned, in this post I will present Spring Batch in the context of configuring it and developing with it some batch jobs for Podcastpedia.org. Here’s a short description of the two jobs that are currently part of the Podcastpedia-batch project:

  1. addNewPodcastJob
    1. reads podcast metadata (feed url, identifier, categories etc.) from a flat file
    2. transforms (parses and prepares episodes to be inserted with Http Apache Client) the data
    3. and in the last step, insert it to the Podcastpedia database and inform the submitter via email about it
  2. notifyEmailSubscribersJob – people can subscribe to their favorite podcasts on Podcastpedia.org via email. For those who did it is checked on a regular basis (DAILY, WEEKLY, MONTHLY) if new episodes are available, and if they are the subscribers are informed via email about those; read from database, expand read data via JPA, re-group it and notify subscriber via email

Source code:
The source code for this tutorial is available on GitHub – Podcastpedia-batch.

Note: Before you start I also highly recommend you read the Domain Language of Batch,  so that terms like “Jobs”, “Steps” or “ItemReaders” don’t sound strange to you.

2. What you’ll need

3. Set up the project

The project is built with Maven. It uses Spring Boot, which makes it easy to create stand-alone Spring based Applications that you can “just run”.  You can learn more about the Spring Boot by visiting the project’s website.

3.1. Maven build file

Because it uses Spring Boot it will have the spring-boot-starter-parent as its parent, and a couple of other spring-boot-starters that will get for us some libraries required in the project:

pom.xml of the podcastpedia-batch project

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
<?xml version="1.0" encoding="UTF-8"?>
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>org.podcastpedia.batch</groupId>
    <artifactId>podcastpedia-batch</artifactId>
    <version>0.1.0</version>
     
    <properties>
        <sprinb.boot.version>1.1.6.RELEASE</sprinb.boot.version>
        <java.version>1.7</java.version>
    </properties>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.1.6.RELEASE</version>
    </parent>
     
    <dependencies>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
          
        </dependency
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-data-jpa</artifactId>       
        </dependency>       
         
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.3.5</version>
        </dependency>    
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.3.2</version>
        </dependency>
        <!-- velocity -->
        <dependency>
            <groupId>org.apache.velocity</groupId>
            <artifactId>velocity</artifactId>
            <version>1.7</version>     
        </dependency>
        <dependency>
            <groupId>org.apache.velocity</groupId>
            <artifactId>velocity-tools</artifactId>
            <version>2.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.struts</groupId>
                    <artifactId>struts-core</artifactId>
                </exclusion>
            </exclusions>            
        </dependency>
                         
        <!-- Project rome rss, atom -->
        <dependency>
            <groupId>rome</groupId>
            <artifactId>rome</artifactId>
            <version>1.0</version>
        </dependency>
        <!-- option this fetcher thing -->
        <dependency>
            <groupId>rome</groupId>
            <artifactId>rome-fetcher</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>org.jdom</groupId>
            <artifactId>jdom</artifactId>
            <version>1.1</version>
        </dependency>    
        <!-- PID 1 -->
        <dependency>
            <groupId>xerces</groupId>
            <artifactId>xercesImpl</artifactId>
            <version>2.9.1</version>
        </dependency>
                         
        <!-- MySQL JDBC connector -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.31</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-freemarker</artifactId>  
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-remote-shell</artifactId>  
            <exclusions>
                <exclusion>
                    <groupId>javax.mail</groupId>
                    <artifactId>mail</artifactId>
                </exclusion>
            </exclusions>            
        </dependency>
        <dependency>
            <groupId>javax.mail</groupId>
            <artifactId>mail</artifactId>
            <version>1.4.7</version>
        </dependency>    
        <dependency>
            <groupId>javax.inject</groupId>
            <artifactId>javax.inject</artifactId>
            <version>1</version>
        </dependency>    
        <dependency>
            <groupId>org.twitter4j</groupId>
            <artifactId>twitter4j-core</artifactId>
            <version>[4.0,)</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

Note:
One big advantage of using the spring-boot-starter-parent as the project’s parent is that you only have to upgrade the version of the parent and it will get the “latest” libraries for you. When I started the project spring boot was in version 1.1.3.RELEASE and by the time of finishing to write this post is already at 1.1.6.RELEASE.

3.2. Project directory structure

I structured the project in the following way:

Project directory structure

1
└── src └── main └── java └── org └── podcastpedia └── batch └── common └── jobs └── addpodcast └── notifysubscribers

Note:

  • the org.podcastpedia.batch.jobs package contains sub-packages having specific classes to particular jobs.
  •  the org.podcastpedia.batch.jobs.common package contains classes used by all the jobs, like for example the JPA entities that both the current jobs require.

4. Create a batch Job configuration

I will start by presenting the Java configuration class for the first batch job:

Batch Job configuration

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package org.podcastpedia.batch.jobs.addpodcast;
 
import org.podcastpedia.batch.common.configuration.DatabaseAccessConfiguration;
import org.podcastpedia.batch.common.listeners.LogProcessListener;
import org.podcastpedia.batch.common.listeners.ProtocolListener;
import org.podcastpedia.batch.jobs.addpodcast.model.SuggestedPodcast;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.io.ClassPathResource;
 
import com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException;
 
@Configuration
@EnableBatchProcessing
@Import({DatabaseAccessConfiguration.class, ServicesConfiguration.class})
public class AddPodcastJobConfiguration {
 
    @Autowired
    private JobBuilderFactory jobs;
  
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
     
    // tag::jobstep[]
    @Bean
    public Job addNewPodcastJob(){
        return jobs.get("addNewPodcastJob")
                .listener(protocolListener())
                .start(step())
                .build();
    }  
     
    @Bean
    public Step step(){
        return stepBuilderFactory.get("step")
                .<SuggestedPodcast,SuggestedPodcast>chunk(1) //important to be one in this case to commit after every line read
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .listener(logProcessListener())
                .faultTolerant()
                .skipLimit(10) //default is set to 0
                .skip(MySQLIntegrityConstraintViolationException.class)
                .build();
    }  
    // end::jobstep[]
     
    // tag::readerwriterprocessor[]
    @Bean
    public ItemReader<SuggestedPodcast> reader(){
        FlatFileItemReader<SuggestedPodcast> reader = new FlatFileItemReader<SuggestedPodcast>();
        reader.setLinesToSkip(1);//first line is title definition
        reader.setResource(new ClassPathResource("suggested-podcasts.txt"));
        reader.setLineMapper(lineMapper());
        return reader;
    }
 
    @Bean
    public LineMapper<SuggestedPodcast> lineMapper() {
        DefaultLineMapper<SuggestedPodcast> lineMapper = new DefaultLineMapper<SuggestedPodcast>();
         
        DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
        lineTokenizer.setDelimiter(";");
        lineTokenizer.setStrict(false);
        lineTokenizer.setNames(new String[]{"FEED_URL", "IDENTIFIER_ON_PODCASTPEDIA", "CATEGORIES", "LANGUAGE", "MEDIA_TYPE", "UPDATE_FREQUENCY", "KEYWORDS", "FB_PAGE", "TWITTER_PAGE", "GPLUS_PAGE", "NAME_SUBMITTER", "EMAIL_SUBMITTER"});
         
        BeanWrapperFieldSetMapper<SuggestedPodcast> fieldSetMapper = new BeanWrapperFieldSetMapper<SuggestedPodcast>();
        fieldSetMapper.setTargetType(SuggestedPodcast.class);
         
        lineMapper.setLineTokenizer(lineTokenizer);
        lineMapper.setFieldSetMapper(suggestedPodcastFieldSetMapper());
         
        return lineMapper;
    }
 
    @Bean
    public SuggestedPodcastFieldSetMapper suggestedPodcastFieldSetMapper() {
        return new SuggestedPodcastFieldSetMapper();
    }
 
    /** configure the processor related stuff */
    @Bean
    public ItemProcessor<SuggestedPodcast, SuggestedPodcast> processor() {
        return new SuggestedPodcastItemProcessor();
    }
     
    @Bean
    public ItemWriter<SuggestedPodcast> writer() {
        return new Writer();
    }
    // end::readerwriterprocessor[]
     
    @Bean
    public ProtocolListener protocolListener(){
        return new ProtocolListener();
    }
  
    @Bean
    public LogProcessListener logProcessListener(){
        return new LogProcessListener();
    }   
 
}

The @EnableBatchProcessing annotation adds many critical beans that support jobs and saves us configuration work. For example you will also be able to @Autowired some useful stuff into your context:

  • a JobRepository (bean name “jobRepository”)
  • a JobLauncher (bean name “jobLauncher”)
  • a JobRegistry (bean name “jobRegistry”)
  • a PlatformTransactionManager (bean name “transactionManager”)
  • a JobBuilderFactory (bean name “jobBuilders”) as a convenience to prevent you from having to inject the job repository into every job, as in the examples above
  • a StepBuilderFactory (bean name “stepBuilders”) as a convenience to prevent you from having to inject the job repository and transaction manager into every step

The first part focuses on the actual job configuration:

Batch Job and Step configuration

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
@Bean
public Job addNewPodcastJob(){
    return jobs.get("addNewPodcastJob")
            .listener(protocolListener())
            .start(step())
            .build();
}  
 
@Bean
public Step step(){
    return stepBuilderFactory.get("step")
            .<SuggestedPodcast,SuggestedPodcast>chunk(1) //important to be one in this case to commit after every line read
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .listener(logProcessListener())
            .faultTolerant()
            .skipLimit(10) //default is set to 0
            .skip(MySQLIntegrityConstraintViolationException.class)
            .build();
}

The first method defines a job and the second one defines a single step. As you’ve read in The Domain Language of Batch,  jobs are built from steps, where each step can involve a reader, a processor, and a writer.

In the step definition, you define how much data to write at a time (in our case 1 record at a time). Next you specify the reader, processor and writer.

5. Spring Batch processing units

Most of the batch processing can be described as reading data, doing some transformation on it and then writing the result out. This mirrors somehow the Extract, Transform, Load (ETL) process, in case you know more about that. Spring Batch provides three key interfaces to help perform bulk reading and writing: ItemReaderItemProcessor and ItemWriter.

5.1. Readers

ItemReader is an abstraction providing the mean to retrieve data from many different types of input: flat files, xml files, database, jms etc., one item at a time. See the Appendix A. List of ItemReaders and ItemWriters for a complete list of available item readers.

In the Podcastpedia batch jobs I use the following specialized ItemReaders:

5.1.1. FlatFileItemReader

which, as the name implies, reads lines of data from a flat file that typically describe records with fields of data defined by fixed positions in the file or delimited by some special character (e.g. Comma). This type of ItemReader is being used in the first batch job, addNewPodcastJob. The input file used is named suggested-podcasts.in, resides in the classpath (src/main/resources) and looks something like the following:

Input file for FlatFileItemReader

1
2
3
FEED_URL; IDENTIFIER_ON_PODCASTPEDIA; CATEGORIES; LANGUAGE; MEDIA_TYPE; UPDATE_FREQUENCY; KEYWORDS; FB_PAGE; TWITTER_PAGE; GPLUS_PAGE; NAME_SUBMITTER; EMAIL_SUBMITTER
http://www.5minutebiographies.com/feed/; 5minutebiographies; people_society, history; en; Audio; WEEKLY; biography, biographies, short biography, short biographies, 5 minute biographies, five minute biographies, 5 minute biography, five minute biography; https://www.facebook.com/5minutebiographies; https://twitter.com/5MinuteBios; ; Adrian Matei; adrianmatei@gmail.com
http://notanotherpodcast.libsyn.com/rss; NotAnotherPodcast; entertainment; en; Audio; WEEKLY; Comedy, Sports, Cinema, Movies, Pop Culture, Food, Games; https://www.facebook.com/notanotherpodcastusa; https://twitter.com/NAPodcastUSA; https://plus.google.com/u/0/103089891373760354121/posts; Adrian Matei; adrianmatei@gmail.com

As you can see the first line defines the names of the “columns”, and the following lines contain the actual data (delimited by “;”), that needs translating to domain objects relevant in the context.

Let’s see now how to configure the FlatFileItemReader:

FlatFileItemReader example

1
2
3
4
5
6
7
8
@Bean
public ItemReader<SuggestedPodcast> reader(){
    FlatFileItemReader<SuggestedPodcast> reader = new FlatFileItemReader<SuggestedPodcast>();
    reader.setLinesToSkip(1);//first line is title definition
    reader.setResource(new ClassPathResource("suggested-podcasts.in"));
    reader.setLineMapper(lineMapper());
    return reader;
}

You can specify, among other things, the input resource, the number of lines to skip, and a line mapper.

5.1.1.1. LineMapper

The LineMapper is an interface for mapping lines (strings) to domain objects, typically used to map lines read from a file to domain objects on a per line basis.  For the Podcastpedia job I used the DefaultLineMapper, which is two-phase implementation consisting of tokenization of the line into a FieldSet followed by mapping to item:

LineMapper default implementation example

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
@Bean
public LineMapper<SuggestedPodcast> lineMapper() {
    DefaultLineMapper<SuggestedPodcast> lineMapper = new DefaultLineMapper<SuggestedPodcast>();
     
    DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
    lineTokenizer.setDelimiter(";");
    lineTokenizer.setStrict(false);
    lineTokenizer.setNames(new String[]{"FEED_URL", "IDENTIFIER_ON_PODCASTPEDIA", "CATEGORIES", "LANGUAGE", "MEDIA_TYPE", "UPDATE_FREQUENCY", "KEYWORDS", "FB_PAGE", "TWITTER_PAGE", "GPLUS_PAGE", "NAME_SUBMITTER", "EMAIL_SUBMITTER"});
     
    BeanWrapperFieldSetMapper<SuggestedPodcast> fieldSetMapper = new BeanWrapperFieldSetMapper<SuggestedPodcast>();
    fieldSetMapper.setTargetType(SuggestedPodcast.class);
     
    lineMapper.setLineTokenizer(lineTokenizer);
    lineMapper.setFieldSetMapper(suggestedPodcastFieldSetMapper());
     
    return lineMapper;
}
  • the DelimitedLineTokenizer  splits the input String via the “;” delimiter.
  • if you set the strict flag to false then lines with less tokens will be tolerated and padded with empty columns, and lines with more tokens will simply be truncated.
  • the columns names from the first line are set lineTokenizer.setNames(...);
  • and the fieldMapper is set (line 14)

Note:
The FieldSet is an “interface used by flat file input sources to encapsulate concerns of converting an array of Strings to Java native types. A bit like the role played by ResultSet in JDBC, clients will know the name or position of strongly typed fields that they want to extract.“

5.1.1.2. FieldSetMapper

The FieldSetMapper is an interface that is used to map data obtained from a FieldSet into an object. Here’s my implementation which maps the fieldSet to the SuggestedPodcast domain object that will be further passed to the processor:

FieldSetMapper implementation

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class SuggestedPodcastFieldSetMapper implements FieldSetMapper<SuggestedPodcast> {
 
    @Override
    public SuggestedPodcast mapFieldSet(FieldSet fieldSet) throws BindException {
         
        SuggestedPodcast suggestedPodcast = new SuggestedPodcast();
         
        suggestedPodcast.setCategories(fieldSet.readString("CATEGORIES"));
        suggestedPodcast.setEmail(fieldSet.readString("EMAIL_SUBMITTER"));
        suggestedPodcast.setName(fieldSet.readString("NAME_SUBMITTER"));
        suggestedPodcast.setTags(fieldSet.readString("KEYWORDS"));
         
        //some of the attributes we can map directly into the Podcast entity that we'll insert later into the database
        Podcast podcast = new Podcast();
        podcast.setUrl(fieldSet.readString("FEED_URL"));
        podcast.setIdentifier(fieldSet.readString("IDENTIFIER_ON_PODCASTPEDIA"));
        podcast.setLanguageCode(LanguageCode.valueOf(fieldSet.readString("LANGUAGE")));
        podcast.setMediaType(MediaType.valueOf(fieldSet.readString("MEDIA_TYPE")));
        podcast.setUpdateFrequency(UpdateFrequency.valueOf(fieldSet.readString("UPDATE_FREQUENCY")));
        podcast.setFbPage(fieldSet.readString("FB_PAGE"));
        podcast.setTwitterPage(fieldSet.readString("TWITTER_PAGE"));
        podcast.setGplusPage(fieldSet.readString("GPLUS_PAGE"));
         
        suggestedPodcast.setPodcast(podcast);
 
        return suggestedPodcast;
    }
     
}

5.2. JdbcCursorItemReader

In the second job, notifyEmailSubscribersJob, in the reader, I only read email subscribers from a single database table, but further in the processor a more detailed read(via JPA) is executed to retrieve all the new episodes of the podcasts the user subscribed to. This is a common pattern employed in the batch world. Follow this link for more Common Batch Patterns.

For the initial read, I chose the JdbcCursorItemReader, which is a simple reader implementation that opens a JDBC cursor and continually retrieves the next row in the ResultSet:

JdbcCursorItemReader example

01
02
03
04
05
06
07
08
09
10
11
12
@Bean
public ItemReader<User> notifySubscribersReader(){
     
    JdbcCursorItemReader<User> reader = new JdbcCursorItemReader<User>();
    String sql = "select * from users where is_email_subscriber is not null";
     
    reader.setSql(sql);
    reader.setDataSource(dataSource);
    reader.setRowMapper(rowMapper());      
 
    return reader;
}

Note I had to set the sql, the datasource to read from and a RowMapper.

5.2.1. RowMapper

The RowMapper is an interface used by JdbcTemplate for mapping rows of a Result’set on a per-row basis. My implementation of this interface, , performs the actual work of mapping each row to a result object, but I don’t need to worry about exception handling:

RowMapper implementation

01
02
03
04
05
06
07
08
09
10
11
public class UserRowMapper implements RowMapper<User> {
 
    @Override
    public User mapRow(ResultSet rs, int rowNum) throws SQLException {
        User user = new User();
        user.setEmail(rs.getString("email"));
         
        return user;
    }
 
}

 5.2. Writers

ItemWriter is an abstraction that represents the output of a Step, one batch or chunk of items at a time. Generally, an item writer has no knowledge of the input it will receive next, only the item that was passed in its current invocation.

The writers for the two jobs presented are quite simple. They just use external services to send email notifications and post tweets on Podcastpedia’s account. Here is the implementation of the ItemWriter for the first job – addNewPodcast:

Writer implementation of ItemWriter

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package org.podcastpedia.batch.jobs.addpodcast;
 
import java.util.Date;
import java.util.List;
 
import javax.inject.Inject;
import javax.persistence.EntityManager;
 
import org.podcastpedia.batch.common.entities.Podcast;
import org.podcastpedia.batch.jobs.addpodcast.model.SuggestedPodcast;
import org.podcastpedia.batch.jobs.addpodcast.service.EmailNotificationService;
import org.podcastpedia.batch.jobs.addpodcast.service.SocialMediaService;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
 
public class Writer implements ItemWriter<SuggestedPodcast>{
 
    @Autowired
    private EntityManager entityManager;
     
    @Inject
    private EmailNotificationService emailNotificationService;
     
    @Inject
    private SocialMediaService socialMediaService;
     
    @Override
    public void write(List<? extends SuggestedPodcast> items) throws Exception {
 
        if(items.get(0) != null){
            SuggestedPodcast suggestedPodcast = items.get(0);
             
            //first insert the data in the database
            Podcast podcast = suggestedPodcast.getPodcast();
             
            podcast.setInsertionDate(new Date());
            entityManager.persist(podcast);
            entityManager.flush();
             
            //notify submitter about the insertion and post a twitt about it
            String url = buildUrlOnPodcastpedia(podcast);
             
            emailNotificationService.sendPodcastAdditionConfirmation(
                    suggestedPodcast.getName(), suggestedPodcast.getEmail(),
                    url);
            if(podcast.getTwitterPage() != null){
                socialMediaService.postOnTwitterAboutNewPodcast(podcast,
                url);              
            }                  
        }
 
    }
 
    private String buildUrlOnPodcastpedia(Podcast podcast) {
        StringBuffer urlOnPodcastpedia = new StringBuffer(
                "http://www.podcastpedia.org");
        if (podcast.getIdentifier() != null) {
            urlOnPodcastpedia.append("/" + podcast.getIdentifier());
        } else {
            urlOnPodcastpedia.append("/podcasts/");
            urlOnPodcastpedia.append(String.valueOf(podcast.getPodcastId()));
            urlOnPodcastpedia.append("/" + podcast.getTitleInUrl());
        }      
        String url = urlOnPodcastpedia.toString();
        return url;
    }
 
}

As you can see there’s nothing special here, except that the write method has to be overriden and this is where the injected external services EmailNotificationService and SocialMediaService are used to inform via email the podcast submitter about the addition to the podcast directory, and if a Twitter page was submitted a tweet will be posted on the Podcastpedia’s wall. You can find detailed explanation on how to send email via Velocity and how to post on Twitter from Java in the following posts:

 5.3. Processors

ItemProcessor is an abstraction that represents the business processing of an item. While the ItemReader reads one item, and the ItemWriter writes them, the ItemProcessor provides access to transform or apply other business processing. When using your own Processors you have to implement the ItemProcessor<I,O> interface, with its only method O process(I item) throws Exception, returning a potentially modified or a new item for continued processing. If the returned result is null, it is assumed that processing of the item should not continue.

While the processor of the first job requires a little bit of more logic, because I have to set the etag and last-modified header attributes, the feed attributes, episodes, categories and keywords of the podcast:

ItemProcessor implementation for the job addNewPodcast

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class SuggestedPodcastItemProcessor implements ItemProcessor<SuggestedPodcast, SuggestedPodcast> {
 
    private static final int TIMEOUT = 10;
 
    @Autowired
    ReadDao readDao;
     
    @Autowired
    PodcastAndEpisodeAttributesService podcastAndEpisodeAttributesService;
     
    @Autowired
    private PoolingHttpClientConnectionManager poolingHttpClientConnectionManager; 
     
    @Autowired
    private SyndFeedService syndFeedService;
 
    /**
     * Method used to build the categories, tags and episodes of the podcast
     */
    @Override
    public SuggestedPodcast process(SuggestedPodcast item) throws Exception {
         
        if(isPodcastAlreadyInTheDirectory(item.getPodcast().getUrl())) {
            return null;
        }
         
        String[] categories = item.getCategories().trim().split("\\s*,\\s*");      
 
        item.getPodcast().setAvailability(org.apache.http.HttpStatus.SC_OK);
         
        //set etag and last modified attributes for the podcast
        setHeaderFieldAttributes(item.getPodcast());
         
        //set the other attributes of the podcast from the feed
        podcastAndEpisodeAttributesService.setPodcastFeedAttributes(item.getPodcast());
                 
        //set the categories
        List<Category> categoriesByNames = readDao.findCategoriesByNames(categories);
        item.getPodcast().setCategories(categoriesByNames);
         
        //set the tags
        setTagsForPodcast(item);
         
        //build the episodes
        setEpisodesForPodcast(item.getPodcast());
         
        return item;
    }
    ......
}

the processor from the second job uses the ‘Driving Query’ approach, where I expand the data retrieved from the Reader with another “JPA-read” and I group the items on podcasts with episodes so that it looks nice in the emails that I am sending out to subscribers:

ItemProcessor implementation of the second job – notifySubscribers

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Scope("step")
public class NotifySubscribersItemProcessor implements ItemProcessor<User, User> {
 
    @Autowired
    EntityManager em;
     
    @Value("#{jobParameters[updateFrequency]}")
    String updateFrequency;
     
    @Override
    public User process(User item) throws Exception {
                 
        String sqlInnerJoinEpisodes = "select e from User u JOIN u.podcasts p JOIN p.episodes e WHERE u.email=?1 AND p.updateFrequency=?2 AND"
                + " e.isNew IS NOT NULL  AND e.availability=200 ORDER BY e.podcast.podcastId ASC, e.publicationDate ASC";
        TypedQuery<Episode> queryInnerJoinepisodes = em.createQuery(sqlInnerJoinEpisodes, Episode.class);
        queryInnerJoinepisodes.setParameter(1, item.getEmail());
        queryInnerJoinepisodes.setParameter(2, UpdateFrequency.valueOf(updateFrequency));      
                 
        List<Episode> newEpisodes = queryInnerJoinepisodes.getResultList();
         
        return regroupPodcastsWithEpisodes(item, newEpisodes);
                 
    }
    .......
}

Note:
If you’d like to find out more how to use the Apache Http Client, to get the etag and last-modified headers, you can have a look at my post – How to use the new Apache Http Client to make a HEAD request

6. Execute the batch application

Batch processing can be embedded in web applications and WAR files, but I chose in the beginning the simpler approach that creates a standalone application, that can be started by the Java main() method:

Batch processing Java main() method

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package org.podcastpedia.batch;
//imports ...;
 
@ComponentScan
@EnableAutoConfiguration
public class Application {
 
    private static final String NEW_EPISODES_NOTIFICATION_JOB = "newEpisodesNotificationJob";
    private static final String ADD_NEW_PODCAST_JOB = "addNewPodcastJob";
 
    public static void main(String[] args) throws BeansException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException, InterruptedException {
         
        Log log = LogFactory.getLog(Application.class);
                 
        SpringApplication app = new SpringApplication(Application.class);
        app.setWebEnvironment(false);
        ConfigurableApplicationContext ctx= app.run(args);
        JobLauncher jobLauncher = ctx.getBean(JobLauncher.class);
                         
        if(ADD_NEW_PODCAST_JOB.equals(args[0])){
            //addNewPodcastJob
            Job addNewPodcastJob = ctx.getBean(ADD_NEW_PODCAST_JOB, Job.class);
            JobParameters jobParameters = new JobParametersBuilder()
            .addDate("date", new Date())
            .toJobParameters(); 
             
            JobExecution jobExecution = jobLauncher.run(addNewPodcastJob, jobParameters);
             
            BatchStatus batchStatus = jobExecution.getStatus();
            while(batchStatus.isRunning()){
                log.info("*********** Still running.... **************");
                Thread.sleep(1000);
            }
            log.info(String.format("*********** Exit status: %s", jobExecution.getExitStatus().getExitCode()));
            JobInstance jobInstance = jobExecution.getJobInstance();
            log.info(String.format("********* Name of the job %s", jobInstance.getJobName()));
             
            log.info(String.format("*********** job instance Id: %d", jobInstance.getId()));
             
            System.exit(0);
             
        } else if(NEW_EPISODES_NOTIFICATION_JOB.equals(args[0])){
            JobParameters jobParameters = new JobParametersBuilder()
            .addDate("date", new Date())
            .addString("updateFrequency", args[1])
            .toJobParameters(); 
             
            jobLauncher.run(ctx.getBean(NEW_EPISODES_NOTIFICATION_JOB,  Job.class), jobParameters);  
        } else {
            throw new IllegalArgumentException("Please provide a valid Job name as first application parameter");
        }
      
        System.exit(0);
    }
     
}

The best explanation for  SpringApplication-, @ComponentScan– and @EnableAutoConfiguration-magic you get from the source – Getting Started – Creating a Batch Service:

“The main() method defers to the SpringApplication helper class, providing Application.class as an argument to its run() method. This tells Spring to read the annotation metadata from Application and to manage it as a component in the Spring application context.

The @ComponentScan annotation tells Spring to search recursively through the org.podcastpedia.batch package and its children for classes marked directly or indirectly with Spring’s @Component annotation. This directive ensures that Spring finds and registers BatchConfiguration, because it is marked with @Configuration, which in turn is a kind of @Component annotation.

The @EnableAutoConfiguration annotation switches on reasonable default behaviors based on the content of your classpath. For example, it looks for any class that implements the CommandLineRunner interface and invokes its run() method.”

Execution construction steps:

  • the JobLauncher, which is a simple interface for controlling jobs,  is retrieved from the ApplicationContext. Remember this is automatically made available via the @EnableBatchProcessing annotation.
  • now based on the first parameter of the application (args[0]), I will retrieve the corresponding Job from the ApplicationContext
  • then the JobParameters are prepared, where I use the current date – .addDate("date", new Date()), so that the job executions are always unique.
  • once everything is in place, the job can be executed: JobExecution jobExecution = jobLauncher.run(addNewPodcastJob, jobParameters);
  • you can use the returned jobExecution to gain access to BatchStatus, exit code, or job name and id.

Note: I highly recommend you read and understand the Meta-Data Schema for Spring Batch. It will also help you better understand the Spring Batch Domain objects.

6.1. Running the application on dev and prod environments

To be able to run the Spring Batch / Spring Boot application on different environments I make use of the Spring Profiles capability. By default the application runs with development data (database). But if I want the job to use the production database I have to do the following:

  • provide the following environment argument  -Dspring.profiles.active=prod
  • have the production database properties configured in the application-prod.properties file in the classpath, right besides the default application.properties file

Summary

In this tutorial we’ve learned how to configure a Spring Batch project with Spring Boot and Java configuration, how to use some of the most common readers in batch processing, how to configure some simple jobs, and how to start Spring Batch jobs from a main method.

Do you want to know how to develop your skillset to become a Java Rockstar?
Subscribe to our newsletter to start Rocking right now!
To get you started we give you our best selling eBooks for FREE!
1. JPA Mini Book
2. JVM Troubleshooting Guide
3. JUnit Tutorial for Unit Testing
4. Java Annotations Tutorial
5. Java Interview Questions
6. Spring Interview Questions
7. Android UI Design
and many more ....
I agree to the Terms and Privacy Policy

Adrian Matei

Adrian Matei (ama [AT] codingpedia DOT org) is the founder of Podcastpedia.org and Codingpedia.org, computer science engineer, husband, father, curious and passionate about science, computers, software, education, economics, social equity, philosophy.
Subscribe
Notify of
guest


This site uses Akismet to reduce spam. Learn how your comment data is processed.

1 Comment
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Fabio
Fabio
10 years ago

Great article!

I’m implementing a batch but still having problems with my “DatabaseAccessConfiguration.class” could you please add a sample how it is configured?!

I’m getting this “Caused by: org.springframework.beans.factory.BeanCurrentlyInCreationException: Error creating bean with name ‘dataSourceTransactionManager’: Requested bean is currently in creation: Is there an unresolvable circular reference?” even not using it yet. Think Spring is trying to use my datasource to update BATCH_JOB_INSTANCE table and alike, while it is still been created.

Back to top button