海运的博客

又一PHP libcurl封装异步并发HTTP客户端

发布时间:January 27, 2015 // 分类:PHP // No Comments

PHP标准库内置curl扩展,不过实现不完整,如multi_socket_action接口,无意中发现pecl http库同样基于libcurl封装,支持更多的libcurl特性,更新也比较快,底层通过libevent(epoll)实现multi_socket_action接口,不过pecl http版本1和版本2 api完全不兼容,使用过程中稳定性及性能并不如PHP内置的curl,好像还有内存泄露,以下为示例代码,基于pecl_http 2.20:

<?php
   function push($client, $url) {
      $req = new http\Client\Request("GET", $url, ["User-Agent"=>"My Client/0.1"]);
      $req->setOptions(array('connecttimeout'=>1, 'timeout'=>1));
      $client->enqueue($req, function($response) use ($client, $req, $url) {
         printf("%s returned '%s' (%d)\n", $response->getTransferInfo("effective_url"), $response->getInfo(), $response->getResponseCode());
         echo $client->count().PHP_EOL;
         global $urls;
         if ($urls) {
            while ($client->count() < 20) {
               $url = array_shift($urls);
               push($client, $url);
            }
            return true; // dequeue
         }
      });
   }

   $client = new http\Client;
   $client->enablePipelining(true);
   $client->enableEvents(true);

   for ($i = 0; $i < 10000; ++$i) {
      $urls[] = "http://192.168.1.3/";
   }
   for ($i = 0; $i < 20; ++$i) {
      $url = array_shift($urls);
      push($client, $url);
   }
   /*
   try{
      var_dump($client->send());
   }
   catch(http\Exception\RuntimeException  $e)
   {
      echo 'Message: ' .$e->getMessage().PHP_EOL;
   }
   */

   while ($client->once()) {
      $client->wait();
   }

Python CURL异步并发HTTP客户端

发布时间:January 7, 2015 // 分类:Python // No Comments

Select模式,类似于php multi curl异步并发,连接数不能太多:

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys
import pycurl
import cStringIO

#最大连接数
num_conn = 20

queue = []
urls = ['https://www.haiyun.me/'] * 10000
for url in urls:
  queue.append(url)

num_urls = len(queue)
num_conn = min(num_conn, num_urls)
print ('----- Getting', num_urls, 'Max conn', num_conn,
       'connections -----')

m = pycurl.CurlMulti()
#初始化handle,可复用
m.handles = []
for i in range(num_conn):
  c = pycurl.Curl()
  c.body = cStringIO.StringIO()
  c.setopt(pycurl.FOLLOWLOCATION, 1)
  c.setopt(pycurl.MAXREDIRS, 5)
  c.setopt(pycurl.CONNECTTIMEOUT, 30)
  c.setopt(pycurl.TIMEOUT, 300)
  c.setopt(pycurl.NOSIGNAL, 1)
  m.handles.append(c)


freelist = m.handles[:]
num_processed = 0
#主循环开始
while num_processed < num_urls:

    #添加请求URL
    while queue and freelist:
      url = queue.pop()
      c = freelist.pop()
      c.setopt(pycurl.URL, url)
      c.setopt(pycurl.WRITEFUNCTION, c.body.write)
      m.add_handle(c)
      c.url = url
      #print url

    #执行请求
    while 1:
      (ret, num_handles) = m.perform()
      if ret != pycurl.E_CALL_MULTI_PERFORM:
        break

    #阻塞一会直到有连接完成
    m.select(1.0)

    #读取完成的连接
    while 1:
      (num_q, ok_list, err_list) = m.info_read()
      for c in ok_list:
        m.remove_handle(c)
        #print c.body.getvalue()
        freelist.append(c)

      for (c, errno, errmsg) in err_list:
        m.remove_handle(c)
        print ('Failed: ', c.url, errno, errmsg)
        freelist.append(c)
      num_processed = num_processed + len(ok_list) + len(err_list)
      if num_q == 0:
        break

for c in m.handles:
  c.fp = None
  c.close()
m.close()

epoll模式,php mult curl不支持此模式,tornado基于pycurl multi_socket_action封装的异步http client,每个client实例维护一个ioloop:

from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop
count = 10000
done = 0
def handle_request(response):
  global done
  done += 1
  if (done == count):
    #结束循环
    IOLoop.instance().stop()

  if response.error:
    print "Error:", response.error
  #else:
    #print response.body
