Queue Service

Configuration

Queues allow you to defer the processing of a time consuming task, such as sending an e-mail, until a later time, thus drastically speeding up the web requests to your application.

The queue configuration file is stored in config/queue.php. In this file you will find connection configurations for each of the queue drivers that are included, such as a database, Beanstalkd, IronMQ, Amazon SQS and Redis.

Two special queue drivers are also available:

  • The sync queue driver runs a job immediately as it is queued. This is useful for development.
  • The null queue driver simply discards queued jobs so they are never executed.

Driver prerequisites

Before using the Amazon SQS, Beanstalkd, IronMQ or Redis drivers you will need to install Drivers plugin.

Basic usage

Pushing a job onto the queue

To push a new job onto the queue, use the Queue::push method:

Queue::push('SendEmail', ['message' => $message]);

The first argument given to the push method is the name of the class that should be used to process the job. The second argument is an array of data that should be passed to the handler. A job handler should be defined like so:

class SendEmail
{
    public function fire($job, $data)
    {
        //
    }
}

Notice the only method that is required is fire, which receives a Job instance as well as the array of data that was pushed onto the queue.

Specifying a custom handler method

If you want the job to use a method other than fire, you may specify the method when you push the job:

Queue::push('SendEmail@send', ['message' => $message]);

Specifying a queue name for a job

You may also specify the queue / tube a job should be sent to:

Queue::push('SendEmail@send', ['message' => $message], 'emails');

Delaying the execution of a job

Sometimes you may wish to delay the execution of a queued job. For instance, you may wish to queue a job that sends a customer an e-mail 15 minutes after sign-up. You can accomplish this using the Queue::later method:

$date = Carbon::now()->addMinutes(15);

Queue::later($date, 'SendEmail', ['message' => $message]);

In this example, we're using the Carbon date library to specify the delay we wish to assign to the job. Alternatively, you may pass the number of seconds you wish to delay as an integer.

NOTE: The Amazon SQS service has a delay limit of 900 seconds (15 minutes).

Queues and models

If your queued job takes a model in its data, only the identifier for the model will be serialized onto the queue. When the job is actually handled, the queue system will automatically re-retrieve the full model instance from the database.

It's all totally transparent to your application and prevents issues that can arise from serializing full model instances.

Deleting a processed job

Once you have processed a job, it must be deleted from the queue, which can be done via the delete method on the Job instance:

public function fire($job, $data)
{
    // Process the job...

    $job->delete();
}

Releasing a job back onto the queue

If you wish to release a job back onto the queue, you may do so via the release method:

public function fire($job, $data)
{
    // Process the job...

    $job->release();
}

You may also specify the number of seconds to wait before the job is released:

$job->release(5);

Checking the number of run attempts

If an exception occurs while the job is being processed, it will automatically be released back onto the queue. You may check the number of attempts that have been made to run the job using the attempts method:

if ($job->attempts() > 3) {
    //
}

Marking a job as failed

While a job is generally marked as failed if an exception is thrown while running the job and the retries are exhausted, you may wish to fail the job manually under certain circumstances.

To mark a job as failed, you can use the fail method:

$job->fail();

Accessing the job ID

You may also access the job identifier:

$job->getJobId();

Queueing closures

Instead of pushing a job class into the queue, you may also push a closure for simple tasks that need to be executed out of the current request cycle.

Pushing a closure onto the queue

Queue::push(function () use ($id, $jobId) {
    Account::delete($id);

    $job = Job::get($jobId);
    $job->delete();
});

NOTE: Instead of making objects available to queued Closures via the use directive, consider passing primary keys and re-pulling the associated models from within your queue job. This often avoids unexpected serialization behavior.

When using Iron.io push queues, you should take extra precaution queueing Closures. The end-point that receives your queue messages should check for a token to verify that the request is actually from Iron.io. For example, your push queue end-point should be something like: https://example.com/queue/receive?token=SecretToken. You may then check the value of the secret token in your application before marshalling the queue request.

Running the queue worker

Winter includes some console commands that will process jobs in the queue.

To process new jobs as they are pushed onto the queue, run the queue:work command:

php artisan queue:work

Once this task has started, it will continue to run until it is manually stopped. You may use a process monitor such as Supervisor to ensure that the queue worker does not stop running.

Queue worker processes store the booted application state in memory. They will not recognize changes in your code after they have been started. When deploying changes, restart queue workers.

Processing a single job

To process only the first job on the queue, use the --once option:

php artisan queue:work --once

Specifying the connection & Queue

You may also specify which queue connection the worker should utilize:

php artisan queue:work --once connection

You may pass a comma-delimited list of queue connections to the work command to set queue priorities:

php artisan queue:work --once --queue=high,low

In this example, jobs on the high queue will always be processed before moving onto jobs from the low queue.

Specifying the job timeout parameter

You may also set the length of time (in seconds) each job should be allowed to run:

php artisan queue:work --once --timeout=60

Specifying queue sleep duration

In addition, you may specify the number of seconds to wait before polling for new jobs:

php artisan queue:work --once --sleep=5

Note that the queue only "sleeps" if no jobs are on the queue. If more jobs are available, the queue will continue to work them without sleeping.

Daemon queue worker

By default queue:work will process jobs without ever re-booting the framework. This results in a significant reduction of CPU usage when compared to the queue:work --once command, but at the added complexity of needing to drain the queues of currently executing jobs during your deployments.

To start a queue worker in daemon mode, simply omit the --once flag:

php artisan queue:work connection

php artisan queue:work connection --sleep=3

php artisan queue:work connection --sleep=3 --tries=3

You may use the php artisan help queue:work command to view all of the available options.

Deploying with daemon queue workers

