Bivio::Search::Xapian
# Copyright (c) 2006-2010 bivio Software, Inc. All Rights Reserved.
# $Id$
package Bivio::Search::Xapian;
use strict;
use Bivio::Base 'Search.None';
use Bivio::IO::Trace;
use File::Spec ();
use Search::Xapian ();
our($_TRACE);
my($_F) = b_use('IO.File');
my($_P) = b_use('Search.Parser');
my($_A) = b_use('IO.Alert');
my($_M) = b_use('Biz.Model');
my($_LOCK_ID);
#TODO: What is the actual max term length; I've seen errors in the 400 range
my($_MAX_WORD) = 80;
my($_LENGTH) = b_use('Type.PageSize')->get_default;
my($_STEMMER) = Search::Xapian::Stem->new('english');
my($_FLAGS) = 0;
foreach my $f (qw(FLAG_BOOLEAN FLAG_PHRASE FLAG_LOVEHATE FLAG_WILDCARD FLAG_PURE_NOT FLAG_PARTIAL)) {
$_FLAGS |= Search::Xapian->$f();
}
my($_VALUE_MAP) = {
simple_class => 0,
'RealmOwner.realm_id' => 1,
primary_id => 2,
title => 3,
excerpt => 4,
author_user_id => 5,
author => 6,
author_email => 7,
modified_date_time => [8, 9],
};
b_use('IO.Config')->register(my $_CFG = {
db_path => b_use('Biz.File')->absolute_path('Xapian'),
max_changesets => 10,
});
my($_L) = b_use('Model.Lock');
my($_D) = b_use('Type.Date');
my($_DT) = b_use('Type.DateTime');
sub EXEC_REALM {
return 'xapian_exec';
}
sub delete_model {
my($proto, $req, $model_or_id) = @_;
$req->perf_time_op(__PACKAGE__, sub {
my($id) = ref($model_or_id) ? $model_or_id->get_primary_id : $model_or_id;
return _queue(
$proto,
$req,
[delete_model => $id],
) unless ref($proto);
_delete($proto, _primary_term($id));
return;
});
return;
}
sub destroy_db {
my($proto, $req) = @_;
_acquire_lock($proto, $req);
$_A->info($_CFG->{db_path}, ': deleting');
$_F->rm_rf($_CFG->{db_path});
return;
}
sub execute {
my($proto, $req) = @_;
my($self) = $req->get(ref($proto) || $proto);
local($ENV{XAPIAN_MAX_CHANGESETS}) = $_CFG->{max_changesets};
$req->perf_time_op(__PACKAGE__, sub {
foreach my $op (@{$self->get('ops')}) {
_trace($op) if $_TRACE;
my($method) = shift(@$op);
$self->$method($req, @$op);
}
});
return 0;
}
sub get_db_path {
return $_CFG->{db_path};
}
sub get_stemmer {
return $_STEMMER;
}
sub get_values_for_primary_id {
my($proto) = shift;
return $proto->unsafe_get_values_for_primary_id(@_)
|| $proto->SUPER::get_values_for_primary_id(@_);
}
sub handle_prepare_commit {
my($self, $req) = @_;
if (b_use('AgentJob.Dispatcher')->can_enqueue_job($req)) {
b_use('AgentJob.Dispatcher')->enqueue(
$req,
'JOB_XAPIAN_COMMIT',
{
ref($self) => $self,
auth_id => _lock_id($self, $req),
auth_user_id => undef,
},
);
}
else {
$req->put(ref($self) => $self);
$self->execute($req);
}
return;
}
sub handle_config {
my(undef, $cfg) = @_;
$_CFG = $cfg;
return;
}
sub update_model {
my($proto, $req) = (shift, shift);
my($model) = @_;
return _queue(
$proto,
$model->get_request,
[update_model => $model->simple_package_name, $model->get_primary_id],
) unless ref($proto);
my($class, $id) = @_;
$model = $_M->new($req, $class);
return
unless $model->unauth_load({$model->get_primary_id_name => $id});
if ($model->can('is_searchable') && !$model->is_searchable) {
# need to remove it, ex. archived searchable file
$proto->delete_model($req, $model);
return;
}
my($postings) = $_P->xapian_terms_and_postings($model);
$req->perf_time_op(__PACKAGE__, sub {
_replace(
$proto,
$req,
$model,
$postings,
);
return;
});
return;
}
sub query {
my($proto, $attr) = @_;
my($q);
my($res) = $attr->{req}->perf_time_op(__PACKAGE__, sub {
$attr->{offset} ||= 0;
$attr->{length} ||= $_LENGTH;
$attr->{private_realm_ids} ||= [];
$attr->{public_realm_ids} ||= [];
$attr->{percent_cuttoff} ||= 0;
$attr->{weight_cutoff} ||= 0;
unless (@{$attr->{private_realm_ids}} || @{$attr->{public_realm_ids}}
|| $attr->{want_all_public}) {
_trace($attr, ': no realms and not public') if $_TRACE;
return [];
}
my($qp) = Search::Xapian::QueryParser->new;
$qp->set_stemmer($_STEMMER);
$qp->set_stemming_strategy(Search::Xapian::STEM_ALL());
$qp->set_default_op(Search::Xapian->OP_AND);
my($db) = Search::Xapian::Database->new($proto->get_db_path);
$qp->set_database($db);
foreach my $field (keys(%$_VALUE_MAP)) {
$qp->add_valuerangeprocessor(
Search::Xapian::DateValueRangeProcessor->new(
$_VALUE_MAP->{$field}->[1],
1,
$_DT->now_as_year - 80
))
if ref($_VALUE_MAP->{$field}) eq 'ARRAY';
}
my($phrase) = $attr->{phrase};
my($elite_set) = $attr->{elite_set};
my($main_query);
b_die("phrase and elite set are mutually incompatible")
if defined($phrase) && defined($elite_set);
b_die("one of phrase or elite_set is required")
unless defined($phrase) || defined($elite_set);
if (defined($phrase)) {
$phrase =~ s/_/ /g;
$main_query = $qp->parse_query($phrase, $_FLAGS);
}
else {
$main_query = Search::Xapian::Query->new(
Search::Xapian->OP_ELITE_SET,
map(Search::Xapian::Query->new($_),
@$elite_set));
}
$q = Search::Xapian::Query->new( Search::Xapian->OP_AND,
defined($elite_set)
? Search::Xapian::Query->new(
Search::Xapian->OP_ELITE_SET,
map(Search::Xapian::Query->new($_),
@$elite_set))
: $qp->parse_query($phrase, $_FLAGS),
$attr->{simple_class}
? Search::Xapian::Query->new('XSIMPLECLASS:' . lc($attr->{simple_class}))
: (),
Search::Xapian::Query->new(
Search::Xapian->OP_OR,
map(Search::Xapian::Query->new("XREALMID:$_"),
@{$attr->{private_realm_ids}}),
map(
Search::Xapian::Query->new(
Search::Xapian->OP_AND,
Search::Xapian::Query->new("XREALMID:$_"),
Search::Xapian::Query->new('XISPUBLIC:1'),
),
@{$attr->{public_realm_ids}},
),
$attr->{want_all_public}
? Search::Xapian::Query->new('XISPUBLIC:1')
: (),
),
);
# Need to make a copy. Xapian is using the Tie interface, and it's
# implementing it in a strange way.
my($enq) = _read(enquire => $q);
$enq->set_cutoff($attr->{percent_cutoff}, $attr->{weight_cutoff})
if $attr->{percent_cutoff} || $attr->{weight_cutoff};
my(@res) = $enq->matches($attr->{offset}, $attr->{length});
return [map(_query_result($proto, $_, $attr->{req}, $attr), @res)];
});
_trace([$q->get_terms], '->[', $attr->{offset}, '..',
$attr->{offset} + $attr->{length}, ']: ', $res,
) if $_TRACE;
return $res;
}
sub unsafe_get_values_for_primary_id {
my($proto, $primary_id, $model, $attr) = @_;
my($req) = $model->req;
my($res);
my($die) = Bivio::Die->catch_quietly(sub {
$res = $req->perf_time_op(
__PACKAGE__,
sub {
return undef
unless my $query_result = _find($proto, $req, $primary_id);
return _query_result($proto, $query_result, $req, $attr || {})
|| undef;
},
);
return;
});
return $res
unless $die;
b_warn($die->get('attrs')->{message});
return undef;
}
sub _acquire_lock {
my($proto, $req) = @_;
$_M->new($req, 'Lock')->acquire_unless_exists(_lock_id($proto, $req));
#TODO(pjm): lock file may be needed by Xapian to guard against multiple write calls
#unlink(File::Spec->catfile($_CFG->{db_path}, 'db_lock'));
return;
}
sub _delete {
my($self, $primary_term, $req) = @_;
return
unless $primary_term;
_write($self, $req, delete_document_by_term => $primary_term);
return;
}
sub _find {
my($proto, $req, $primary_id) = @_;
return (
_read(enquire => Search::Xapian::Query->new(_primary_term($primary_id)))
->matches(0, 1),
)[0];
}
sub _lock_id {
my($proto, $req) = @_;
return $_LOCK_ID ||= $_M->new($req, 'RealmOwner')
->unauth_load_or_die({name => $proto->EXEC_REALM})
->get('realm_id');
}
sub _primary_term {
my($id) = @_;
return "Q$id";
}
sub _queue {
my($proto, $req, $op) = @_;
my($self) = $req->unsafe_get_txn_resource($proto);
$req->push_txn_resource($self = $proto->new({ops => []}))
unless $self;
_trace($op) if $_TRACE;
push(@{$self->get('ops')}, $op);
return;
}
sub _query_author {
my($proto, $req, $res, $attr) = @_;
return $res
unless _query_model($proto, $res, $req, $attr);
return $res
if defined($res->{author}) && length($res->{author})
|| !(my $uid = $res->{author_user_id});
my($e) = $_M->new($req, 'Email');
my($ro) = $_M->new($req, 'RealmOwner');
return $res
unless $e->unauth_load({realm_id => $uid})
&& $ro->unauth_load({realm_id => $uid});
$res->{author_email} = $e->get('email');
$res->{author} = $ro->get('display_name');
return $res;
}
sub _query_model {
my($proto, $res, $req, $attr) = @_;
return 0
if $attr->{no_model};
my($m) = $_M->new($req, $res->{simple_class});
# There's a possibility that the the search db is out of sync with db
return 0
unless $m->unauth_load({$m->get_primary_id_name => $res->{primary_id}});
$res->{model} = $m;
unless ($res->{author_user_id}) {
my($p) = $proto->excerpt_model($m);
foreach my $field (keys(%$_VALUE_MAP)) {
$res->{$field} = $p->get($field);
}
}
return 1;
}
sub _query_result {
my($proto, $query_result, $req, $attr) = @_;
my($d) = $query_result->get_document;
return _query_author(
$proto,
$req,
{
map({
my($m) = "get_$_";
($_ => $query_result->$m());
} qw(percent rank collapse_count weight)),
simple_class => $d->get_value(0),
'RealmOwner.realm_id' => $d->get_value(1),
primary_id => $d->get_value(2),
map(($_ => $d->get_value(
ref($_VALUE_MAP->{$_}) eq 'ARRAY'
? $_VALUE_MAP->{$_}->[0]
: $_VALUE_MAP->{$_}
)), keys(%$_VALUE_MAP)),
},
$attr,
);
}
sub _read {
my($op) = shift;
# assumes lock is already acquired
return Search::Xapian::Database->new($_CFG->{db_path})->$op(@_);
}
sub _replace {
my($self, $req, $model, $parser) = @_;
return
unless $parser;
my($doc) = Search::Xapian::Document->new;
$doc->set_data('');
while (my($field, $index) = each(%$_VALUE_MAP)) {
my($v) = $parser->get($field);
if (ref($index) eq 'ARRAY') {
$doc->add_value($index->[1], $_D->to_file_name($v));
$index = $index->[0];
}
$doc->add_value($index, defined($v) ? $v : '');
}
my($primary_term) = _primary_term($model->get_primary_id);
foreach my $t ($primary_term, @{$parser->get('terms')}) {
$doc->add_term(substr($t, 0, $_MAX_WORD));
}
my($i) = 1;
foreach my $p (@{$parser->get('postings')}) {
next
if length($p) > $_MAX_WORD;
my($s) = $_STEMMER->stem_word($p);
$doc->add_posting($p, $i)
unless $s eq $p;
$doc->add_posting($s, $i);
foreach my $syn (@{$parser->xapian_posting_synonyms($s)}) {
$doc->add_posting($syn, $i);
}
$i++;
}
_write($self, $req, replace_document_by_term => $primary_term, $doc);
return;
}
sub _write {
my($proto, $req, $op) = (shift, shift, shift);
_acquire_lock($proto, $req);
my($db) = Search::Xapian::WritableDatabase->new(
$_CFG->{db_path}, Search::Xapian->DB_CREATE_OR_OPEN,
);
$db->$op(@_);
$db->flush;
return;
}
1;