#默认client是基于ioloop实现的,配置使用Pycurl
AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient",max_clients=20)
http_client = AsyncHTTPClient()
for i in range(count):
  http_client.fetch("https://www.haiyun.me/", handle_request)
#死循环
IOLoop.instance().start()      

基于epoll的multi curl在lan环境下效果不如select,因为所有Socket都在活跃状态,所有的callback都被唤醒,会导致资源的竞争。既然都是要处理所有的Socket,直接遍历是最简单最有效的方式.
为更好的性能建议libcurl/pycurl开启异步DNS解析

自用完美PHP异步并发multi curl

发布时间:December 7, 2014 // 分类:PHP // No Comments

修改自https://code.google.com/p/rolling-curl/

<?php
   /*
   Authored by Josh Fraser (www.joshfraser.com)
   Released under Apache License 2.0

   Maintained by Alexander Makarov, http://rmcreative.ru/

   $Id$
   */

   /**
   * Class that represent a single curl request
   */
   class RollingCurlRequest {
      public $url = false;
      public $method = 'GET';
      public $post_data = null;
      public $headers = null;
      public $options = null;
      public $info = null;
      public $callback;
      public $recursion = false;

      /**
      * @param string $url
      * @param string $method
      * @param  $post_data
      * @param  $headers
      * @param  $options
      * @return void
      */
      function __construct($url, $options = null, $info = null, $method = "GET", $post_data = null, $headers = null  ) {
         $this->url = $url;
         $this->method = $method;
         $this->post_data = $post_data;
         $this->headers = $headers;
         $this->options = $options;
         $this->info = $info;
      }

      /**
      * @return void
      */
      public function __destruct() {
         unset($this->url, $this->method, $this->post_data, $this->headers, $this->options);
      }
   }

   /**
   * RollingCurl custom exception
   */
   class RollingCurlException extends Exception {
   }

   /**
   * Class that holds a rolling queue of curl requests.
   *
   * @throws RollingCurlException
   */
   class RollingCurl {
      /**
      * @var int
      *
      * Window size is the max number of simultaneous connections allowed.
      *
      * REMEMBER TO RESPECT THE SERVERS:
      * Sending too many requests at one time can easily be perceived
      * as a DOS attack. Increase this window_size if you are making requests
      * to multiple servers or have permission from the receving server admins.
      */
      private $window_size = 5;

      //private $master = 'NULL';
      //保存连接数量
      public $current_size =0;
      /**
      * @var float
      *
      * Timeout is the timeout used for curl_multi_select.
      */
      private $timeout = 10;

      /**
      * @var array
      *
      * Set your base options that you want to be used with EVERY request.
      */
      protected $options = array(
         CURLOPT_SSL_VERIFYPEER => 0,
         CURLOPT_RETURNTRANSFER => 1,
         CURLOPT_VERBOSE => 0,
         CURLOPT_TIMEOUT => 20,
         CURLOPT_DNS_CACHE_TIMEOUT => 3600,
         CURLOPT_CONNECTTIMEOUT => 10,
         CURLOPT_ENCODING => 'gzip,deflate',
         CURLOPT_FOLLOWLOCATION => 1,
         CURLOPT_MAXREDIRS => 2,
         CURLOPT_USERAGENT => 'Mozilla/5.0 (Windows NT 6.3; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0',
         //CURLOPT_HEADER => 1
      );

      /**
      * @var array
      */
      private $headers = array(

         'Connection: Keep-Alive',
         'Keep-Alive: 300',
         'Expect:'
      );

      /**
      * @var Request[]
      *
      * The request queue
      */
      private $requests = array();

      /**
      * @var RequestMap[]
      *
      * Maps handles to request indexes
      */
      private $requestMap = array();

      /**
      * @param  $callback
      * Callback function to be applied to each result.
      *
      * Can be specified as 'my_callback_function'
      * or array($object, 'my_callback_method').
      *
      * Function should take three parameters: $response, $info, $request.
      * $response is response body, $info is additional curl info.
      * $request is the original request
      *
      * @return void
      */
      function __construct($callback = null) {
         $this->callback = $callback;
      }

      /**
      * @param string $name
      * @return mixed
      */
      public function __get($name) {
         return (isset($this->{$name})) ? $this->{$name} : null;
      }

      /**
      * @param string $name
      * @param mixed $value
      * @return bool
      */
      public function __set($name, $value) {
         // append the base options & headers
         if ($name == "options" || $name == "headers") {
            $this->{$name} = $value + $this->{$name};
         } else {
            $this->{$name} = $value;
         }
         return true;
      }

      /**
      * Add a request to the request queue
      *
      * @param Request $request
      * @return bool
      */
      public function add($request) {
         $this->requests[] = $request;
         return true;
      }

      /**
      * Create new Request and add it to the request queue
      *
      * @param string $url
      * @param string $method
      * @param  $post_data
      * @param  $headers
      * @param  $options
      * @return bool
      */
      public function request($url, $method = "GET", $post_data = null, $headers = null, $options = null) {
         $this->requests[] = new RollingCurlRequest($url, $method, $post_data, $headers, $options);
         return true;
      }

      /**
      * Perform GET request
      *
      * @param string $url
      * @param  $headers
      * @param  $options
      * @return bool
      */
      public function get($url, $headers = null, $options = null) {
         return $this->request($url, "GET", null, $headers, $options);
      }

      /**
      * Perform POST request
      *
      * @param string $url
      * @param  $post_data
      * @param  $headers
      * @param  $options
      * @return bool
      */
      public function post($url, $post_data = null, $headers = null, $options = null) {
         return $this->request($url, "POST", $post_data, $headers, $options);
      }

      /**
      * Execute processing
      *
      * @param int $window_size Max number of simultaneous connections
      * @return string|bool
      */
      public function execute($window_size = null) {
         // rolling curl window must always be greater than 1
         if (sizeof($this->requests) == 1) {
            return $this->single_curl();
         } else {
            // start the rolling curl. window_size is the max number of simultaneous connections
            return $this->rolling_curl($window_size);
         }
      }

      /**
      * Performs a single curl request
      *
      * @access private
      * @return string
      */
      private function single_curl() {
         $ch = curl_init();
         $request = array_shift($this->requests);
         //获取选项及header
         $options = $this->get_options($request);
         curl_setopt_array($ch, $options);
         $output = curl_exec($ch);
         $info = curl_getinfo($ch);
         //处理错误
         if (curl_error($ch))
         $info['error'] = curl_error($ch);

         // it's not neccesary to set a callback for one-off requests
         if ($request->callback) {
            $callback = $request->callback;
            if (is_callable($callback)) {
               call_user_func($callback, $output, $info, $request);
            }
         }
         else
         return $output;
         return true;
      }

      /**
      * Performs multiple curl requests
      *
      * @access private
      * @throws RollingCurlException
      * @param int $window_size Max number of simultaneous connections
      * @return bool
      */
      private function rolling_curl($window_size = null) {
         if ($window_size)
         $this->window_size = $window_size;

         // make sure the rolling window isn't greater than the # of urls
         if (sizeof($this->requests) < $this->window_size)
         $this->window_size = sizeof($this->requests);

         if ($this->window_size < 2) {
            throw new RollingCurlException("Window size must be greater than 1");
         }

         $master = curl_multi_init();

         //首次执行填满请求
         for ($i = 0; $i < $this->window_size; $i++) {
            $ch = curl_init();

            $options = $this->get_options($this->requests[$i]);

            curl_setopt_array($ch, $options);
            curl_multi_add_handle($master, $ch);

            $key = (int) $ch;
            //ch重用队列
            $chs[$key] = $ch;
            //请求map,后续根据返回信息的ch获取原始请求信息
            $this->requestMap[$key] = $i;
            $this->current_size++;
         }

         do {
            //执行句柄内所有连接,包括后来新加入的连接
            do {
               //running变量返回正在处理的curl数量,0表示当前没有正在执行的curl
               $execrun = curl_multi_exec($master, $running);
            } while ($execrun == CURLM_CALL_MULTI_PERFORM); // 7.20.0后弃用

            if ($execrun != CURLM_OK)
            echo "ERROR!\n " . curl_multi_strerror($execrun);

            //阻塞一会等待有数据可读,返回可读数量,失败为-1,避免一直循环占用CPU
            if ($running)
            curl_multi_select($master, $this->timeout);

            //读取返回的连接,并加入新的连接
            while ($done = curl_multi_info_read($master)) {

               //获取完成的句柄
               $ch = $done['handle'];
               //获取返回的请求信息
               $info = curl_getinfo($ch);
               //获取返回内容
               $output = curl_multi_getcontent($ch);
               //处理错误信息
               //if (curl_error($ch))  
               if ($done['result'] != CURLE_OK) 
               $info['error'] = curl_error($ch);

               //根据请求映射是哪个请求返回的信息,即请求数组中第i个请求
               $key = (int) $ch;
               $request = $this->requests[$this->requestMap[$key]];
               //发送返回信息到回调函数
               $callback = $request->callback;
               if (is_callable($callback)) {
                  //移除请求信息和请求映射
                  unset($this->requests[$this->requestMap[$key]]);
                  unset($this->requestMap[$key]);
                  $this->current_size--;
                  //回调函数
                  call_user_func($callback, $output, $info, $request);
               }
               //删除完成的句柄
               curl_multi_remove_handle($master, $done['handle']);

               //判断队列内的连接是否用完
               if (isset($this->requests[$i])) {
                  //重用之前完成的ch
                  $ch = $chs[$key];
                  //var_dump($ch);
                  $options = $this->get_options($this->requests[$i]);
                  curl_setopt_array($ch, $options);
                  //增加新的连接
                  curl_multi_add_handle($master, $ch);

                  //添加到request Maps,用于返回信息时根据handle找到相应连接
                  $key = (int) $ch;
                  $this->requestMap[$key] = $i;
                  $this->current_size++;
                  $i++;
               } 
            }

         } while ($this->current_size) ;
         curl_multi_close($master);
         return true;
      }

      //返回是否还有活动连接
      public function state() {
         return curl_multi_select($this->master, $this->timeout);
      }

      /**
      * Helper function to set up a new request by setting the appropriate options
      *
      * @access private
      * @param Request $request
      * @return array
      */
      private function get_options($request) {
         //获取类内选项设置
         $options = $this->__get('options');
         if (ini_get('safe_mode') == 'Off' || !ini_get('safe_mode')) {
            $options[CURLOPT_FOLLOWLOCATION] = 1;
            $options[CURLOPT_MAXREDIRS] = 5;
         }

         //附加类内设置到请求选项中
         if ($request->options) {
            $options = $request->options + $options;
         }

         //获取类内head设置
         $headers = $this->__get('headers');

         //附加header
         if ($request->headers) {
            $headers = $request->headers + $headers;
         }

         // set the request URL
         $options[CURLOPT_URL] = $request->url;

         // posting data w/ this request?
         if ($request->post_data) {
            $options[CURLOPT_POST] = 1;
            $options[CURLOPT_POSTFIELDS] = $request->post_data;
         }
         if ($headers) {
            $options[CURLOPT_HEADER] = 0;
            $options[CURLOPT_HTTPHEADER] = $headers;
         }

         return $options;
      }

      /**
      * @return void
      */
      public function __destruct() {
         unset($this->window_size, $this->callback, $this->options, $this->headers, $this->requests);
      }

      Function test() {

         var_dump($this->requests);
      }
   }

