001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.commons.dbutils;
018
019 import java.sql.Connection;
020 import java.sql.PreparedStatement;
021 import java.sql.ResultSet;
022 import java.sql.SQLException;
023 import java.util.concurrent.Callable;
024 import java.util.concurrent.ExecutorService;
025 import java.util.concurrent.Future;
026
027 import javax.sql.DataSource;
028
029 /**
030 * Executes SQL queries with pluggable strategies for handling
031 * <code>ResultSet</code>s. This class is thread safe.
032 *
033 * @see ResultSetHandler
034 * @since 1.4
035 */
036 public class AsyncQueryRunner extends AbstractQueryRunner {
037
038 private final ExecutorService executorService;
039
040 /**
041 * Constructor for AsyncQueryRunner.
042 *
043 * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
044 */
045 public AsyncQueryRunner(ExecutorService executorService) {
046 this(null, false, executorService);
047 }
048
049 /**
050 * Constructor for AsyncQueryRunner, allows workaround for Oracle drivers
051 * @param pmdKnownBroken Oracle drivers don't support {@link java.sql.ParameterMetaData#getParameterType(int) };
052 * if <code>pmdKnownBroken</code> is set to true, we won't even try it; if false, we'll try it,
053 * and if it breaks, we'll remember not to use it again.
054 * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
055 */
056 public AsyncQueryRunner(boolean pmdKnownBroken, ExecutorService executorService) {
057 this(null, pmdKnownBroken, executorService);
058 }
059
060 /**
061 * Constructor for AsyncQueryRunner which takes a <code>DataSource</code>. Methods that do not take a
062 * <code>Connection</code> parameter will retrieve connections from this
063 * <code>DataSource</code>.
064 *
065 * @param ds The <code>DataSource</code> to retrieve connections from.
066 * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
067 */
068 public AsyncQueryRunner(DataSource ds, ExecutorService executorService) {
069 this(ds, false, executorService);
070 }
071
072 /**
073 * Constructor for QueryRunner, allows workaround for Oracle drivers. Methods that do not take a
074 * <code>Connection</code> parameter will retrieve connections from this
075 * <code>DataSource</code>.
076 *
077 * @param ds The <code>DataSource</code> to retrieve connections from.
078 * @param pmdKnownBroken Oracle drivers don't support {@link java.sql.ParameterMetaData#getParameterType(int) };
079 * if <code>pmdKnownBroken</code> is set to true, we won't even try it; if false, we'll try it,
080 * and if it breaks, we'll remember not to use it again.
081 * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently.
082 */
083 public AsyncQueryRunner(DataSource ds, boolean pmdKnownBroken, ExecutorService executorService) {
084 super(ds, pmdKnownBroken);
085 this.executorService = executorService;
086 }
087
088 /**
089 * Class that encapsulates the continuation for batch calls.
090 */
091 protected class BatchCallableStatement implements Callable<int[]> {
092 private String sql;
093 private Object[][] params;
094 private Connection conn;
095 private boolean closeConn;
096 private PreparedStatement ps;
097
098 /**
099 * Creates a new BatchCallableStatement instance.
100 *
101 * @param sql The SQL statement to execute.
102 * @param params An array of query replacement parameters. Each row in
103 * this array is one set of batch replacement values.
104 * @param conn The connection to use for the batch call.
105 * @param closeConn True if the connection should be closed, false otherwise.
106 * @param ps The {@link PreparedStatement} to be executed.
107 */
108 public BatchCallableStatement(String sql, Object[][] params, Connection conn, boolean closeConn, PreparedStatement ps) {
109 this.sql = sql;
110 this.params = params;
111 this.conn = conn;
112 this.closeConn = closeConn;
113 this.ps = ps;
114 }
115
116 /**
117 * The actual call to executeBatch.
118 *
119 * @return an array of update counts containing one element for each command in the batch.
120 * @throws SQLException if a database access error occurs or one of the commands sent to the database fails.
121 * @see PreparedStatement#executeBatch()
122 */
123 public int[] call() throws SQLException {
124 int[] ret = null;
125
126 try {
127 ret = ps.executeBatch();
128 } catch (SQLException e) {
129 rethrow(e, sql, (Object[])params);
130 } finally {
131 close(ps);
132 if (closeConn) {
133 close(conn);
134 }
135 }
136
137 return ret;
138 }
139 }
140
141 /**
142 * Execute a batch of SQL INSERT, UPDATE, or DELETE queries.
143 *
144 * @param conn The <code>Connection</code> to use to run the query. The caller is
145 * responsible for closing this Connection.
146 * @param sql The SQL to execute.
147 * @param params An array of query replacement parameters. Each row in
148 * this array is one set of batch replacement values.
149 * @return A <code>Future</code> which returns the number of rows updated per statement.
150 * @throws SQLException if a database access error occurs
151 */
152 public Future<int[]> batch(Connection conn, String sql, Object[][] params) throws SQLException {
153 return executorService.submit(this.batch(conn, false, sql, params));
154 }
155
156 /**
157 * Execute a batch of SQL INSERT, UPDATE, or DELETE queries. The
158 * <code>Connection</code> is retrieved from the <code>DataSource</code>
159 * set in the constructor. This <code>Connection</code> must be in
160 * auto-commit mode or the update will not be saved.
161 *
162 * @param sql The SQL to execute.
163 * @param params An array of query replacement parameters. Each row in
164 * this array is one set of batch replacement values.
165 * @return A <code>Future</code> which returns the number of rows updated per statement.
166 * @throws SQLException if a database access error occurs
167 */
168 public Future<int[]> batch(String sql, Object[][] params) throws SQLException {
169 Connection conn = this.prepareConnection();
170
171 return executorService.submit(this.batch(conn, true, sql, params));
172 }
173
174 /**
175 * Creates a continuation for a batch call, and returns it in a <code>Callable</code>.
176 * @param conn The connection to use for the batch call.
177 * @param closeConn True if the connection should be closed, false otherwise.
178 * @param sql The SQL statement to execute.
179 * @param params An array of query replacement parameters. Each row in
180 * this array is one set of batch replacement values.
181 * @return A <code>Callable</code> which returns the number of rows updated per statement.
182 * @throws SQLException If there are database or parameter errors.
183 */
184 private Callable<int[]> batch(Connection conn, boolean closeConn, String sql, Object[][] params) throws SQLException {
185 if (conn == null) {
186 throw new SQLException("Null connection");
187 }
188
189 if (sql == null) {
190 if (closeConn) {
191 close(conn);
192 }
193 throw new SQLException("Null SQL statement");
194 }
195
196 if (params == null) {
197 if (closeConn) {
198 close(conn);
199 }
200 throw new SQLException("Null parameters. If parameters aren't need, pass an empty array.");
201 }
202
203 PreparedStatement stmt = null;
204 Callable<int[]> ret = null;
205 try {
206 stmt = this.prepareStatement(conn, sql);
207
208 for (int i = 0; i < params.length; i++) {
209 this.fillStatement(stmt, params[i]);
210 stmt.addBatch();
211 }
212
213 ret = new BatchCallableStatement(sql, params, conn, closeConn, stmt);
214
215 } catch (SQLException e) {
216 close(stmt);
217 close(conn);
218 this.rethrow(e, sql, (Object[])params);
219 }
220
221 return ret;
222 }
223
224 /**
225 * Class that encapsulates the continuation for query calls.
226 * @param <T> The type of the result from the call to handle.
227 */
228 protected class QueryCallableStatement<T> implements Callable<T> {
229 private String sql;
230 private Object[] params;
231 private Connection conn;
232 private boolean closeConn;
233 private PreparedStatement ps;
234 private ResultSetHandler<T> rsh;
235
236 /**
237 * Creates a new {@code QueryCallableStatement} instance.
238 *
239 * @param conn The connection to use for the batch call.
240 * @param closeConn True if the connection should be closed, false otherwise.
241 * @param ps The {@link PreparedStatement} to be executed.
242 * @param rsh The handler that converts the results into an object.
243 * @param sql The SQL statement to execute.
244 * @param params An array of query replacement parameters. Each row in
245 * this array is one set of batch replacement values.
246 */
247 public QueryCallableStatement(Connection conn, boolean closeConn, PreparedStatement ps,
248 ResultSetHandler<T> rsh, String sql, Object... params) {
249 this.sql = sql;
250 this.params = params;
251 this.conn = conn;
252 this.closeConn = closeConn;
253 this.ps = ps;
254 this.rsh = rsh;
255 }
256
257 /**
258 * The actual call to {@code handle()} method.
259 *
260 * @return an array of update counts containing one element for each command in the batch.
261 * @throws SQLException if a database access error occurs.
262 * @see ResultSetHandler#handle(ResultSet)
263 */
264 public T call() throws SQLException {
265 ResultSet rs = null;
266 T ret = null;
267
268 try {
269 rs = wrap(ps.executeQuery());
270 ret = rsh.handle(rs);
271 } catch (SQLException e) {
272 rethrow(e, sql, params);
273 } finally {
274 try {
275 close(rs);
276 } finally {
277 close(ps);
278 if (closeConn) {
279 close(conn);
280 }
281 }
282 }
283
284 return ret;
285 }
286
287 }
288
289 /**
290 * Creates a continuation for a query call, and returns it in a <code>Callable</code>.
291 * @param conn The connection to use for the query call.
292 * @param closeConn True if the connection should be closed, false otherwise.
293 * @param sql The SQL statement to execute.
294 * @param params An array of query replacement parameters. Each row in
295 * this array is one set of query replacement values.
296 * @return A <code>Callable</code> which returns the result of the query call.
297 * @throws SQLException If there are database or parameter errors.
298 */
299 private <T> Callable<T> query(Connection conn, boolean closeConn, String sql, ResultSetHandler<T> rsh, Object... params)
300 throws SQLException {
301 PreparedStatement stmt = null;
302 Callable<T> ret = null;
303
304 if (conn == null) {
305 throw new SQLException("Null connection");
306 }
307
308 if (sql == null) {
309 if (closeConn) {
310 close(conn);
311 }
312 throw new SQLException("Null SQL statement");
313 }
314
315 if (rsh == null) {
316 if (closeConn) {
317 close(conn);
318 }
319 throw new SQLException("Null ResultSetHandler");
320 }
321
322 try {
323 stmt = this.prepareStatement(conn, sql);
324 this.fillStatement(stmt, params);
325
326 ret = new QueryCallableStatement<T>(conn, closeConn, stmt, rsh, sql, params);
327
328 } catch (SQLException e) {
329 close(stmt);
330 if (closeConn) {
331 close(conn);
332 }
333 this.rethrow(e, sql, params);
334 }
335
336 return ret;
337 }
338
339 /**
340 * Execute an SQL SELECT query with replacement parameters. The
341 * caller is responsible for closing the connection.
342 * @param <T> The type of object that the handler returns
343 * @param conn The connection to execute the query in.
344 * @param sql The query to execute.
345 * @param rsh The handler that converts the results into an object.
346 * @param params The replacement parameters.
347 * @return A <code>Future</code> which returns the result of the query call.
348 * @throws SQLException if a database access error occurs
349 */
350 public <T> Future<T> query(Connection conn, String sql, ResultSetHandler<T> rsh, Object... params) throws SQLException {
351 return executorService.submit(query(conn, false, sql, rsh, params));
352 }
353
354 /**
355 * Execute an SQL SELECT query without any replacement parameters. The
356 * caller is responsible for closing the connection.
357 * @param <T> The type of object that the handler returns
358 * @param conn The connection to execute the query in.
359 * @param sql The query to execute.
360 * @param rsh The handler that converts the results into an object.
361 * @return A <code>Future</code> which returns the result of the query call.
362 * @throws SQLException if a database access error occurs
363 */
364 public <T> Future<T> query(Connection conn, String sql, ResultSetHandler<T> rsh) throws SQLException {
365 return executorService.submit(this.query(conn, false, sql, rsh, (Object[]) null));
366 }
367
368 /**
369 * Executes the given SELECT SQL query and returns a result object.
370 * The <code>Connection</code> is retrieved from the
371 * <code>DataSource</code> set in the constructor.
372 * @param <T> The type of object that the handler returns
373 * @param sql The SQL statement to execute.
374 * @param rsh The handler used to create the result object from
375 * the <code>ResultSet</code>.
376 * @param params Initialize the PreparedStatement's IN parameters with
377 * this array.
378 * @return A <code>Future</code> which returns the result of the query call.
379 * @throws SQLException if a database access error occurs
380 */
381 public <T> Future<T> query(String sql, ResultSetHandler<T> rsh, Object... params) throws SQLException {
382 Connection conn = this.prepareConnection();
383 return executorService.submit(this.query(conn, true, sql, rsh, params));
384 }
385
386 /**
387 * Executes the given SELECT SQL without any replacement parameters.
388 * The <code>Connection</code> is retrieved from the
389 * <code>DataSource</code> set in the constructor.
390 * @param <T> The type of object that the handler returns
391 * @param sql The SQL statement to execute.
392 * @param rsh The handler used to create the result object from
393 * the <code>ResultSet</code>.
394 *
395 * @return A <code>Future</code> which returns the result of the query call.
396 * @throws SQLException if a database access error occurs
397 */
398 public <T> Future<T> query(String sql, ResultSetHandler<T> rsh) throws SQLException {
399 Connection conn = this.prepareConnection();
400 return executorService.submit(this.query(conn, true, sql, rsh, (Object[]) null));
401 }
402
403 /**
404 * Class that encapsulates the continuation for update calls.
405 */
406 protected class UpdateCallableStatement implements Callable<Integer> {
407 private String sql;
408 private Object[] params;
409 private Connection conn;
410 private boolean closeConn;
411 private PreparedStatement ps;
412
413 /**
414 *
415 *
416 * @param conn The connection to use for the batch call.
417 * @param closeConn True if the connection should be closed, false otherwise.
418 * @param ps The {@link PreparedStatement} to be executed.
419 * @param sql The SQL statement to execute.
420 * @param params An array of query replacement parameters. Each row in
421 * this array is one set of batch replacement values.
422 */
423 public UpdateCallableStatement(Connection conn, boolean closeConn, PreparedStatement ps, String sql, Object... params) {
424 this.sql = sql;
425 this.params = params;
426 this.conn = conn;
427 this.closeConn = closeConn;
428 this.ps = ps;
429 }
430
431 /**
432 * The actual call to {@code executeUpdate()} method.
433 *
434 * @return either (1) the row count for SQL Data Manipulation Language (DML) statements or
435 * (2) 0 for SQL statements that return nothing
436 * @throws SQLException if a database access error occurs.
437 * @see PreparedStatement#executeUpdate()
438 */
439 public Integer call() throws SQLException {
440 int rows = 0;
441
442 try {
443 rows = ps.executeUpdate();
444 } catch (SQLException e) {
445 rethrow(e, sql, params);
446 } finally {
447 close(ps);
448 if (closeConn) {
449 close(conn);
450 }
451 }
452
453 return Integer.valueOf(rows);
454 }
455
456 }
457
458 /**
459 * Creates a continuation for an update call, and returns it in a <code>Callable</code>.
460 * @param conn The connection to use for the update call.
461 * @param closeConn True if the connection should be closed, false otherwise.
462 * @param sql The SQL statement to execute.
463 * @param params An array of update replacement parameters. Each row in
464 * this array is one set of update replacement values.
465 * @return A <code>Callable</code> which returns the number of rows updated.
466 * @throws SQLException If there are database or parameter errors.
467 */
468 private Callable<Integer> update(Connection conn, boolean closeConn, String sql, Object... params) throws SQLException {
469 PreparedStatement stmt = null;
470 Callable<Integer> ret = null;
471
472 if (conn == null) {
473 throw new SQLException("Null connection");
474 }
475
476 if (sql == null) {
477 if (closeConn) {
478 close(conn);
479 }
480 throw new SQLException("Null SQL statement");
481 }
482
483 try {
484 stmt = this.prepareStatement(conn, sql);
485 this.fillStatement(stmt, params);
486
487 ret = new UpdateCallableStatement(conn, closeConn, stmt, sql, params);
488
489 } catch (SQLException e) {
490 close(stmt);
491 if (closeConn) {
492 close(conn);
493 }
494 this.rethrow(e, sql, params);
495 }
496
497 return ret;
498 }
499
500 /**
501 * Execute an SQL INSERT, UPDATE, or DELETE query without replacement
502 * parameters.
503 *
504 * @param conn The connection to use to run the query.
505 * @param sql The SQL to execute.
506 * @return A <code>Future</code> which returns the number of rows updated.
507 * @throws SQLException if a database access error occurs
508 */
509 public Future<Integer> update(Connection conn, String sql) throws SQLException {
510 return executorService.submit(this.update(conn, false, sql, (Object[]) null));
511 }
512
513 /**
514 * Execute an SQL INSERT, UPDATE, or DELETE query with a single replacement
515 * parameter.
516 *
517 * @param conn The connection to use to run the query.
518 * @param sql The SQL to execute.
519 * @param param The replacement parameter.
520 * @return A <code>Future</code> which returns the number of rows updated.
521 * @throws SQLException if a database access error occurs
522 */
523 public Future<Integer> update(Connection conn, String sql, Object param) throws SQLException {
524 return executorService.submit(this.update(conn, false, sql, new Object[]{param}));
525 }
526
527 /**
528 * Execute an SQL INSERT, UPDATE, or DELETE query.
529 *
530 * @param conn The connection to use to run the query.
531 * @param sql The SQL to execute.
532 * @param params The query replacement parameters.
533 * @return A <code>Future</code> which returns the number of rows updated.
534 * @throws SQLException if a database access error occurs
535 */
536 public Future<Integer> update(Connection conn, String sql, Object... params) throws SQLException {
537 return executorService.submit(this.update(conn, false, sql, params));
538 }
539
540 /**
541 * Executes the given INSERT, UPDATE, or DELETE SQL statement without
542 * any replacement parameters. The <code>Connection</code> is retrieved
543 * from the <code>DataSource</code> set in the constructor. This
544 * <code>Connection</code> must be in auto-commit mode or the update will
545 * not be saved.
546 *
547 * @param sql The SQL statement to execute.
548 * @throws SQLException if a database access error occurs
549 * @return A <code>Future</code> which returns the number of rows updated.
550 */
551 public Future<Integer> update(String sql) throws SQLException {
552 Connection conn = this.prepareConnection();
553 return executorService.submit(this.update(conn, true, sql, (Object[]) null));
554 }
555
556 /**
557 * Executes the given INSERT, UPDATE, or DELETE SQL statement with
558 * a single replacement parameter. The <code>Connection</code> is
559 * retrieved from the <code>DataSource</code> set in the constructor.
560 * This <code>Connection</code> must be in auto-commit mode or the
561 * update will not be saved.
562 *
563 * @param sql The SQL statement to execute.
564 * @param param The replacement parameter.
565 * @throws SQLException if a database access error occurs
566 * @return A <code>Future</code> which returns the number of rows updated.
567 */
568 public Future<Integer> update(String sql, Object param) throws SQLException {
569 Connection conn = this.prepareConnection();
570 return executorService.submit(this.update(conn, true, sql, new Object[]{param}));
571 }
572
573 /**
574 * Executes the given INSERT, UPDATE, or DELETE SQL statement. The
575 * <code>Connection</code> is retrieved from the <code>DataSource</code>
576 * set in the constructor. This <code>Connection</code> must be in
577 * auto-commit mode or the update will not be saved.
578 *
579 * @param sql The SQL statement to execute.
580 * @param params Initializes the PreparedStatement's IN (i.e. '?')
581 * parameters.
582 * @throws SQLException if a database access error occurs
583 * @return A <code>Future</code> which returns the number of rows updated.
584 */
585 public Future<Integer> update(String sql, Object... params) throws SQLException {
586 Connection conn = this.prepareConnection();
587 return executorService.submit(this.update(conn, true, sql, params));
588 }
589
590 }