海运的博客

Tokyo Tyrant和Tokyo Cabinet实现任务队列

发布时间:January 7, 2014 // 分类:消息队列 // No Comments

Tokyo Tyrant官方提供的Lua脚本扩展:

-- 入队列
function enqueue(key, value)
   --通过队列名为key记录入队列的尾部,自加1为当次入队列的起始ID
   local id = _adddouble(key, 1)
   if not id then
      return nil
   end
   --队列key格式为队列名称\t加12位数字
   key = string.format("%s\t%012d", key, id)
   if not _putkeep(key, value) then
      return nil
   end
   return "ok"
end

-- 出队列
function dequeue(key, max)
   max = tonumber(max)
   if not max or max < 1 then
      max = 1
   end
   key = string.format("%s\t", key)
   --匹配队列前缀,返回匹配的多个key
   local keys = _fwmkeys(key, max)
   local res = ""
   for i = 1, #keys do
      local key = keys[i]
      local value = _get(key)
      --删除要出队列的key并返回队列值
      if _out(key) and value then
         res = res .. value .. "\n"
      end
   end
   return res
end

-- 查看队列大小
function queuesize(key)
   key = string.format("%s\t", key)
   local keys = _fwmkeys(key)
   //队列数组大小
   return #keys
end
tcrmgr ext -port 1978 127.0.0.1 enqueue myqueue value1
tcrmgr ext -port 1978 127.0.0.1 dequeue myqueue 10

通过TT入队列:

<?php
   function dequeue(){
      $tt = new TokyoTyrant("localhost", '1978');   
      $content = $tt -> ext('dequeue', '0', 'queue', 10);
      $content = preg_split('/\n/', $content, -1, PREG_SPLIT_NO_EMPTY);
      return $content;
   }
   //队列名称
   $file = 'file.csv';
   $queuename = 'queue';
   $tt = new TokyoTyrant("localhost", '1978');   
   $content = file_get_contents($file);
   $content = preg_split('/\n/', $content, -1, PREG_SPLIT_NO_EMPTY); //分割字符串为数组
   //队列开始ID
   //print_r($content);
   foreach($content as $key => $value)  
   {
      if ($key !== '')  
      {
         $tmpvalue = explode(',',$value);
         preg_match_all("/[0-9]{1}/", $tmpvalue[0], $arrNum, PREG_SET_ORDER);
         if (count($arrNum) > 8)
         {
            $tt->ext('enqueue','0', $queuename, $value );
            //echo $value."\n";
         }
      }
   }
?>

大数据通过TC PHP扩展入队列:

<?php
   //队列名称
   $file = 'file.txt';
   $queuename = 'queue';
   $db = new TCHDB();
   $db->open('/var/queue/db.tch', TCHDB::OWRITER | TCHDB::OCREAT);
   $content = file_get_contents($file);
   $content = preg_split('/\n/', $content, -1, PREG_SPLIT_NO_EMPTY); //分割字符串为数组
   //队列开始ID
   $id = 1;
   //print_r($content);
   foreach($content as $key => $value)  
   {
      if ($key !== '')  
      {
         $id = sprintf('%012d', $id);
         //echo $key.$value."\n";
         $db->put("$queuename-$id", $value);
         $id++;
      }
   }
   $db->out($queuename);
?>

标签:none

评论已关闭

分类
最新文章
最近回复
  • opnfense: 谢谢博主!!!解决问题了!!!我之前一直以为内置的odhcp6就是唯一管理ipv6的方式
  • liyk: 这个方法获取的IPv6大概20分钟之后就会失效,默认路由先消失,然后Global IPV6再消失
  • 海运: 不好意思,没有。
  • zongboa: 您好,請問一下有immortalwrt設定guest Wi-Fi的GUI教學嗎?感謝您。
  • 海运: 恩山有很多。
  • swsend: 大佬可以分享一下固件吗,谢谢。
  • Jimmy: 方法一 nghtp3步骤需要改成如下才能编译成功: git clone https://git...
  • 海运: 地址格式和udpxy一样,udpxy和msd_lite能用这个就能用。
  • 1: 怎么用 编译后的程序在家里路由器内任意一台设备上运行就可以吗?比如笔记本电脑 m参数是笔记本的...
  • 孤狼: ups_status_set: seems that UPS [BK650M2-CH] is ...