Bivio::SQL::Connection::Postgres
# Copyright (c) 2001-2013 bivio Software, Inc. All rights reserved.
# $Id$
package Bivio::SQL::Connection::Postgres;
use strict;
use Bivio::Base 'Bivio::SQL::Connection';
use Bivio::Die;
use Bivio::DieCode;
use Bivio::IO::Trace;
use Bivio::TypeError;
use DBI ();
# C<Bivio::SQL::Connection::Postgres>
our($_TRACE);
sub CAN_LIMIT_AND_OFFSET {
# : boolean
# Postgres supports C<LIMIT> and C<OFFSET>.
return 1;
}
sub REQUIRE_COMMIT_OR_ROLLBACK {
# : boolean
# Returns true.
return 1;
}
sub get_dbi_prefix {
# (proto, hash_ref) : string
# Returns the PostgreSQL DBI connection prefix.
my($self, $cfg) = @_;
my($res) = 'dbi:Pg:';
if ($cfg->{host}) {
$res .= "host=$cfg->{host};";
if ($cfg->{host}) {
$res .= "port=$cfg->{port};";
}
}
$res .= 'dbname=';
return $res;
}
sub get_settings {
# (self) : hash_ref
# Returns the postgres config (aka 'settings')
my($self) = @_;
my($res);
$self->do_execute_rows(
sub {
my($row) = @_;
$res->{$row->{name}} = $row->{setting};
return 1;
},
'select name, setting from pg_settings'
);
return $res;
}
sub internal_execute {
# (self) : undef
# Ignores annoying warnings.
my($prev) = $SIG{__WARN__};
local($SIG{__WARN__}) = sub {
my($msg) = @_;
return
if $msg =~ /NOTICE:\s+CREATE TABLE . PRIMARY KEY will create implicit index/;
return $prev && $prev->(@_);
};
return shift->SUPER::internal_execute(@_);
}
sub internal_fixup_sql {
# (self, string) : string
# Fixes the Oracle SQL to conform to Postgres's requirements.
my($self, $sql) = @_;
$sql = $self->SUPER::internal_fixup_sql($sql);
# Julian date format is 'J SSSS'
$sql =~ s/('J SSSS)S'/$1'/igs;
# Timestamp instead of date
$sql =~ s/\bDATE\b/TIMESTAMP/igs;
$sql =~ s/\bTO_DATE\(/TO_TIMESTAMP\(/igs;
# No 'by' on sequence increments
$sql =~ s/(\sINCREMENT\s+)BY\b/$1/igs;
# blobs
$sql =~ s/\bBLOB\b/BYTEA/igs;
$sql =~ s/\bTEXT64K\b/TEXT/igs;
$sql = _fixup_outer_join($sql)
if $sql =~ /\(\+\)/;
_trace($sql) if $_TRACE;
return $sql;
}
sub internal_get_blob_type {
# (self) : hash_ref
# Returns the bind_param() value for a BLOB.
return {
pg_type => DBD::Pg::PG_BYTEA(),
};
}
sub internal_get_error_code {
# (self, string) : Type.Enum
# Converts the database error message into an appropriate error code. Returns
# undef if the message is not translatable.
my($self, $die_attrs) = @_;
if ($die_attrs->{dbi_errstr} =~
/Cannot insert a duplicate key into unique index (\w+)/i
# Relaxed since they seem to change it often
|| $die_attrs->{dbi_errstr} =~
/duplicate key.*? unique .*?"(\w+)"/i) {
return _interpret_constraint_violation($self, $die_attrs, $1);
}
$die_attrs->{dbi_errstr} =~
s/(Unterminated quoted string)/$1; There is a null character in one of the parameters/;
return shift->SUPER::internal_get_error_code(@_);
}
sub internal_get_retry_sleep {
# (self, int, string) : int
# Returns the number of seconds to sleep for the specified transient
# error code. 0 indicates retry immediately, undef indicates don't
# retry.
my($self, $error, $message) = @_;
# retry in 15 seconds if database is gone. may have rebooted database
return 15 if $error == -1 && $message =~ /backend closed the channel/;
return 1 if $error == -1
&& $message =~ /server closed the connection unexpectedly/;
return undef;
}
sub next_primary_id {
# (self, string, ref) : string
# Returns the next primary id sequence number for the specified table.
my($self, $table_name, $die) = @_;
my($sql) = "select nextval('".substr($table_name, 0, -2)."_s')";
return $self->execute($sql, [], $die)->fetchrow_array;
}
sub _fixup_outer_join {
# (string) : string
# Replaces Oracle style outer joins (+) with Postgres syntax LEFT JOIN.
my($sql) = @_;
# find the outer join expression, remove it from the WHERE clause
# and add it to the FROM section with the LEFT JOIN . ON . syntax
# example:
#
# select * from ec_payment_t, ec_subscription_t, realm_owner_t
# where ec_payment_id.realm_id=realm_owner_t.realm_id
# and ec_payment_t.ec_payment_id=ec_subscription_t.ec_payment_id(+)
#
# becomes:
#
# select * from ec_payment_t LEFT JOIN ec_subscription_t ON
# ec_payment_t.ec_payment_id=ec_subscription_t.ec_payment_id, realm_owner_t
# where ec_payment_id.realm_id=realm_owner_t.realm_id
#
# Another case is:
# SELECT (SELECT
# SUM(policy_t.state_tax)
# FROM policy_t, filing_event_t
# WHERE policy_t.broker_user_id=broker_t.user_id
# ) AS state_tax_due
# FROM tax_deposit_t,broker_t,user_t,broker_tax_payment_t
# WHERE broker_t.user_id=broker_tax_payment_t.broker_user_id(+)
# becomes:
# SELECT (SELECT
# SUM(policy_t.state_tax)
# FROM policy_t, filing_event_t
# WHERE policy_t.broker_user_id=broker_t.user_id
# ) AS state_tax_due
# FROM tax_deposit_t,broker_t
# LEFT JOIN broker_tax_payment_t
# ON (broker_t.user_id = broker_tax_payment_t.broker_user_id),
# user_t
# WHERE broker_t.user_id=broker_tax_payment_t.broker_user_id
#
my($relations) = [];
my($prefix, $from_where) = _split_at_from($sql);
_trace('prefix=', $prefix, '; from_where=', $from_where) if $_TRACE;
while ($from_where =~ /\(\+\)/) {
$from_where =~ s/\b(FROM)(?:POSTGRES-FIXME)?\b(.+?)([\w\.]+)\s*\=\s*([\w\.]+)\(\+\)(?:\s+AND\b)?/FROMPOSTGRES-FIXME$2/is
|| Bivio::Die->die('failed to find outer join: ', $from_where);
push(@$relations, [$3, $4]);
}
return unless @$relations;
b_die('too weird outer join: ', $from_where)
if $from_where =~ /POSTGRES-FIXME.*POSTGRES-FIXME/s;
_trace('from_where=', $from_where, '; relations=', $relations) if $_TRACE;
my($joins) = {};
foreach my $r (@$relations) {
my($left, $right) = @$r;
my($source_table) = lc(_parse_table_name($left));
my($target_table) = lc(_parse_table_name($right));
if ($joins->{$source_table}) {
# We already added the LEFT JOIN $target_table in $joins
next if $joins->{$source_table}
=~ s/(?<=LEFT JOIN $target_table ON \()/$left = $right AND /is;
}
# Remove target_table from FROM, and save LEFT JOIN in $joins
$from_where =~ s/(\sFROMPOSTGRES-FIXME\s.*?)\b((?:\w+\s+)?$target_table)\b,?/$1/s
|| Bivio::Die->die('failed to remove ', $target_table, ': ', $from_where);
$joins->{$source_table} .= " LEFT JOIN $2 ON ($left = $right)";
}
# Remove target table(s) from FROM and add $joins to FROM
foreach my $source_table (sort(keys(%$joins))) {
$from_where =~ s/(FROMPOSTGRES-FIXME)(.*?\b$source_table\b)(?=\s*,|\s+WHERE\b|\s+ON\b|\s+LEFT JOIN\b)/$1$2$joins->{$source_table}/is
|| Bivio::Die->die('failed to insert outer join: ',
$source_table, ' "',
$joins->{$source_table}, '" into ', $from_where);
}
# remove extra commas, trailing where, trailing and
$from_where =~ s/\bFROMPOSTGRES-FIXME\b/FROM/sg;
$from_where =~ s/,\s*(?=\sWHERE\s)//is;
$from_where =~ s/\s+AND\s+OFFSET/ OFFSET/is;
$from_where =~ s/\s+WHERE\s+OFFSET/ OFFSET/is;
# Really should have an SQL lexicon...
$from_where =~ s/\s(?:WHERE|AND)(?=\s*$|\s*\)|\s*(?:HAVING|GROUP|ORDER|UNION|INTERSECT)\b)//is;
return $prefix . $from_where;
}
sub _interpret_constraint_violation {
# (self, hash_ref, string) : Type.Enum
# Will set "columns" and "table" in attrs. Returns die code that is
# appropriate for the constraint violation.
my($self, $attrs, $constraint) = @_;
my($die_code);
# Ignore errors, die_code will be undef in this case and result in a
# server error
Bivio::Die->eval(sub {
# rollback because Postgres won't let other queries on this txn
$self->rollback;
# Try to find the constraint columns (assumes it is an index)
my($statement) = $self->internal_get_dbi_connection()->prepare(<<"EOF");
SELECT class2.relname, attname
FROM pg_class class1, pg_class class2, pg_index, pg_attribute
WHERE class1.oid=pg_attribute.attrelid
AND class1.oid=pg_index.indexrelid
AND pg_index.indrelid=class2.oid
AND class1.relname=?
EOF
$statement->execute($constraint);
my($cols) = [];
my($table);
while (my $row = $statement->fetchrow_arrayref) {
$table = lc($row->[0]);
push(@$cols, lc($row->[1]));
}
# This is an operation error, not db error. Don't need to ping.
$self->internal_clear_ping;
# Found the constraint?
if ($table) {
# Save the state for the die message
$attrs->{columns} = $cols;
$attrs->{table} = $table;
_trace($constraint, ': found ', $table, '.', $cols)
if $_TRACE;
if (7 == $attrs->{dbi_err}) {
# duplicate key
$attrs->{type_error} = Bivio::TypeError->EXISTS;
$die_code = Bivio::DieCode->DB_CONSTRAINT;
}
}
else {
# returns undef for die_code
_trace($constraint,
': constraint query returned nothing') if $_TRACE;
}
return 1;
});
_trace($constraint, ':', $@) if $_TRACE && $@;
return $die_code;
}
sub _parse_table_name {
# (string) : string
# Parses the table name from a table_name.field string.
my($str) = @_;
$str =~ /^(\w+)\./
|| Bivio::Die->die("didn't find table: ", $str);
return $1;
}
sub _split_at_from {
my($sql) = @_;
my($from_re) = qr{(\sFROM\s)}i;
my($parts) = [split($from_re, $sql)];
my($prefix) = '';
while (defined(my $p = shift(@$parts))) {
return ($prefix, join('', $p, @$parts))
if $p =~ $from_re
&& @{[$prefix =~ /\(/sg]} == @{[$prefix =~ /\)/sg]};
$prefix .= $p;
}
b_die('could not find FROM in: ', $sql);
# DOES NOT RETURN
}
1;