package MangoX::Queue; use Mojo::Base 'Mojo::EventEmitter'; use Carp 'croak'; use DateTime; use DateTime::Duration; use Mojo::Log; use Mango::BSON ':bson'; use MangoX::Queue::Delay; no warnings 'experimental::smartmatch'; our $VERSION = '0.03'; # A logger has 'log' => sub { Mojo::Log->new->level('error') }; # The Mango::Collection representing the queue has 'collection'; # A MangoX::Queue::Delay has 'delay' => sub { MangoX::Queue::Delay->new }; # How long to wait before assuming a job has failed has 'timeout' => sub { $ENV{MANGOX_QUEUE_JOB_TIMEOUT} // 60 }; # How many times to retry a job before giving up has 'retries' => sub { $ENV{MANGOX_QUEUE_JOB_RETRIES} // 5 }; # Store Mojo::IOLoop->timer IDs has 'consumers' => sub { {} }; # Store plugins has 'plugins' => sub { {} }; sub new { my $self = shift->SUPER::new(@_); croak qq{No Mango::Collection provided to constructor} unless ref($self->collection) eq 'Mango::Collection'; return $self; } sub plugin { my ($self, $name, $options) = @_; croak qq{Plugin $name already loaded} if exists $self->plugins->{$name}; { no strict 'refs'; unless($name->can('new')) { eval "require $name" or croak qq{Failed to load plugin $name: $@}; } } $self->plugins->{$name} = $name->new(%$options); $self->plugins->{$name}->register($self); return $self->plugins->{$name}; } sub get_options { my ($self) = @_; return { query => { '$or' => [{ status => { '$in' => [ 'Pending' ] } },{ status => { '$in' => [ 'Retrieved' ] }, retrieved => { '$lt' => DateTime->now->subtract_duration(DateTime::Duration->new(seconds => $self->timeout)) } }] }, update => { '$set' => { status => 'Retrieved', retrieved => DateTime->now, }, '$inc' => { attempt => 1, } }, sort => bson_doc( # Sort by priority, then in order of creation 'priority' => 1, 'created' => -1, ), new => 0, # Get the original object (so we can see status etc) }; } sub enqueue { my ($self, @args) = @_; # args maybe # - 'job_name' # - foo => bar, 'job_name' # - 'job_name', $callback # - foo => bar, 'job_name', $callback my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef; my $job = pop @args; my %args; %args = (@args) if scalar @args; my $db_job = { priority => $args{priority} // 1, created => $args{created} // DateTime->now, data => $job, status => $args{status} // 'Pending', attempt => 1, }; if($callback) { return $self->collection->insert($db_job => sub { my ($collection, $error, $oid) = @_; $db_job->{_id} = $oid; $self->emit_safe(enqueued => $db_job) if $self->has_subscribers('enqueued'); $callback->($db_job); }); } else { my $id = $self->collection->insert($db_job); $db_job->{_id} = $id; $self->emit_safe(enqueued => $db_job) if $self->has_subscribers('enqueued'); return $db_job; } } sub watch { my ($self, $id_or_job, $status, $callback) = @_; my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job; $status //= 'Complete'; # args # - watch $queue $id, 'Status' => $callback if($callback) { # Non-blocking $self->log->debug("Waiting for $id on status $status in non-blocking mode"); return Mojo::IOLoop->timer(0 => sub { $self->_watch_nonblocking($id, $status, $callback) }); } else { # Blocking $self->log->debug("Waiting for $id on status $status in blocking mode"); return $self->_watch_blocking($id, $status); } } sub _watch_blocking { my ($self, $id, $status) = @_; while(1) { my $doc = $self->collection->find_one({'_id' => $id}); $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No')); if($doc && ((!ref($status) && $doc->{status} eq $status) || (ref($status) eq 'ARRAY' && $doc->{status} ~~ @$status))) { return 1; } else { $self->delay->wait; } } } sub _watch_nonblocking { my ($self, $id, $status, $callback) = @_; $self->collection->find_one({'_id' => $id} => sub { my ($cursor, $err, $doc) = @_; $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No')); if($doc && ((!ref($status) && $doc->{status} eq $status) || (ref($status) eq 'ARRAY' && $doc->{status} ~~ @$status))) { $self->log->debug("Status is $status"); $self->delay->reset; $callback->($doc); } else { $self->log->debug("Job not found or status doesn't match"); $self->delay->wait(sub { return unless Mojo::IOLoop->is_running; Mojo::IOLoop->timer(0 => sub { $self->_watch_nonblocking($id, $status, $callback) }); }); return undef; } }); } sub requeue { my ($self, $job, $callback) = @_; $job->{status} = 'Pending'; return $self->update($job, $callback); } sub dequeue { my ($self, $id_or_job, $callback) = @_; # TODO option to not remove on dequeue? my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job; if($callback) { $self->collection->remove({'_id' => $id} => sub { $callback->(); $self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued'); }); } else { $self->collection->remove({'_id' => $id}); $self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued'); } } sub get { my ($self, $id_or_job, $callback) = @_; my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job; if($callback) { return $self->collection->find_one({'_id' => $id} => sub { my ($collection, $error, $doc) = @_; $callback->($doc); }); } else { return $self->collection->find_one({'_id' => $id}); } } sub update { my ($self, $job, $callback) = @_; if($callback) { return $self->collection->find_one({'_id' => $job->{_id}} => sub { my ($collection, $error, $doc) = @_; $callback->($doc); }); } else { return $self->collection->update({'_id' => $job->{_id}}, $job, {upsert => 1}); } } sub fetch { my ($self, @args) = @_; # fetch $queue status => 'Complete', sub { my $job = shift; } my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef; my %args; %args = (@args) if scalar @args; $self->log->debug("In fetch"); if($callback) { $self->log->debug("Fetching in non-blocking mode"); my $consumer_id = (scalar keys %{$self->consumers}) + 1; $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 1) }); return $consumer_id; } else { $self->log->debug("Fetching in blocking mode"); return $self->_consume_blocking(\%args, 1); } } sub consume { my ($self, @args) = @_; # consume $queue status => 'Failed', sub { my $job = shift; } my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef; my %args; %args = (@args) if scalar @args; $self->log->debug("In consume"); if($callback) { $self->log->debug("consuming in non-blocking mode"); my $consumer_id = (scalar keys %{$self->consumers}) + 1; $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 0) }); $self->log->debug("Timer scheduled, consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id}); return $consumer_id; } else { $self->log->debug("consuming in blocking mode"); return $self->_consume_blocking(\%args, 0); } } sub release { my ($self, $consumer_id) = @_; $self->log->debug("Releasing consumer $consumer_id with timer ID: " . $self->consumers->{$consumer_id}); Mojo::IOLoop->remove($self->consumers->{$consumer_id}); delete $self->consumers->{$consumer_id}; return; } sub _consume_blocking { my ($self, $args, $fetch) = @_; while(1) { my $opts = $self->get_options; $opts->{query} = $args if scalar keys %$args; my $doc = $self->collection->find_and_modify($opts); $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No')); if($doc) { $self->emit_safe(consumed => $doc) if $self->has_subscribers('consumed'); return $doc; } else { last if $fetch; $self->delay->wait; } } } sub _consume_nonblocking { my ($self, $args, $consumer_id, $callback, $fetch) = @_; my $opts = $self->get_options; $opts->{query} = $args if scalar keys %$args; $self->collection->find_and_modify($opts => sub { my ($cursor, $err, $doc) = @_; $self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No')); if($doc) { $self->delay->reset; $self->emit_safe(consumed => $doc) if $self->has_subscribers('consumed'); $callback->($doc); return unless Mojo::IOLoop->is_running; return if $fetch; return unless exists $self->consumers->{$consumer_id}; $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking($args, $consumer_id, $callback, 0) }); $self->log->debug("Timer rescheduled (recursive immediate), consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id}); } else { return unless Mojo::IOLoop->is_running; return if $fetch; $self->delay->wait(sub { return unless exists $self->consumers->{$consumer_id}; $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking($args, $consumer_id, $callback, 0) }); $self->log->debug("Timer rescheduled (recursive delayed), consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id}); }); return undef; } }); } 1; =encoding utf8 =head1 NAME MangoX::Queue - A MongoDB queue implementation using Mango =head1 DESCRIPTION L is a MongoDB backed queue implementation using L to support blocking and non-blocking queues. L makes no attempt to handle the L connection, database or collection - pass in a collection to the constructor and L will use it. The collection can be plain, capped or sharded. =head1 SYNOPSIS use Mango; use MangoX::Queue; my $mango = Mango->new("mongodb://localhost:27017"); my $collection = $mango->db('my_db')->collection('my_queue'); my $queue = MangoX::Queue->new(collection => $collection); # To add a job my $id = enqueue $queue 'test'; # Blocking enqueue $queue 'test' => sub { my $id = shift; }; # Non-blocking # To set options my $id = enqueue $queue priority => 1, created => DateTime->now, 'test'; # Blocking enqueue $queue priority => 1, created => DateTime->now, 'test' => sub { my $id = shift; }; # Non-blocking # To watch for a specific job status watch $queue $id; # Blocking watch $queue $id, 'Complete' => sub { # Non-blocking # Job status is 'Complete' }; # To fetch a job my $job = fetch $queue; # Blocking fetch $queue sub { # Non-blocking my ($job) = @_; # ... }; # To get a job by id my $job = get $queue $id; # Blocking get $queue $id => sub { my $job = shift; }; # Non-blocking # To requeue a job my $id = requeue $queue $job; # Blocking requeue $queue $job => sub { my $id = shift; }; # Non-blocking # To dequeue a job dequeue $queue $id; # Blocking dequeue $queue $id => sub { }; # Non-blocking # To consume a queue while(my $job = consume $queue) { # Blocking # ... } my $consumer = consume $queue sub { # Non-blocking my ($job) = @_; # ... }; # To stop consuming a queue release $queue $consumer; # To listen for events on $queue enqueued => sub ( my ($queue, $job) = @_; }; on $queue dequeued => sub ( my ($queue, $job) = @_; }; on $queue consumed => sub { my ($queue, $job) = @_; }; # To register a plugin plugin $queue 'MangoX::Queue::Plugin::Statsd'; =head1 ATTRIBUTES L implements the following attributes. =head2 collection my $collection = $queue->collection; $queue->collection($mango->db('foo')->collection('bar')); my $queue = MangoX::Queue->new(collection => $collection); The L representing the MongoDB queue collection. =head2 delay my $delay = $queue->delay; $queue->delay(MangoX::Queue::Delay->new); The L responsible for dynamically controlling the delay between queue queries. =head2 plugins my $plugins = $queue->plugins; Returns a hash containing the plugins registered with this queue. =head2 retries my $retries = $queue->retries; $queue->retries(5); The number of times a job will be picked up from the queue before it is marked as failed. =head2 timeout my $timeout = $queue->timeout; $queue->timeout(10); The time (in seconds) a job is allowed to stay in Retrieved state before it is released back into Pending state. Defaults to 60 seconds. =head1 EVENTS L inherits from L and emits the following events =head2 consumed on $queue consumed => sub { my ($queue, $job) = @_; # ... }; Emitted when an item is consumed (either via consume or fetch) =head2 dequeued on $queue dequeued => sub { my ($queue, $job) = @_; # ... }; Emitted when an item is dequeued =head2 enqueued on $queue enqueued => sub { my ($queue, $job) = @_; # ... }; Emitted when an item is enqueued =head1 METHODS L implements the following methods. =head2 consume # In blocking mode while(my $job = consume $queue) { # ... } while(my $job = $queue->consume) { # ... } # In non-blocking mode consume $queue sub { my ($job) = @_; # ... }; $queue->consume(sub { my ($job) = @_; # ... }); Waits for jobs to arrive on the queue, sleeping between queue checks using L or L. Currently sets the status to 'Retrieved' before returning the job. =head2 dequeue my $job = fetch $queue; dequeue $queue $job; Dequeues a job. Currently removes it from the collection. =head2 enqueue enqueue $queue 'job name'; enqueue $queue [ 'some', 'data' ]; enqueue $queue +{ foo => 'bar' }; $queue->enqueue('job name'); $queue->enqueue([ 'some', 'data' ]); $queue->enqueue({ foo => 'bar' }); Add an item to the queue. Currently uses priority 1 with a job status of 'Pending'. =head2 fetch # In blocking mode my $job = fetch $queue; my $job = $queue->fetch; # In non-blocking mode fetch $queue sub { my ($job) = @_; # ... }; $queue->fetch(sub { my ($job) = @_; # ... }); Fetch a single job from the queue, returning undef if no jobs are available. Currently sets job status to 'Retrieved'. =head2 get my $job = get $queue $id; Gets a job from the queue by ID. Doesn't change the job status. =head2 get_options my $options = $queue->get_options; Returns the L options hash used by find_and_modify to identify and update available queue items. Wait for a job to enter a certain status. =head2 release my $consumer = consume $queue sub { # ... }; release $queue $consumer; Releases a non-blocking consumer from watching a queue. =head2 requeue my $job = fetch $queue; requeue $queue $job; Requeues a job. Sets the job status to 'Pending'. =head2 update my $job = fetch $queue; $job->{status} = 'Failed'; update $queue $job; Updates a job in the queue. =head2 watch # In blocking mode my $id = enqueue $queue 'test'; watch $queue $id, 'Complete'; # blocks until job is complete # In non-blocking mode my $id = enqueue $queue 'test'; watch $queue $id, 'Complete' => sub { # ... }; =head1 SEE ALSO L, L =cut