使用方法:

<?php
   require("class/RollingCurl.php");
   function callback($response, $info, $request) {
      print_r($response);
      print_r($info);
      print_r($request);
   }
   $rc = new RollingCurl();
   $rc->window_size = 2;
   for ($i = 1; $i < 1000; $i++) {
      $url = "http://www.baidu.com/";
      $request = new RollingCurlRequest($url);
      $request->options = array(CURLOPT_COOKIEJAR => '/tmp/ck.cookie', CURLOPT_COOKIEFILE => '/tmp/ck.cookie');
      $request->headers = array('Referer: https://www.haiyun.me');
      $request->callback = 'callback';
      $rc->add($request);
   }
   $res = $rc->execute();

PHP异步并发执行multi cURL详解

发布时间:January 4, 2014 // 分类:PHP // No Comments

官方提供的例子,一次执行多个连接,然后阻塞所有链接完成取内容,整个过程是阻塞的。

<?php
// 创建单个cURL资源
$ch1 = curl_init();
$ch2 = curl_init();

// 设置URL和相应的选项
curl_setopt($ch1, CURLOPT_URL, "https://www.haiyun.me/");
curl_setopt($ch1, CURLOPT_HEADER, 0);
curl_setopt($ch1, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch2, CURLOPT_URL, "https://www.haiyun.me/");
curl_setopt($ch2, CURLOPT_HEADER, 0);
curl_setopt($ch2, CURLOPT_RETURNTRANSFER, 1);