The simplest way to deploy an application using daemon queue workers is to put the application in maintenance mode at the beginning of your deployment. This can be done using the backend settings area. Once the application is in maintenance mode, Winter will not accept any new jobs off of the queue, but will continue to process existing jobs.

The easiest way to restart your workers is to include the following command in your deployment script:

php artisan queue:restart

This command will instruct all queue workers to restart after they finish processing their current job.

NOTE: This command relies on the cache system to schedule the restart. By default, APCu does not work for CLI commands. If you are using APCu, add apc.enable_cli=1 to your APCu configuration.

Coding for daemon queue workers

Daemon queue workers do not restart the platform before processing each job. Therefore, you should be careful to free any heavy resources before your job finishes. For example, if you are doing image manipulation with the GD library, you should free the memory with imagedestroy when you are done.

Similarly, your database connection may disconnect when being used by long-running daemon. You may use the Db::reconnect method to ensure you have a fresh connection.

Using a system daemon

While running a queue worker as a daemon through php artisan is good for development, using a system daemon manager on your server to run and monitor the queue worker will provide many benefits such as auto-restarting, better logging to the system and more control over the configuration.

Below are two examples of common system daemon managers.

Systemd

systemd is a common process manager in use on more recent releases of Linux distributions. It covers an array of system components and offers tight control and simple configuration. Since it is pre-installed on many Linux distrubtions to date, there is no extra software required to be installed to take advantage of it.

Registering the queue worker service

You can register a new service to run the queue worker by running the following command as the webhost user in your CLI terminal:

systemctl --user edit --force --full queue-worker.service

This will open up a terminal editor for you to enter in the service configuration. A typical configuration will be made up of the following:

[Unit]
Description=Runs a queue worker for Winter CMS
OnFailure=failure-notify@%n.service

[Service]
Restart=always
# You can also use "Restart=on-failure" to restart only on a failure in the queue worker.
WorkingDirectory=/var/www/html
ExecStart=/usr/bin/php artisan queue:work

[Install]
WantedBy=default.target

You should use your project's root folder as the WorkingDirectory definition. The ExecStart definition should specify the direct path to your PHP executable, and the queue:work command and arguments you wish to use.

Once you save your configuration, you will need to enable it.

systemctl --user enable queue-worker.service

If you make any changes to your service's configuration, you must reload the configuration in systemd, which can be done by running the following:

systemctl --user daemon-reload

Starting / stopping the daemon

To start your queue worker daemon, simply run the following:

systemctl --user start queue-worker.service

And to stop the queue worker daemon:

systemctl --user stop queue-worker.service

Finally, to restart it:

systemctl --user restart queue-worker.service

NOTE: If you run the following commands when logged in via an SSH session, your daemon may stop as soon as you logout. You can tell your OS to keep alive any of your processes by running the following command, substituting the <user> placeholder with the user running the service: loginctl enable-linger <user>

Checking the status of the daemon

If you wish to check on the status of your queue worker, you can run the following:

systemctl --user status queue-worker.service

By default, systemd will show whether your service is active or not, and provides a small snippet of the logs for the service.

You can also get the full logs by querying journalctl:

journalctl --user -u queue-worker.service

Supervisor

Supervisor is a process monitor for the Linux operating system, and will automatically restart your queue:work process if it fails. To install Supervisor on Ubuntu, you may use the following command:

sudo apt-get install supervisor

Configuring Supervisor

Supervisor configuration files are typically stored in the /etc/supervisor/conf.d directory. Within this directory, you may create any number of configuration files that instruct supervisor how your processes should be monitored. For example, let's create a winter-worker.conf file that starts and monitors a queue:work process:

[program:winter-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /path/to/winter/artisan queue:work --sleep=3 --tries=3
autostart=true
autorestart=true
user=winter
numprocs=8
redirect_stderr=true
stdout_logfile=/path/to/winter/worker.log

In this example, the numprocs directive will instruct Supervisor to run 8 queue:work processes and monitor all of them, automatically restarting them if they fail. Of course, you should change the queue:work portion of the command directive to reflect your desired queue connection. The user directive should be changed to the name of a user that has permission to run the command.

Starting Supervisor

Once the configuration file has been created, you may update the Supervisor configuration and start the processes using the following commands:

sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start winter-worker:*

For more information on Supervisor, consult the Supervisor documentation.

Failed jobs

Since things don't always go as planned, sometimes your queued jobs will fail. Don't worry, it happens to the best of us! There is a convenient way to specify the maximum number of times a job should be attempted. After a job has exceeded this amount of attempts, it will be inserted into a failed_jobs table. The failed jobs table name can be configured via the config/queue.php configuration file.

You can specify the maximum number of times a job should be attempted using the --tries switch on the queue:work command:

php artisan queue:work connection-name --tries=3

If you would like to register an event that will be called when a queue job fails, you may use the Queue::failing method. This event is a great opportunity to notify your team via e-mail or another third party service.

Queue::failing(function ($connection, $job, $data) {
    //
});

You may also define a failed method directly on a queue job class, allowing you to perform job specific actions when a failure occurs:

public function failed($data)
{
    // Called when the job is failing...
}

The original array of data will also be automatically passed onto the failed method.

Retrying failed jobs

To view all of your failed jobs, you may use the queue:failed Artisan command:

php artisan queue:failed

The queue:failed command will list the job ID, connection, queue, and failure time. The job ID may be used to retry the failed job. For instance, to retry a failed job that has an ID of 5, the following command should be issued:

php artisan queue:retry 5

If you would like to delete a failed job, you may use the queue:forget command:

php artisan queue:forget 5

To delete all of your failed jobs, you may use the queue:flush command:

php artisan queue:flush
Copyright © 2024 Winter CMS