CeleryとScrapyで定期クローリングする【Python】

Scrapyで定期的にクローリングがしたいなぁと思って、Celeryでタスクキュー環境を作ってみました。
実行環境は Windows + RabbitMQ + Celery + Scrapy + flower です。
自分用メモに近いので間違っていたらごめんなさいです。

ブローカーのインストール

WindowsにErlang(OTP 20.3 Windows 64ビットバイナリファイルってやつ)をインストールしたら、ブローカー用にRabbitMQをインストールします。
ブローカーは、タスクを溜め込むためのミドルウェアです。
RabbitMQのインストールと管理画面の有効化(MacOSX/Linux/Windows) - Qiita ←の通りにするとばっちりうまくいきました。
RabbitMQはWindowsのサービスとして立ち上げておきます。

RabbitMQ

Celeryとか諸々のインストール

CeleryはPython製の非同期タスクキューです。
仮想環境を作ってpipで必要なモジュールをインストールします。
20180320現在、Celeryの最新バージョン(4.1.0)はwindowsをサポートしていないので、3系の最後のバージョン(3.1.25)を使用します。

1
2
3
4
5
6
7
8
9
E:\pythonWorkspace>python -m venv CPrj
E:\pythonWorkspace>cd CPrj
E:\pythonWorkspace\CPrj>cd Scripts
E:\pythonWorkspace\CPrj\Scripts>activate
(CPrj) E:\pythonWorkspace\CPrj\Scripts>pip install celery==3.1.25 flower pywin32 scrapy
Collecting celery==3.1.25
...中略...
Successfully installed Automat-0.6.0 PyDispatcher-2.0.5 Twisted-17.9.0 amqp-1.4.9 anyjson-0.3.3 asn1crypto-0.24.0 attrs-17.4.0 babel-2.5.3 billiard-3.3.0.23 celery-3.1.25 cffi-1.11.5 constantly-15.1.0 cryptography-2.1.4 cssselect-1.0.3 flower-0.9.2 hyperlink-18.0.0 idna-2.6 incremental-17.5.0 kombu-3.0.37 lxml-4.2.0 parsel-1.4.0 pyOpenSSL-17.5.0 pyasn1-0.4.2 pyasn1-modules-0.2.1 pycparser-2.18 pytz-2018.3 pywin32-223 queuelib-1.5.0 scrapy-1.5.0 service-identity-17.0.0 six-1.11.0 tornado-5.0.1 w3lib-1.19.0 zope.interface-4.4.3
(CPrj) E:\pythonWorkspace\CPrj\Scripts>

Scrapyプロジェクトの作成

実際にクローリングするScrapyのプロジェクトを作成します。

1
2
3
4
5
6
7
8
9
10
(CPrj) E:\pythonWorkspace\CPrj\Scripts>cd ..
(CPrj) E:\pythonWorkspace\CPrj>scrapy startproject SPrj
New Scrapy project 'SPrj', using template directory 'e:\\pythonworkspace\\cprj\\lib\\site-packages\\scrapy\\templates\\project', created in:
E:\pythonWorkspace\CPrj\SPrj

You can start your first spider with:
cd SPrj
scrapy genspider example example.com

(CPrj) E:\pythonWorkspace\CPrj

ScrapyのSpiderの作成

スパイダーを作成します。
今回はYahooのニュースサイトをクロールするスパイダーにしてみます。

1
2
3
4
5
(CPrj) E:\pythonWorkspace\CPrj>cd SPrj
(CPrj) E:\pythonWorkspace\CPrj\SPrj>scrapy genspider yahoo_news https://news.yahoo.co.jp/
Created spider 'yahoo_news' using template 'basic' in module:
SPrj.spiders.yahoo_news
(CPrj) E:\pythonWorkspace\CPrj\SPrj>

Celeryアプリケーションとタスクの作成

Scrapyのプロジェクトフォルダにcelery_app.pyを作ります。
中身は以下のような感じです。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from celery import Celery
from datetime import timedelta
from celery.schedules import crontab
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings

#アプリケーション
app = Celery("celery_app")
app.conf.update(
BROKER_URL = "amqp://",
CELERY_TASK_SERIALIZER = 'json',
CELERY_RESULT_SERIALIZER = 'json',
CELERY_ACCEPT_CONTENT = ['json'],
CELERY_ENABLE_UTC = True,
CELERY_TIMEZONE = "Asia/Tokyo",
CELERY_TASK_RESULT_EXPIRES = 18000,
CELERYBEAT_SCHEDULE = {
"crawl": {
"task": "celery_app.scrapy_run_crawl",
"schedule": timedelta(seconds=30),
},
}
)

#タスク
@app.task
def scrapy_run_crawl():
process = CrawlerProcess(get_project_settings())
process.crawl("yahoo_news")
process.start()

scrapy_run_crawlというタスクの部分で、Scrapyのスパイダーを動かしています。
CELERYBEAT_SCHEDULEは後述するCelery beatというスケジューラ機能のためのものです。
ディレクトリ構成はこんな感じです。

1
2
3
4
5
6
7
8
9
10
11
12
13
CPrj
├ SPrj(Scrapyのプロジェクト)
│ ├ SPrj
│ │ ├ spiders
│ │ ├ __init__.py
│ │ ├ items.py
│ │ ├ middlewares.py
│ │ ├ pipelines.py
│ │ └ settings.py
│ ├ celery_app.py(Celeryアプリ)
├ └ scrapy.cfg
├ ...etcetc
└ ...etcetc