// 创建批处理cURL句柄
$mh = curl_multi_init();

// 增加单个句柄到批处理
curl_multi_add_handle($mh,$ch1);
curl_multi_add_handle($mh,$ch2);

$active = null;
// 执行批处理句柄,循环任务直到全部执行,不等待返回结果
do {
    $mrc = curl_multi_exec($mh, $active);
} while ($mrc == CURLM_CALL_MULTI_PERFORM);

//循环判断任务是否执行完成
while ($active && $mrc == CURLM_OK) {
    //阻塞等待cURL批处理中的活动连接,失败时返回-1,不等于-1代表还有活动连接
    if (curl_multi_select($mh) != -1) {
        //有活动连接时继续执行批处理句柄
        do {
            $mrc = curl_multi_exec($mh, $active);
        } while ($mrc == CURLM_CALL_MULTI_PERFORM);
    }
}

//获取单个URL返回的内容
var_dump(curl_multi_getcontent($ch1));
// 关闭全部句柄
curl_multi_remove_handle($mh, $ch1);
curl_multi_remove_handle($mh, $ch2);
curl_multi_close($mh);
?>

非阻塞实现http://code.google.com/p/rolling-curl/,不过在循环时是根据curl_multi_exec返回的运行状态判断的,大量链接时经常没完成就跳出,修改了下根据循环的队列是否完成来判断。

