Curl多线程

 工作中经常需要采集一些内容,所以深入研究了一下PHP CURL多线程,不得不说这个扩展很变态。经过几十次的修改终于写出一个完美的CURL类,有多厉害用“终极”二字形容也不为过。PHP文档中资料甚少,到网上也看到一些类,感觉实在太烂,有些类资源占用很高,有些稍微好点的效率真不敢恭维,说白了就是个原始的多线程而已,根本无法充分利用CPU和带宽。

官网:https://github.com/ares333/CurlMulti

 

本类的特点:
运行绝对稳定。
设置一个并发就会始终以这个并发数进行工作,即使通过回调函数添加任务也不影响。
CPU占用极低,绝大部分CPU消耗在用户的回调函数上。
内存利用率高,任务数量较多(15W个任务占用内存会超过256M)可以使用回调函数添加任务,个数自定。
能够最大限度的占用带宽。
链式任务,比如一个任务需要从多个不同的地址采集数据,可以通过回调一气呵成。
能够对CURL错误进行多次尝试,次数自定(大并发一开始容易产生CURL错误,网络状况或对方服务器稳定性也有可能产生CURL错误)。
回调函数相当灵活,可以多种类型任务同时进行(比如下载文件,抓取网页,分析404可以在一个PHP进程中同时进行)。
可以非常容易的定制任务类型,比如检查404,获取redirect的最后url等。
可以设置缓存,挑战产品节操。

不足:
不能充分利用多核CPU(可以开多个进程解决,需要自己处理任务分割等逻辑),可以用pthreads解决!
最大并发500(或512?),经过测试是CURL 内部限制,超过最大并发会导致总是返回失败。
目前没有断点续传功能。
目前任务是原子性的,不能对一个大文件分为几部分分别开线程下载。

我这里有34W张图片需要下载,先下载13W截图如下


每秒钟下载300张左右图片,每张图片300K左右,下载速度约100MB/s(1G的网卡用的差不多了),速度和下载文件总数有非常小的偏差,这个数值是根据CURL的传输字节数计算而来。速度超过100MB/s的时候SSH基本连接不上了。。。
iostat

主要瓶颈在IO上,否则速度会更快。

采集http://www.lyricsmode.com/的歌词速度也很快,一共60多万歌词页,平均每秒钟能稳定采集1200页(边采集边分析边入库,否则3000+页每秒都不稀奇),处理采集的HTML的速度根本追不上采集的速度,瓶颈应该是对方服务器速度,那个没有截图,有兴趣的可以试一下。

================================================================
完整文档如下
================================================================

尽可能展示类的特性。此demo环境是本机,所以性能不能达到最高,只做为使用说明,为了高性能请到服务器调试。选择http://www.1ting.com/作为目标站点,这个站比较典型,而且国内速度也很快。

本文用到两个类(这两个类在demo包中已经包含):
CUrl:多线程类。
PHPQuery:HTML分析,项目地址 http://code.google.com/p/phpquery/。如果google被封这里有较新版本 phpQuery-0.9.5.386.zip。这里用的压缩包中的 phpQuery/phpQuery.php这个单文件类,如果你不会用这个类没关系,用正则分析也可以。

必要的文件包含和初始化实例代码中都省略了。
代码非常完整,可以直接运行,demo请在命令行模式运行(你懂的)。
代码非常简洁。。。
PHPQuery需要php-dom扩展,PHP环境没有的话需要安装。
数据库操作使用PDO,所以也需要PDO扩展。
既然是demo,那么程序上我就一切从简了。
phpQuery很NB,很多变态的用法自己去研究吧,基本可以说无所不能。如果使用phpQuery必须在回调函数中手动调用phpQuery::unloadDocuments();释放,否则phpQuery处理的文档全部都在内存中!!!
并发情况可以用360的连接查看器查看。

CUrl一些必要内容:

  1. CUrl类单线程和多线程都可以使用缓存,并且缓存机制一样。
  2. CUrl类抓取HTML单线程和多线程返回的内容结构一致
    $result=array('info'=>array(),'content'=>'');
    $result['info']是curl_info()的内容,$result['content']是抓取的html文件。
  3. curl配置分为三个级别,优先级由低到高如下,优先级高的会覆盖优先级低的配置。
    默认:如私有方法init()中所示。
    类级别:保存在 $opt这个公有属性中,此CURL对象的所有操作中都会起作用。
    任务级别:多线程任务中添加任务时指定,只在当前任务中起作用。
  4. 类中用到回调函数的地方都使用call_user_func_array(),所以必须先从PHP手册中把这个函数搞明白了。

