Bivio::Agent::Job::Dispatcher
# Copyright (c) 2000-2006 bivio Software, Inc. All rights reserved.
# $Id$
package Bivio::Agent::Job::Dispatcher;
use strict;
use Bivio::Base 'Agent.Dispatcher';
# C<Agent.Job::Dispatcher> is used to queue tasks at the end
# of other dispatcher tasks. There is only one queue. It is cleared
# by L<Agent.Task|Agent.Task> on errors. You may
# not queue new jobs during L<execute_queue|"execute_queue">.
my($_R) = b_use('AgentJob.Request');
my($_TI) = b_use('Agent.TaskId');
# Don't allow queueing while in execute.
my($_SELF);
my($_IN_EXECUTE) = 0;
my(@_QUEUE);
__PACKAGE__->initialize;
sub can_enqueue_job {
my($proto, $req) = @_;
return exists($ENV{MOD_PERL}) && b_use('AgentHTTP.Request')->is_blesser_of($req)
? 1 : 0;
}
sub create_request {
# (self, hash_ref) : Agent.Request
# Creates and returns a request.
my($self, $params) = @_;
return $_R->new($params);
}
sub discard_queue {
# (self) : undef
# Clears the queue. May be called at any time.
@_QUEUE = ();
return;
}
sub enqueue {
# (self, Agent.Request, any, hash_ref) : undef
# Enqueue I<task> with I<params>. The I<auth_id> and and auth_user_id are
# extracted from I<req>, if they are not supplied in I<params>. I<params> may
# not contain any models. All models must be freshly loaded for each job.
#
# May not be called during L<execute_queue|"execute_queue">.
my($self, $req, $task_id, $params) = @_;
b_die('not allowed to call enqueue in execute_queue')
if $_IN_EXECUTE;
# No models please
while (my($k, $v) = each(%$params)) {
b_die('models may not be queued: ', $k, '=', $v)
if b_use('Biz.Model')->is_super_of($v);
}
# Validate task
$task_id = $_TI->from_any($task_id);
# Extract params from request
$params->{task_id} = $task_id;
my($u) = $req->get('auth_user');
$params->{auth_user_id} ||= $u ? $u->get('realm_id') : undef;
#TODO: Need to define this list more clearly
foreach my $p (qw(auth_id Bivio::UI::Facade UI.Facade is_secure client_addr)) {
$params->{$p} = $req->unsafe_get($p)
unless exists($params->{$p});
}
# Enqueue and add as a txn resource (may end up calling handle_rollback
# multiple times, but the routine is re-enterable).
$req->push_txn_resource($self);
push(@_QUEUE, $params);
return;
}
sub execute_queue {
# (proto) : undef
# Processes the queue. Called from Mail or HTTP dispatcher after
# request completes.
die('recursive call to execute_queue') if $_IN_EXECUTE;
$_IN_EXECUTE = 1;
# Iterate through each item in the queue
while (@_QUEUE) {
my($params) = shift(@_QUEUE);
b_info($$, ' JOB_START: ', $params);
my($die) = $_SELF->process_request({%$params});
b_info(
$$,
$die ? (' JOB_ERROR: ', $params, ' ', $die) : (' JOB_END: ', $params),
);
}
$_IN_EXECUTE = 0;
return;
}
sub handle_commit {
# (self) : undef
# Commit called, do nothing.
return;
}
sub handle_rollback {
# (self) : undef
# Rollback called, clear queue.
my($self) = @_;
$self->discard_queue;
return;
}
sub initialize {
# (proto) : undef
# Called on first request.
my($proto) = @_;
return if $_SELF;
$_SELF = $proto->new;
$_SELF->SUPER::initialize();
return;
}
sub queue_is_empty {
# (self) : boolean
# Returns true if the queue is empty.
return @_QUEUE ? 0 : 1;
}
1;