Statistics
12
Views
0
Downloads
0
Donations
Support
Share
Uploader

高宏飞

Shared on 2026-04-23

AuthorMohamed Said

Learn how to use the queue system to make your applications faster and more reliable while reducing running costs

Tags
No tags
Publish Year: 2022
Language: 英文
Pages: 232
File Format: PDF
File Size: 4.3 MB
Support Statistics
¥.00 · 0times
Text Preview (First 20 pages)
Registered users can read the full content for free

Register as a Gaohf Library member to read the complete e-book online for free and enjoy a better reading experience.

(This page has no text content)
Contents Preface 4 .................................................................................................................... Introduction to Queues 5 .............................................................................................. The Queue System 9 ............................................................................................. Queues in Laravel 11 ............................................................................................. The Advantages of Using Queues 16 ....................................................................... Cookbook 20 ............................................................................................................... Sending Email Verification Messages 22 .................................................................. High Priority Jobs 28 ............................................................................................. Retrying Failed Jobs 32 .......................................................................................... Configuring Automatic Retries 36 ........................................................................... Canceling Abandoned Orders 41 ............................................................................. Sending Webhooks 45 ........................................................................................... Provisioning a Forge Server 49 ................................................................................ Canceling a Conference 54 ..................................................................................... Preventing a Double Refund 60 .............................................................................. Preventing a Double Refund With Unique Jobs 66 ..................................................... Bulk Refunding Invoices 72 .................................................................................... Selling Conference Tickets 83 ................................................................................. Spike Detection 89 ................................................................................................ Processing Uploaded Videos 93 .............................................................................. Sending Monthly Invoices 97 .................................................................................. Controlling The Rate of Job Execution 101 ............................................................... Dealing With API Rate Limits 105 ............................................................................ Dealing With an Unstable Service 109 ..................................................................... Provisioning a Serverless Database 115 ................................................................... Generating Complex Reports 121 ........................................................................... FIFO Queues 126 .................................................................................................. Dispatching Event Listeners to Queue 132 ............................................................... Sending Notifications in the Queue 136 ...................................................................
Dynamic Queue Consumption 138 .......................................................................... Customizing The Job Payload 143 ........................................................................... A Guide to Running and Managing Queues 147 ............................................................... How Laravel Dispatches Jobs 148 ............................................................................ Creation of The Job Payload 155 ............................................................................. How Workers Work 160 ........................................................................................ Choosing the Right Machine Configuration 168 ........................................................ Keeping the Workers Running 174 .......................................................................... Scalability of The Queue System 177 ....................................................................... Scaling With Laravel Horizon 182 ............................................................................ Choosing The Right Queue Driver 187 ..................................................................... Handling Queues on Deployments 195 .................................................................... Designing Reliable Queued Jobs 197 ....................................................................... Managing The State 202 ........................................................................................ Dealing With Failure 207 ........................................................................................ Dispatching Large Batches 211 ............................................................................... Reference 214 ............................................................................................................. Worker Configurations 214 .................................................................................... Job Configurations 218 .......................................................................................... Connection Configurations 221 .............................................................................. Horizon Configurations 223 .................................................................................... Built-in Middleware 225 ........................................................................................ Job Interfaces 228 .................................................................................................
4 Preface Hey, welcome to Laravel Queues in Action! I'm glad you're reading this book and excited to join you in exploring the many ways we can use the queue system to enhance our applications. Over the years, I worked on projects that heavily relied on asynchronous task execution. Also, during my time at Laravel, I worked with hundreds of developers to solve problems that involved the queue system. Along the way, I contributed to enhancing the system by adding new features, fixing bugs, and improving performance. While the Laravel community was growing, we collectively discovered best practices for working with the queue system. This knowledge was shared in separate blog posts here and there. But there still wasn't a comprehensive guide on utilizing its power and dealing with common problems in the community. It was with this in mind that I decided to write this book in 2020. Fast forward to 2022, several Laravel releases came out, and more usage patterns were discovered. These new releases introduced several enhancements to the system that dealt with many performance and developer experience issues. In this second edition, I think that the book is now about giving you the information you need to make educated decisions about how to best use the queue system in your projects. I have done my best to make the book insightful to first-time queue system explorers and long-time professionals. I hope that Laravel Queues in Action proves helpful!
5 CHAPTER 1 Introduction to Queues A web application is a series of procedures that pass data around. It takes data in, performs several operations, and passes it back out. That makes a web application a form of data pipeline. The term pipeline is borrowed from industry in which pipes transport substances from one place to another. In the context of web applications, each pipe—sometimes referred to as stage—in a pipeline performs an operation that validates, transforms, stores, or transports the data. Other stages may not touch the data, but they take action based on it. Figure 1-1. A data pipeline Like an industrial pipeline, the data needs to pass through one pipe to move to the next, and each pipe has a resistance coefficient. The higher the coefficient, the slower the data will move in a pipe. Therefore, adding more pipes with high resistance will result in an overall delay in passing the data out of the final pipe. Take a look at this example controller action: public function store(Request $request) { $validated = $this->validate($request, [ 'email' => 'email' ]);
6 $user = User::create($validated); Mail::to($user)->send(new Welcome()); return UserResource::make($user); } In this data pipeline, the data is passed from the request to the validation stage, the storage stage, the mail sending stage, and the transformation stage before it's passed out to the client making the request. Each stage's resistance coefficient dictates how fast the data will pass through. The slower the operation, the longer it takes for the response to be returned to the client. But unlike industrial pipelines, a single web application instance can only handle one request at any time. If more requests come, they will have to wait until the current request is passed out of the final pipe. That means slow operations not only delay the response of one request but also delay handling other requests. For example, if the machine that hosts your web application can run fifty instances of the web application and all fifty slots are occupied, new requests will have to wait. For a business to be able to handle more traffic, it has three options: Buy more machines (called scaling out).1. Buy more computing resources for the existing machines (called scaling up).2. Task software engineers to decrease the resistance coefficient of all the stages of3. handling a request. Sometimes throwing money at the problem–buying more computing resources–is more efficient. Other times putting more software engineering efforts is more effecient. Decreasing the resistance can be achieved in various ways: Code performance optimization (Figure 1-2).1. Concurrent processing; running several stages in parallel (Figure 1-3).2. Asynchronous execution; removing some stages from the main pipeline and putting it3. into another pipeline (Figure 1-4).
7 Figure 1-2. The 2nd stage was optimized for performance Figure 1-3. The 2nd & 3rd stages were executed concurrently Figure 1-4. The 2nd & 3rd stages were moved to a different pipeline Laravel is a web framework that ships with many tools that help us monitor and enhance the performance of our code. It also provides us with tools we can use for concurrent processing and asynchronous execution.
8 Concurrent Execution For stages that need to complete before sending the response back to the user, we may install Laravel Octane and use the Swoole client. Doing so will allow us to split an operation into several smaller ones, execute them concurrently, and wait for the result before sending it to the user. This pattern is called fan-out, fan-in. Fanning out is the process of distributing work on multiple processors, and fanning in is combining the results of those processes into one set and passing it to the next stage of the pipeline. Concurrent execution in PHP is not supported out of the box. However, the Swoole (and its fork Open Swoole) extension allows us to use multiprocessing to send commands from our main process (pipeline) to one or more worker processes and wait for the results. Laravel Octane utilizes this and abstracts it under the concurrently() method: public function store(Request $request) { $request->user()->can('load_entities'); [$users, $servers] = Octane::concurrently([ fn () => User::all(), fn () => Supplier::all(), ]); return [$users, $servers]; } In this example, the authorization stage will ensure the current logged-in user is allowed to load_entities, then using Octane::concurrently(), we can fan out the tasks of loading users and suppliers to two worker processes to work on them in parallel. Once the results from the two processes come back, they will be moved to the transformation stage before responding to the web client. Octane's concurrency model helps us run several stages of our program simultaneously and shorten the time needed to handle the request. But what if we can move some of the stages completely from the pipeline to a different pipeline? That way, we can remove all the work that doesn't need to happen synchronously–while handling the request–and execute it asynchronously in the background. Take another look at this example:
9 $user = User::create($validated); Mail::to($user)->send(new Welcome()); return UserResource::make($user); We don't have to pass the request through the mail-sending stage before it reaches the transformation stage. Instead, we can pass the request directly from the storage stage to the transformation stage and move the mail sending stage to an asynchronous pipeline to be done later. That's what the queue system that ships with Laravel is for. The Queue System A user interface is how a human interacts with a program, and a programmable interface (API) is how a program interacts with another program. A queue system offers a programmable interface that programs use to communicate asynchronously, not in real- time. One program offloads work to another to be done later. When you send a message through Slack, it will be stored in a database until the receiver(s) can check it out. That's the case for any form of asynchronous communication; messages need to be stored in a place until a receiver is available to read them. The simplest form of storing several items in PHP is arrays. So let's see how a queue would look when its messages are stored in an array: $queue = [ 'Send mail to user #1', 'Send mail to user #2' ]; This queue contains two messages. We can enqueue a new message, and it'll be added to the end of the queue: enqueue('Send mail to user #3'); $queue = [ 'Send mail to user #1',
10 'Send mail to user #2', 'Send mail to user #3' ]; We can also dequeue a message, and it'll be removed from the beginning of the queue: $message = dequeue(); // == Send mail to user #1 $queue = [ 'Send mail to user #2', 'Send mail to user #3' ]; Notice: If you ever heard the term "first-in-first-out (FIFO)" and didn't understand what it means, now you know it means the first message in the queue is the first message that gets processed. Now a message is a call-to-action trigger; its body contains a string, the receiver interprets this string, and the call to action is extracted. In a queue system, the receiver is called a worker. Which is a computer program that constantly dequeues messages from a queue, extracts the call-to-action, and executes it. Figure 1-5 shows a logical view of how the queue system works. Web application instances enqueue jobs in the queue store while workers keep checking it for messages. Once a worker detects a message is present, it dequeues and processes that message.
11 Figure 1-5. Workers in action As you can see, a web application doesn't wait for the message to be processed; it's one-way communication. It just sends it to the queue and continues handling more client requests. On the other hand, workers don't care where the message is coming from. Their only job is to process a message and move to the next. Queues in Laravel Laravel ships with a powerful queue system right out of the box. It supports multiple drivers for storing messages: Database Beanstalkd Redis Amazon SQS Enqueuing messages in Laravel can be done in several ways, and the most basic method is using the Queue facade: use Illuminate\Support\Facades\Queue; Queue::pushRaw('Send mail to user #1'); If you're using the database queue driver, calling pushRaw() will add a new row in the jobs
12 table with the message "Send mail to user #1" stored in the payload field. Enqueuing raw string messages like this is not very useful. Workers don't know how to interpret plain English and extract the action that needs to be triggered. For that reason, Laravel allows us to enqueue class instances: use Illuminate\Support\Facades\Queue; use App\Jobs\SendWelcomeEmail; Queue::push( new SendWelcomeEmail(1) ); Notice: Laravel uses the term "push" instead of "enqueue", and "pop" instead of "dequeue". When you enqueue an object, Laravel will serialize it and build a string payload for the message body. When workers dequeue that message later, they will be able to extract the object and call the proper method to trigger the action. Laravel refers to a message that contains a class instance as a "Job". To create a new job in your application, you may run this artisan command: php artisan make:job SendWelcomeEmail This command will create a SendWelcomeEmail job inside the app/Jobs directory. That job will look like this: namespace App\Jobs; class SendWelcomeEmail implements ShouldQueue { use Dispatchable; use InteractsWithQueue; use Queueable; use SerializesModels; public function __construct() {
13 } public function handle() { // Execute the job logic. } } When a worker dequeues this job, it will execute the handle() method. Inside that method, you should put all your business logic. Notice: Starting Laravel 8.0, you can use __invoke instead of handle for the method name. Another way of sending messages to the queue that workers can interpret is by using a closure (an anonymous function): use Illuminate\Support\Facades\Queue; Queue::push(function() use ($user){ Mail::to($user)->send(new Welcome()); }); You can even use arrow functions: use Illuminate\Support\Facades\Queue; Queue::push( fn() => Mail::to($user)->send(new Welcome()) ); Behind the scenes, Laravel will serialize the closure by converting it into a string form. Then, when the worker picks it up, it can convert it back to its closure form and invoke it. The Command Bus Laravel ships with a command bus that can be used to dispatch jobs to the queue. Dispatching through the command bus allows us to use several functionalities I will show
14 you later. Throughout this book, we will use the command bus to dispatch our jobs instead of the Queue::push() method. Here's an example of using the dispatch() helper, which uses the command bus under the hood: use App\Jobs\SendWelcomeEmail; dispatch( new SendWelcomeEmail(1) ); Or you can use the Bus facade: use Illuminate\Support\Facades\Bus; use App\Jobs\SendWelcomeEmail; Bus::dispatch( new SendWelcomeEmail(1) ); You can also use the dispatch() static method on the job class: use App\Jobs\SendWelcomeEmail; SendWelcomeEmail::dispatch(1); Notice: Arguments passed to the static dispatch() method will be transferred to the job instance automatically. Starting A Worker To start a worker, you need to run the following artisan command: php artisan queue:work
15 This command will run a program on your machine which will bootstrap an instance of the Laravel application and keep checking the queue for jobs to process. $app = require_once __DIR__.'/bootstrap/app.php'; while (true) { $job = $app->dequeue(); $app->process($job); } Similar to the HTTP (web application) pipeline, a worker can only handle a single job at any given time. To dequeue jobs faster, you'll need to start multiple workers to process jobs in parallel. We will look into managing workers later in this book. Now the queue:work program that ships with Laravel loads a single instance of your application and uses it to process all jobs. That's different from how a standard web request is handled. Handling web requests in PHP is done by bootstrapping a fresh application instance exclusively for each request and terminating that instance after the request is handled (Figure 1-6). Figure 1-6. Request handling vs. job handling Bootstrapping an application is a stage with a high resistance coefficient. Therefore, doing it only once in the lifetime of a queue worker has a considerable performance benefit as it will allow the worker to handle more jobs per a given time interval. However, this benefit comes at a cost. When the same application instance is used, it becomes the software engineer's job to take care of managing the memory. A memory leak somewhere in the code will cause the process to consume all available memory in the
16 machine and eventually crash it entirely. That's why PHP starts a fresh instance to handle a web request. To free engineers from managing memory. Notice: Laravel knows the cost of bootstrapping the application for every web request. That's why it gives engineers the option to use Laravel Octane. Web requests are handled by Octane similar to how queued jobs are handled by a queue worker, using the same application instance. Memory management may seem like a highly complex topic, and it is indeed if you are new to the concept. Luckily, Laravel puts some safeguards in place to prevent workers from eating all machine memory. And in the following chapters of this book, we will discuss the topic in detail and go through the best practices to ensure memory consumption is kept in check. Another side effect of bootstrapping the application once is that data may leak between jobs if you are not careful. This will cause bugs that are very hard to debug as they will only happen if a worker dequeues jobs that come in a specific order, which is very hard to replicate. We will talk about that in detail in the upcoming chapters as well. The Advantages of Using Queues Every virtual private server, aka VPS, has a limit for the concurrent number of requests it can handle. Even serverless offerings like AWS Lambda has this limit. Therefore, you need to handle requests faster to increase the throughput of your VPS or Lambda function without scaling your infrastructure. We've discussed this earlier and mentioned that moving slow stages to the queue will help us handle requests faster. Responding faster to the user is also a significant enhancement to the user experience. But faster request handling is not the only reason for using queues. Another significant benefit of using queues is decoupling different stages of your application logic. Let's take a look at an example: public function store(Request $request) { $validated = $request->validate(...); $order = Order::create($validated); Webhooks::each(function($webhook) {
17 $webhook->send($order); }); return OrderResource::make($order); } In this example, the request goes through several stages: Validation stage.1. Storage stage.2. Webhook sending stage.3. Transformation stage.4. The validation, storage, and transformation stages are essential for handling this web request. However, can we say the same about the webhook sending stage? The answer is no. We can definitely send the webhooks later after responding to the user. Sending those webhooks while handling the request could be a swift operation, and removing this stage won't affect the throughput or user experience that much. But, What if one of the webhook receivers went down temporarily? Should we abort the entire request because one receiver failed? Or should we send that failed webhook again? If we retry, how many times? If we abort the request and throw an error, what happens when the user sends the same request again? Some receivers have already received the webhook; are we going to send them the same webhook again? There are a lot of questions here. And the simple answer is decoupling. In a loosely coupled application, different parts can be handled in various manners without affecting other parts. Decoupling the webhook sending stage from the request handling process allows us to monitor each webhook sent and retry when needed. Here's how we can rewrite this controller action: public function store(Request $request) { $validated = $request->validate(...); $order = Order::create($validated); Webhooks::each(function($webhook) { SendWebhook::dispatch($webhook, $order); }); return OrderResource::make($order);
18 } We've swapped the actual webhook sending with dispatching a job that will handle sending the webhook later. Inside the queue, we can configure the job for each webhook to automatically retry several times if the receiver fails. We can also configure how much time we should wait before every retry. We will learn how to do that later in this book. Concurrency To efficiently deliver all webhooks as soon as possible, we can utilize the concurrency offered by Laravel Octane. However, this comes with the task of memory management since the requests will be handled in the same memory space as we explained earlier in this chapter. Using Laravel queues, we can achieve concurrency by running multiple workers to process jobs from the queue. That way, you can leave the request handling process of the application to run in the usual manner of handling each request via a separate process without sharing memory. Rate Limiting Another benefit we get from decoupling is that we can control the rate a particular part of the application is executed. In the example above, we can control the rate at which webhook sending jobs are executed so we don't hit the receiver's rate limit. When integrating with a third-party API that we have no control over, this is crucial. Laravel's rate limiting capabilities allow us to easily control the number of times a particular job is executed and the number of times several instances of a job can be executed concurrently. We'll learn about that in the upcoming chapters. Scheduling Sometimes you may wish to delay a stage in your web application to be performed after a few seconds. For example, you may want to delay delivering a webhook to allow your database replica instances to read newly written records before a third-party application can read them. To achieve that, you may do something like this: $order = Order::create($validated); sleep(2);
19 $webhook->send($order); The sleep(2) stage here will block the execution of the script for 2 seconds before moving to the webhook sending stage. This is obviously something you'll want to avoid in a web application. Using queues, you can schedule a specific job to be executed after a specific period: $order = Order::create($validated); SendWebhook::dispatch($webhook, $order)->delay(2); Using delay(2) while dispatching the job to the queue instructs Laravel to delay processing this job for 2 seconds. In other words, workers will skip this job for 2 seconds before picking it up. During those 2 seconds, the worker will pick up other jobs in the queue so it won't remain idle. Prioritization A computer system is limited by the number of tasks of a particular type it can handle during a given period. Take sending notification emails, for example; if your system can send 1000 emails per hour, you'd want to ensure the most important notifications are sent while less important ones are delayed for later. Using a queue system, you can give a particular queue a higher priority than others. Workers will always process jobs from that queue before all other queues.
20 CHAPTER 2 Cookbook In chapter 1, we used the concepts of industrial pipelines to explain how a Laravel application handles web requests. And we learned how to increase the request throughput by removing some stages from the web pipeline to the queue pipeline. In this chapter, we will walk through several real-world challenges with solutions that run in production. I met these challenges while building products at Laravel and Foodics, and others collected while supporting the framework users for the past six years. Before we can jump into these challenges, let's learn how to configure the queue in a fresh Laravel application. Configuring the Queue Queued jobs need to be stored somewhere, and Laravel ships with several storage drivers. Each of these drivers has its pros and cons, and we will discover those in detail later in this book. However, the database driver will be the easiest to configure on a local development machine. That's why we'll use it for the examples in this book. Besides being easy to set up, the database driver gives us high visibility. We can easily open our favorite GUI tool for managing databases and explore the jobs in the queue. We can also delete jobs and make tweaks to test things out. All we need to get started is to create a database and configure the database connection in the config/database.php configuration file. After that, we need to create the migration that creates the jobs database table: php artisan queue:table Running this artisan command will add a migration inside the database/migrations directory that creates a jobs table. The code inside the migration will look like this: