项目作者: fantasy-peak

项目描述 :
C++17 and reactor mode task/timer executor
高级语言: C++
项目地址: git://github.com/fantasy-peak/reactor.git
创建时间: 2021-04-22T12:09:16Z
项目社区:https://github.com/fantasy-peak/reactor

开源协议:Apache License 2.0

下载


reactor

Build Status

A C++17 single-file header-only, based on reactor mode, It can add tasks and timers and file descriptor to reactor(one loop one thread)

Simple examples

create thread pool

  1. auto thread_pool = std::make_unique<fantasy::ThreadPool>(2);
  2. auto get_thread_id = [] {
  3. std::stringstream ss;
  4. ss << std::this_thread::get_id();
  5. return ss.str();
  6. };
  7. std::future<std::string> ret_1 = thread_pool->enqueue([&] {
  8. spdlog::info("1 thread_id: {}", get_thread_id());
  9. std::this_thread::sleep_for(std::chrono::seconds(1));
  10. return std::string{"hello"};
  11. });
  12. std::future<int32_t> ret_2 = thread_pool->enqueue([&] {
  13. spdlog::info("2 thread_id: {}", get_thread_id());
  14. std::this_thread::sleep_for(std::chrono::seconds(1));
  15. return 999;
  16. });
  17. std::future<double> ret_3 = thread_pool->enqueue([&] {
  18. spdlog::info("3 thread_id: {}", get_thread_id());
  19. std::this_thread::sleep_for(std::chrono::seconds(1));
  20. return 999.1;
  21. });
  22. spdlog::info("ret_1: {}", ret_1.get());
  23. spdlog::info("ret_2: {}", ret_2.get());
  24. spdlog::info("ret_3: {}", ret_3.get());

add a task

  1. fantasy::Reactor reactor;
  2. reactor.run();
  3. // It will run on the reactor thread, do not block the current thread
  4. reactor.callLater([&] {
  5. spdlog::info("task");
  6. });
  7. // It will run on the reactor thread, block the current thread
  8. reactor.callNow([&] {
  9. spdlog::info("task");
  10. });

add/remove a timed task

  1. fantasy::Reactor reactor;
  2. reactor.run();
  3. // It will run in one second
  4. reactor.callAt(std::chrono::system_clock::now() + std::chrono::seconds(1),[] {
  5. spdlog::info("callAt");
  6. });
  7. // It will run in five second
  8. reactor.callAfter(std::chrono::seconds(5), [] {
  9. spdlog::info("callAfter");
  10. });
  11. // Run every three seconds
  12. reactor.callEvery(std::chrono::seconds(3), [] {
  13. spdlog::info("callEvery");
  14. return fantasy::Reactor::CallStatus::Ok;
  15. });
  16. // Run every day 05:30:00
  17. auto id = reactor.callEveryDay(fantasy::Time{5, 30, 0, 0}, [] {
  18. spdlog::info("callEveryDay");
  19. return fantasy::Reactor::CallStatus::Ok;
  20. });
  21. // cancel scheduled tasks
  22. reactor.cancel(id);

add file descriptor to reactor for read && write

  1. fantasy::Reactor reactor;
  2. reactor.run();
  3. std::string recv_buffer;
  4. int servfd;
  5. struct sockaddr_in servaddr, cliaddr;
  6. if ((servfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
  7. spdlog::info("create socket error!");
  8. exit(1);
  9. }
  10. bzero(&servaddr, sizeof(servaddr));
  11. servaddr.sin_family = AF_INET;
  12. servaddr.sin_port = htons(SERVER_PORT);
  13. servaddr.sin_addr.s_addr = htons(INADDR_ANY);
  14. int opt = 1;
  15. setsockopt(servfd, SOL_SOCKET, SO_REUSEADDR, (const void*)&opt, sizeof(opt));
  16. if (bind(servfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0) {
  17. spdlog::info("bind to port {} failure!", SERVER_PORT);
  18. exit(1);
  19. }
  20. if (listen(servfd, LENGTH_OF_LISTEN_QUEUE) < 0) {
  21. spdlog::info("call listen failure!");
  22. exit(1);
  23. }
  24. reactor.callOnRead(servfd, [&](int fd, const std::weak_ptr<fantasy::Reactor::Channel>&) mutable {
  25. socklen_t length = sizeof(cliaddr);
  26. int clifd = ::accept4(fd, (struct sockaddr*)&cliaddr, &length, SOCK_NONBLOCK | SOCK_CLOEXEC);
  27. if (clifd < 0) {
  28. spdlog::error("error comes when call accept!");
  29. return fantasy::Reactor::CallStatus::Ok;
  30. }
  31. reactor.callOnRead(clifd, [&](int fd, const std::weak_ptr<fantasy::Reactor::Channel>& channel_ptr) mutable {
  32. spdlog::info("call callOnRead");
  33. char buffer[BUFFER_SIZE] = {};
  34. auto n = read(fd, buffer, BUFFER_SIZE);
  35. if (n < 0) {
  36. perror("read()");
  37. return fantasy::Reactor::CallStatus::Remove;
  38. };
  39. if (n == 0) {
  40. spdlog::error("client close");
  41. return fantasy::Reactor::CallStatus::Remove;
  42. }
  43. spdlog::info("read: [{}], read buffer len: {}", buffer, n);
  44. recv_buffer = std::string{buffer};
  45. if (auto spt = channel_ptr.lock())
  46. spt->enableWriting();
  47. return fantasy::Reactor::CallStatus::Ok;
  48. });
  49. reactor.callOnWrite(clifd, [&](int fd, const std::weak_ptr<fantasy::Reactor::Channel>& channel_ptr) {
  50. if (recv_buffer.empty())
  51. return fantasy::Reactor::CallStatus::Ok;
  52. spdlog::info("callOnWrite");
  53. char buffer[BUFFER_SIZE] = {};
  54. memcpy(buffer, recv_buffer.c_str(), recv_buffer.size());
  55. recv_buffer.clear();
  56. spdlog::info("buffer: {}", buffer);
  57. auto n = write(fd, buffer, strlen(buffer));
  58. if (n < 0) {
  59. perror("write()");
  60. exit(1);
  61. }
  62. if (auto spt = channel_ptr.lock())
  63. spt->disableWriting();
  64. return fantasy::Reactor::CallStatus::Ok;
  65. });
  66. return fantasy::Reactor::CallStatus::Ok;
  67. });

Documentation

You can use connection pool and client separately

Maintainers

@fantasy-peak