工作中经常需要采集一些内容,所以深入研究了一下PHP CURL多线程,不得不说这个扩展很变态。经过几十次的修改终于写出一个完美的CURL类,有多厉害用“终极”二字形容也不为过。PHP文档中资料甚少,到网上也看到一些类,感觉实在太烂,有些类资源占用很高,有些稍微好点的效率真不敢恭维,说白了就是个原始的多线程而已,根本无法充分利用CPU和带宽。
官网:https://github.com/ares333/CurlMulti
本类的特点:
运行绝对稳定。
设置一个并发就会始终以这个并发数进行工作,即使通过回调函数添加任务也不影响。
CPU占用极低,绝大部分CPU消耗在用户的回调函数上。
内存利用率高,任务数量较多(15W个任务占用内存会超过256M)可以使用回调函数添加任务,个数自定。
能够最大限度的占用带宽。
链式任务,比如一个任务需要从多个不同的地址采集数据,可以通过回调一气呵成。
能够对CURL错误进行多次尝试,次数自定(大并发一开始容易产生CURL错误,网络状况或对方服务器稳定性也有可能产生CURL错误)。
回调函数相当灵活,可以多种类型任务同时进行(比如下载文件,抓取网页,分析404可以在一个PHP进程中同时进行)。
可以非常容易的定制任务类型,比如检查404,获取redirect的最后url等。
可以设置缓存,挑战产品节操。
不足:
不能充分利用多核CPU(可以开多个进程解决,需要自己处理任务分割等逻辑),可以用pthreads解决!
最大并发500(或512?),经过测试是CURL 内部限制,超过最大并发会导致总是返回失败。
目前没有断点续传功能。
目前任务是原子性的,不能对一个大文件分为几部分分别开线程下载。
我这里有34W张图片需要下载,先下载13W截图如下
每秒钟下载300张左右图片,每张图片300K左右,下载速度约100MB/s(1G的网卡用的差不多了),速度和下载文件总数有非常小的偏差,这个数值是根据CURL的传输字节数计算而来。速度超过100MB/s的时候SSH基本连接不上了。。。
iostat
主要瓶颈在IO上,否则速度会更快。
采集http://www.lyricsmode.com/的歌词速度也很快,一共60多万歌词页,平均每秒钟能稳定采集1200页(边采集边分析边入库,否则3000+页每秒都不稀奇),处理采集的HTML的速度根本追不上采集的速度,瓶颈应该是对方服务器速度,那个没有截图,有兴趣的可以试一下。
================================================================
完整文档如下
================================================================
尽可能展示类的特性。此demo环境是本机,所以性能不能达到最高,只做为使用说明,为了高性能请到服务器调试。选择http://www.1ting.com/作为目标站点,这个站比较典型,而且国内速度也很快。
本文用到两个类(这两个类在demo包中已经包含):
CUrl:多线程类。
PHPQuery:HTML分析,项目地址 http://code.google.com/p/phpquery/。如果google被封这里有较新版本 phpQuery-0.9.5.386.zip。这里用的压缩包中的 phpQuery/phpQuery.php这个单文件类,如果你不会用这个类没关系,用正则分析也可以。
必要的文件包含和初始化实例代码中都省略了。
代码非常完整,可以直接运行,demo请在命令行模式运行(你懂的)。
代码非常简洁。。。
PHPQuery需要php-dom扩展,PHP环境没有的话需要安装。
数据库操作使用PDO,所以也需要PDO扩展。
既然是demo,那么程序上我就一切从简了。
phpQuery很NB,很多变态的用法自己去研究吧,基本可以说无所不能。如果使用phpQuery必须在回调函数中手动调用phpQuery::unloadDocuments();释放,否则phpQuery处理的文档全部都在内存中!!!
并发情况可以用360的连接查看器查看。
CUrl一些必要内容:
- CUrl类单线程和多线程都可以使用缓存,并且缓存机制一样。
- CUrl类抓取HTML单线程和多线程返回的内容结构一致
$result=array('info'=>array(),'content'=>'');
$result['info']是curl_info()的内容,$result['content']是抓取的html文件。
- curl配置分为三个级别,优先级由低到高如下,优先级高的会覆盖优先级低的配置。
默认:如私有方法init()中所示。
类级别:保存在 $opt这个公有属性中,此CURL对象的所有操作中都会起作用。
任务级别:多线程任务中添加任务时指定,只在当前任务中起作用。
- 类中用到回调函数的地方都使用call_user_func_array(),所以必须先从PHP手册中把这个函数搞明白了。
demo需要的数据表
|
`id` int(10) unsigned NOT NULL AUTO_INCREMENT, |
|
`artist_id` int(11) NOT NULL, |
|
`name` varchar(255) NOT NULL, |
|
`url` varchar(255) NOT NULL, |
|
) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; |
|
`id` int(11) NOT NULL AUTO_INCREMENT, |
|
`name` varchar(255) NOT NULL, |
|
`pic` varchar(255) NOT NULL, |
|
`url` varchar(255) NOT NULL, |
|
) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; |
|
CREATE TABLE `songlist` ( |
|
`id` int(11) NOT NULL AUTO_INCREMENT, |
|
`artist_id` int(11) NOT NULL, |
|
`name` varchar(255) NOT NULL, |
|
`album_url` varchar(255) NOT NULL, |
|
) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; |
压缩包中有程序运行的所有必要文件,导入curl.sql到test数据库,修改init.php中的配置就可以直接运行了!!
命令行:
php demo1.php
php demo2.php
php demo3.php
...
然后数据表中就有数据了。。。
demo中都没有phpQuery::unloadDocuments()操作,请自行加上。
windows命令行中文可能乱码导致非常奇怪的错误(尤其是demo5),windows命令行默认是GBK编码,可以php中用iconv转码,或修改命令行默认编码(没测试),所有代码在linux下测试完全正确。
完整Demo从此正式开始...
1.单线程用法
CUrl类有两个单线程的方法read(),download()。
|
$result=$curl->read($url); |
|
$html=phpQuery::newDocumentHTML($result['content']); |
|
$li=$html['ul.allSinger li a']; |
|
$st=$db->prepare("insert into artist(name,url) values(?,?)"); |
|
$st->execute(array(trim($v->text()),trim($v->attr('href')))); |
用单线程方法抓取女歌手列表页,用PHPQuery进行分析,取出歌手名子和详情页地址并存入artist表,本次示例抓取了2417个歌手。
2.简单多线程抓取
先介绍一些必要内容,下面展示多线程其他特性的时候就不在描述这些必要内容。
add($url,$p,$f)方法:
添加一个任务到队列
- $url是一个数组$url[0]保存目标url,如果$url[1]未设置表示是一个抓取html的任务,如果$url[1]设置了表示是一个下载任务,$url[1]必须是下载文件的本地绝对路径,$url[2]是任务级curl设置。
- $p是任务完成后的回调函数 ,$p[0]是需要调用的回调函数,$p[1]是需要传递的参数。抓取和下载任务的回调函数第一个参数总是一个数组,保存有抓取的结果,从第二个参数开始才是$p[1]中设置的参数,抓取任务和下载任务第一个参数的唯一区别就是抓取任务多一个content内容。
- $f是错误处理的回调函数,$f[0]是回调函数,$f[1]是需要传递的参数。某个任务由于curl错误并且尝试了n次(公有属性$maxTry设置最大尝试次数,默认3)失败就会调用这个函数,回调函数的第一个参数是错误信息,从第二个参数开始才是$f[1]的内容。 如果没有设置错误回调,错误信息会输出到标准输出。
go()方法:
前序操作都做完了开始正式跑。
status($debug=false)方法:
这个函数应该在回调函数中调用用来显示当前时刻的curl状况,如果$debug为true会显示详细的运行信息。注意:这个函数本身会输出信息,所以前面不要加echo,函数本身已经进行了格式化,所以函数后边不要输出"n" 。
26.69%:当前任务的进度,如果是链式任务这个数字可能会降低,因为链式任务会动态加入大量任务。
645/2417(0):以数量方式显示进度,括号中是缓存命中的数量。
30/s:每秒钟完成的任务数,主要由目标服务器性能,网络性能,本机性能,并发数决定。
550.37KB/s:抓取速度。
11.29MB:已经抓取的数据量。
ETA 58s:在当前速度下任务完成剩余时间。
现在我要抓取那2000多个歌手的专辑列表,用add()方法一次性把这些任务添加到任务队列,go()方法启动,curl首先创建n个并发(并发数在$limit公有属性中设置,默认30),有任务完成就会调用回调函数处理,处理完毕之后丢弃html并从任务队列中拿出一个任务添加到并发进程中,当所有并发任务完成并且任务队列为空的时候curl结束运行。
02 |
$artistList=$db->query("select id,url from artist")->fetchAll(); |
03 |
foreach($artistList as $v){ |
04 |
$url=array($baseUrl.$v['url']); |
05 |
$callback=array('demo2_cb1',array($v['id'])); |
06 |
$curl->add($url,$callback); |
11 |
function demo2_cb1($r,$id){ |
13 |
if($r['info']['http_code']==200){ |
14 |
$html=phpQuery::newDocumentHTML($r['content']); |
15 |
$list=$html['div.albumList ul li a.albumLink']; |
17 |
$st=$db->prepare('insert into album(artist_id,name,url) values(?,?,?)'); |
20 |
$st->execute(array($id,trim($v->find('span.albumName')->text()),trim($v->attr('href')))); |
从之前抓取结果中取所有歌手地址添加到任务队列,go()开始运行。回调函数负责处理抓取结果并入库。
3.链式抓取
假如现在我只需要歌曲列表,不需要其他信息。常规做法是从歌手列表抓取专辑列表并保存到数据库,然后从专辑列表抓取歌曲列表,现在专辑列表入库的操作可以省略了。
02 |
$artistList=$db->query("select id,url from artist")->fetchAll(); |
03 |
foreach($artistList as $v){ |
04 |
$url=array($baseUrl.$v['url']); |
05 |
$callback=array('demo3_cb1',array($v['id'])); |
06 |
$curl->add($url,$callback); |
11 |
function demo3_cb1($r,$id){ |
12 |
global $db,$curl,$baseUrl; |
13 |
if($r['info']['http_code']==200){ |
14 |
$html=phpQuery::newDocumentHTML($r['content']); |
15 |
$list=$html['div.albumList ul li a.albumLink']; |
19 |
$url=array($baseUrl.trim($v->attr('href'))); |
21 |
$callback=array('demo3_cb2',array($id,$url[0])); |
22 |
$curl->add($url,$callback); |
30 |
function demo3_cb2($r,$id,$url){ |
32 |
if($r['info']['http_code']==200){ |
33 |
$html=phpQuery::newDocumentHTML($r['content']); |
34 |
$list=$html['#song-list tr td.songTitle a.songName']; |
36 |
$st=$db->prepare('insert into songlist(artist_id,name,album_url) values(?,?,?)'); |
39 |
$st->execute(array($id,trim($v->text()),$url)); |
回调函数可以无限级连接,也可以根据不同情况添加不同的回调,各种神奇用法自己研究吧。
4.超大量任务处理
假设2中抓取专辑列表的任务数量非常大,不能一次性都添加进去,这时候需要通过回调函数动态添加任务。
02 |
$curl->task='demo4_addTask'; |
06 |
function demo4_addTask(){ |
07 |
global $baseUrl,$db,$curl; |
10 |
$list=$db->query("select id,url from artist where id>$lastId order by id limit $limit")->fetchAll(); |
12 |
$url=array($baseUrl.$v['url']); |
13 |
$callback=array('demo4_cb1',array($v['id'])); |
14 |
$curl->add($url,$callback); |
20 |
function demo4_cb1($r,$id){ |
22 |
if($r['info']['http_code']==200){ |
23 |
$html=phpQuery::newDocumentHTML($r['content']); |
24 |
$list=$html['div.albumList ul li a.albumLink']; |
26 |
$st=$db->prepare('insert into album(artist_id,name,url) values(?,?,?)'); |
29 |
$st->execute(array($id,trim($v->find('span.albumName')->text()),trim($v->attr('href')))); |
原理比较简单,如果任务队列为空curl就会尝试调用task属性指定的回调函数,具体任务怎么添加由自己控制。
任务总数500变成动态增长的。
5.多线程下载
现在我要多线程下载所有歌手的图片,图片是歌手详情页左上角那一张。
|
$dir=dirname(__FILE__).'/pic'; |
03 |
$artistList=$db->query("select id,name,url from artist")->fetchAll(); |
04 |
foreach($artistList as $v){ |
05 |
$url=array($baseUrl.$v['url']); |
06 |
$callback=array('demo5_cb1',array($v['id'],$v['name'])); |
07 |
$curl->add($url,$callback); |
12 |
function demo5_cb1($r,$id,$name){ |
13 |
global $db,$curl,$dir; |
14 |
if($r['info']['http_code']==200){ |
15 |
$html=phpQuery::newDocumentHTML($r['content']); |
16 |
$list=$html['dl.singerInfo dt img']; |
20 |
$picUrl=$v->attr('src'); |
21 |
$ext=pathinfo($picUrl,PATHINFO_EXTENSION); |
22 |
if(!empty($name) && !empty($ext)){ |
23 |
$filename=$name.'.'.$ext; |
24 |
$file=$dir.'/'.$filename; |
25 |
$url=array($picUrl,$file); |
26 |
$callback=array('demo5_cb2',array($id,$filename)); |
27 |
$curl->add($url,$callback); |
36 |
function demo5_cb2($r,$id,$filename){ |
38 |
if($r['info']['http_code']==200){ |
39 |
if($db->exec("update artist set pic='$filename' where id=$id")){ |
40 |
echo $r['info']['url']."n"; |
这下载速度嗷嗷快啊,机器NB你也可以达到100MB/s的神速(不是100Mbit/s)。
注意:下载顺序不是按任务顺序来的,是由每个线程的速度决定的,所以数据表中查看的时候需要给pic字段排下序。
6.自定义任务
除了抓取html,下载各种类型的文件,还可以自定义任何类型的任务。
比如检查链接404,设置代理,检查链接的301次数等等。
现在我要检查歌手页的404情况。因为任务类型统一,使用类级别配置就可以。
任务级别的配置自己研究吧。
02 |
$artistList=$db->query("select id,url from artist")->fetchAll(); |
03 |
$curl->opt=array(CURLOPT_NOBODY=>true); |
04 |
foreach($artistList as $v){ |
05 |
$url=array($baseUrl.$v['url']); |
06 |
$callback=array('demo6_cb1',array($url[0])); |
07 |
$curl->add($url,$callback); |
12 |
function demo6_cb1($r,$url){ |
13 |
echo $r['info']['http_code']."t".$url."n"; |
没有全部检查,应该没有404页面。。。
7.使用缓存
有时候会遇到这样一种情况,由于网速或使用了代理等原因抓取的速度非常慢,而且由于抓取规则修改调整需要多次抓取,这时候使用缓存可以极大的提高效率。
缓存通过公有属性$cache控制,默认是这个样子
$cache=array('on'=>false,'dir'=>null,'expire'=>86400);
类自动建立hash之后的子目录,子目录数上限16^3,如果有必要$cache['dir']自己再套一层hash目录即可。
还是以2中抓取专辑列表为例。
02 |
$artistList=$db->query("select id,url from artist")->fetchAll(); |
03 |
$curl->cache['on']=true; |
04 |
$curl->cache['dir']=dirname(__FILE__).'/cache'; |
05 |
foreach($artistList as $v){ |
06 |
$url=array($baseUrl.$v['url']); |
07 |
$callback=array('demo7_cb1',array($v['id'])); |
08 |
$curl->add($url,$callback); |
13 |
function demo7_cb1($r,$id){ |
15 |
if($r['info']['http_code']==200){ |
16 |
$html=phpQuery::newDocumentHTML($r['content']); |
17 |
$list=$html['div.albumList ul li a.albumLink']; |
19 |
$st=$db->prepare('insert into album(artist_id,name,url) values(?,?,?)'); |
22 |
$st->execute(array($id,trim($v->find('span.albumName')->text()),trim($v->attr('href')))); |
第一次抓去了188个,第二次抓取这188个就直接读缓存了。
8.多线程XMLRPC
xmlrpc一般使用一些封装好的类进行操作,这样执行起来比较方便,但是只能单线程。本类可以非常简单的实现xmlrpc多线程调用,经过测试,大并发下比单线程速度提高2个数量级。此代码不在demo包中,因为需要apikey和新的数据表,所以只演示原理,用法和结果。
last.fm有一个更正歌手名字的服务,文档看这里 http://www.last.fm/api/show/artist.getCorrection
06 |
function artistNameCorrect() { |
09 |
$header[] = "Content-type: text/xml"; |
10 |
$curl->opt[CURLOPT_HTTPHEADER] = $header; |
11 |
$opt = array(CURLOPT_POSTFIELDS => $this->getXmlRpc('artist.getCorrection', array('artist' => 'Capone -N- Noreaga'))); |
12 |
$curl->add(array($this->xmlRpcServer, null, $opt), array(array($this, 'artistNameCorrectCb1'))); |
16 |
function artistNameCorrectCb1($r) { |
17 |
header("Content-Type: text/xml"); |
19 |
$r = new SimpleXMLElement($r); |
20 |
$r = $r->params->param->value->string; |
21 |
$r = stripslashes($r); |
22 |
$r = new SimpleXMLElement($r); |
27 |
private function getXmlRpc($method, $param) { |
29 |
foreach ($param as $k => $v) { |
40 |
<methodName>$method</methodName> |
49 |
<string>{$this->apiKey}</string> |
有了前7步的基础,代码很容易理解,其中细节自己琢磨。回调函数输出如下结果
因为不断升级,API可能有变化,具体变化查看public方法的注释即可,教程和demo就不再更新。
旧版:CUrl
这是一个框架类命名空间什么的个别地方需要自己调整。
完整demo下载(curl-demo.zip)
内部代码经过n次修改变化很大,目标网站基本已经改版导致demo代码不一定可用,旧版本就不在维护。
原文地址:http://blog.phpdr.net/curl%E5%A4%9A%E7%BA%BF%E7%A8%8B.html