Bivio::SQL::ListSupport
# Copyright (c) 1999-2010 bivio Software, Inc. All rights reserved. # $Id$ package Bivio::SQL::ListSupport; use strict; use Bivio::Base 'SQL.Support'; # C<Bivio::SQL::ListSupport> provides SQL database access for # L<Bivio::Biz::ListModel>s. The loading specification is defined # by L<Bivio::SQL::ListQuery|Bivio::SQL::ListQuery>. # # ListSupport uses the L<Bivio::SQL::Connection> for statement execution. # # A field is a name of the form C<Model.field>. # # A field identity is an array_ref of fields. The first field is the # name that appears in the columns list. The rest are aliases which # are joined in the where clause with "=". # # Outer joins are supported for field identities. An alias may end # with '(+)'. # # # See also L<Bivio::SQL::Support|Bivio::SQL::Support> for more attributes. # # # auth_id : array_ref # # auth_id : string # # A field or field identity which must be equal to # request's I<auth_id> attribute. # # auth_user_id : array_ref # # auth_user_id : string # # A field or field identity which must be equal to # request's I<auth_user_id> attribute. # # can_iterate : boolean [0] # # By default lists can't be iterated. If you set this to true, you can # iterate. # # count_distinct : array_ref # # The column to be used in the COUNT(DISTINCT x) query. # # date : array_ref # # date : string # # Date qualification field, used to qualify queries. # # from : string # # Optionally override the FROM clause. Use this feature with caution. # # group_by : array_ref # # Optionally, a list of fields and field identities that can be used # to group the result. # # other : array_ref # # A list of fields and field identities that have no ordering. # # other_query_keys : array_ref # # Extra keys expected on the query. Used by L<clean_raw|"clean_raw">. # # order_by : array_ref # # A list of fields and field identities that can be used to sort # the result. # # order_by_names : array_ref # # List of columns order_by columns (in order). # # orabug_fetch_all_select : boolean # # If set, then selects must be read to completion. We were seeing ORA-03113 # because the oracle slave was crashing with a SEGV # on the test system only. It isn't happening any more. We don't know # why, but we've taken it out of RealmUserList. I've left the code in, # because it may need to be added quickly. Just add it to internal_initialize # of the list model, e.g. # # # This select causes the oracle db slave to crash. The next # # operation fails. See ListSupport for more details. # orabug_fetch_all_select => 1, # # parent_id : array_ref # # parent_id : string # # A field or field identity which further qualifies a query. # Used when a list "this" points to another list, e.g. # InstrumentSummaryList leads the user to InstrumentTransactionList. # # parent_id_type : string # # The type class of the parent_id field. # # primary_key : array_ref (required) # # The list of fields and field that uniquely identifies a row. # # primary_key_types : array_ref # # List of primary key types in the order of I<primary_key_names>. # # select_value : string # # The raw SQL to be substituted into the SQL query for the column. # # version : int # # The version of this particular combination of fields. It will be # set in all query strings. It should be changed whenever the # declaration changes. It is used to reject an out-dated query. # # want_date : boolean [0] # # #TODO: It may make sense to just use the "date" field. # # want_select : boolean [1] # # Is this going to be in the select? If false, like setting # in_select to false for all columns. # # want_select_distinct : boolean [0] # # Use SELECT DISTINCT instead of SELECT. # # want_level_in_select : boolean [0] # # Add C<LEVEL> to the select. This is an Oracle specific field. # It is used with C<CONNECT BY>. # # want_page_count : boolean [Bivio::Biz::ListModel want_page_count] # # Should the number of pages be calculated for this list if paged? # # where : array_ref # # A list of fields which will be ANDed to rest of the where clause. # If an element matches a column_name or alias, then the appropriate # sql_name for the column_name or alias will be substituted. # # # # The following declaration is taken from # L<Bivio::Biz::Model::ClubUserList|Bivio::Biz::Model::ClubUserList>: # # Bivio::SQL::ListSupport->new({ # version => 1, # order_by => [qw( # RealmOwner.name # ClubUser.mail_mode # RealmUser.role # )], # other => [qw( # User.last_name # User.middle_name # User.first_name # )], # primary_key => [ # [qw(User.user_id ClubUser.user_id RealmOwner.realm_id # RealmUser.user_id)], # ], # auth_id => [qw(ClubUser.club_id RealmUser.realm_id)], # }); # # This declaration will produce the following properties: # # User.last_name # RealmOwner.name # ClubUser.mail_mode # RealmUser.role # User.user_id # ClubUser.club_id # # This is the first version. Any time the field names change, you should change # the version. Field identities do not affect the version, because they do not # affect the external representation, just the implementation of the query. # # You can order this model by I<RealmOwner.name>, I<ClubUser.mail_mode>, or # I<RealmUser.role>. While it may not make the most sense to order by # I<ClubUser.mail_mode>, it is allowed and "why not?". # # The I<User.user_id> and its aliases # I<ClubUser.user_id>, I<RealmOwner.realm_id>, and I<RealmUser.user_id>, # is the C<primary_key> for this ListSupport. It is guaranteed to be # unique to each row of the ListSupport. our($_TRACE); b_use('IO.Trace'); my($_PRIMARY_ID_SQL_VALUE) = b_use('Type.PrimaryId')->to_sql_value('?'); my($_DATE_SQL_VALUE) = b_use('Type.Date')->to_sql_value('?'); my($_CONSTANT_COLS) = [qw(auth_id auth_user_id parent_id)]; my($_LQ) = b_use('SQL.ListQuery'); my($_C) = b_use('SQL.Connection'); sub get_statement { my($self) = @_; # Return the statement for this instance. return $self->internal_get()->{statement}; } sub iterate_next { my($self) = shift; return 0 unless $self->SUPER::iterate_next(@_); my($model, $iterator, $row, $converter) = @_; _add_constant_values($self, $model->get_query, $row, $converter); return 1; } sub iterate_start { # Returns a handle which can be used to iterate the rows with # L<iterate_next|"iterate_next">. L<iterate_end|"iterate_end"> # should be called, too. # # Arguments are the same as L<load|"load">. return _execute_select(@_); } sub load { my($self, $query, $stmt, $where, $params, $die) = @_; # Loads the specified rows with data using the parameterized where_clause # and substitution values. At most the specified max rows will be loaded. # Data will be loaded starting at the specified index into the result set. # # I<where> is added to the internally generated select with I<params>. # # If I<want_this> or I<this> is set, only loads one element. # If no select such just return an empty list. Only local fields. return [] unless _select($self); # Detail or list? return $query->get('this') || $query->unsafe_get('want_first_only') ? _load_this($self, $query, _execute_select(@_), $die) : _load_list(@_); } sub new { my($proto, $decl, $stmt) = @_; # Creates a SQL list support instance from a declaration. A I<decl> is a list of # keyed categories. The keys are described below. The values are either an # array_ref or a string (except I<version>). The array_ref may contain strings # (fields) or array_refs of strings (field identities). A field is composed of a # table qualifier and the column name. The first field in a field identity is # a property. The others are supplied for the I<WHERE> clause. # # The table qualifier is the table name with the trailing I<t> replaced by # a digit. The digits start at I<1>. # # The types of the columns will be extracted from the property # models corresponding to the table names. my($attrs) = { statement => $stmt, # All columns by qualified name columns => {}, # All models by qualified name models => {}, # All fields and field identities by qualified name column_aliases => {}, # The columns returned by select in order (not including auth_id) select_columns => [], # Columns which have no corresponding property model field local_columns => [], # See discussion of =item orabug_fetch_all_select orabug_fetch_all_select => $decl->{orabug_fetch_all_select}, # Default is false map({ $_ => $decl->{$_} ? 1 : 0; } qw(can_iterate want_date)), # Default is true want_select => !defined($decl->{want_select}) || $decl->{want_select} ? 1 : 0, other_query_keys => !defined($decl->{other_query_keys}) || ref($decl->{other_query_keys}) eq 'ARRAY' ? $decl->{other_query_keys} : b_die( $decl->{other_query_keys}, ': invalid other_query_keys'), want_page_count => $decl->{want_page_count}, }; $proto->init_common_attrs($attrs, $decl); # We add this to the declaration in the case that if ($decl->{want_level_in_select}) { $decl->{other} = [] unless ref($decl->{other}); push(@{$decl->{other}}, { name => 'level', type => 'Integer', constraint => 'NOT_NULL', in_select => 1, }, ); } _init_column_lists($attrs, $decl, _init_column_classes($attrs, $decl)); my($self) = $proto->SUPER::new($attrs); $_LQ->initialize_support($self); #TODO: make $self read_only? return $self; } sub _add_constant_values { my($self, $query, $row, $converter) = @_; my($attrs) = $self->internal_get; _map_constant_cols(sub { my($f) = @_; return unless my $i = $self->unsafe_get($f); my($v) = $query->unsafe_get($f); $row->{$i->{name}} ||= $converter ? $i->{type}->$converter($v) : $v; return; }); return $row; } sub _count_pages { my($self, $query, $from_where, $params) = @_; # Sets page_count and adjusts page_number. Returns page_count. my($statement) = $_C->execute( $self->get('select_count') . ' ' . $from_where, $params); my($row_count) = $_C->perf_time_op(sub {$statement->fetchrow_array}); my($page_count) = _page_number($query, $row_count); my($page_number) = $query->get('page_number'); _trace('page_count=', $page_count) if $_TRACE; if ($page_number > $page_count) { _trace('page_number (', $page_number, ') > count') if $_TRACE; $query->put(page_number => $page_number = $page_count); } $query->put(page_count => $page_count, row_count => $row_count); return $page_count; } sub _execute_select { # Prepare and execute the select statement. return $_C->execute((_prepare_statement(@_))[0,1]); } sub _find_list_start { my($self, $query, $sql, $params, $die) = @_; # Returns $rows and $statement after finding first row to return. my($db) = $_C->get_instance; my($statement, $row); my($page_number, $count) = $query->get(qw(page_number count)); my($can_limit_and_offset) = $db->CAN_LIMIT_AND_OFFSET; foreach my $is_second_try (0 .. 1) { # Set prev first, because there is a return in the for loop if ($page_number > $query->FIRST_PAGE) { $query->put(has_prev => 1, prev_page => $page_number - 1); } else { $query->put(has_prev => 0, prev_page => undef, # Avoids problems if page_number is negative page_number => ($page_number = $query->FIRST_PAGE)); } if ($can_limit_and_offset) { # We always get one more, so has_next works $statement = $db->execute( $sql . sprintf(' OFFSET %d LIMIT %d', ($page_number - 1) * $count, $count + 1), $params); return ($row, $statement) if $row = $_C->perf_time_op(sub {$statement->fetchrow_arrayref}); $_C->perf_time_finish($statement); return (undef, undef) if $is_second_try || $page_number == $query->FIRST_PAGE; $can_limit_and_offset = 0; } # No LIMIT/OFFSET, so go through rows serially my($start) = ($page_number - $query->FIRST_PAGE()) * $count; #TODO: Is this needed? $count has to be > 0, no? $start = 0 if $start < 0; $statement = $db->execute($sql, $params); my($num_rows) = 0; 0 while $row = $_C->perf_time_op(sub {$statement->fetchrow_arrayref}) and ++$num_rows <= $start; return ($row, $statement) if $row; $_C->perf_time_finish($statement); unless ($num_rows) { _trace('no rows found') if $_TRACE; return (undef, undef); } $query->put(page_number => $page_number = _page_number($query, $num_rows)); } continue { _trace('last page=', $page_number, ', retrying') if $_TRACE; } ($die || 'Bivio::Die')->throw_die('DB_ERROR', { message => 'unable to find page in list', page_number => $page_number, where => $sql, params => $params, }); # DOES NOT RETURN } sub _init_column_classes { my($attrs, $decl) = @_; # Initialize the column classes. # Returns the beginnings of the where clause my($where) = __PACKAGE__->init_column_classes($attrs, $decl, [@$_CONSTANT_COLS, qw(date primary_key order_by group_by other count_distinct)]); if ($decl->{where}) { my(@decl_where) = (); foreach my $e (@{$decl->{where}}) { if (defined($attrs->{column_aliases}->{$e})) { push(@decl_where, $attrs->{column_aliases}->{$e}->{sql_name}); } elsif (defined($attrs->{models}->{$e})) { #TODO: This doesn't work for qualified columns, but it works for # what I need right now. push(@decl_where, $attrs->{models}->{$e}->{sql_name}); } else { push(@decl_where, $e); } } $where = join(' AND ', grep($_, $where, join(' ', @decl_where))); } foreach my $c (@$_CONSTANT_COLS, qw(date count_distinct)) { Bivio::Die->die("too many $c fields") if @{$attrs->{$c}} > 1; $attrs->{$c} = $attrs->{$c}->[0]; } # order_by may be empty and stays in specified order. my($i) = 0; foreach my $c (@{$attrs->{order_by}}) { $c->{order_by_index} = $i++; } return undef unless %{$attrs->{models}} && $attrs->{want_select}; # primary_key must be at least one column if there are models. Bivio::Die->die('no primary_key fields') unless @{$attrs->{primary_key}} || !%{$attrs->{models}}; # Sort all names in a select alphabetically. $attrs->{primary_key} = [sort {$a->{name} cmp $a->{name}} @{$attrs->{primary_key}}]; # other can be empty. No reformatting necessary # Ensure that (qual) columns defined for all (qual) models and their # primary keys and initialize primary_key_map. __PACKAGE__->init_model_primary_key_maps($attrs); return $where; } sub _init_column_lists { my($attrs, $decl, $where) = @_; # Creates many of the lists in $attrs which are derived from the class # lists (primary_key, order_by). Creates select and select_this # using "where" of field identities and column information already in $attrs # only if "where" is defined (see _init_column_classes). # Lists are sorted to keep Oracle's cache happy across invocations $attrs->{primary_key_names} = [map {$_->{name}} @{$attrs->{primary_key}}]; $attrs->{primary_key_types} = [map {$_->{type}} @{$attrs->{primary_key}}]; # order_by can't be sorted, because order is important $attrs->{order_by_names} = [map {$_->{name}} @{$attrs->{order_by}}]; $attrs->{column_names} = [sort(keys(%{$attrs->{columns}}))]; if ($attrs->{parent_id}) { $attrs->{parent_id_type} = $attrs->{parent_id}->{type}; } foreach my $c (values(%{$attrs->{columns}})) { Bivio::Die->die($c->{name}, ': cannot have a blob in a ListModel') if $c->{type} eq 'Bivio::Type::BLOB'; } # Nothing to select return unless defined($where); # Order select columns alphabetically, ignoring primary_key, primary_id # and auth_id and any other columns with in_select turned off. my(@sel_cols) = sort {$a->{name} cmp $b->{name}} (grep($_->{in_select}, values(%{$attrs->{columns}}))); # Go through the list and delete cols we don't return or in the # case of the primary key, what we return first. Yes, this probably # could be done in one giant grep, but better to get right than # tricky. <g> foreach my $col (@{$attrs->{primary_key}}) { @sel_cols = grep($_ ne $col, @sel_cols); } # Put primary key back on front, if it is part of select $attrs->{can_load_this} = 1; unshift(@sel_cols, grep($_->{in_select} || ($attrs->{can_load_this} = 0), @{$attrs->{primary_key}})); $attrs->{select_columns} = \@sel_cols; # Get names and set select_index my($i) = 0; my(@select_sql_names) = map { $_->{select_index} = $i++; $_->{select_value} || $_->{type}->from_sql_value($_->{sql_name}); } @{$attrs->{select_columns}}; # Create select from all columns $attrs->{decl_from} = $decl->{from}; $attrs->{sql_from} = ' ' . ( $decl->{from} || 'FROM '. join(',', sort(map($_->{model_from_sql}, values(%{$attrs->{models}})))) ); $where =~ s/^\s*AND\s+//i; $attrs->{sql_where} = $where; my($select) = ($decl->{want_select_distinct} ? 'DISTINCT ' : '') . join(',', @select_sql_names); my($select_count) = $decl->{want_select_distinct} ? $attrs->{count_distinct} ? 'DISTINCT ' . $attrs->{count_distinct}->{sql_name} : $select : '*'; $attrs->{select} = "SELECT $select"; $attrs->{select_count} = "SELECT COUNT($select_count)"; return; } sub _load_list { my($self, $query, undef, undef, undef, $die) = @_; my($sql, $params, $from_where) = _prepare_statement(@_); _count_pages($self, $query, $from_where, $params) if $from_where && $query->unsafe_get('want_page_count'); my($attrs) = $self->internal_get; my($count) = $query->get('count'); my($row, $statement) = _find_list_start($self, $query, $sql, $params, $die); return [] unless $row; my($select_columns) = $attrs->{select_columns}; my(@rows); for (;;) { my($i) = 0; push(@rows, _add_constant_values($self, $query, { (map { ($_->{name}, $_->{type}->from_sql_column($row->[$i++])); } @$select_columns), })); last if --$count <= 0; unless ($row = $_C->perf_time_op(sub {$statement->fetchrow_arrayref})) { $_C->perf_time_finish($statement); return \@rows; } } # Is there a next? if ($_C->perf_time_op(sub {$statement->fetchrow_arrayref})) { $query->put(has_next => 1, next_page => $query->get('page_number') + 1); # See discussion of =item orabug_fetch_all_select if ($attrs->{orabug_fetch_all_select}) { 0 while $_C->perf_time_op(sub {$statement->fetchrow_arrayref}); } } $_C->perf_time_finish($statement); # Return the page return \@rows; } sub _load_this { my($self, $query, $statement, $die) = @_; # Load "this" from statement. We search serially through all records. # There doesn't appear to be a better way to do this, because we need # to know "prev". Eventually, this will have to be PL/SQL. my($attrs) = $self->internal_get; $die->throw_die('DIE', 'cannot load this, primary key must be in_select') unless $attrs->{can_load_this}; my($count, $this) = $query->get(qw(count this)); my($want_first) = $query->unsafe_get('want_first_only'); _trace($want_first ? 'looking for first' : ('looking for this ', $attrs->{primary_key_names}, ' = ', $this)) if $_TRACE; my($types) = $attrs->{primary_key_types}; my($prev, $row); my($row_count) = 0; for (;;) { $_C->perf_time_finish($statement), return [] unless $row = $_C->perf_time_op(sub {$statement->fetchrow_arrayref}); $row_count++; # Convert the entire primary key and save in $prev if no match my($j) = 0; my($match) = 1; my(@prev) = map { my($v) = $_->from_sql_column($row->[$j]); #TODO: Should this be "is_equal"? This is probably "good enough". # It will slow it down a lot to make a method call for each # row/attribute. "eq" works in all cases and probably in future. $match &&= $want_first || $this->[$j] eq $v; $j++; $v; } @$types; if ($want_first) { $query->put(this => $this = \@prev); _trace('found first ', $attrs->{primary_key_names}, ' = ', \@prev) if $_TRACE; last; } last if $match; $prev = \@prev; } # Found it, copy all columns of this _trace('found this at row #', $row_count) if $_TRACE; my($i) = 0; my($rows) = [_add_constant_values($self, $query, { (map { ($_->{name}, $_->{type}->from_sql_column($row->[$i++])); } @{$attrs->{select_columns}}), })]; # Set prev if defined $query->put(prev => $prev, has_prev => 1) if $prev; # Set next if more rows my($next) = $_C->perf_time_op(sub {$statement->fetchrow_arrayref}); if ($next) { my($j) = 0; $query->put(has_next => 1, next => [map { $_->from_sql_column($row->[$j++]); } @$types]); # See discussion of =item orabug_fetch_all_select if ($attrs->{orabug_fetch_all_select}) { 0 while $_C->perf_time_op(sub {$statement->fetchrow_arrayref}); } } $_C->perf_time_finish($statement); $query->put(page_number => _page_number($query, $row_count)); return $rows; } sub _map_constant_cols { my($op) = @_; return map($op->($_), @$_CONSTANT_COLS); } sub _merge_where { my($self, $_where) = @_; # Merge any internal, literal where predicates with where clause # returned by internal_pre_load return $_where unless $self->unsafe_get('sql_where'); _trace('sql_where: ', $self->get('sql_where')); return join(' AND ', grep($_, $self->get('sql_where'), $_where)); } sub _page_number { my($query, $num_rows) = @_; # Returns the page number that $num_rows is on. return int(--$num_rows/$query->get('count')) + $query->FIRST_PAGE(); } sub _prepare_ordinal_clauses { my($self, $query) = @_; # Generates the order_by and group_by clauses. my($attrs) = $self->internal_get; my($res) = ''; $res .= ' GROUP BY ' . join( ',', map($_->{type}->to_group_by_value($_->{sql_name}), @{$attrs->{group_by}}), ) if @{$attrs->{group_by}}; my($qob); if (@{$attrs->{order_by}} and $qob = $query->get('order_by') and @$qob) { my $max_i = $query->unsafe_get('want_only_one_order_by') ? 2 : @$qob; $res .= ' ORDER BY'; for (my($i) = 0; $i < $max_i; $i += 2) { my($c) = $attrs->{columns}->{$qob->[$i]}; $res .= ' ' . $c->{type}->to_order_by_value($c->{sql_name}) . ($qob->[$i+1] ? ',' : ' desc,'); } chop($res); } _trace('group_by/order_by: ', $res); return $res; } sub _prepare_query_values { my($self, $stmt, $query) = @_; _map_constant_cols(sub { my($col) = @_; if ($self->get($col) && defined(my $v = $query->unsafe_get($col))) { $stmt->where([$self->get($col)->{name}, [$v]]); } return; }); if ($self->unsafe_get('date')) { my($begin_date, $interval, $end_date) = $query->get(qw(begin_date interval date)); unless ($end_date || $begin_date) { b_warn($interval, ': interval but no date, ignoring; ', $query) if $interval; } else { # Won't have both a begin_date and interval (see ListQuery) $begin_date = $interval->dec($end_date) if $interval; $stmt->where($stmt->GTE($self->get('date')->{name}, [$begin_date])) if $begin_date; $stmt->where($stmt->LTE($self->get('date')->{name}, [$end_date])) if $end_date; } } return; } sub _prepare_statement { my($self, $query, $stmt, $_where_in, $params_in, $die) = @_; _trace('_where: ', $_where_in); $stmt ||= Bivio::SQL::Statement->new; _prepare_query_values($self, $stmt, $query); my($where, $params) = $stmt->build_for_list_support_prepare_statement( $self, $self->get('statement'), _merge_where($self, $_where_in), $params_in); _trace('where: ', $where); return ($where . _prepare_ordinal_clauses($self, $query), $params, undef) if $where =~ s/^WHERE SELECT\b/SELECT/; ($die || 'Bivio::Die')->throw_die('DIE', 'must support select') unless my $select = _select($self); my(@from_where) = (); # if $where has a FROM clause, ignore $sql_from # otherwise, append $where to $sql_from unless ($where && $where =~ /^\s*FROM/is) { push(@from_where, $self->get('sql_from')); } push(@from_where, $where); return ( join(' ', $select, @from_where, _prepare_ordinal_clauses($self, $query)), $params, join(' ', @from_where) ); } sub _select { my($self) = @_; # Ask statement to build select string. return $self->get_statement() ->build_select_for_sql_support($self); } 1;