Skip to content

Commit 28afde3

Browse files
committed
Synchronized build
1 parent 8dd5c75 commit 28afde3

File tree

6 files changed

+104
-104
lines changed

6 files changed

+104
-104
lines changed

blog/entries/apache-airflow-testing-with-pytest/index.html

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -347,9 +347,9 @@ <h2 id="smoketesting-can-the-airflow-daemon-load-the-dags">Smoketesting: Can th
347347
this.</p>
348348
<p>The most basic test you'll want is to determine whether your DAGs can load
349349
without errors. To do this, you can use the following function:</p>
350-
<div class="hll"><pre><span></span><span class="kn">from</span> <span class="nn">airflow.models</span> <span class="kn">import</span> <span class="n">DagBag</span>
350+
<div class="hll"><pre><span></span><span class="kn">from</span><span class="w"> </span><span class="nn">airflow.models</span><span class="w"> </span><span class="kn">import</span> <span class="n">DagBag</span>
351351

352-
<span class="k">def</span> <span class="nf">test_dags_load_with_no_errors</span><span class="p">():</span>
352+
<span class="k">def</span><span class="w"> </span><span class="nf">test_dags_load_with_no_errors</span><span class="p">():</span>
353353
<span class="n">dag_bag</span> <span class="o">=</span> <span class="n">DagBag</span><span class="p">(</span><span class="n">include_examples</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span>
354354
<span class="n">dag_bag</span><span class="o">.</span><span class="n">process_file</span><span class="p">(</span><span class="s1">&#39;common_api_workflows.py&#39;</span><span class="p">)</span>
355355
<span class="k">assert</span> <span class="nb">len</span><span class="p">(</span><span class="n">dag_bag</span><span class="o">.</span><span class="n">import_errors</span><span class="p">)</span> <span class="o">==</span> <span class="mi">0</span>
@@ -363,10 +363,10 @@ <h2 id="hint-use-functions-to-create-dags">Hint: Use functions to create DAGs.</
363363
<em>can</em> be loaded). This may seem obvious, but none of the Airflow documentation
364364
uses this pattern. Here is an example of a function that creates a simple dag,
365365
and a test of the function:</p>
366-
<div class="hll"><pre><span></span><span class="kn">from</span> <span class="nn">airflow</span> <span class="kn">import</span> <span class="n">DAG</span>
367-
<span class="kn">from</span> <span class="nn">airflow.operators.bash_operator</span> <span class="kn">import</span> <span class="n">BashOperator</span>
366+
<div class="hll"><pre><span></span><span class="kn">from</span><span class="w"> </span><span class="nn">airflow</span><span class="w"> </span><span class="kn">import</span> <span class="n">DAG</span>
367+
<span class="kn">from</span><span class="w"> </span><span class="nn">airflow.operators.bash_operator</span><span class="w"> </span><span class="kn">import</span> <span class="n">BashOperator</span>
368368

369-
<span class="k">def</span> <span class="nf">create_dag</span><span class="p">(</span>
369+
<span class="k">def</span><span class="w"> </span><span class="nf">create_dag</span><span class="p">(</span>
370370
<span class="n">source</span><span class="p">,</span>
371371
<span class="n">script_location</span><span class="p">,</span>
372372
<span class="n">dag_id</span><span class="p">,</span>
@@ -397,7 +397,7 @@ <h2 id="hint-use-functions-to-create-dags">Hint: Use functions to create DAGs.</
397397

398398
<span class="k">return</span> <span class="n">dag</span>
399399

400-
<span class="k">def</span> <span class="nf">test_create_dag_creates_correct_dependencies</span><span class="p">():</span>
400+
<span class="k">def</span><span class="w"> </span><span class="nf">test_create_dag_creates_correct_dependencies</span><span class="p">():</span>
401401
<span class="n">dag</span> <span class="o">=</span> <span class="n">create_dag</span><span class="p">(</span>
402402
<span class="s1">&#39;test_source&#39;</span><span class="p">,</span>
403403
<span class="s1">&#39;test_script_location&#39;</span><span class="p">,</span>

blog/entries/crawling-500-million/index.html

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -323,13 +323,13 @@ <h4 id="rate-limiting-with-token-buckets-and-error-circuit-breakers">Rate limiti
323323
<h5 id="backing-off-with-circuit-breakers">Backing off with circuit breakers</h5><p>If our heuristic fails to correctly approximate the bandwidth capabilities of a site, we are going to start encountering problems. For one, we might exceed the server-side rate limit, which means we will see <code>429 Rate Limit Exceeded</code> and <code>403 Forbidden</code> errors instead of the images we're trying to crawl. Worse yet, the upstream source might continue to happily serve requests while we suck up all of their traffic capacity, resulting in other users being unable to view the images. Clearly, in either scenario, we need to either reduce our crawl rate or even give up crawling the source entirely if it appears that we are impacting their uptime.</p>
324324
<p>To handle these situations, we have two tools in our toolbox: a sliding window recording the status code of every request made we've made to each domain in the last 60 seconds, and a list of the last 50 statuses for each website. If the number of errors in our one minute window exceed 10%, something is wrong; we should wait a minute before trying again. If we have encountered many errors in a row, however, that suggests that we're having trouble with a particular site, so we ought to give up crawling the source and raise an alert.</p>
325325
<p>Workers can keep track of this information in sorted sets in Redis. For the sliding error window, we'll sort each request by its timestamp, which will make it easy and cheap for us to expire status codes beyond the sliding window interval. Maintaining a list of the last N response codes is even easier; we just stick the status code in a list associated with the source.</p>
326-
<div class="hll"><pre><span></span><span class="k">class</span> <span class="nc">StatsManager</span><span class="p">:</span>
327-
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">redis</span><span class="p">):</span>
326+
<div class="hll"><pre><span></span><span class="k">class</span><span class="w"> </span><span class="nc">StatsManager</span><span class="p">:</span>
327+
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">redis</span><span class="p">):</span>
328328
<span class="bp">self</span><span class="o">.</span><span class="n">redis</span> <span class="o">=</span> <span class="n">redis</span>
329329
<span class="bp">self</span><span class="o">.</span><span class="n">known_sources</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span>
330330

331331
<span class="nd">@staticmethod</span>
332-
<span class="k">async</span> <span class="k">def</span> <span class="nf">_record_window_samples</span><span class="p">(</span><span class="n">pipe</span><span class="p">,</span> <span class="n">source</span><span class="p">,</span> <span class="n">status</span><span class="p">):</span>
332+
<span class="k">async</span> <span class="k">def</span><span class="w"> </span><span class="nf">_record_window_samples</span><span class="p">(</span><span class="n">pipe</span><span class="p">,</span> <span class="n">source</span><span class="p">,</span> <span class="n">status</span><span class="p">):</span>
333333
<span class="w"> </span><span class="sd">&quot;&quot;&quot; Insert a status into all sliding windows. &quot;&quot;&quot;</span>
334334
<span class="n">now</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">monotonic</span><span class="p">()</span>
335335
<span class="c1"># Time-based sliding windows</span>
@@ -373,12 +373,12 @@ <h5 id="enforcing-rate-limits-with-token-buckets">Enforcing rate limits with tok
373373
<p>The answer is to implement a distributed token bucket system. The idea behind this is that each crawler has to obtain a token from Redis before making a request. Every second, the crawl monitor sets a variable containing the number of requests that can be made against a source. Each crawler process decrements the counter before making a request. If the decremented result is above zero, the worker is cleared to crawl. Otherwise, the rate limit has been reached and we should wait until a token has been obtained.</p>
374374
<p>The beauty of token buckets is their simplicity, performance, and resilience against failure. If our crawler monitor process dies, crawling halts completely; making a request is not possible without first acquiring a token. This is a much better alternative to the guard rails completely disappearing with the crawl monitor and allowing unbounded crawling. Further, since decrementing a counter and retrieving the result is an atomic operation in Redis, there's no risk of race conditions and therefore no need for locking. This is a huge boon for performance, as the overhead of coordinating and blocking on every single request would rapidly bog down our crawling system.</p>
375375
<p>To ensure that all crawling is performed at the correct speed, I wrapped <code>aiohttp.ClientSession</code> with a rate limited version of the class.</p>
376-
<div class="hll"><pre><span></span><span class="k">class</span> <span class="nc">RateLimitedClientSession</span><span class="p">:</span>
377-
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">aioclient</span><span class="p">,</span> <span class="n">redis</span><span class="p">):</span>
376+
<div class="hll"><pre><span></span><span class="k">class</span><span class="w"> </span><span class="nc">RateLimitedClientSession</span><span class="p">:</span>
377+
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">aioclient</span><span class="p">,</span> <span class="n">redis</span><span class="p">):</span>
378378
<span class="bp">self</span><span class="o">.</span><span class="n">client</span> <span class="o">=</span> <span class="n">aioclient</span>
379379
<span class="bp">self</span><span class="o">.</span><span class="n">redis</span> <span class="o">=</span> <span class="n">redis</span>
380380

381-
<span class="k">async</span> <span class="k">def</span> <span class="nf">_get_token</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">source</span><span class="p">):</span>
381+
<span class="k">async</span> <span class="k">def</span><span class="w"> </span><span class="nf">_get_token</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">source</span><span class="p">):</span>
382382
<span class="n">token_key</span> <span class="o">=</span> <span class="sa">f</span><span class="s1">&#39;</span><span class="si">{</span><span class="n">CURRTOKEN_PREFIX</span><span class="si">}{</span><span class="n">source</span><span class="si">}</span><span class="s1">&#39;</span>
383383
<span class="n">tokens</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">redis</span><span class="o">.</span><span class="n">decr</span><span class="p">(</span><span class="n">token_key</span><span class="p">))</span>
384384
<span class="k">if</span> <span class="n">tokens</span> <span class="o">&gt;=</span> <span class="mi">0</span><span class="p">:</span>
@@ -389,7 +389,7 @@ <h5 id="enforcing-rate-limits-with-token-buckets">Enforcing rate limits with tok
389389
<span class="n">token_acquired</span> <span class="o">=</span> <span class="kc">False</span>
390390
<span class="k">return</span> <span class="n">token_acquired</span>
391391

392-
<span class="k">async</span> <span class="k">def</span> <span class="nf">get</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">url</span><span class="p">,</span> <span class="n">source</span><span class="p">):</span>
392+
<span class="k">async</span> <span class="k">def</span><span class="w"> </span><span class="nf">get</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">url</span><span class="p">,</span> <span class="n">source</span><span class="p">):</span>
393393
<span class="n">token_acquired</span> <span class="o">=</span> <span class="kc">False</span>
394394
<span class="k">while</span> <span class="ow">not</span> <span class="n">token_acquired</span><span class="p">:</span>
395395
<span class="n">token_acquired</span> <span class="o">=</span> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_token</span><span class="p">(</span><span class="n">source</span><span class="p">)</span>
@@ -400,7 +400,7 @@ <h5 id="scheduling-tasks-somewhat-intelligently">Scheduling tasks (somewhat) int
400400
<p>For instance, imagine that each worker is able to handle 5000 simultaneous crawling tasks, and every one of those tasks is tied to a tiny website with a very low rate limit. That means that our entire worker, which is capable of handling hundreds of crawl and analysis jobs per second, is stuck making one request per second until some faster tasks appear in the queue.</p>
401401
<p>In other words, we need to make sure that each worker process isn't jamming itself up with a single source. We have a <a href="https://en.wikipedia.org/wiki/Scheduling_(computing%29">scheduling problem</a>. We've naively implemented first-come-first-serve and need to switch to a different scheduling strategy.</p>
402402
<p>There are innumerable ways to address scheduling problems. Since there are only a few dozen sources in our system, we can get away with using a stupid scheduling algorithm: give each source equal capacity in every worker. In other words, if there are 5000 tasks to distribute and 30 sources, we can allocate 166 simultaneous tasks to each source per worker. That's plenty for our purposes. There are obvious drawbacks of this approach in that eventually there will be so many sources that we start starving high rate limit sources of work. We'll cross that bridge when we come to it; it's better to use the simplest possible approach we can get away with instead of spending all of our time on solving hypothetical future problems.</p>
403-
<div class="hll"><pre><span></span> <span class="k">async</span> <span class="k">def</span> <span class="nf">_schedule</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">task_schedule</span><span class="p">):</span>
403+
<div class="hll"><pre><span></span> <span class="k">async</span> <span class="k">def</span><span class="w"> </span><span class="nf">_schedule</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">task_schedule</span><span class="p">):</span>
404404
<span class="n">raw_sources</span> <span class="o">=</span> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">redis</span><span class="o">.</span><span class="n">smembers</span><span class="p">(</span><span class="s1">&#39;inbound_sources&#39;</span><span class="p">)</span>
405405
<span class="n">sources</span> <span class="o">=</span> <span class="p">[</span><span class="nb">str</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="s1">&#39;utf-8&#39;</span><span class="p">)</span> <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">raw_sources</span><span class="p">]</span>
406406
<span class="n">num_sources</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">sources</span><span class="p">)</span>
@@ -428,7 +428,7 @@ <h5 id="designing-for-testability">Designing for testability</h5><p>It's quite d
428428
<p>For example, the crawl monitor usually has to talk to our CC Search API (for assessing source size), Redis, and Kafka to do its job of regulating the crawl; instead of setting up a brittle and complicated integration test with all of those dependencies, we just instantiate some mock objects and pass them in. Now we can easily test individual components such as the error circuit breaker.</p>
429429
<p><em><center>Testing our crawl monitor's circuit breaking functionality with mock dependencies</center></em></p>
430430
<div class="hll"><pre><span></span><span class="nd">@pytest</span><span class="o">.</span><span class="n">fixture</span>
431-
<span class="k">def</span> <span class="nf">source_fixture</span><span class="p">():</span>
431+
<span class="k">def</span><span class="w"> </span><span class="nf">source_fixture</span><span class="p">():</span>
432432
<span class="w"> </span><span class="sd">&quot;&quot;&quot; Mocks the /v1/sources endpoint response. &quot;&quot;&quot;</span>
433433
<span class="k">return</span> <span class="p">[</span>
434434
<span class="p">{</span>
@@ -446,7 +446,7 @@ <h5 id="designing-for-testability">Designing for testability</h5><p>It's quite d
446446
<span class="p">]</span>
447447

448448

449-
<span class="k">def</span> <span class="nf">create_mock_monitor</span><span class="p">(</span><span class="n">sources</span><span class="p">):</span>
449+
<span class="k">def</span><span class="w"> </span><span class="nf">create_mock_monitor</span><span class="p">(</span><span class="n">sources</span><span class="p">):</span>
450450
<span class="n">response</span> <span class="o">=</span> <span class="n">FakeAioResponse</span><span class="p">(</span><span class="n">status</span><span class="o">=</span><span class="mi">200</span><span class="p">,</span> <span class="n">body</span><span class="o">=</span><span class="n">sources</span><span class="p">)</span>
451451
<span class="n">session</span> <span class="o">=</span> <span class="n">FakeAioSession</span><span class="p">(</span><span class="n">response</span><span class="o">=</span><span class="n">response</span><span class="p">)</span>
452452
<span class="n">redis</span> <span class="o">=</span> <span class="n">FakeRedis</span><span class="p">()</span>
@@ -455,7 +455,7 @@ <h5 id="designing-for-testability">Designing for testability</h5><p>It's quite d
455455

456456

457457
<span class="nd">@pytest</span><span class="o">.</span><span class="n">mark</span><span class="o">.</span><span class="n">asyncio</span>
458-
<span class="k">async</span> <span class="k">def</span> <span class="nf">test_error_circuit_breaker</span><span class="p">(</span><span class="n">source_fixture</span><span class="p">):</span>
458+
<span class="k">async</span> <span class="k">def</span><span class="w"> </span><span class="nf">test_error_circuit_breaker</span><span class="p">(</span><span class="n">source_fixture</span><span class="p">):</span>
459459
<span class="n">sources</span> <span class="o">=</span> <span class="n">source_fixture</span>
460460
<span class="n">redis</span><span class="p">,</span> <span class="n">monitor</span> <span class="o">=</span> <span class="n">create_mock_monitor</span><span class="p">(</span><span class="n">sources</span><span class="p">)</span>
461461
<span class="n">redis</span><span class="o">.</span><span class="n">store</span><span class="p">[</span><span class="s1">&#39;statuslast50req:example&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="p">[</span><span class="sa">b</span><span class="s1">&#39;500&#39;</span><span class="p">]</span> <span class="o">*</span> <span class="mi">50</span>

0 commit comments

Comments
 (0)