scrapy.cfgファイルと同フォルダにcelery_app.pyファイルを配置しているのは、process.crawl(“yahoo_news”)でスパイダーを見つけられなかったからです。
うーん、この辺はあまり調べていないのでよく分かりません。

今回はタスクが一つの単純な構成でしたので、celery_app.pyファイル一つにまとめましたが、規模が大きくなると、celery_tasks.pyに分けたりceleryconfig.pyを使うことになるのだと思います。

広告

Celeryのワーカーの起動

タスクを実際に実行するワーカーを起動します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
(CPrj) E:\pythonWorkspace\CPrj\SPrj>celery worker -A celery_app -l info
-------------- celery@xxxxx-PC v3.1.25 (Cipater)
---- **** -----
--- * *** * -- Windows-7-6.1.7601-SP1
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: celery_app:0x3c6c278
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery


[tasks]
. celery_app.add
. celery_app.scrapy_run_crawl

[2018-03-19 14:16:06,385: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2018-03-19 14:16:06,425: INFO/MainProcess] mingle: searching for neighbors
[2018-03-19 14:16:07,574: INFO/MainProcess] mingle: all alone
[2018-03-19 14:16:07,609: WARNING/MainProcess] celery@xxxxx-PC ready.

これでワーカーが待機状態となります。

タスクをブローカーに送る

では、タスクをブローカー(メッセージキュー)に送ってみます。
ワーカー以外にもう一つコマンドプロンプトを立ち上げて、コマンドラインから直接pythonを叩きます。

1
2
3
4
5
6
(CPrj) E:\pythonWorkspace\CPrj\SPrj> python
Python 3.6.4 (v3.6.4:d48eceb, Dec 19 2017, 06:54:40) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from celery_app import scrapy_run_crawl
>>> scrapy_run_crawl.delay()
<AsyncResult: 157d503b-d8b1-41af-8901-bec16f582a2e>

delay()は、タスクメッセージを送るショートカットです。
送られたタスクはメッセージキューに送られて、ワーカーが取り出して処理します。
こんな風にワーカー側がログを吐いてくれます。

1
2
3
4
[2018-03-19 14:22:50,985: INFO/MainProcess] Received task: celery_app.scrapy_run_crawl[157d503b-d8b1-41af-8901-bec16f582a2e]
[2018-03-19 14:22:51,005: INFO/Worker-1] Scrapy 1.5.0 started (bot: SPrj)
[2018-03-19 14:22:51,005: WARNING/Worker-1] 2018-03-19 14:22:51 [scrapy.utils.log] INFO: Scrapy 1.5.0 started (bot: SPrj)
...以下略...

タスクの定期実行

先ほどと同じように、ワーカーとは別プロンプトから、Celery beatというスケジューラを起動させます。

1
2
3
4
5
6
7
8
9
10
11
(CPrj) E:\pythonWorkspace\CPrj\SPrj>celery beat -A celery_app -l info
celery beat v3.1.25 (Cipater) is starting.
__ - ... __ - _
Configuration ->
. broker -> amqp://guest:**@localhost:5672//
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> now (0s)
[2018-03-19 14:32:15,369: INFO/MainProcess] beat: Starting...

前述したcelery_app.pyのCELERYBEAT_SCHEDULEの部分がスケジューラの箇所です。
上の例で言えば、30秒に1回ScrapyのスパイダーがYahooをクロールする、という風にスケジューリングされたタスクです。
crontabを使えば以下のような時刻指定も可能です。

1
2
3
4
5
6
7
CELERYBEAT_SCHEDULE = {
"crawl": {
"task": "celery_app.scrapy_run_crawl",
#毎日15:25分に実行
"schedule": crontab(hour=15, minute=25,),
},
}

flowerで処理を確認する

最後に、Celeryのタスクをモニタリングするwebベースのツールflowerを起動させます。
特に設定は必要ありません。
また別コマンドラインから、celery_app.pyがあるディレクトリで、

1
2
3
4
5
6
7
8
9
10
11
12
13
14
(CPrj) E:\pythonWorkspace\CPrj\SPrj>flower -A celery_app --port=5555
[I 180319 15:34:30 command:139] Visit me at http://localhost:5555
[I 180319 15:34:30 command:144] Broker: amqp://guest:**@localhost:5672//
[I 180319 15:34:30 command:147] Registered tasks:
['celery.backend_cleanup',
'celery.chain',
'celery.chord',
'celery.chord_unlock',
'celery.chunks',
'celery.group',
'celery.map',
'celery.starmap',
'celery_app.scrapy_run_crawl']
[I 180319 15:34:30 mixins:231] Connected to amqp://guest:**@127.0.0.1:5672//

と、します。
あとはブラウザで http://localhost:5555/ にアクセスします。
こんな風になっていれば成功です。

Celery flower

タスクの処理される様子は、リアルタイムで更新されます。

Celery flower

ざざーーっと設定したので見落としがあるかもしれませんが、自分用メモですのでどうぞご容赦ください。


開発環境
WinOS : Windows7(64bit)
Python : 3.6.4(64bit)
Erlang : 20.3
RabbitMQ: 3.7.4
Celery : 3.1.25
flower : 0.9.2
Scrapy : 1.5.0

広告