项目作者: kcenon

项目描述 :
Asynchronous TCP communication system in order to support micro-service architecture.
高级语言: C++
项目地址: git://github.com/kcenon/messaging_system.git
创建时间: 2021-02-22T01:55:38Z
项目社区:https://github.com/kcenon/messaging_system

开源协议:BSD 3-Clause "New" or "Revised" License

下载


Messaging System

A high-performance C++20 messaging framework designed for distributed applications. Built on lock-free thread pools and featuring type-safe containers, PostgreSQL integration, and asynchronous TCP networking.

🚀 Key Features

🎯 Core Modules

  • Container Module: Type-safe, SIMD-optimized data containers with thread-safe operations
  • Database Module: PostgreSQL integration with connection pooling and prepared statements
  • Network Module: Asynchronous TCP client/server with coroutine-based I/O
  • Thread System: Lock-free thread pools with hazard pointer memory management

🌟 Advanced Capabilities

  • Lock-free Performance: Up to 2.48M jobs/second throughput with sub-microsecond latency
  • Type Safety: Comprehensive type system with variant values and memory safety
  • Python Bindings: Complete Python API for rapid prototyping and integration
  • Cross-platform: Linux, macOS, and Windows support with ARM64 optimization

📋 Requirements

System Requirements

  • Compiler: C++20 compatible (GCC 10+, Clang 12+, MSVC 2019+)
  • Build System: CMake 3.16 or later
  • Package Manager: vcpkg (included as submodule)
  • Platforms: Linux, macOS 10.15+, Windows 10+

Runtime Dependencies

  • PostgreSQL: 12+ (for database module)
  • Python: 3.8+ (for Python bindings, optional)

Development Dependencies

  • Testing: Google Test 1.17+ (included via vcpkg)
  • Benchmarking: Google Benchmark (included via vcpkg)
  • Documentation: Doxygen 1.8+ (optional)

Thread System Integration

This messaging system now includes the latest thread_system with:

  • Lock-free Components: High-performance lock-free queues and thread pools
  • Typed Thread Pools: Priority-based job scheduling (RealTime, Batch, Background)
  • Hazard Pointers: Safe memory reclamation for lock-free data structures
  • Performance: 2.14x throughput improvement with lock-free implementations

🏗️ Quick Start

1. Clone and Setup

  1. git clone <repository-url> messaging_system
  2. cd messaging_system
  3. git submodule update --init --recursive

2. Install Dependencies

  1. # Automated dependency installation
  2. ./dependency.sh
  3. # On macOS with Homebrew
  4. brew install postgresql cmake
  5. # On Ubuntu/Debian
  6. sudo apt update && sudo apt install postgresql-dev cmake build-essential

3. Build the Project

  1. # Quick build (Release mode)
  2. ./build.sh
  3. # Build with tests
  4. ./build.sh --tests
  5. # Clean build
  6. ./build.sh --clean
  7. # Build specific modules only
  8. ./build.sh --no-database --no-python

4. Run Tests

  1. # Run all tests
  2. ./build.sh --tests
  3. # Or run individual test suites
  4. cd build/bin
  5. ./container_test
  6. ./database_test
  7. ./network_test
  8. ./integration_test

Manual CMake Build

  1. mkdir build && cd build
  2. cmake .. -DCMAKE_TOOLCHAIN_FILE=../vcpkg/scripts/buildsystems/vcpkg.cmake \
  3. -DCMAKE_BUILD_TYPE=Release \
  4. -DUSE_UNIT_TEST=ON
  5. cmake --build . --parallel

📚 Module Overview

📦 Container Module (container/)

Type-safe, high-performance data containers

  • Thread Safety: Lock-free and mutex-based options
  • Value Types: Bool, numeric (int8-int64), string, bytes, nested containers
  • SIMD Optimization: ARM NEON and x86 AVX support for numeric operations
  • Serialization: Binary and JSON serialization with compression
  • Memory Management: Efficient variant storage with minimal allocations

🗄️ Database Module (database/)

PostgreSQL integration with enterprise features

  • Connection Pool: Thread-safe connection management with configurable pool sizes
  • Prepared Statements: Query optimization and SQL injection protection
  • Async Operations: Non-blocking database operations with coroutine support
  • Transaction Support: ACID compliance with nested transaction handling
  • Error Recovery: Automatic connection recovery and query retry mechanisms

🌐 Network Module (network/)

High-performance TCP messaging infrastructure

  • Asynchronous I/O: ASIO-based coroutine implementation
  • Client/Server: Full-duplex messaging with session management
  • Protocol Support: Binary and text protocols with custom message framing
  • Pipeline Processing: Message transformation and routing capabilities
  • Load Balancing: Built-in support for distributed server architectures

🧵 Thread System (thread_system/)

Lock-free concurrent processing framework

  • Lock-free Queues: Multi-producer, multi-consumer queues with hazard pointers
  • Typed Scheduling: Priority-based job scheduling (RealTime, Batch, Background)
  • Memory Safety: Hazard pointer-based memory reclamation
  • Performance Monitoring: Real-time metrics collection and reporting
  • Cross-platform: Optimized implementations for different CPU architectures

💡 Usage Examples

Basic Container Operations

  1. #include <container/container.h>
  2. using namespace container_module;
  3. int main() {
  4. // Create a type-safe container
  5. auto container = std::make_shared<value_container>();
  6. // Set various value types
  7. container->set_source("client_01", "session_1");
  8. container->set_target("server", "main");
  9. container->set_message_type("user_data");
  10. // Serialize and deserialize
  11. std::string serialized = container->serialize();
  12. auto restored = std::make_shared<value_container>(serialized);
  13. return 0;
  14. }

