海运的博客

使用Gearman搭建分布式任务分发平台

发布时间:January 9, 2014 // 分类:消息队列 // No Comments

使用yum直接安装:

yum install gearmand

安装PHP扩展:

yum install libgearman re2c
wget http://pecl.php.net/get/gearman-1.1.2.tgz
tar zxvf gearman-1.1.2.tgz 
cd gearman-1.1.2
phpize 
./configure 
make && make install
echo "extension=gearman.so" >> /etc/php.ini

启动任务分发进程:

gearmand -d --keepalive --libtokyocabinet-file /tmp/gearmand.tch

PHP客户端,提交任务:

<?php  
   $client= new GearmanClient();  
   $client->addServer("127.0.0.1", 4730); 
   //发送任务到Job,处理函数和数据
   //echo $client->do("reverse", "Hello World!");  
   echo $client->doBackground("reverse", "Hello World!");  
?> 

PHP执行端:

<?php  
   $worker= new GearmanWorker();  
   $worker->addServer("127.0.0.1", 4730); 
   //处理接收到数据的回调函数
   $worker->addFunction("reverse", "reverse_function");  
   while ($worker->work());  
   function reverse_function($job)  
   {  
      for($i = 0; $i < 10; $i++){  
         sleep(1);  
         echo "{$i}\n";  
      }  
      return $job->workload();  
   }  
?>  

Kyoto Cabinet和LevelDB实现任务队列

发布时间:January 7, 2014 // 分类:消息队列 // No Comments

根据Tokyo Tyrant自带的Lua队列脚本修改,Kyoto Cabinet和Tokyo Tyrant的API有很大不同。

kt = __kyototycoon__
db = kt.db

-- 记录日志
if kt.thid == 0 then
   kt.log("system", "the Lua script has been loaded")
end

-- 入队列
function enqueue(inmap, outmap)
   local key = inmap.key
   local value = inmap.value
   --队列值自增1,空从0开始
   local id = db:increment_double(key, 1)
   if not id then
      return kt.RVEINTERNAL 
   end 
   key = string.format("%s-%012d", key, id)
   if not db:add(key, value) then
      return kt.RVEINTERNAL
   end
   outmap[key] = "ok"
   return kt.RVSUCCESS
end

-- 出队列
function dequeue(inmap, outmap)
   local key = inmap.key
   local max = inmap.max
   max = tonumber(max)
   if not max or max < 1 then
      max = 1
   end
   key = string.format("%s-", key)
   --匹配队列前缀,返回多个匹配的key
   local keys = db:match_prefix(key, max)
   local res = ""
   for i = 1, #keys do
      local key = keys[i]
      local value = db:get(key)
      if db:remove(key) and value then
         --要返回的结果
         outmap[keys[i]] = value
      end
   end
   return kt.RVSUCCESS
end

-- 查看队列大小
function queuesize(inmap, outmap)
   local key = inmap.key
   key = string.format("%s-", key)
   local keys = db:match_prefix(key)
   outmap.size = #keys
   return kt.RVSUCCESS
end

-- 重置队列ID从0开始 
function queuereset(inmap, outmap)
   local key = inmap.key
   if not key then
      return kt.RVEINVALID
   end
   if not db:remove(key) then
      local err = db:error()
      if err:code() == kt.Error.NOREC then
         return kt.RVELOGIC
      end
      return kt.RVEINTERNAL
   end
   return kt.RVSUCCESS
end

使用:

ktremotemgr script -host 192.168.1.3 -port 1978 enqueue key queue value value1
ktremotemgr script -host 192.168.1.3 -port 1978 dequeue key queue max 10
curl "http://192.168.1.3:1978/rpc/play_script?name=enqueue&_key=queue&_value=value1"
curl "http://192.168.1.3:1978/rpc/play_script?name=dequeue&_key=queue&_max=10"

Tokyo Tyrant和Tokyo Cabinet实现任务队列

发布时间:January 7, 2014 // 分类:消息队列 // No Comments

Tokyo Tyrant官方提供的Lua脚本扩展:

-- 入队列
function enqueue(key, value)
   --通过队列名为key记录入队列的尾部,自加1为当次入队列的起始ID
   local id = _adddouble(key, 1)
   if not id then
      return nil
   end
   --队列key格式为队列名称\t加12位数字
   key = string.format("%s\t%012d", key, id)
   if not _putkeep(key, value) then
      return nil
   end
   return "ok"
end

-- 出队列
function dequeue(key, max)
   max = tonumber(max)
   if not max or max < 1 then
      max = 1
   end
   key = string.format("%s\t", key)
   --匹配队列前缀,返回匹配的多个key
   local keys = _fwmkeys(key, max)
   local res = ""
   for i = 1, #keys do
      local key = keys[i]
      local value = _get(key)
      --删除要出队列的key并返回队列值
      if _out(key) and value then
         res = res .. value .. "\n"
      end
   end
   return res
end

-- 查看队列大小
function queuesize(key)
   key = string.format("%s\t", key)
   local keys = _fwmkeys(key)
   //队列数组大小
   return #keys
end
tcrmgr ext -port 1978 127.0.0.1 enqueue myqueue value1
tcrmgr ext -port 1978 127.0.0.1 dequeue myqueue 10

通过TT入队列:

<?php
   function dequeue(){
      $tt = new TokyoTyrant("localhost", '1978');   
      $content = $tt -> ext('dequeue', '0', 'queue', 10);
      $content = preg_split('/\n/', $content, -1, PREG_SPLIT_NO_EMPTY);
      return $content;
   }
   //队列名称
   $file = 'file.csv';
   $queuename = 'queue';
   $tt = new TokyoTyrant("localhost", '1978');   
   $content = file_get_contents($file);
   $content = preg_split('/\n/', $content, -1, PREG_SPLIT_NO_EMPTY); //分割字符串为数组
   //队列开始ID
   //print_r($content);
   foreach($content as $key => $value)  
   {
      if ($key !== '')  
      {
         $tmpvalue = explode(',',$value);
         preg_match_all("/[0-9]{1}/", $tmpvalue[0], $arrNum, PREG_SET_ORDER);
         if (count($arrNum) > 8)
         {
            $tt->ext('enqueue','0', $queuename, $value );
            //echo $value."\n";
         }
      }
   }
?>

大数据通过TC PHP扩展入队列:

<?php
   //队列名称
   $file = 'file.txt';
   $queuename = 'queue';
   $db = new TCHDB();
   $db->open('/var/queue/db.tch', TCHDB::OWRITER | TCHDB::OCREAT);
   $content = file_get_contents($file);
   $content = preg_split('/\n/', $content, -1, PREG_SPLIT_NO_EMPTY); //分割字符串为数组
   //队列开始ID
   $id = 1;
   //print_r($content);
   foreach($content as $key => $value)  
   {
      if ($key !== '')  
      {
         $id = sprintf('%012d', $id);
         //echo $key.$value."\n";
         $db->put("$queuename-$id", $value);
         $id++;
      }
   }
   $db->out($queuename);
?>

Debian7安装队列服务HTTPSQS

发布时间:December 31, 2013 // 分类:消息队列 // No Comments

aptitude install gcc make zlib1g-dev libbz2-dev
yum install gcc zlib-devel bzip2-devel
wget https://github.com/downloads/libevent/libevent/libevent-2.0.21-stable.tar.gz
tar zxvf libevent-2.0.21-stable.tar.gz 
cd libevent-2.0.21-stable/
./configure --prefix=/usr/local/libevent-2.0.21-stable/
make && make install
cd ../

wget http://fallabs.com/tokyocabinet/tokyocabinet-1.4.48.tar.gz
tar zxvf tokyocabinet-1.4.48.tar.gz
cd tokyocabinet-1.4.48/
./configure --prefix=/usr/local/tokyocabinet-1.4.48/
make && make install
cd ../

wget http://httpsqs.googlecode.com/files/httpsqs-1.7.tar.gz
tar zxvf httpsqs-1.7.tar.gz
cd httpsqs-1.7/
sed -i 's/12/21/g' Makefile
sed -i 's/47/48/g' Makefile
make && make install

修改源码添加出多个队列支持:

/* 出多个队列 */
else if (strcmp(httpsqs_input_opt, "mget") == 0 && httpsqs_input_num >= 0 && httpsqs_input_num <= 10000) 
{
    int i;
    for( i=0; i<httpsqs_input_num; i++ )
    {
        int queue_get_value = 0;
        queue_get_value = httpsqs_now_getpos((char *)httpsqs_input_name);
        if (queue_get_value == 0) {
            evbuffer_add_printf(buf, "%s", "HTTPSQS_GET_END");
            break;
        } else {
            char queue_name[300] = {0}; /* 队列名称的总长度,用户输入的队列长度少于256字节 */
            sprintf(queue_name, "%s:%d", httpsqs_input_name, queue_get_value);
            char *httpsqs_output_value;
            httpsqs_output_value = tcbdbget2(httpsqs_db_tcbdb, queue_name);
            if (httpsqs_output_value) {
                memset(queue_name, '\0', 300);
                sprintf(queue_name, "%d", queue_get_value);    
                evhttp_add_header(req->output_headers, "Pos", queue_name);
                evbuffer_add_printf(buf, "%s\n", httpsqs_output_value);
                free(httpsqs_output_value);
            } else {
                evbuffer_add_printf(buf, "%s", "HTTPSQS_GET_END");
                break;
            }
        }
    }
}

源码分析:

将队列读取点和写入点保存为key:queue:getpos/putpos的值,队列内容key为queue:1/queue:2形式
通过httpsqs_read_putpos获取写入/读取点值
读取队列内容时httpsqs_now_read_pos对值判断及更新值,对key:queue:+1
通过tcbdbput2/tcbdget2写入/读取队列内容

更多:http://blog.s135.com/httpsqs/

分类
最新文章
最近回复
  • opnfense: 谢谢博主!!!解决问题了!!!我之前一直以为内置的odhcp6就是唯一管理ipv6的方式
  • liyk: 这个方法获取的IPv6大概20分钟之后就会失效,默认路由先消失,然后Global IPV6再消失
  • 海运: 不好意思,没有。
  • zongboa: 您好,請問一下有immortalwrt設定guest Wi-Fi的GUI教學嗎?感謝您。
  • 海运: 恩山有很多。
  • swsend: 大佬可以分享一下固件吗,谢谢。
  • Jimmy: 方法一 nghtp3步骤需要改成如下才能编译成功: git clone https://git...
  • 海运: 地址格式和udpxy一样,udpxy和msd_lite能用这个就能用。
  • 1: 怎么用 编译后的程序在家里路由器内任意一台设备上运行就可以吗?比如笔记本电脑 m参数是笔记本的...
  • 孤狼: ups_status_set: seems that UPS [BK650M2-CH] is ...