001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.commons.dbutils; 018 019 import java.sql.Connection; 020 import java.sql.PreparedStatement; 021 import java.sql.ResultSet; 022 import java.sql.SQLException; 023 import java.util.concurrent.Callable; 024 import java.util.concurrent.ExecutorService; 025 import java.util.concurrent.Future; 026 027 import javax.sql.DataSource; 028 029 /** 030 * Executes SQL queries with pluggable strategies for handling 031 * <code>ResultSet</code>s. This class is thread safe. 032 * 033 * @see ResultSetHandler 034 * @since 1.4 035 */ 036 public class AsyncQueryRunner extends AbstractQueryRunner { 037 038 private final ExecutorService executorService; 039 040 /** 041 * Constructor for AsyncQueryRunner. 042 * 043 * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently. 044 */ 045 public AsyncQueryRunner(ExecutorService executorService) { 046 this(null, false, executorService); 047 } 048 049 /** 050 * Constructor for AsyncQueryRunner, allows workaround for Oracle drivers 051 * @param pmdKnownBroken Oracle drivers don't support {@link java.sql.ParameterMetaData#getParameterType(int) }; 052 * if <code>pmdKnownBroken</code> is set to true, we won't even try it; if false, we'll try it, 053 * and if it breaks, we'll remember not to use it again. 054 * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently. 055 */ 056 public AsyncQueryRunner(boolean pmdKnownBroken, ExecutorService executorService) { 057 this(null, pmdKnownBroken, executorService); 058 } 059 060 /** 061 * Constructor for AsyncQueryRunner which takes a <code>DataSource</code>. Methods that do not take a 062 * <code>Connection</code> parameter will retrieve connections from this 063 * <code>DataSource</code>. 064 * 065 * @param ds The <code>DataSource</code> to retrieve connections from. 066 * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently. 067 */ 068 public AsyncQueryRunner(DataSource ds, ExecutorService executorService) { 069 this(ds, false, executorService); 070 } 071 072 /** 073 * Constructor for QueryRunner, allows workaround for Oracle drivers. Methods that do not take a 074 * <code>Connection</code> parameter will retrieve connections from this 075 * <code>DataSource</code>. 076 * 077 * @param ds The <code>DataSource</code> to retrieve connections from. 078 * @param pmdKnownBroken Oracle drivers don't support {@link java.sql.ParameterMetaData#getParameterType(int) }; 079 * if <code>pmdKnownBroken</code> is set to true, we won't even try it; if false, we'll try it, 080 * and if it breaks, we'll remember not to use it again. 081 * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently. 082 */ 083 public AsyncQueryRunner(DataSource ds, boolean pmdKnownBroken, ExecutorService executorService) { 084 super(ds, pmdKnownBroken); 085 this.executorService = executorService; 086 } 087 088 /** 089 * Class that encapsulates the continuation for batch calls. 090 */ 091 protected class BatchCallableStatement implements Callable<int[]> { 092 private String sql; 093 private Object[][] params; 094 private Connection conn; 095 private boolean closeConn; 096 private PreparedStatement ps; 097 098 /** 099 * Creates a new BatchCallableStatement instance. 100 * 101 * @param sql The SQL statement to execute. 102 * @param params An array of query replacement parameters. Each row in 103 * this array is one set of batch replacement values. 104 * @param conn The connection to use for the batch call. 105 * @param closeConn True if the connection should be closed, false otherwise. 106 * @param ps The {@link PreparedStatement} to be executed. 107 */ 108 public BatchCallableStatement(String sql, Object[][] params, Connection conn, boolean closeConn, PreparedStatement ps) { 109 this.sql = sql; 110 this.params = params; 111 this.conn = conn; 112 this.closeConn = closeConn; 113 this.ps = ps; 114 } 115 116 /** 117 * The actual call to executeBatch. 118 * 119 * @return an array of update counts containing one element for each command in the batch. 120 * @throws SQLException if a database access error occurs or one of the commands sent to the database fails. 121 * @see PreparedStatement#executeBatch() 122 */ 123 public int[] call() throws SQLException { 124 int[] ret = null; 125 126 try { 127 ret = ps.executeBatch(); 128 } catch (SQLException e) { 129 rethrow(e, sql, (Object[])params); 130 } finally { 131 close(ps); 132 if (closeConn) { 133 close(conn); 134 } 135 } 136 137 return ret; 138 } 139 } 140 141 /** 142 * Execute a batch of SQL INSERT, UPDATE, or DELETE queries. 143 * 144 * @param conn The <code>Connection</code> to use to run the query. The caller is 145 * responsible for closing this Connection. 146 * @param sql The SQL to execute. 147 * @param params An array of query replacement parameters. Each row in 148 * this array is one set of batch replacement values. 149 * @return A <code>Future</code> which returns the number of rows updated per statement. 150 * @throws SQLException if a database access error occurs 151 */ 152 public Future<int[]> batch(Connection conn, String sql, Object[][] params) throws SQLException { 153 return executorService.submit(this.batch(conn, false, sql, params)); 154 } 155 156 /** 157 * Execute a batch of SQL INSERT, UPDATE, or DELETE queries. The 158 * <code>Connection</code> is retrieved from the <code>DataSource</code> 159 * set in the constructor. This <code>Connection</code> must be in 160 * auto-commit mode or the update will not be saved. 161 * 162 * @param sql The SQL to execute. 163 * @param params An array of query replacement parameters. Each row in 164 * this array is one set of batch replacement values. 165 * @return A <code>Future</code> which returns the number of rows updated per statement. 166 * @throws SQLException if a database access error occurs 167 */ 168 public Future<int[]> batch(String sql, Object[][] params) throws SQLException { 169 Connection conn = this.prepareConnection(); 170 171 return executorService.submit(this.batch(conn, true, sql, params)); 172 } 173 174 /** 175 * Creates a continuation for a batch call, and returns it in a <code>Callable</code>. 176 * @param conn The connection to use for the batch call. 177 * @param closeConn True if the connection should be closed, false otherwise. 178 * @param sql The SQL statement to execute. 179 * @param params An array of query replacement parameters. Each row in 180 * this array is one set of batch replacement values. 181 * @return A <code>Callable</code> which returns the number of rows updated per statement. 182 * @throws SQLException If there are database or parameter errors. 183 */ 184 private Callable<int[]> batch(Connection conn, boolean closeConn, String sql, Object[][] params) throws SQLException { 185 if (conn == null) { 186 throw new SQLException("Null connection"); 187 } 188 189 if (sql == null) { 190 if (closeConn) { 191 close(conn); 192 } 193 throw new SQLException("Null SQL statement"); 194 } 195 196 if (params == null) { 197 if (closeConn) { 198 close(conn); 199 } 200 throw new SQLException("Null parameters. If parameters aren't need, pass an empty array."); 201 } 202 203 PreparedStatement stmt = null; 204 Callable<int[]> ret = null; 205 try { 206 stmt = this.prepareStatement(conn, sql); 207 208 for (int i = 0; i < params.length; i++) { 209 this.fillStatement(stmt, params[i]); 210 stmt.addBatch(); 211 } 212 213 ret = new BatchCallableStatement(sql, params, conn, closeConn, stmt); 214 215 } catch (SQLException e) { 216 close(stmt); 217 close(conn); 218 this.rethrow(e, sql, (Object[])params); 219 } 220 221 return ret; 222 } 223 224 /** 225 * Class that encapsulates the continuation for query calls. 226 * @param <T> The type of the result from the call to handle. 227 */ 228 protected class QueryCallableStatement<T> implements Callable<T> { 229 private String sql; 230 private Object[] params; 231 private Connection conn; 232 private boolean closeConn; 233 private PreparedStatement ps; 234 private ResultSetHandler<T> rsh; 235 236 /** 237 * Creates a new {@code QueryCallableStatement} instance. 238 * 239 * @param conn The connection to use for the batch call. 240 * @param closeConn True if the connection should be closed, false otherwise. 241 * @param ps The {@link PreparedStatement} to be executed. 242 * @param rsh The handler that converts the results into an object. 243 * @param sql The SQL statement to execute. 244 * @param params An array of query replacement parameters. Each row in 245 * this array is one set of batch replacement values. 246 */ 247 public QueryCallableStatement(Connection conn, boolean closeConn, PreparedStatement ps, 248 ResultSetHandler<T> rsh, String sql, Object... params) { 249 this.sql = sql; 250 this.params = params; 251 this.conn = conn; 252 this.closeConn = closeConn; 253 this.ps = ps; 254 this.rsh = rsh; 255 } 256 257 /** 258 * The actual call to {@code handle()} method. 259 * 260 * @return an array of update counts containing one element for each command in the batch. 261 * @throws SQLException if a database access error occurs. 262 * @see ResultSetHandler#handle(ResultSet) 263 */ 264 public T call() throws SQLException { 265 ResultSet rs = null; 266 T ret = null; 267 268 try { 269 rs = wrap(ps.executeQuery()); 270 ret = rsh.handle(rs); 271 } catch (SQLException e) { 272 rethrow(e, sql, params); 273 } finally { 274 try { 275 close(rs); 276 } finally { 277 close(ps); 278 if (closeConn) { 279 close(conn); 280 } 281 } 282 } 283 284 return ret; 285 } 286 287 } 288 289 /** 290 * Creates a continuation for a query call, and returns it in a <code>Callable</code>. 291 * @param conn The connection to use for the query call. 292 * @param closeConn True if the connection should be closed, false otherwise. 293 * @param sql The SQL statement to execute. 294 * @param params An array of query replacement parameters. Each row in 295 * this array is one set of query replacement values. 296 * @return A <code>Callable</code> which returns the result of the query call. 297 * @throws SQLException If there are database or parameter errors. 298 */ 299 private <T> Callable<T> query(Connection conn, boolean closeConn, String sql, ResultSetHandler<T> rsh, Object... params) 300 throws SQLException { 301 PreparedStatement stmt = null; 302 Callable<T> ret = null; 303 304 if (conn == null) { 305 throw new SQLException("Null connection"); 306 } 307 308 if (sql == null) { 309 if (closeConn) { 310 close(conn); 311 } 312 throw new SQLException("Null SQL statement"); 313 } 314 315 if (rsh == null) { 316 if (closeConn) { 317 close(conn); 318 } 319 throw new SQLException("Null ResultSetHandler"); 320 } 321 322 try { 323 stmt = this.prepareStatement(conn, sql); 324 this.fillStatement(stmt, params); 325 326 ret = new QueryCallableStatement<T>(conn, closeConn, stmt, rsh, sql, params); 327 328 } catch (SQLException e) { 329 close(stmt); 330 if (closeConn) { 331 close(conn); 332 } 333 this.rethrow(e, sql, params); 334 } 335 336 return ret; 337 } 338 339 /** 340 * Execute an SQL SELECT query with replacement parameters. The 341 * caller is responsible for closing the connection. 342 * @param <T> The type of object that the handler returns 343 * @param conn The connection to execute the query in. 344 * @param sql The query to execute. 345 * @param rsh The handler that converts the results into an object. 346 * @param params The replacement parameters. 347 * @return A <code>Future</code> which returns the result of the query call. 348 * @throws SQLException if a database access error occurs 349 */ 350 public <T> Future<T> query(Connection conn, String sql, ResultSetHandler<T> rsh, Object... params) throws SQLException { 351 return executorService.submit(query(conn, false, sql, rsh, params)); 352 } 353 354 /** 355 * Execute an SQL SELECT query without any replacement parameters. The 356 * caller is responsible for closing the connection. 357 * @param <T> The type of object that the handler returns 358 * @param conn The connection to execute the query in. 359 * @param sql The query to execute. 360 * @param rsh The handler that converts the results into an object. 361 * @return A <code>Future</code> which returns the result of the query call. 362 * @throws SQLException if a database access error occurs 363 */ 364 public <T> Future<T> query(Connection conn, String sql, ResultSetHandler<T> rsh) throws SQLException { 365 return executorService.submit(this.query(conn, false, sql, rsh, (Object[]) null)); 366 } 367 368 /** 369 * Executes the given SELECT SQL query and returns a result object. 370 * The <code>Connection</code> is retrieved from the 371 * <code>DataSource</code> set in the constructor. 372 * @param <T> The type of object that the handler returns 373 * @param sql The SQL statement to execute. 374 * @param rsh The handler used to create the result object from 375 * the <code>ResultSet</code>. 376 * @param params Initialize the PreparedStatement's IN parameters with 377 * this array. 378 * @return A <code>Future</code> which returns the result of the query call. 379 * @throws SQLException if a database access error occurs 380 */ 381 public <T> Future<T> query(String sql, ResultSetHandler<T> rsh, Object... params) throws SQLException { 382 Connection conn = this.prepareConnection(); 383 return executorService.submit(this.query(conn, true, sql, rsh, params)); 384 } 385 386 /** 387 * Executes the given SELECT SQL without any replacement parameters. 388 * The <code>Connection</code> is retrieved from the 389 * <code>DataSource</code> set in the constructor. 390 * @param <T> The type of object that the handler returns 391 * @param sql The SQL statement to execute. 392 * @param rsh The handler used to create the result object from 393 * the <code>ResultSet</code>. 394 * 395 * @return A <code>Future</code> which returns the result of the query call. 396 * @throws SQLException if a database access error occurs 397 */ 398 public <T> Future<T> query(String sql, ResultSetHandler<T> rsh) throws SQLException { 399 Connection conn = this.prepareConnection(); 400 return executorService.submit(this.query(conn, true, sql, rsh, (Object[]) null)); 401 } 402 403 /** 404 * Class that encapsulates the continuation for update calls. 405 */ 406 protected class UpdateCallableStatement implements Callable<Integer> { 407 private String sql; 408 private Object[] params; 409 private Connection conn; 410 private boolean closeConn; 411 private PreparedStatement ps; 412 413 /** 414 * 415 * 416 * @param conn The connection to use for the batch call. 417 * @param closeConn True if the connection should be closed, false otherwise. 418 * @param ps The {@link PreparedStatement} to be executed. 419 * @param sql The SQL statement to execute. 420 * @param params An array of query replacement parameters. Each row in 421 * this array is one set of batch replacement values. 422 */ 423 public UpdateCallableStatement(Connection conn, boolean closeConn, PreparedStatement ps, String sql, Object... params) { 424 this.sql = sql; 425 this.params = params; 426 this.conn = conn; 427 this.closeConn = closeConn; 428 this.ps = ps; 429 } 430 431 /** 432 * The actual call to {@code executeUpdate()} method. 433 * 434 * @return either (1) the row count for SQL Data Manipulation Language (DML) statements or 435 * (2) 0 for SQL statements that return nothing 436 * @throws SQLException if a database access error occurs. 437 * @see PreparedStatement#executeUpdate() 438 */ 439 public Integer call() throws SQLException { 440 int rows = 0; 441 442 try { 443 rows = ps.executeUpdate(); 444 } catch (SQLException e) { 445 rethrow(e, sql, params); 446 } finally { 447 close(ps); 448 if (closeConn) { 449 close(conn); 450 } 451 } 452 453 return Integer.valueOf(rows); 454 } 455 456 } 457 458 /** 459 * Creates a continuation for an update call, and returns it in a <code>Callable</code>. 460 * @param conn The connection to use for the update call. 461 * @param closeConn True if the connection should be closed, false otherwise. 462 * @param sql The SQL statement to execute. 463 * @param params An array of update replacement parameters. Each row in 464 * this array is one set of update replacement values. 465 * @return A <code>Callable</code> which returns the number of rows updated. 466 * @throws SQLException If there are database or parameter errors. 467 */ 468 private Callable<Integer> update(Connection conn, boolean closeConn, String sql, Object... params) throws SQLException { 469 PreparedStatement stmt = null; 470 Callable<Integer> ret = null; 471 472 if (conn == null) { 473 throw new SQLException("Null connection"); 474 } 475 476 if (sql == null) { 477 if (closeConn) { 478 close(conn); 479 } 480 throw new SQLException("Null SQL statement"); 481 } 482 483 try { 484 stmt = this.prepareStatement(conn, sql); 485 this.fillStatement(stmt, params); 486 487 ret = new UpdateCallableStatement(conn, closeConn, stmt, sql, params); 488 489 } catch (SQLException e) { 490 close(stmt); 491 if (closeConn) { 492 close(conn); 493 } 494 this.rethrow(e, sql, params); 495 } 496 497 return ret; 498 } 499 500 /** 501 * Execute an SQL INSERT, UPDATE, or DELETE query without replacement 502 * parameters. 503 * 504 * @param conn The connection to use to run the query. 505 * @param sql The SQL to execute. 506 * @return A <code>Future</code> which returns the number of rows updated. 507 * @throws SQLException if a database access error occurs 508 */ 509 public Future<Integer> update(Connection conn, String sql) throws SQLException { 510 return executorService.submit(this.update(conn, false, sql, (Object[]) null)); 511 } 512 513 /** 514 * Execute an SQL INSERT, UPDATE, or DELETE query with a single replacement 515 * parameter. 516 * 517 * @param conn The connection to use to run the query. 518 * @param sql The SQL to execute. 519 * @param param The replacement parameter. 520 * @return A <code>Future</code> which returns the number of rows updated. 521 * @throws SQLException if a database access error occurs 522 */ 523 public Future<Integer> update(Connection conn, String sql, Object param) throws SQLException { 524 return executorService.submit(this.update(conn, false, sql, new Object[]{param})); 525 } 526 527 /** 528 * Execute an SQL INSERT, UPDATE, or DELETE query. 529 * 530 * @param conn The connection to use to run the query. 531 * @param sql The SQL to execute. 532 * @param params The query replacement parameters. 533 * @return A <code>Future</code> which returns the number of rows updated. 534 * @throws SQLException if a database access error occurs 535 */ 536 public Future<Integer> update(Connection conn, String sql, Object... params) throws SQLException { 537 return executorService.submit(this.update(conn, false, sql, params)); 538 } 539 540 /** 541 * Executes the given INSERT, UPDATE, or DELETE SQL statement without 542 * any replacement parameters. The <code>Connection</code> is retrieved 543 * from the <code>DataSource</code> set in the constructor. This 544 * <code>Connection</code> must be in auto-commit mode or the update will 545 * not be saved. 546 * 547 * @param sql The SQL statement to execute. 548 * @throws SQLException if a database access error occurs 549 * @return A <code>Future</code> which returns the number of rows updated. 550 */ 551 public Future<Integer> update(String sql) throws SQLException { 552 Connection conn = this.prepareConnection(); 553 return executorService.submit(this.update(conn, true, sql, (Object[]) null)); 554 } 555 556 /** 557 * Executes the given INSERT, UPDATE, or DELETE SQL statement with 558 * a single replacement parameter. The <code>Connection</code> is 559 * retrieved from the <code>DataSource</code> set in the constructor. 560 * This <code>Connection</code> must be in auto-commit mode or the 561 * update will not be saved. 562 * 563 * @param sql The SQL statement to execute. 564 * @param param The replacement parameter. 565 * @throws SQLException if a database access error occurs 566 * @return A <code>Future</code> which returns the number of rows updated. 567 */ 568 public Future<Integer> update(String sql, Object param) throws SQLException { 569 Connection conn = this.prepareConnection(); 570 return executorService.submit(this.update(conn, true, sql, new Object[]{param})); 571 } 572 573 /** 574 * Executes the given INSERT, UPDATE, or DELETE SQL statement. The 575 * <code>Connection</code> is retrieved from the <code>DataSource</code> 576 * set in the constructor. This <code>Connection</code> must be in 577 * auto-commit mode or the update will not be saved. 578 * 579 * @param sql The SQL statement to execute. 580 * @param params Initializes the PreparedStatement's IN (i.e. '?') 581 * parameters. 582 * @throws SQLException if a database access error occurs 583 * @return A <code>Future</code> which returns the number of rows updated. 584 */ 585 public Future<Integer> update(String sql, Object... params) throws SQLException { 586 Connection conn = this.prepareConnection(); 587 return executorService.submit(this.update(conn, true, sql, params)); 588 } 589 590 }