001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.commons.dbutils;
018    
019    import java.sql.Connection;
020    import java.sql.PreparedStatement;
021    import java.sql.ResultSet;
022    import java.sql.SQLException;
023    import java.util.concurrent.Callable;
024    import java.util.concurrent.ExecutorService;
025    import java.util.concurrent.Future;
026    
027    import javax.sql.DataSource;
028    
029    /**
030     * Executes SQL queries with pluggable strategies for handling
031     * <code>ResultSet</code>s.  This class is thread safe.
032     *
033     * @see ResultSetHandler
034     * @since 1.4
035     */
036    public class AsyncQueryRunner extends AbstractQueryRunner {
037    
038        private final ExecutorService executorService;
039    
040        /**
041         * Constructor for AsyncQueryRunner.
042         *
043         * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
044         */
045        public AsyncQueryRunner(ExecutorService executorService) {
046            this(null, false, executorService);
047        }
048    
049        /**
050         * Constructor for AsyncQueryRunner, allows workaround for Oracle drivers
051         * @param pmdKnownBroken Oracle drivers don't support {@link java.sql.ParameterMetaData#getParameterType(int) };
052         * if <code>pmdKnownBroken</code> is set to true, we won't even try it; if false, we'll try it,
053         * and if it breaks, we'll remember not to use it again.
054         * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
055         */
056        public AsyncQueryRunner(boolean pmdKnownBroken, ExecutorService executorService) {
057            this(null, pmdKnownBroken, executorService);
058        }
059    
060        /**
061         * Constructor for AsyncQueryRunner which takes a <code>DataSource</code>.  Methods that do not take a
062         * <code>Connection</code> parameter will retrieve connections from this
063         * <code>DataSource</code>.
064         *
065         * @param ds The <code>DataSource</code> to retrieve connections from.
066         * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
067         */
068        public AsyncQueryRunner(DataSource ds, ExecutorService executorService) {
069            this(ds, false, executorService);
070        }
071    
072        /**
073         * Constructor for QueryRunner, allows workaround for Oracle drivers.  Methods that do not take a
074         * <code>Connection</code> parameter will retrieve connections from this
075         * <code>DataSource</code>.
076         *
077         * @param ds The <code>DataSource</code> to retrieve connections from.
078         * @param pmdKnownBroken Oracle drivers don't support {@link java.sql.ParameterMetaData#getParameterType(int) };
079         * if <code>pmdKnownBroken</code> is set to true, we won't even try it; if false, we'll try it,
080         * and if it breaks, we'll remember not to use it again.
081         * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
082         */
083        public AsyncQueryRunner(DataSource ds, boolean pmdKnownBroken, ExecutorService executorService) {
084            super(ds, pmdKnownBroken);
085            this.executorService = executorService;
086        }
087    
088        /**
089         * Class that encapsulates the continuation for batch calls.
090         */
091        protected class BatchCallableStatement implements Callable<int[]> {
092            private String sql;
093            private Object[][] params;
094            private Connection conn;
095            private boolean closeConn;
096            private PreparedStatement ps;
097    
098            /**
099             * Creates a new BatchCallableStatement instance.
100             *
101             * @param sql The SQL statement to execute.
102             * @param params An array of query replacement parameters.  Each row in
103             *        this array is one set of batch replacement values.
104             * @param conn The connection to use for the batch call.
105             * @param closeConn True if the connection should be closed, false otherwise.
106             * @param ps The {@link PreparedStatement} to be executed.
107             */
108            public BatchCallableStatement(String sql, Object[][] params, Connection conn, boolean closeConn, PreparedStatement ps) {
109                this.sql = sql;
110                this.params = params;
111                this.conn = conn;
112                this.closeConn = closeConn;
113                this.ps = ps;
114            }
115    
116            /**
117             * The actual call to executeBatch.
118             *
119             * @return an array of update counts containing one element for each command in the batch.
120             * @throws SQLException if a database access error occurs or one of the commands sent to the database fails.
121             * @see PreparedStatement#executeBatch()
122             */
123            public int[] call() throws SQLException {
124                int[] ret = null;
125    
126                try {
127                    ret = ps.executeBatch();
128                } catch (SQLException e) {
129                    rethrow(e, sql, (Object[])params);
130                } finally {
131                    close(ps);
132                    if (closeConn) {
133                        close(conn);
134                    }
135                }
136    
137                return ret;
138            }
139        }
140    
141        /**
142         * Execute a batch of SQL INSERT, UPDATE, or DELETE queries.
143         *
144         * @param conn The <code>Connection</code> to use to run the query.  The caller is
145         * responsible for closing this Connection.
146         * @param sql The SQL to execute.
147         * @param params An array of query replacement parameters.  Each row in
148         * this array is one set of batch replacement values.
149         * @return A <code>Future</code> which returns the number of rows updated per statement.
150         * @throws SQLException if a database access error occurs
151         */
152        public Future<int[]> batch(Connection conn, String sql, Object[][] params) throws SQLException {
153            return executorService.submit(this.batch(conn, false, sql, params));
154        }
155    
156        /**
157         * Execute a batch of SQL INSERT, UPDATE, or DELETE queries.  The
158         * <code>Connection</code> is retrieved from the <code>DataSource</code>
159         * set in the constructor.  This <code>Connection</code> must be in
160         * auto-commit mode or the update will not be saved.
161         *
162         * @param sql The SQL to execute.
163         * @param params An array of query replacement parameters.  Each row in
164         * this array is one set of batch replacement values.
165         * @return A <code>Future</code> which returns the number of rows updated per statement.
166         * @throws SQLException if a database access error occurs
167         */
168        public Future<int[]> batch(String sql, Object[][] params) throws SQLException {
169            Connection conn = this.prepareConnection();
170    
171            return executorService.submit(this.batch(conn, true, sql, params));
172        }
173    
174        /**
175         * Creates a continuation for a batch call, and returns it in a <code>Callable</code>.
176         * @param conn The connection to use for the batch call.
177         * @param closeConn True if the connection should be closed, false otherwise.
178         * @param sql The SQL statement to execute.
179         * @param params An array of query replacement parameters.  Each row in
180         * this array is one set of batch replacement values.
181         * @return A <code>Callable</code> which returns the number of rows updated per statement.
182         * @throws SQLException If there are database or parameter errors.
183         */
184        private Callable<int[]> batch(Connection conn, boolean closeConn, String sql, Object[][] params) throws SQLException {
185            if (conn == null) {
186                throw new SQLException("Null connection");
187            }
188    
189            if (sql == null) {
190                if (closeConn) {
191                    close(conn);
192                }
193                throw new SQLException("Null SQL statement");
194            }
195    
196            if (params == null) {
197                if (closeConn) {
198                    close(conn);
199                }
200                throw new SQLException("Null parameters. If parameters aren't need, pass an empty array.");
201            }
202    
203            PreparedStatement stmt = null;
204            Callable<int[]> ret = null;
205            try {
206                stmt = this.prepareStatement(conn, sql);
207    
208                for (int i = 0; i < params.length; i++) {
209                    this.fillStatement(stmt, params[i]);
210                    stmt.addBatch();
211                }
212    
213                ret = new BatchCallableStatement(sql, params, conn, closeConn, stmt);
214    
215            } catch (SQLException e) {
216                close(stmt);
217                close(conn);
218                this.rethrow(e, sql, (Object[])params);
219            }
220    
221            return ret;
222        }
223    
224        /**
225         * Class that encapsulates the continuation for query calls.
226         * @param <T> The type of the result from the call to handle.
227         */
228        protected class QueryCallableStatement<T> implements Callable<T> {
229            private String sql;
230            private Object[] params;
231            private Connection conn;
232            private boolean closeConn;
233            private PreparedStatement ps;
234            private ResultSetHandler<T> rsh;
235    
236            /**
237             * Creates a new {@code QueryCallableStatement} instance.
238             *
239             * @param conn The connection to use for the batch call.
240             * @param closeConn True if the connection should be closed, false otherwise.
241             * @param ps The {@link PreparedStatement} to be executed.
242             * @param rsh The handler that converts the results into an object.
243             * @param sql The SQL statement to execute.
244             * @param params An array of query replacement parameters.  Each row in
245             *        this array is one set of batch replacement values.
246             */
247            public QueryCallableStatement(Connection conn, boolean closeConn, PreparedStatement ps,
248                    ResultSetHandler<T> rsh, String sql, Object... params) {
249                this.sql = sql;
250                this.params = params;
251                this.conn = conn;
252                this.closeConn = closeConn;
253                this.ps = ps;
254                this.rsh = rsh;
255            }
256    
257            /**
258             * The actual call to {@code handle()} method.
259             *
260             * @return an array of update counts containing one element for each command in the batch.
261             * @throws SQLException if a database access error occurs.
262             * @see ResultSetHandler#handle(ResultSet)
263             */
264            public T call() throws SQLException {
265                ResultSet rs = null;
266                T ret = null;
267    
268                try {
269                    rs = wrap(ps.executeQuery());
270                    ret = rsh.handle(rs);
271                } catch (SQLException e) {
272                    rethrow(e, sql, params);
273                } finally {
274                    try {
275                        close(rs);
276                    } finally {
277                        close(ps);
278                        if (closeConn) {
279                            close(conn);
280                        }
281                    }
282                }
283    
284                return ret;
285            }
286    
287        }
288    
289        /**
290         * Creates a continuation for a query call, and returns it in a <code>Callable</code>.
291         * @param conn The connection to use for the query call.
292         * @param closeConn True if the connection should be closed, false otherwise.
293         * @param sql The SQL statement to execute.
294         * @param params An array of query replacement parameters.  Each row in
295         * this array is one set of query replacement values.
296         * @return A <code>Callable</code> which returns the result of the query call.
297         * @throws SQLException If there are database or parameter errors.
298         */
299        private <T> Callable<T> query(Connection conn, boolean closeConn, String sql, ResultSetHandler<T> rsh, Object... params)
300                throws SQLException {
301            PreparedStatement stmt = null;
302            Callable<T> ret = null;
303    
304            if (conn == null) {
305                throw new SQLException("Null connection");
306            }
307    
308            if (sql == null) {
309                if (closeConn) {
310                    close(conn);
311                }
312                throw new SQLException("Null SQL statement");
313            }
314    
315            if (rsh == null) {
316                if (closeConn) {
317                    close(conn);
318                }
319                throw new SQLException("Null ResultSetHandler");
320            }
321    
322            try {
323                stmt = this.prepareStatement(conn, sql);
324                this.fillStatement(stmt, params);
325    
326                ret = new QueryCallableStatement<T>(conn, closeConn, stmt, rsh, sql, params);
327    
328            } catch (SQLException e) {
329                close(stmt);
330                if (closeConn) {
331                    close(conn);
332                }
333                this.rethrow(e, sql, params);
334            }
335    
336            return ret;
337        }
338    
339        /**
340         * Execute an SQL SELECT query with replacement parameters.  The
341         * caller is responsible for closing the connection.
342         * @param <T> The type of object that the handler returns
343         * @param conn The connection to execute the query in.
344         * @param sql The query to execute.
345         * @param rsh The handler that converts the results into an object.
346         * @param params The replacement parameters.
347         * @return A <code>Future</code> which returns the result of the query call.
348         * @throws SQLException if a database access error occurs
349         */
350        public <T> Future<T> query(Connection conn, String sql, ResultSetHandler<T> rsh, Object... params) throws SQLException {
351            return executorService.submit(query(conn, false, sql, rsh, params));
352        }
353    
354        /**
355         * Execute an SQL SELECT query without any replacement parameters.  The
356         * caller is responsible for closing the connection.
357         * @param <T> The type of object that the handler returns
358         * @param conn The connection to execute the query in.
359         * @param sql The query to execute.
360         * @param rsh The handler that converts the results into an object.
361         * @return A <code>Future</code> which returns the result of the query call.
362         * @throws SQLException if a database access error occurs
363         */
364        public <T> Future<T> query(Connection conn, String sql, ResultSetHandler<T> rsh) throws SQLException {
365            return executorService.submit(this.query(conn, false, sql, rsh, (Object[]) null));
366        }
367    
368        /**
369         * Executes the given SELECT SQL query and returns a result object.
370         * The <code>Connection</code> is retrieved from the
371         * <code>DataSource</code> set in the constructor.
372         * @param <T> The type of object that the handler returns
373         * @param sql The SQL statement to execute.
374         * @param rsh The handler used to create the result object from
375         * the <code>ResultSet</code>.
376         * @param params Initialize the PreparedStatement's IN parameters with
377         * this array.
378         * @return A <code>Future</code> which returns the result of the query call.
379         * @throws SQLException if a database access error occurs
380         */
381        public <T> Future<T> query(String sql, ResultSetHandler<T> rsh, Object... params) throws SQLException {
382            Connection conn = this.prepareConnection();
383            return executorService.submit(this.query(conn, true, sql, rsh, params));
384        }
385    
386        /**
387         * Executes the given SELECT SQL without any replacement parameters.
388         * The <code>Connection</code> is retrieved from the
389         * <code>DataSource</code> set in the constructor.
390         * @param <T> The type of object that the handler returns
391         * @param sql The SQL statement to execute.
392         * @param rsh The handler used to create the result object from
393         * the <code>ResultSet</code>.
394         *
395         * @return A <code>Future</code> which returns the result of the query call.
396         * @throws SQLException if a database access error occurs
397         */
398        public <T> Future<T> query(String sql, ResultSetHandler<T> rsh) throws SQLException {
399            Connection conn = this.prepareConnection();
400            return executorService.submit(this.query(conn, true, sql, rsh, (Object[]) null));
401        }
402    
403        /**
404         * Class that encapsulates the continuation for update calls.
405         */
406        protected class UpdateCallableStatement implements Callable<Integer> {
407            private String sql;
408            private Object[] params;
409            private Connection conn;
410            private boolean closeConn;
411            private PreparedStatement ps;
412    
413            /**
414             *
415             *
416             * @param conn The connection to use for the batch call.
417             * @param closeConn True if the connection should be closed, false otherwise.
418             * @param ps The {@link PreparedStatement} to be executed.
419             * @param sql The SQL statement to execute.
420             * @param params An array of query replacement parameters.  Each row in
421             *        this array is one set of batch replacement values.
422             */
423            public UpdateCallableStatement(Connection conn, boolean closeConn, PreparedStatement ps, String sql, Object... params) {
424                this.sql = sql;
425                this.params = params;
426                this.conn = conn;
427                this.closeConn = closeConn;
428                this.ps = ps;
429            }
430    
431            /**
432             * The actual call to {@code executeUpdate()} method.
433             *
434             * @return either (1) the row count for SQL Data Manipulation Language (DML) statements or
435             *                (2) 0 for SQL statements that return nothing
436             * @throws SQLException if a database access error occurs.
437             * @see PreparedStatement#executeUpdate()
438             */
439            public Integer call() throws SQLException {
440                int rows = 0;
441    
442                try {
443                    rows = ps.executeUpdate();
444                } catch (SQLException e) {
445                    rethrow(e, sql, params);
446                } finally {
447                    close(ps);
448                    if (closeConn) {
449                        close(conn);
450                    }
451                }
452    
453                return Integer.valueOf(rows);
454            }
455    
456        }
457    
458        /**
459         * Creates a continuation for an update call, and returns it in a <code>Callable</code>.
460         * @param conn The connection to use for the update call.
461         * @param closeConn True if the connection should be closed, false otherwise.
462         * @param sql The SQL statement to execute.
463         * @param params An array of update replacement parameters.  Each row in
464         * this array is one set of update replacement values.
465         * @return A <code>Callable</code> which returns the number of rows updated.
466         * @throws SQLException If there are database or parameter errors.
467         */
468        private Callable<Integer> update(Connection conn, boolean closeConn, String sql, Object... params) throws SQLException {
469            PreparedStatement stmt = null;
470            Callable<Integer> ret = null;
471    
472            if (conn == null) {
473                throw new SQLException("Null connection");
474            }
475    
476            if (sql == null) {
477                if (closeConn) {
478                    close(conn);
479                }
480                throw new SQLException("Null SQL statement");
481            }
482    
483            try {
484                stmt = this.prepareStatement(conn, sql);
485                this.fillStatement(stmt, params);
486    
487                ret = new UpdateCallableStatement(conn, closeConn, stmt, sql, params);
488    
489            } catch (SQLException e) {
490                close(stmt);
491                if (closeConn) {
492                    close(conn);
493                }
494                this.rethrow(e, sql, params);
495            }
496    
497            return ret;
498        }
499    
500        /**
501         * Execute an SQL INSERT, UPDATE, or DELETE query without replacement
502         * parameters.
503         *
504         * @param conn The connection to use to run the query.
505         * @param sql The SQL to execute.
506         * @return A <code>Future</code> which returns the number of rows updated.
507         * @throws SQLException if a database access error occurs
508         */
509        public Future<Integer> update(Connection conn, String sql) throws SQLException {
510            return executorService.submit(this.update(conn, false, sql, (Object[]) null));
511        }
512    
513        /**
514         * Execute an SQL INSERT, UPDATE, or DELETE query with a single replacement
515         * parameter.
516         *
517         * @param conn The connection to use to run the query.
518         * @param sql The SQL to execute.
519         * @param param The replacement parameter.
520         * @return A <code>Future</code> which returns the number of rows updated.
521         * @throws SQLException if a database access error occurs
522         */
523        public Future<Integer> update(Connection conn, String sql, Object param) throws SQLException {
524            return executorService.submit(this.update(conn, false, sql, new Object[]{param}));
525        }
526    
527        /**
528         * Execute an SQL INSERT, UPDATE, or DELETE query.
529         *
530         * @param conn The connection to use to run the query.
531         * @param sql The SQL to execute.
532         * @param params The query replacement parameters.
533         * @return A <code>Future</code> which returns the number of rows updated.
534         * @throws SQLException if a database access error occurs
535         */
536        public Future<Integer> update(Connection conn, String sql, Object... params) throws SQLException {
537            return executorService.submit(this.update(conn, false, sql, params));
538        }
539    
540        /**
541         * Executes the given INSERT, UPDATE, or DELETE SQL statement without
542         * any replacement parameters. The <code>Connection</code> is retrieved
543         * from the <code>DataSource</code> set in the constructor.  This
544         * <code>Connection</code> must be in auto-commit mode or the update will
545         * not be saved.
546         *
547         * @param sql The SQL statement to execute.
548         * @throws SQLException if a database access error occurs
549         * @return A <code>Future</code> which returns the number of rows updated.
550         */
551        public Future<Integer> update(String sql) throws SQLException {
552            Connection conn = this.prepareConnection();
553            return executorService.submit(this.update(conn, true, sql, (Object[]) null));
554        }
555    
556        /**
557         * Executes the given INSERT, UPDATE, or DELETE SQL statement with
558         * a single replacement parameter.  The <code>Connection</code> is
559         * retrieved from the <code>DataSource</code> set in the constructor.
560         * This <code>Connection</code> must be in auto-commit mode or the
561         * update will not be saved.
562         *
563         * @param sql The SQL statement to execute.
564         * @param param The replacement parameter.
565         * @throws SQLException if a database access error occurs
566         * @return A <code>Future</code> which returns the number of rows updated.
567         */
568        public Future<Integer> update(String sql, Object param) throws SQLException {
569            Connection conn = this.prepareConnection();
570            return executorService.submit(this.update(conn, true, sql, new Object[]{param}));
571        }
572    
573        /**
574         * Executes the given INSERT, UPDATE, or DELETE SQL statement.  The
575         * <code>Connection</code> is retrieved from the <code>DataSource</code>
576         * set in the constructor.  This <code>Connection</code> must be in
577         * auto-commit mode or the update will not be saved.
578         *
579         * @param sql The SQL statement to execute.
580         * @param params Initializes the PreparedStatement's IN (i.e. '?')
581         * parameters.
582         * @throws SQLException if a database access error occurs
583         * @return A <code>Future</code> which returns the number of rows updated.
584         */
585        public Future<Integer> update(String sql, Object... params) throws SQLException {
586            Connection conn = this.prepareConnection();
587            return executorService.submit(this.update(conn, true, sql, params));
588        }
589    
590    }