<?php
   $isruning = 1;
   do {
      //执行句柄内所有连接,包括后来新加入的连接
      do {
         $execrun = curl_multi_exec($master, $running);
      } while ($execrun == CURLM_CALL_MULTI_PERFORM);
      //while (($execrun = curl_multi_exec($master, $running)) == CURLM_CALL_MULTI_PERFORM) ;
      //if ($execrun != CURLM_OK)
      //break;
      //有连接返回立即处理,并加入新的连接
      while ($done = curl_multi_info_read($master)) {

         //获取返回的信息
         $info = curl_getinfo($done['handle']);
         $output = curl_multi_getcontent($done['handle']);

         //发送返回信息到回调函数
         $callback = $this->callback;
         if (is_callable($callback)) {
            //获取返回信息的句柄
            $key = (string) $done['handle'];
            //根据请求映射是哪个请求返回的信息
            $request = $this->requests[$this->requestMap[$key]];
            unset($this->requestMap[$key]);
            call_user_func($callback, $output, $info, $request);
         }

         //判断队列内的连接是否用完
         if ($i < sizeof($this->requests) && isset($this->requests[$i]) && $i < count($this->requests)) {
            $ch = curl_init();
            $options = $this->get_options($this->requests[$i]);
            curl_setopt_array($ch, $options);
            //增加新的连接
            curl_multi_add_handle($master, $ch);

            //添加到request Maps,用于返回信息时根据handle找到相应连接
            $key = (string) $ch;
            $this->requestMap[$key] = $i;
            $i++;
         } 
         else {
            //循环结束
            $isruning = 0;
         }

         //删除完成的句柄
         curl_multi_remove_handle($master, $done['handle']);
      }

      // Block for data in / output; error handling is done by curl_multi_exec
      if ($running)
      curl_multi_select($master, $this->timeout);

   } while ($isruning);
?>
分类
最新文章
最近回复
  • opnfense: 谢谢博主!!!解决问题了!!!我之前一直以为内置的odhcp6就是唯一管理ipv6的方式
  • liyk: 这个方法获取的IPv6大概20分钟之后就会失效,默认路由先消失,然后Global IPV6再消失
  • 海运: 不好意思,没有。
  • zongboa: 您好,請問一下有immortalwrt設定guest Wi-Fi的GUI教學嗎?感謝您。
  • 海运: 恩山有很多。
  • swsend: 大佬可以分享一下固件吗,谢谢。
  • Jimmy: 方法一 nghtp3步骤需要改成如下才能编译成功: git clone https://git...
  • 海运: 地址格式和udpxy一样,udpxy和msd_lite能用这个就能用。
  • 1: 怎么用 编译后的程序在家里路由器内任意一台设备上运行就可以吗?比如笔记本电脑 m参数是笔记本的...
  • 孤狼: ups_status_set: seems that UPS [BK650M2-CH] is ...