demo需要的数据表

  CREATE TABLE `album` (
    `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
    `artist_id` int(11) NOT NULL,
    `name` varchar(255) NOT NULL,
    `url` varchar(255) NOT NULL,
    PRIMARY KEY (`id`)
  ) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
   
  CREATE TABLE `artist` (
    `id` int(11) NOT NULL AUTO_INCREMENT,
    `name` varchar(255) NOT NULL,
    `pic` varchar(255) NOT NULL,
    `url` varchar(255) NOT NULL,
    PRIMARY KEY (`id`)
  ) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
   
  CREATE TABLE `songlist` (
    `id` int(11) NOT NULL AUTO_INCREMENT,
    `artist_id` int(11) NOT NULL,
    `name` varchar(255) NOT NULL,
    `album_url` varchar(255) NOT NULL,
    PRIMARY KEY (`id`)
  ) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

压缩包中有程序运行的所有必要文件,导入curl.sql到test数据库,修改init.php中的配置就可以直接运行了!!
命令行:
php demo1.php
php demo2.php
php demo3.php
...
然后数据表中就有数据了。。。
demo中都没有phpQuery::unloadDocuments()操作,请自行加上。
windows命令行中文可能乱码导致非常奇怪的错误(尤其是demo5),windows命令行默认是GBK编码,可以php中用iconv转码,或修改命令行默认编码(没测试),所有代码在linux下测试完全正确。

完整Demo从此正式开始...

1.单线程用法

CUrl类有两个单线程的方法read(),download()。

  $result=$curl->read($url);
  $html=phpQuery::newDocumentHTML($result['content']);
  $li=$html['ul.allSinger li a'];
  $st=$db->prepare("insert into artist(name,url) values(?,?)");
  foreach($li as $v){
      $v=pq($v);
      $st->execute(array(trim($v->text()),trim($v->attr('href'))));
  }

用单线程方法抓取女歌手列表页,用PHPQuery进行分析,取出歌手名子和详情页地址并存入artist表,本次示例抓取了2417个歌手。

2.简单多线程抓取

先介绍一些必要内容,下面展示多线程其他特性的时候就不在描述这些必要内容。

add($url,$p,$f)方法:
添加一个任务到队列

  1. $url是一个数组$url[0]保存目标url,如果$url[1]未设置表示是一个抓取html的任务,如果$url[1]设置了表示是一个下载任务,$url[1]必须是下载文件的本地绝对路径,$url[2]是任务级curl设置。
  2. $p是任务完成后的回调函数 ,$p[0]是需要调用的回调函数,$p[1]是需要传递的参数。抓取和下载任务的回调函数第一个参数总是一个数组,保存有抓取的结果,从第二个参数开始才是$p[1]中设置的参数,抓取任务和下载任务第一个参数的唯一区别就是抓取任务多一个content内容。
  3. $f是错误处理的回调函数,$f[0]是回调函数,$f[1]是需要传递的参数。某个任务由于curl错误并且尝试了n次(公有属性$maxTry设置最大尝试次数,默认3)失败就会调用这个函数,回调函数的第一个参数是错误信息,从第二个参数开始才是$f[1]的内容。 如果没有设置错误回调,错误信息会输出到标准输出。

go()方法:
前序操作都做完了开始正式跑。

status($debug=false)方法:
这个函数应该在回调函数中调用用来显示当前时刻的curl状况,如果$debug为true会显示详细的运行信息。注意:这个函数本身会输出信息,所以前面不要加echo,函数本身已经进行了格式化,所以函数后边不要输出"n" 。

26.69%:当前任务的进度,如果是链式任务这个数字可能会降低,因为链式任务会动态加入大量任务。
645/2417(0):以数量方式显示进度,括号中是缓存命中的数量。
30/s:每秒钟完成的任务数,主要由目标服务器性能,网络性能,本机性能,并发数决定。
550.37KB/s:抓取速度。
11.29MB:已经抓取的数据量。
ETA 58s:在当前速度下任务完成剩余时间。

现在我要抓取那2000多个歌手的专辑列表,用add()方法一次性把这些任务添加到任务队列,go()方法启动,curl首先创建n个并发(并发数在$limit公有属性中设置,默认30),有任务完成就会调用回调函数处理,处理完毕之后丢弃html并从任务队列中拿出一个任务添加到并发进程中,当所有并发任务完成并且任务队列为空的时候curl结束运行。

01 $baseUrl='http://www.1ting.com';
02 $artistList=$db->query("select id,url from artist")->fetchAll();
03 foreach($artistList as $v){
04     $url=array($baseUrl.$v['url']);
05     $callback=array('demo2_cb1',array($v['id']));
06     $curl->add($url,$callback);
07 }
08 $curl->go();
09  
10 //处理歌手详情页的回调函数
11 function demo2_cb1($r,$id){
12     global $db,$curl;
13     if($r['info']['http_code']==200){
14         $html=phpQuery::newDocumentHTML($r['content']);
15         $list=$html['div.albumList ul li a.albumLink'];
16         if(!empty($list)){
17             $st=$db->prepare('insert into album(artist_id,name,url) values(?,?,?)');
18             foreach($list as $v){
19                 $v=pq($v);
20                 $st->execute(array($id,trim($v->find('span.albumName')->text()),trim($v->attr('href'))));
21             }
22         }
23     }
24     $curl->status();
25 }

从之前抓取结果中取所有歌手地址添加到任务队列,go()开始运行。回调函数负责处理抓取结果并入库。

3.链式抓取

假如现在我只需要歌曲列表,不需要其他信息。常规做法是从歌手列表抓取专辑列表并保存到数据库,然后从专辑列表抓取歌曲列表,现在专辑列表入库的操作可以省略了。

01 $baseUrl='http://www.1ting.com';
02 $artistList=$db->query("select id,url from artist")->fetchAll();
03 foreach($artistList as $v){
04     $url=array($baseUrl.$v['url']);
05     $callback=array('demo3_cb1',array($v['id']));
06     $curl->add($url,$callback);
07 }
08 $curl->go();
09  
10 //处理歌手详情页的回调函数
11 function demo3_cb1($r,$id){
12     global $db,$curl,$baseUrl;
13     if($r['info']['http_code']==200){
14         $html=phpQuery::newDocumentHTML($r['content']);
15         $list=$html['div.albumList ul li a.albumLink'];
16         if(!empty($list)){
17             foreach($list as $v){
18                 $v=pq($v);
19                 $url=array($baseUrl.trim($v->attr('href')));
20                 //继续传递歌手id
21                 $callback=array('demo3_cb2',array($id,$url[0]));
22                 $curl->add($url,$callback);
23             }
24         }
25     }
26     $curl->status(0);
27 }
28  
29 //处理专辑详情页的回调函数
30 function demo3_cb2($r,$id,$url){
31     global $db,$curl;
32     if($r['info']['http_code']==200){
33         $html=phpQuery::newDocumentHTML($r['content']);
34         $list=$html['#song-list tr td.songTitle a.songName'];
35         if(!empty($list)){
36             $st=$db->prepare('insert into songlist(artist_id,name,album_url) values(?,?,?)');
37             foreach($list as $v){
38                 $v=pq($v);
39                 $st->execute(array($id,trim($v->text()),$url));
40             } 
41         }
42     }
43 }

回调函数可以无限级连接,也可以根据不同情况添加不同的回调,各种神奇用法自己研究吧。

4.超大量任务处理

假设2中抓取专辑列表的任务数量非常大,不能一次性都添加进去,这时候需要通过回调函数动态添加任务。

01 $baseUrl='http://www.1ting.com';
02 $curl->task='demo4_addTask';
03 $curl->go();
04  
05 //取还没有添加的任务
06 function demo4_addTask(){
07     global $baseUrl,$db,$curl;
08     static $lastId=0;
09     $limit=100;
10     $list=$db->query("select id,url from artist where id>$lastId order by id limit $limit")->fetchAll();
11     foreach($list as $v){
12         $url=array($baseUrl.$v['url']);
13         $callback=array('demo4_cb1',array($v['id']));
14         $curl->add($url,$callback);
15     }
16     $lastId=$v['id'];
17 }
18  
19 //处理歌手详情页的回调函数
20 function demo4_cb1($r,$id){
21     global $db,$curl;
22     if($r['info']['http_code']==200){
23         $html=phpQuery::newDocumentHTML($r['content']);
24         $list=$html['div.albumList ul li a.albumLink'];
25         if(!empty($list)){
26             $st=$db->prepare('insert into album(artist_id,name,url) values(?,?,?)');
27             foreach($list as $v){
28                 $v=pq($v);
29                 $st->execute(array($id,trim($v->find('span.albumName')->text()),trim($v->attr('href'))));
30             }
31         }
32     }
33     $curl->status();
34 }

原理比较简单,如果任务队列为空curl就会尝试调用task属性指定的回调函数,具体任务怎么添加由自己控制。

任务总数500变成动态增长的。

5.多线程下载

现在我要多线程下载所有歌手的图片,图片是歌手详情页左上角那一张。

  $dir=dirname(__FILE__).'/pic';
02 $baseUrl='http://www.1ting.com';
03 $artistList=$db->query("select id,name,url from artist")->fetchAll();
04 foreach($artistList as $v){
05     $url=array($baseUrl.$v['url']);
06     $callback=array('demo5_cb1',array($v['id'],$v['name']));
07     $curl->add($url,$callback);
08 }
09 $curl->go();
10  
11 //处理歌手详情页的回调函数
12 function demo5_cb1($r,$id,$name){
13     global $db,$curl,$dir;
14     if($r['info']['http_code']==200){
15         $html=phpQuery::newDocumentHTML($r['content']);
16         $list=$html['dl.singerInfo dt img'];
17         if(!empty($list)){
18             foreach($list as $v){
19                 $v=pq($v);
20                 $picUrl=$v->attr('src');
21                 $ext=pathinfo($picUrl,PATHINFO_EXTENSION);
22                 if(!empty($name) && !empty($ext)){
23                     $filename=$name.'.'.$ext;
24                     $file=$dir.'/'.$filename;
25                     $url=array($picUrl,$file);
26                     $callback=array('demo5_cb2',array($id,$filename));
27                     $curl->add($url,$callback);
28                 }
29             }
30         }
31     }
32     //$curl->status();
33 }
34  
35 //图片下载完成回调函数
36 function demo5_cb2($r,$id,$filename){
37     global $db;
38     if($r['info']['http_code']==200){
39         if($db->exec("update artist set pic='$filename' where id=$id")){
40             echo $r['info']['url']."n";    
41         }
42     }  
43 }

这下载速度嗷嗷快啊,机器NB你也可以达到100MB/s的神速(不是100Mbit/s)。
注意:下载顺序不是按任务顺序来的,是由每个线程的速度决定的,所以数据表中查看的时候需要给pic字段排下序。

6.自定义任务

除了抓取html,下载各种类型的文件,还可以自定义任何类型的任务。
比如检查链接404,设置代理,检查链接的301次数等等。
现在我要检查歌手页的404情况。因为任务类型统一,使用类级别配置就可以。
任务级别的配置自己研究吧。

01 $baseUrl='http://www.1ting.com';
02 $artistList=$db->query("select id,url from artist")->fetchAll();
03 $curl->opt=array(CURLOPT_NOBODY=>true);
04 foreach($artistList as $v){
05     $url=array($baseUrl.$v['url']);
06     $callback=array('demo6_cb1',array($url[0]));
07     $curl->add($url,$callback);
08 }
09 $curl->go();
10  
11 //处理歌手详情页的回调函数
12 function demo6_cb1($r,$url){
13     echo $r['info']['http_code']."t".$url."n";
14 }

没有全部检查,应该没有404页面。。。

7.使用缓存

有时候会遇到这样一种情况,由于网速或使用了代理等原因抓取的速度非常慢,而且由于抓取规则修改调整需要多次抓取,这时候使用缓存可以极大的提高效率。

缓存通过公有属性$cache控制,默认是这个样子
$cache=array('on'=>false,'dir'=>null,'expire'=>86400);

类自动建立hash之后的子目录,子目录数上限16^3,如果有必要$cache['dir']自己再套一层hash目录即可。

还是以2中抓取专辑列表为例。

01 $baseUrl='http://www.1ting.com';
02 $artistList=$db->query("select id,url from artist")->fetchAll();
03 $curl->cache['on']=true;
04 $curl->cache['dir']=dirname(__FILE__).'/cache';
05 foreach($artistList as $v){
06     $url=array($baseUrl.$v['url']);
07     $callback=array('demo7_cb1',array($v['id']));
08     $curl->add($url,$callback);
09 }
10 $curl->go();
11  
12 //处理歌手详情页的回调函数
13 function demo7_cb1($r,$id){
14     global $db,$curl;
15     if($r['info']['http_code']==200){
16         $html=phpQuery::newDocumentHTML($r['content']);
17         $list=$html['div.albumList ul li a.albumLink'];
18         if(!empty($list)){
19             $st=$db->prepare('insert into album(artist_id,name,url) values(?,?,?)');
20             foreach($list as $v){
21                 $v=pq($v);
22                 $st->execute(array($id,trim($v->find('span.albumName')->text()),trim($v->attr('href'))));
23             }
24         }
25     }
26     $curl->status();
27 }

第一次抓去了188个,第二次抓取这188个就直接读缓存了。

8.多线程XMLRPC

xmlrpc一般使用一些封装好的类进行操作,这样执行起来比较方便,但是只能单线程。本类可以非常简单的实现xmlrpc多线程调用,经过测试,大并发下比单线程速度提高2个数量级此代码不在demo包中,因为需要apikey和新的数据表,所以只演示原理,用法和结果。

last.fm有一个更正歌手名字的服务,文档看这里 http://www.last.fm/api/show/artist.getCorrection

01 private $xmlRpcServer = 'http://ws.audioscrobbler.com/2.0/';
02  
03 /**
04  * 利用last的api更正歌手的名字
05  */
06 function artistNameCorrect() {
07     $curl = $this->curl;
08     $header = array();
09     $header[] = "Content-type: text/xml";
10     $curl->opt[CURLOPT_HTTPHEADER] = $header;
11     $opt = array(CURLOPT_POSTFIELDS => $this->getXmlRpc('artist.getCorrection', array('artist' => 'Capone -N- Noreaga')));
12     $curl->add(array($this->xmlRpcServer, null, $opt), array(array($this, 'artistNameCorrectCb1')));
13     $curl->go();
14 }
15  
16 function artistNameCorrectCb1($r) {
17     header("Content-Type: text/xml");
18     $r = $r['content'];
19     $r = new SimpleXMLElement($r);
20     $r = $r->params->param->value->string;
21     $r = stripslashes($r);
22     $r = new SimpleXMLElement($r);
23     echo $r->asXML();
24 }
25  
26 //获取请求xml
27 private function getXmlRpc($method, $param) {
28     $member = '';
29     foreach ($param as $k => $v) {
30         $member.="
31         <member>
32             <name>$k</name>
33             <value>
34                 <string>$v</string>
35             </value>
36         </member>";
37     }
38     $xml = "
39     <methodCall>
40         <methodName>$method</methodName>
41         <params>
42             <param>
43             <value>
44                 <struct>
45                     $member
46                     <member>
47                         <name>api_key</name>
48                         <value>
49                             <string>{$this->apiKey}</string>
50                         </value>
51                     </member>
52                 </struct>
53             </value>
54             </param>
55         </params>
56     </methodCall>";
57     return $xml;
58 }

有了前7步的基础,代码很容易理解,其中细节自己琢磨。回调函数输出如下结果

2013-02-06_144032

因为不断升级,API可能有变化,具体变化查看public方法的注释即可,教程和demo就不再更新。
旧版:CUrl
这是一个框架类命名空间什么的个别地方需要自己调整。

完整demo下载(curl-demo.zip)

内部代码经过n次修改变化很大,目标网站基本已经改版导致demo代码不一定可用,旧版本就不在维护。

原文地址:http://blog.phpdr.net/curl%E5%A4%9A%E7%BA%BF%E7%A8%8B.html

300*300
 文章首页关于迷茫时代关于我写意人生
版权所有:迷茫时代 All rights reserved   
执行时间:0.01697 秒