Network Client/Server

  1. #include <network/messaging_server.h>
  2. #include <network/messaging_client.h>
  3. using namespace network_module;
  4. int main() {
  5. // Create server
  6. auto server = std::make_shared<messaging_server>("main_server");
  7. server->start_server(8080);
  8. // Create client
  9. auto client = std::make_shared<messaging_client>("client_01");
  10. client->start_client("127.0.0.1", 8080);
  11. // Server and client automatically handle messaging
  12. // Cleanup
  13. client->stop_client();
  14. server->stop_server();
  15. return 0;
  16. }

Database Operations

  1. #include <database/database_manager.h>
  2. using namespace database;
  3. int main() {
  4. // Configure database
  5. database_manager manager;
  6. manager.set_mode(database_types::postgres);
  7. // Connect and execute queries
  8. if (manager.connect("host=localhost dbname=mydb")) {
  9. auto result = manager.select_query("SELECT * FROM users");
  10. // Process results...
  11. manager.disconnect();
  12. }
  13. return 0;
  14. }

Python Integration

  1. from messaging_system import Container, MessagingClient, Value
  2. import time
  3. # Create and populate a container
  4. container = Container()
  5. container.create(
  6. target_id="server",
  7. source_id="python_client",
  8. message_type="data_update",
  9. values=[
  10. Value("user_id", "9", "12345"),
  11. Value("message", "f", "Hello from Python!"),
  12. Value("timestamp", "9", str(int(time.time())))
  13. ]
  14. )
  15. # Network client
  16. client = MessagingClient(
  17. source_id="py_client",
  18. connection_key="secure_key",
  19. recv_callback=lambda msg: print(f"Received: {msg}")
  20. )
  21. if client.start("localhost", 8080):
  22. # Send container as message
  23. serialized = container.serialize()
  24. client.send_raw_message(serialized)
  25. client.stop()

📊 Performance Characteristics

Benchmark Results (AMD Ryzen/Intel Core, 16 threads)

Component Metric Lock-free Mutex-based Improvement
Thread Pool Throughput 2.48M jobs/sec 1.16M jobs/sec 2.14x
Job Queue Latency 250ns 1.2μs 4.8x
Container Serialization 15M ops/sec 12M ops/sec 1.25x
Network Connections 10K concurrent 8K concurrent 1.25x

Performance Features

  • Sub-microsecond Latency: Job scheduling and message processing
  • Linear Scalability: Performance scales with CPU cores up to 32+ threads
  • Memory Efficiency: Hazard pointer-based memory reclamation eliminates GC pauses
  • SIMD Optimization: ARM NEON and x86 AVX for numeric operations
  • Zero-copy Operations: Minimize memory allocations in hot paths

🏗️ Architecture

  1. ┌─────────────────────────────────────────────────────────┐
  2. Application Layer
  3. ├─────────────────────────────────────────────────────────┤
  4. C++ API Python Bindings REST API
  5. ├─────────────────────────────────────────────────────────┤
  6. Container Database Network
  7. Module Module Module
  8. ├─────────────────────────────────────────────────────────┤
  9. Thread System (Lock-free)
  10. ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐
  11. Thread Pools Job Queues Hazard Pointers
  12. └─────────────┘ └─────────────┘ └─────────────────────┘
  13. ├─────────────────────────────────────────────────────────┤
  14. Foundation Layer
  15. ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐
  16. Utilities Monitoring Logging
  17. └─────────────┘ └─────────────┘ └─────────────────────┘
  18. └─────────────────────────────────────────────────────────┘

🧪 Testing

Test Coverage

  • Container Tests: 15 unit tests covering serialization, threading, SIMD operations
  • Database Tests: 14 unit tests covering connections, queries, error handling
  • Network Tests: 14 unit tests covering client/server, sessions, protocols
  • Integration Tests: 8 cross-module integration tests
  • Performance Tests: Benchmark suite with regression detection

Running Tests

  1. # All tests with coverage
  2. ./build.sh --tests
  3. # Specific test modules
  4. cd build/bin
  5. ./container_test --gtest_filter="ContainerTest.*"
  6. ./database_test --gtest_filter="DatabaseTest.SingletonAccess"
  7. ./network_test --gtest_filter="NetworkTest.ServerClientIntegration"
  8. # Performance benchmarks
  9. cd build
  10. ./thread_system/bin/lockfree_performance_benchmark
  11. ./thread_system/bin/container_benchmark

🤝 Contributing

  1. Fork the repository
  2. Create a feature branch: git checkout -b feature/amazing-feature
  3. Test your changes: ./build.sh --tests
  4. Format code: Follow the project’s C++ style guide
  5. Commit changes: git commit -m 'feat: add amazing feature'
  6. Push to branch: git push origin feature/amazing-feature
  7. Submit a Pull Request

Development Guidelines

  • Follow C++20 best practices and RAII principles
  • Maintain thread safety in all public APIs
  • Add unit tests for new functionality
  • Update documentation for API changes
  • Use meaningful commit messages (conventional commits)

📚 Documentation

📄 License

BSD 3-Clause License

Copyright (c) 2021-2025, Messaging System Contributors
All rights reserved.

See LICENSE file for full license text.