基于Hyperf开发的任务调度系统
基于Hyperf开发的任务调度系统
基于 Hyperf + Nsq 的一个异步队列库.支持投递任务,DAG任务编排.多个任务使用同一个事务。
1.投递任务
use App\Model\Task;use App\Job\SimpleJob;use App\Kernel\Nsq\Queue;class Example{/*** @desc 测试job队列功能*/public function queue() : void{$task = Task::find(1);$job = new SimpleJob($task);$queue = new Queue('queue');$queue->push($job);}}
2.任务编排协程安全的单连接模式(事务保持、多路复用等条件下,有时必须使用一个连接)
use App\Kernel\Concurrent\ConcurrentMySQLPattern;use App\Dag\Task\Task1;use App\Dag\Task\Task2;use App\Dag\Task\Task3;class Example{public function conCurrentMySQL() : void{$dsn = 'DSN';$user = 'USER';$password = 'PWD';try {$pdo = new \PDO($dsn, $user, $password);$pdo->setAttribute(\PDO::ATTR_EMULATE_PREPARES, true);$c = new ConcurrentMySQLPattern($pdo, $this->logger);$c->beginTransaction();$dag = new \Hyperf\Dag\Dag();$a = \Hyperf\Dag\Vertex::make(function () use ($c){$task = new Task1();return $task->Run($c);}, 'a');$b = \Hyperf\Dag\Vertex::make(function ($results) use ($c){$task = new Task2();return $task->Run($c);}, 'b');$d = \Hyperf\Dag\Vertex::make(function ($results) use ($c, $a, $b){if ($results[$a->key] && $results[$b->key]) {return $c->commit();}return $c->rollback();}, 'd');$e = \Hyperf\Dag\Vertex::make(function ($results) use ($c){$c->close();}, 'e');$results = $dag->addVertex($a)->addVertex($b)->addVertex($d)->addVertex($e)->addEdge($a, $b)->addEdge($b, $d)->addEdge($d, $e)->run();} catch (\PDOException $exception) {echo 'Connection failed: ' . $exception->getMessage();}}}
3.仪表盘
4.TODO: