DB Connection Leak in Item Reader when using AsyncTaskExecutor

In the spring batch job that we have developed we are using the AsyncTaskExecutor, to allow for parallel processing at a step level. We have the concurrency limit set as 5. We use the JDBCCursorItemReader to pick-up the input records that we need to process. The JDBCCursorItemReader is configured to pick up connections for a connection pool. On job execution we have noticed that the job aborts due to the exhaustion of connections in the pool, leading us to believe that the JDBCCursorItemReader is not releasing the connections back to the connection pool. Any suggestion?

Below is my batch xml

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/batch"
    xmlns:beans="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">

    <beans:import resource="../launch-context.xml" />

    <beans:bean id="wsStudentItemReader"
        class="org.springframework.batch.item.database.JdbcCursorItemReader"    
        scope="step">
        <beans:property name="dataSource" ref="rptDS" />
        <beans:property name="sql"
            value="SELECT * FROM STUDENTS WHERE BATCH_ID=?" />
        <beans:property name="preparedStatementSetter">
            <beans:bean class="com.test.BatchDtSetter"
                autowire="byName">
                <beans:property name="batchId" value="#{jobParameters[batchId]}" /> 
            </beans:bean>
        </beans:property>
        <beans:property name="rowMapper" ref="wsRowMapper" />
    </beans:bean>


 <beans:bean id="outputWriter"
        class="org.springframework.batch.item.support.ClassifierCompositeItemWriter">
        <beans:property name="classifier" ref="writerClassifier" >      
        </beans:property>       
 </beans:bean>  

<beans:bean id="writerClassifier"
    class="com.test.WriterClassifier">
    <beans:property name="codeFailWriter" ref="failJdbcBatchItemWriter" />
    <beans:property name="codePassWriter" ref="passJdbcBatchItemWriter"></beans:property>
</beans:bean>

    <beans:bean id="failJdbcBatchItemWriter"
        class="org.springframework.batch.item.database.JdbcBatchItemWriter">
        <beans:property name="dataSource" ref="rptDS" />
        <beans:property name="sql"
            value="DELETE FROM STUDENTS WHERE BATCH_ID=?" />
        <beans:property name="itemPreparedStatementSetter" ref="FailStatusSetter" />
    </beans:bean>

<beans:bean id="FailStatusSetter" class="com.test.FailStatusSetter" />

    <beans:bean id="passJdbcBatchItemWriter"
        class="com.test.PassBatchItemWriter">
    </beans:bean>

    <beans:bean id="WSListnr"
        class="com.test.WSBatchListnr">
        <beans:property name="dataSource" ref="rptDS" />
    </beans:bean>


    <beans:bean id="wsRowMapper" class="com.test.WSReqMapper" />
    <beans:bean id="wsReqPrcsr" 
         class="com.test.WSReqProc">
         <beans:property name="dataSource" ref="rptDS" />
    </beans:bean>
    <beans:bean id="wsReqPrepStmtSetter" class="com.test.wsStudentSetter" />

    <step id="initiateStep">
        <tasklet ref="initiateStepTask" />
    </step>
    <step id="wsStudentGenStep">
        <tasklet task-executor="taskExecutor">
            <chunk reader="wsStudentItemReader" processor="wsReqPrcsr"
                writer="outputWriter" commit-interval="4" skip-limit="20">
                <skippable-exception-classes>
                    <include class="java.lang.Exception" />
                </skippable-exception-classes>
            </chunk>
            <listeners> 
                <listener ref="WSListnr" />
            </listeners>            
        </tasklet>
    </step>
    <job id="wsStudent">
        <step id="wsStudentFileGenIntialStep" parent="initiateStep"
            next="wsStudentFileGenStep" />
        <step id="wsStudentFileGenStep" parent="wsStudentGenStep" />
    </job>


    <beans:bean id="initiateStepTask" class="com.test.Initializer"
        scope="step">
    </beans:bean>

    <beans:bean id="taskExecutor"
       class="org.springframework.core.task.SimpleAsyncTaskExecutor">
       <beans:property name="concurrencyLimit" value="2"/>
   </beans:bean>

</beans:beans>

Config.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:beans="http://www.springframework.org/schema/beans"
    xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"
    xsi:schemaLocation="
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
        http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd">

    <context:component-scan base-package="com.test" />
    <context:property-placeholder location="classpath:batch.properties" />

        <beans:bean id="websiteDS"
    class="org.springframework.jndi.JndiObjectFactoryBean">
    <beans:property name="jndiName" value="java:jboss/datasources/WebSiteDS" />
    <beans:property name="lookupOnStartup" value="false" />
    <beans:property name="cache" value="true" /> 
    <beans:property name="proxyInterface" value="javax.sql.DataSource" />
</beans:bean>

<beans:bean id="rptDS"
    class="org.springframework.jndi.JndiObjectFactoryBean">
    <beans:property name="jndiName" value="java:jboss/datasources/ReportingDS" />
    <beans:property name="lookupOnStartup" value="false" />
    <beans:property name="cache" value="true" />
    <beans:property name="proxyInterface" value="javax.sql.DataSource" />
</beans:bean>


    <beans:bean id="transactionManager"
        class="org.springframework.jdbc.datasource.DataSourceTransactionManager"
        lazy-init="true">
        <beans:property name="dataSource" ref="rptDS" />
    </beans:bean>


    <beans:bean id="jobRepository"
        class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"
        p:databaseType="db2" p:transactionManager-ref="transactionManager"
        p:dataSource-ref="batchDS" p:isolation-level-for-create="ISOLATION_REPEATABLE_READ" />


    <beans:bean id="jobLauncher" 
        class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <beans:property name="jobRepository" ref="jobRepository" /> 
    </beans:bean>

    <beans:bean id="jobOperator"
        class="org.springframework.batch.core.launch.support.SimpleJobOperator"
        p:jobLauncher-ref="jobLauncher" p:jobExplorer-ref="jobExplorer"
        p:jobRepository-ref="jobRepository" p:jobRegistry-ref="jobRegistry" />

    <beans:bean id="jobExplorer"
        class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean"
        p:dataSource-ref="batchDS" />

    <beans:bean id="jobRegistry"
        class="org.springframework.batch.core.configuration.support.MapJobRegistry" />
    <beans:bean
        class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">
        <beans:property name="jobRegistry" ref="jobRegistry" />
    </beans:bean>


    <beans:bean id="completionPolicy" class="org.springframework.batch.repeat.policy.DefaultResultCompletionPolicy"/> 


</beans:beans>

Also refer, SimpleAsyncTaskExecutor trying to read record after completion of processing, as both the issues might be related

Answers


The issue is resolved by using JdbcPagingItemReader.

Also refer: SimpleAsyncTaskExecutor trying to read record after completion of processing

Thanks @MichaelPralow


Need Your Help

How to - dynamically - change link text color in jQuery Mobile?

jquery html css jquery-mobile

jQuery Mobile styles links according to the theme stylesheet. You can only change this if you use !important, thus:

Int value in MousePressed in one JPanel not updating when the Int changes

java swing menubar actionevent mousepress

I'm coding an application to print bricks in the grids, and I have set it up so that ideally, to change the colour, the menu selection changes an int which sets the colour in the bricks class.

About UNIX Resources Network

Original, collect and organize Developers related documents, information and materials, contains jQuery, Html, CSS, MySQL, .NET, ASP.NET, SQL, objective-c, iPhone, Ruby on Rails, C, SQL Server, Ruby, Arrays, Regex, ASP.NET MVC, WPF, XML, Ajax, DataBase, and so on.