Bivio::SQL::PropertySupport
# Copyright (c) 1999-2010 bivio Software, Inc. All rights reserved.
# $Id$
package Bivio::SQL::PropertySupport;
use strict;
use Bivio::Base 'SQL.Support';
# C<Bivio::SQL::PropertySupport> is SQL transaction support for
# L<Bivio::Biz::PropertyModel>s. PropertyModel life-cycle methods are
# supported throught L<"unsafe_load"> L<"create">, L<"delete">, and
# L<"update">.
#
# Support uses the L<Bivio::SQL::Connection> for connections and
# statement execution.
#
#
# See also L<Bivio::SQL::Support|Bivio::SQL::Support> for more attributes.
#
#
# has_blob : boolean
#
# Is true if the PropertModel has a BLOB data type. Requires special
# handling in L<Bivio::SQL::Connection|Bivio::SQL::Connection>.
#
# primary_id_name : string
#
# Computed from the columns. If there is a column which matches the table name
# followed by C<_id>, e.g. I<table_name_id> for a table called I<table_name_t>,
# this will be the I<primary_id_name> for the table. See L<create|"create"> for
# how it is set automatically from its corresponding sequence.
#
# select : string
#
# The list of select_columns followed FROM table. Does not include
# WHERE.
our($_TRACE);
b_use('IO.Trace');
my($_BLOB) = b_use('Type.BLOB');
my($_C) = b_use('SQL.Constraint');
my($_D) = b_use('Bivio.Die');
my($_DT) = b_use('Type.DateTime');
my($_R);
my($_SC) = b_use('SQL.Connection');
my($_MIN_PRIMARY_ID) = b_use('Type.PrimaryId')->get_min;
b_use('IO.Config')->register(my $_CFG = {
unused_classes => [qw(RealmFile RealmMail RealmMailBounce Website Forum CalendarEvent JobLock Tuple TupleDef TupleSlotType TupleSlotDef TupleUse Motion MotionVote RealmDAG OTP NonuniqueEmail CRMThread TupleTag RealmFileLock TaskLog)],
});
sub create {
my($self, $new_values, $die) = @_;
# Inserts a new record into to database and loads the model's properties.
# Dies on errors.
#
# If there is a I<primary_id_name>, the value will be retrieved from
# the sequence I<table_name_s> (table_name sans '_t', that is) I<if it
# not already set>.
#
# I<die> must implement L<Bivio::Die::die|Bivio::ie/"die">.
my($attrs) = $self->internal_get;
my($sql) = $attrs->{insert};
# Allow the caller to override primary_id. Probably should check
# that isn't a valid sequence.
my($pid) = $attrs->{primary_id_name};
if ($pid) {
if ($new_values->{$pid}) {
#TODO: Need an assertion check about not using sequences....
# $die->throw_die('DIE', {message =>
# 'special primary_id greater than min primary id',
# field => $pid, value => $new_values->{$pid}})
# unless $new_values->{$pid} < $_MIN_PRIMARY_ID;
}
else {
$new_values->{$pid} = $_SC->next_primary_id(
$attrs->{table_name}, $die);
}
}
my($columns) = $attrs->{columns};
my(@params) = map {
$columns->{$_}->{type}->to_sql_param($new_values->{$_});
} @{$attrs->{column_names}};
$_SC->perf_time_finish(
$_SC->execute($sql, \@params, $die, $attrs->{has_blob}),
);
return;
}
sub delete {
my($self, $values, $die) = @_;
# Removes the row with identified by the specified parameterized where_clause
# and substitution values. If an error occurs during the delete, calls die.
#
# I<die> must implement L<Bivio::Die::die|Bivio::Die/"die">.
#
# Returns true if something was deleted.
my($attrs) = $self->internal_get;
my($columns) = $attrs->{columns};
my(@params) = map {
unless (exists($values->{$_})) {
$die ||= $_D;
$die->throw_die('DIE', {
message => 'missing primary key value for delete',
entity => $self,
column => $_});
}
$columns->{$_}->{type}->to_sql_param($values->{$_});
} @{$attrs->{primary_key_names}};
my($sth) = $_SC->execute(
$attrs->{delete},
\@params, $die,
$attrs->{has_blob},
);
return $_SC->perf_time_op(sub {
my($rows) = $sth->rows;
$sth->finish;
return $rows ? 1 : 0;
});
}
sub delete_all {
my($self, $query, $die) = @_;
# Deletes all the rows specified by the possibly partial key values.
# If an error occurs during the delete, calls die.
# Returns the number of rows deleted.
($_R ||= b_use('Agent.Request'))->assert_test
unless %$query;
my($params) = [];
my($sth) = $_SC->execute(
'delete from '
. $self->get('table_name')
. _prepare_where($self, $query, $params),
$params,
$die,
);
return $_SC->perf_time_op(sub {
my($rows) = $sth->rows;
$sth->finish;
return $rows ? $rows : 0;
});
}
sub get_children {
return shift->get('children');
}
sub handle_config {
my(undef, $cfg) = @_;
# unused_classes : array_ref (required)
#
# May be empty. List of PropertyModel classes not in use in this application.
$_CFG = $cfg;
return;
}
sub iterate_start {
my($self, $die, $order_by, $query) = @_;
# Returns a handle which can be used to iterate the rows with
# L<iterate_next|"iterate_next">. L<iterate_end|"iterate_end">
# should be called, too.
#
# I<query> is formatted like L<unsafe_load|"unsafe_load">.
#
# I<auth_id> must be the auth_id for the table. It need not be set
# iwc all rows will be returned.
#
# I<order_by> is an SQL C<ORDER BY> clause without the keywords
# C<ORDER BY>.
my($params) = [];
$_SC->execute(
_prepare_select($self, $query, $params)
. ($order_by ? " ORDER BY $order_by" : ''),
$params,
$die,
$self->get('has_blob'),
);
}
sub new {
my($proto, $decl) = @_;
# Creates a SQL support instance. I<decl> is defined as follows:
#
# {
# version => 1,
# table_name => 'name',
# columns => {
# column_name => [Bivio::Type, Bivio::SQL::Constraint]
# searchable => {
# type => 'MyType',
# constraint => 'NOT_NULL',
# is_searchable => 1,
# },
# }
# }
#
# This module takes ownership of I<decl>.
my($attrs) = {
class => $decl->{class},
parents => {},
children => [],
table_name => $decl->{table_name},
columns => {},
primary_key => [],
column_aliases => {},
has_blob => 0,
cascade_delete_children => $decl->{cascade_delete_children} || 0,
};
$proto->init_common_attrs($attrs, $decl);
b_die('you must declare table_name: ', $decl)
unless defined($attrs->{table_name});
b_die("$attrs->{table_name}: invalid table name, must end in _t")
unless $attrs->{table_name} =~ m!^\w{1,28}_t$!;
_init_columns($proto, $attrs, $decl->{columns});
# Get auth_id and other columns
my($save_count) = int(keys(%{$attrs->{columns}}));
__PACKAGE__->init_column_classes($attrs, $decl, [qw(auth_id other)]);
__PACKAGE__->init_model_primary_key_maps($attrs);
b_die(
'columns may not be added in "other" or "auth_id" category: ',
[keys(%{$attrs->{columns}})])
unless $save_count == int(keys(%{$attrs->{columns}}));
# auth_id must be at most one column. Turn into that column or undef.
b_die('too many auth_id fields')
if int(@{$attrs->{auth_id}}) > 1;
$attrs->{auth_id} = $attrs->{auth_id}->[0];
$attrs->{primary_key_types} = [map {$_->{type}} @{$attrs->{primary_key}}];
# Cache as much of the statements as possible
_init_statements($attrs);
return $proto->SUPER::new($attrs);
}
sub register_child_model {
unshift(@{shift->get('children')}, [@_]);
return;
}
sub unsafe_load {
my($self, $query, $die) = @_;
# Loads the specified properties with data using the parameterized where clause
# and substitution values. If successful, the values hash will returned.
#
# I<query> is processed into the where. Values in the query which
# are array_refs are converted with
# L<Bivio::Type::to_sql_param_list|Bivio::Type/"to_sql_param_list">.
# Other values are processed with
# L<Bivio::Type::to_sql_param|Bivio::Type/"to_sql_param">.
#
# Returns undef if no rows were returned.
#
# I<die> must implement L<Bivio::Die::die|Bivio::Die/"die">.
my($attrs) = $self->internal_get;
my(@params);
$die ||= $_D;
my($sql) = _prepare_select($self, $query, \@params);
my($statement) = $_SC->execute($sql, \@params, $die, $attrs->{has_blob});
my($row, $too_many);
$_SC->perf_time_op(
sub {
$row = $statement->fetchrow_arrayref;
$too_many = $statement->fetchrow_arrayref ? 1 : 0
if $row;
$statement->finish;
return;
},
);
return undef
unless $row;
$die->throw_die(TOO_MANY => {
message => 'too many rows returned',
sql => $sql,
params => \@params,
}) if $too_many;
my($columns) = $attrs->{columns};
my($i) = 0;
return {map(
($_->{name}, $_->{type}->from_sql_column($row->[$i++])),
@{$attrs->{select_columns}},
)};
}
sub update {
my($self, $old_values, $new_values, $die) = @_;
# Updates the database fields for the specified model extracting primary
# keys from I<old_values>.
my($attrs) = $self->internal_get;
my($columns) = $attrs->{columns};
my($set);
my(@params);
my($n);
foreach $n (@{$attrs->{column_names}}) {
next if ! exists($new_values->{$n});
my($column) = $columns->{$n};
my($old) = $old_values->{$n};
my($new) = $new_values->{$n};
# This works for BLOBs, too. If the scalar_ref is the same,
# then we don't update.
next if _equals($old, $new);
$set .= $n.'='.$column->{sql_pos_param}.',';
$new = $column->{type}->to_sql_param($new);
push(@params, $new);
}
unless ($set) {
&_trace(defined($die) ? ($die, ': ') : (), 'no update required')
if $_TRACE;
return;
}
chop($set);
my(@pk);
foreach $n (@{$attrs->{primary_key_names}}) {
push(@pk, $columns->{$n}->{type}->to_sql_param($old_values->{$n}));
}
push(@params, @pk);
# Need to lock the row before updating if blob
$_SC->perf_time_finish($_SC->execute($attrs->{update_lock}, \@pk, $die))
if $attrs->{has_blob};
$_SC->perf_time_finish(
$_SC->execute(
$attrs->{update} . $set.$attrs->{primary_where},
\@params,
$die,
$attrs->{has_blob},
),
);
return;
}
sub _add_parent_model {
my($attrs, $col, $parent_model, $parent_field) = @_;
($attrs->{parents}->{$parent_model} ||= {})->{$col->{name}}
= $parent_field;
$col->{parent_model} = $parent_model;
$col->{parent_field} = $parent_field;
return;
}
sub _equals {
my($v, $v2) = @_;
$v = '' unless defined($v);
$v2 = '' unless defined($v2);
return $v eq $v2;
}
sub _init_columns {
my($proto, $attrs, $column_cfg) = @_;
# Initializes the columns.
# Sort the columns, so in a guaranteed order. Makes for better
# Oracle caching of prepared statements. We sort first, so
# primary keys are sorted as well.
$attrs->{column_names} = [sort(keys(%$column_cfg))];
b_die('missing columns: ', $attrs)
unless %$column_cfg;
foreach my $n (@{$attrs->{column_names}}) {
my($cfg) = $column_cfg->{$n};
my($col) = ref($cfg) eq 'HASH' ? $cfg : {
type => $cfg->[0],
constraint => $cfg->[1],
};
my($type_decl) = $col->{type};
$col->{sql_name} = $col->{name} = $n;
$attrs->{columns}->{$n} = $attrs->{column_aliases}->{$n} = $col;
$col->{constraint} = $_C->from_any($col->{constraint});
$proto->init_type($col, $type_decl);
_add_parent_model($attrs, $col, $1, $2)
if $type_decl =~ /^(.*)\.(.*)$/;
$col->{is_searchable} = $col->{is_searchable} ? 1 : 0;
$col->{sql_pos_param} = $col->{type}->to_sql_value('?');
$col->{sql_pos_param_for_insert} ||= $col->{type}->to_sql_value('?');
$attrs->{has_blob} = 1
if $_BLOB->is_super_of($col->{type});
$col->{is_primary_key} = $col->{constraint}->eq_primary_key;
push(@{$attrs->{primary_key}}, $col)
if $col->{is_primary_key};
}
b_die($attrs->{table_name}, ': too many BLOBs')
if $attrs->{has_blob} > 1;
b_die($attrs->{table_name}, ': no primary keys')
unless int(@{$attrs->{primary_key}});
$attrs->{primary_key_names} = [map {$_->{name}} @{$attrs->{primary_key}}];
return;
}
sub _init_statements {
my($attrs) = @_;
$attrs->{select_columns} = [map {
$attrs->{columns}->{$_};
} @{$attrs->{column_names}}];
$attrs->{select} = 'select '.join (',', map {
$attrs->{columns}->{$_}->{type}->from_sql_value($_);
} @{$attrs->{column_names}})." from $attrs->{table_name} ";
$attrs->{insert} = "insert into $attrs->{table_name} ("
.join(',', @{$attrs->{column_names}}).') values ('
.join(',', map {
$attrs->{columns}->{$_}->{sql_pos_param_for_insert}
} @{$attrs->{column_names}})
.')';
$attrs->{primary_where} = ' where ' . join(' and ',
map {
$_.'='.$attrs->{columns}->{$_}->{sql_pos_param}
} @{$attrs->{primary_key_names}});
$attrs->{delete} = "delete from $attrs->{table_name} "
.$attrs->{primary_where};
$attrs->{update} = "update $attrs->{table_name} set ";
$attrs->{update_lock} = "select ".$attrs->{primary_key_names}->[0]
." from $attrs->{table_name} "
.$attrs->{primary_where}." for update";
my($primary_id_name) = $attrs->{table_name};
$primary_id_name =~ s/_t$/_id/;
$attrs->{primary_id_name} = $primary_id_name
if $attrs->{columns}->{$primary_id_name};
return;
}
sub _prepare_select {
my($self, $query, $params) = @_;
return $self->get('select') . _prepare_where($self, $query, $params);
}
sub _prepare_select_param {
my($column, $value, $params) = @_;
# Returns the string for the query. If undef, adds "IS NULL" test. Pushes the
# value on params, otherwise. Handles ARRAY parameters as IN (?, ...).
return $column->{sql_name} . ' IS NULL'
unless defined($value);
unless (ref($value) eq 'ARRAY') {
push(@$params, $column->{type}->to_sql_param($value));
return $column->{sql_name} . '=' . $column->{sql_pos_param};
}
$value = [map($column->{type}->from_literal_for_model_value($_), @$value)];
push(@$params, @{$column->{type}->to_sql_param_list($value)});
return $column->{sql_name}
. ' IN '
. $column->{type}->to_sql_value_list($value);
}
sub _prepare_where {
my($self, $query, $params) = @_;
return ''
unless $query && %$query;
my($columns) = $self->get('columns');
return ' WHERE '
. join(
' AND ',
map(
_prepare_select_param(
$columns->{$_} || b_die('invalid field name: ', $_),
$query->{$_},
$params,
),
# Use a sort to force order which (may) help Oracle's cache.
sort(keys(%$query)),
),
);
}
1;