Bivio::SQL::Connection
# Copyright (c) 1999-2010 bivio Software, Inc. All rights reserved. # $Id$ package Bivio::SQL::Connection; use strict; use Bivio::Base 'Bivio.UNIVERSAL'; # C<SQL.Connection> is used to transact with the database. Instances # of this module maintains one connection to the database at all times. They # will reset the connection if the database the connection is lost. # # B<Agent.Task> depends on the fact that this is the only module # which modifies the database. our($_TRACE); b_use('IO.Trace'); my($_DT) = b_use('Type.DateTime'); my($_D) = b_use('Bivio.Die'); my($_A) = b_use('IO.Alert'); my($_DBI) = b_use('Ext.DBI'); my($_CL) = b_use('IO.ClassLoader'); my($_C) = b_use('IO.Config'); my($_DC) = b_use('Bivio.DieCode'); my($_R); my($_IDI) = __PACKAGE__->instance_data_index; my($_CONNECTIONS) = {}; my($_DEFAULT_DBI_NAME); # Number of times we retry a single statement. my($_MAX_RETRIES) = 3; b_use('Action.PingReply')->register_handler(__PACKAGE__); b_use('Bivio.ShellUtil')->register_handler(__PACKAGE__); b_use('IO.Config')->register(my $_CFG = { long_query_seconds => 30, }); sub CAN_LIMIT_AND_OFFSET { # The implemenation allows C<LIMIT> and C<OFFSET> clauses. return 0; } sub MAX_BLOB { # Maximum length of a blob. You cannot retrieve blobs larger than this. # You can only have one blob per record. # # Returns 0x4_000_000 return 0x4_000_000; } sub MAX_PARAMETERS { # Maximum number of '?' parameters on a single statement. # # Returns 100. # This value is somewhat arbitrary, but we've tested this up to # 230. The problem is it depends on the statement complexity... # Deleting 100 at a time seems like it gets the biggest impact. return 100; } sub REQUIRE_COMMIT_OR_ROLLBACK { # Should a commit/rollback be issued even for queries? Subclasses may # override this, default is false. return 0; } sub commit { # Commits all open transactions. return _commit_or_rollback('commit', @_); } sub create { # B<DEPRECATED: Use L<get_instance|"get_instance">>. $_A->warn_deprecated('use get_instance()'); return shift->get_instance(@_); } sub disconnect { my($self) = shift; # Disconnects from database. return _get_instance($self)->disconnect(@_) unless ref($self); my($fields) = $self->[$_IDI]; _get_connection($self)->disconnect(); $fields->{connection_pid} = 0; $fields->{connection} = undef; return; } sub do_execute { return _do_execute(shift, 'fetchrow_arrayref', @_); } sub do_execute_rows { return _do_execute(shift, 'fetchrow_hashref', @_); } sub execute { my($self) = _verify_instance(shift); # Executes the specified statement and dies with an appropriate error # if it fails. # # B<NOTE: All calls must go through this> # # I<die> must implement L<Die.die|Die/"die">. # # If I<has_blob> is specified, the arguments are scanned for a scalar_ref. # If found, the positional parameter is bound properly. If no scalar_ref # is found, then the BLOB is assumed to be an output parameter and # I<LongReadLen> and I<LongTruncOk> are set accordingly. # # We retry on certain errors (see # L<internal_get_retry_sleep|"internal_get_retry_sleep">). my($sql, $params, $die, $has_blob) = @_; my($fields) = $self->[$_IDI]; $sql = $self->internal_fixup_sql($sql); my($err, $errstr, $statement); my($retries) = 0; TRY: { # Execute the statement #TODO: should be a Die->catch() but this prints a stack trace, and # causes the request to lose attributes? my($delta) = 0; my($ok) = $self->perf_time_op(sub { return $_D->eval(sub { $self->internal_execute($sql, $params, $has_blob, \$statement); return 1; }), }, \$delta, ); my($die_error) = $@; b_warn($delta, 's: query took a long time: ', $sql, $params) if $delta > $_CFG->{long_query_seconds}; return $statement if $ok; $err = $statement && $statement->err ? $statement->err + 0 : 0; $errstr = $statement && $statement->errstr ? $statement->errstr : $die_error; # If we get an error, it may be a timed-out connection. We'll # check the connection the next time through. $fields->{need_ping} = 1; my($sleep) = $self->internal_get_retry_sleep($err, $errstr); last TRY unless defined($sleep); if ($fields->{need_commit}) { b_warn($errstr, '; not retrying, partial transaction'); last TRY; } if (++$retries > $_MAX_RETRIES) { b_warn($errstr, '; max retries hit'); last TRY; } # Don't do anything with statement, it will be garbage collected. # Shouldn't really get here, so put in the logs. b_warn( 'retrying: ', $errstr, '; die=', $die, '; sql=', $sql, '; params=', $params, '; retries=', $retries, ) if $retries == 1; _trace('retry after sleep=', $sleep) if $_TRACE; sleep($sleep) if $sleep > 0; redo TRY; } # Unrecoverable error my($attrs) = { message => $@, dbi_err => $err, dbi_errstr => $errstr, sql => $sql, sql_params => $params, }; my($die_code) = $self->internal_get_error_code($attrs); $_D->eval(sub { $self->perf_time_op(sub {$statement->finish}) if $statement; }); ($die || $_D)->throw_die($die_code, $attrs, caller); # DOES NOT RETURN } sub execute_one_row { return _execute_one_row('fetchrow_arrayref', @_); } sub execute_one_row_hashref { return _execute_one_row('fetchrow_hashref', @_); } sub get_dbi_config { my($self) = shift; return $_DBI->get_config( @_ ? @_ : ref($self) ? $self->[$_IDI]->{dbi_name} : $_C->DEFAULT_NAME, ); } sub get_instance { my($proto, $dbi_name) = @_; $dbi_name = $_C->DEFAULT_NAME unless defined($dbi_name); unless ($_CONNECTIONS->{$dbi_name}) { my($module) = b_use($proto->get_dbi_config($dbi_name)->{connection}); _trace($module) if $_TRACE; $_CONNECTIONS->{$dbi_name} = $module->internal_new($dbi_name); } if (my $req = ($_R ||= b_use('Agent.Request'))->get_current) { $req->push_txn_resource($_CONNECTIONS->{$dbi_name}); } return $_CONNECTIONS->{$dbi_name}; } sub handle_commit { shift->commit(@_); return; } sub handle_config { my(undef, $cfg) = @_; $_CFG = $cfg; return; } sub handle_ping_reply { return shift->ping_connection; } sub handle_piped_exec_child { foreach my $self (values(%$_CONNECTIONS)) { next unless my $fields = $self->[$_IDI]; $fields->{connection}->{InactiveDestroy} = 1; $fields->{connection} = undef; } return; } sub handle_rollback { shift->rollback(@_); return; } sub internal_clear_ping { my($self) = @_; # Clears the need_ping state. my($fields) = $self->[$_IDI]; $fields->{need_ping} = 0; return; } sub internal_dbi_connect { my(undef, $dbi_name) = @_; # Connects to the database. Returns the database handle. return $_DBI->connect($dbi_name); } sub internal_execute { my($self, $sql, $params, $has_blob, $statement) = @_; # Executes sql. Exception must be caught by caller. Sets statement (even on # error). my($fields) = $self->[$_IDI]; _trace_sql($sql, $params) if $_TRACE; #TODO: Need to investigate problems and performance of cached statements #TODO: If do cache, then make sure not "active" when making call. $$statement = _get_connection($self)->prepare($sql); # Only need a commit if there has been data modification language # Tightly coupled with PropertySupport my($is_select) = $sql =~ /^\s*select/i && $sql !~ /\bfor\s+update\b/i; return if !$is_select && $fields->{db_is_read_only}; if ($has_blob) { $params = $self->internal_prepare_blob($is_select, $params, $statement); } $fields->{need_commit} = 1 unless $is_select; ref($params) ? $$statement->execute(@$params) : $$statement->execute(); return; } sub internal_fixup_sql { my($self, $sql) = @_; $sql =~ s/\border\s+by\s.*$//is if $sql =~ /^\s*SELECT\s+COUNT\(\*\)\s+FROM\s/is; return $sql; } sub internal_get_dbi_connection { my($self) = @_; # Returns the raw DBI connection. return _get_connection($self); } sub internal_get_error_code { my($self, $die_attrs) = @_; # Converts the database error into an appropriate error code. Subclasses # should override this to handle constraint violations. $die_attrs->{program_error} = 1; # Unexpected error is treated as an assertion fault return $_DC->DB_ERROR; } sub internal_get_retry_sleep { # Returns the number of seconds to sleep for the specified transient # error code. 0 indicates retry immediately, undef indicates don't # retry. return undef; } sub internal_new { my($proto, $dbi_name) = @_; # Creates a new connection which uses the specified DBI config name. # Do not call this method directly, use L<connect|"connect">. my($self) = $proto->SUPER::new; $self->[$_IDI] = { dbi_name => $dbi_name, db_is_read_only => 0, connection => undef, # Set to the pid that creates the connection. Ensures all # children use a different connection. connection_pid => 0, need_commit => 0, # If there is an error, this will be true. _get_connection # checks the connection with a ping to make sure it is still # alive. need_ping => 0, }; return $self; } sub internal_prepare_blob { my($self, $is_select, $params, $statement) = @_; # Prepares a query or update of a blob field.. # Returns the altered statement params. if ($is_select) { # Returns a value. For older DBD::Oracle implementations, we # need to set the value on every $statement. Newer imps, # set it once per connection. $$statement->{LongReadLen} = int($self->MAX_BLOB * 1.1); $$statement->{LongTruncOk} = 0; return $params; } # Passing a value, possibly my($i) = 1; foreach my $p (@$params) { $$statement->bind_param($i++, $p), next unless ref($p); # I wonder if it stores a reference or a copy? $$statement->bind_param($i++, $$p, $self->internal_get_blob_type); } # Parameters are bound, so don't pass them on return undef; } sub is_read_only { my($self) = shift; # Returns true if the current database connection is to a read-only # database. return _get_instance($self)->is_read_only(@_) unless ref($self); my($fields) = $self->[$_IDI]; return $fields->{db_is_read_only}; } sub map_execute { return _map_execute(shift, 'fetchrow_arrayref', @_); } sub map_execute_rows { return _map_execute(shift, 'fetchrow_hashref', @_); } sub next_primary_id { my($self) = shift; # Subclasses should return the next sequence number for the specified # table. return _get_instance($self)->next_primary_id(@_) unless ref($self); $_D->die('abstract method'); # DOES NOT RETURN } sub perf_time_finish { my($self, $st) = @_; return $self->perf_time_op(sub {$st->finish}); } sub perf_time_op { shift; return ($_R ||= b_use('Agent.Request'))->perf_time_op(__PACKAGE__, @_); } sub ping_connection { my($self) = shift; # Ensures the connection is valid. return _get_instance($self)->ping_connection(@_) unless ref($self); my($fields) = $self->[$_IDI]; $fields->{need_ping} = 1; return _get_connection($self); } sub rollback { # Rolls back all open transactions. return _commit_or_rollback('rollback', @_); } sub set_dbi_name { my(undef, $name) = @_; # Sets the name of the L<Ext.DBI|Ext.DBI> configuration # to use. The default is C<undef>. Returns the previous name. # # Doesn't do anything if I<name> is not different from the current name. # # The name selected will become the default database for all static calls. # Don't do anything if the names are equal return $name if defined($name) == defined($_DEFAULT_DBI_NAME) && (!defined($name) || $name eq $_DEFAULT_DBI_NAME); my($old) = $_DEFAULT_DBI_NAME; $_DEFAULT_DBI_NAME = $name; _trace('default db set to ', $name) if $_TRACE; return $old; } sub _commit_or_rollback { my($method, $self) = splice(@_, 0, 2); # Wrapper for commit() and rollback() return _get_instance($self)->$method(@_) unless ref($self); my($fields) = $self->[$_IDI]; return unless $self->REQUIRE_COMMIT_OR_ROLLBACK || $fields->{need_commit}; _trace($method) if $_TRACE; _get_connection($self)->$method() unless $fields->{db_is_read_only}; $fields->{need_commit} = 0; return; } sub _do_execute { # (self, string, code_ref, @_) : undef my($self) = _verify_instance(shift); my($method, $op) = (shift, shift); my($st) = $self->execute(@_); return unless $st->{Active}; while (my $row = $self->perf_time_op(sub {$st->$method})) { last unless $op->($row); } $self->perf_time_finish($st); #TODO: Clears cached handle # $self->finish_statement($st); return; } sub _execute_one_row { my($method, $self) = (shift, shift); return _execute_one_row($method, _get_instance($self), @_) unless ref($self); my($sth) = $self->execute(@_); return $self->perf_time_op(sub { my($row) = $sth->$method; $sth->finish; return $row; }); } sub _get_connection { my($self) = @_; # static _get_connection(self) : connection # # Returns a cached database connection for this process. Checks the # connection for validity. my($fields) = $self->[$_IDI]; if ($fields->{connection_pid} != $$) { if ($fields->{connection}) { # This disconnects the parent process'. Make sure we rollback # any pending transactions. By default, disconnect commits $_D->eval(sub { $fields->{connection}->ping && $fields->{connection}->rollback}); $_D->eval(sub {$fields->{connection}->disconnect}); b_warn("reconnecting to database: pid=$$"); # Make sure we don't enter this code again. $fields->{connection} = undef; } _trace("creating connection: pid=$$") if $_TRACE; $fields->{connection} = $self->internal_dbi_connect($fields->{dbi_name}); $fields->{db_is_read_only} = $_DBI->get_config($fields->{dbi_name})->{is_read_only}; # Got a connection which will be reused on next call. We don't # need to ping it (just in case parent process had an error on # the connection). $fields->{connection_pid} = $$; $fields->{need_ping} = 0; } elsif ($fields->{need_ping}) { # Got an error on a previous use of this connection. Make # sure is still valid. $fields->{need_ping} = 0; unless ($_D->eval(sub {$fields->{connection}->ping})) { # Just in case, rollback any pending actions # be executed. Caller will reset $_CONNECTION $fields->{connection_pid} = 0; return _get_connection($self); } # Current connection is valid } return $fields->{connection}; } sub _get_instance { my($proto) = @_; return ref($proto) ? $proto : $proto->get_instance($_DEFAULT_DBI_NAME); } sub _map_execute { # (self, string, @_) : undef # (self, string, code_ref, @_) : undef my($self) = _verify_instance(shift); my($method) = shift; my($op) = ref($_[0]) eq 'CODE' ? shift : sub { my($row) = @_; return @$row == 1 ? $row->[0] : [@$row]; }; my($st) = $self->execute(@_); my($res) = []; return $res unless $st->{Active}; while (1) { last unless my $row = $self->perf_time_op(sub {$st->$method}); push(@$res, $op->($row)); } $self->perf_time_finish($st); #TODO: Clears cached handle # $self->finish_statement($st); return $res; } sub _trace_sql { my($sql, $params) = @_; map($sql =~ s{\?}{ !defined($_) ? 'NULL' : ref($_) ? '<blob>' : $_ =~ /\D/ ? _trace_sql_quote($_) : $_; }e, @$params, ); _trace($sql); return; } sub _trace_sql_quote { my($v) = @_; $v = $_A->format_args($v); chomp($v); $v =~ s/'/''/sg; return qq{'$v'}; } sub _verify_instance { return shift if ref($_[0]); return _get_instance(shift); } 1;