项目作者: n040661

项目描述 :
drpc-proxy-copy是基于使用storm DRPC的RPC服务,解耦业务代码与storm框架代码的一个简单框架; 在某些场景下,有使用DRPC但不注重使用storm的流式计算的需求,通常情况下使用DRPCServer做为服务提供方接收请求,bolt中处理业务,ReturnResults返回结果;bolt中会将业务代码与storm代码交织、耦合,为后期升级、扩展留下难题。 DRPC-Proxy提供解耦业务与storm,服务消费方使用动态代理生调用DRPCClient与DRPCServer通讯,DRPCServer将请求匹配到对应的服务提供方,最终结果由DRPCServer返回给消费方。参考rongkang
高级语言: Java
项目地址: git://github.com/n040661/drpc-proxy-copy.git
创建时间: 2018-04-09T13:00:35Z
项目社区:https://github.com/n040661/drpc-proxy-copy

开源协议:

下载


“# drpc-proxy-copy”
DRPC-Proxy是基于使用storm DRPC的RPC服务,解耦业务代码与storm框架代码的一个简单框架;
在某些场景下,有使用DRPC但不注重使用storm的流式计算的需求,通常情况下使用DRPCServer做为服务提供方接收请求,bolt中处理业务,ReturnResults返回结果;bolt中会将业务代码与storm代码交织、耦合,为后期升级、扩展留下难题。
DRPC-Proxy提供解耦业务与storm,服务消费方使用动态代理生调用DRPCClient与DRPCServer通讯,DRPCServer将请求匹配到对应的服务提供方,最终结果由DRPCServer返回给消费方。

DRPC-Proxy 特点

  • 解耦storm与业务代码,开发过程中对storm无感知
  • 使用简单,导入jar包,properties中添加相关服务的配置,pom.xml中添加依赖及profile
  • 支持三种模式开发,脱离storm进行业务开发-rely模式,LocalDRPC模式,Remote模式
  • 提供spring环境下的支持,无spring亦可
  • 异常可远程抛出
  • 对DRPC无封装,使用原生代码调用
  • 集成AKKA,保证单线程下bolt对高并发的支持

Module 说明

proxy : 基于接口的drpc调用

proxy-spring : 支持spring环境的基于接口的drpc调用

demo:

demo-customer-spring : spring环境服务消费者

demo-server : 服务接口

demo-serviceimpl : 服务提供者

用法(spring环境)

服务接口API

  1. public interface UserService {
  2. User getUser(String name) throws MyException;
  3. }

服务提供者

  1. public class UserServiceImpl implements UserService {
  2. public User getUser(String name) throws MyException{
  3. User user = new User();
  4. if("tom".equals(name)){
  5. user.setAge(12);
  6. user.setId(111L);
  7. user.setName("tom");
  8. }else {
  9. throw new MyOnlyException("业务异常");
  10. }
  11. return user;
  12. }

drpcproxy-provider.properties

  1. service.impls=\
  2. # com.zph1000.demo.serviceimpl.TestServiceImpl,\
  3. com.zph1000.demo.serviceimpl.UserServiceImpl
  4. drpc.spout.num=1
  5. drpc.dispatch.bolt.num=1
  6. drpc.result.bolt.num=1
  7. drpc.spout.name=spout_name
  8. drpc.topology.name=topology_name

启动脚本

  1. storm jar provider.jar com.zph0000.demo.ConfigMain drpcSpoutName topologyName

服务消费者

  1. public class Runner {
  2. public static void main(String[] args) {
  3. ServiceImplFactory.init();
  4. UserService userService = ServiceImplFactory.newInstance(UserService.class);
  5. User user = userService.getUser("tom");
  6. System.out.println("------------user:"+user.toString());
  7. }
  8. }

drpcproxy-consumer.properties

  1. drpc.client.config.storm.thrift.transport=org.apache.storm.security.auth.SimpleTransportPlugin
  2. drpc.client.config.storm.nimbus.retry.times=3
  3. drpc.client.config.storm.nimbus.retry.interval.millis=10000
  4. drpc.client.config.storm.nimbus.retry.intervalceiling.millis=60000
  5. drpc.client.config.drpc.max_buffer_size=104857600
  6. drpc.client.host=192.168.1.81
  7. drpc.client.port=3772
  8. drpc.client.timeout=50000
  9. topology.mapping.config.zph1000-service-provider=\
  10. # com.zph0000.demo.service.TestService,\
  11. com.zph0000.demo.service.UserService
  12. #topology.mapping.config.zph1000-service-provider-spring=\
  13. # com.zph1000.demo.service.UserService
  14. #remote,local,rely
  15. drpc.pattern=${profiles.pattern}

用法2(spring环境-springboot)

服务接口API

  1. public interface UserService {
  2. UserDto getUser(Long id);
  3. }

服务提供者

  1. @Service
  2. @Transactional
  3. public class UserServiceImpl implements UserService {
  4. @Autowired
  5. private UserRepository userRepository;
  6. public UserDto getUser(Long id) {
  7. return convert(userRepository.findOne(id));
  8. }
  9. }

drpcproxy-provider.properties

  1. #业务所在的包名,使用AnnotationConfigApplicationContext创建spring上下文环境 ,建议使用springboot,可支持基于xml构建上下文
  2. service.impls=com.zph0000.demo
  3. drpc.spout.num=1
  4. drpc.dispatch.bolt.num=1
  5. drpc.result.bolt.num=1
  6. drpc.spout.name=spout_name
  7. drpc.topology.name=topology_name

启动脚本

  1. storm jar provider.jar com.zph1000.proxy.SpringMain drpcSpoutName topologyName

服务消费者

  1. @RestController
  2. @RequestMapping("user")
  3. public class UserController {
  4. @Autowired
  5. private UserService userService;
  6. @RequestMapping(value = "/{id}",method = RequestMethod.GET)
  7. public UserDto getUser(@PathVariable("id") Long id){
  8. return userService.getUser(id);
  9. }
  10. }

drpcproxy-consumer.properties

  1. drpc.client.config.storm.thrift.transport=org.apache.storm.security.auth.SimpleTransportPlugin
  2. drpc.client.config.storm.nimbus.retry.times=3
  3. drpc.client.config.storm.nimbus.retry.interval.millis=10000
  4. drpc.client.config.storm.nimbus.retry.intervalceiling.millis=60000
  5. drpc.client.config.drpc.max_buffer_size=104857600
  6. drpc.client.host=192.168.1.81
  7. drpc.client.port=3772
  8. drpc.client.timeout=5000
  9. topology.mapping.config.zph1000-service-provider-spring=\
  10. # com.zph1000.demo.service.GroupService,\
  11. com.zph1000.demo.service.UserService

StormConfig

  1. @Profile({"local","remote"})
  2. @Configuration
  3. @ServiceScan(basePackages = "com.zph0000.demo.service",
  4. // excludeClasses = {UserService.class},
  5. rpcHandleBeanRef="stormDrpcHandle")
  6. public class StormConfig {
  7. @Profile("local")
  8. @Scope("singleton")
  9. @Bean("stormDrpcHandle")
  10. public RpcHandle getStormLocalRpcHandle(){
  11. StormLocalDrpcHandle drpcHandle = null;
  12. try {
  13. Set<String> serviceImpls = ServiceImplFactory.loadServiceImpls();
  14. SpringBoltHandle springBoltHandle = new SpringBoltHandle(serviceImpls.toArray(new String[serviceImpls.size()]));
  15. drpcHandle = new StormLocalDrpcHandle(springBoltHandle);
  16. } catch (IOException e) {
  17. throw new RuntimeException("初始化stormDrpcHandle失败");
  18. }
  19. return drpcHandle;
  20. }
  21. @Bean
  22. @ConfigurationProperties("drpc.client")
  23. public DrpcClientConfig getDrpcClientConfig(){
  24. return new DrpcClientConfig();
  25. }
  26. @Bean
  27. @ConfigurationProperties("topology.mapping")
  28. public TopologyMapping getTopologyMapping(){
  29. return new TopologyMapping();
  30. }
  31. @Profile("remote")
  32. @Bean("stormDrpcHandle")
  33. public RpcHandle getStormRemoteRpcHandle(DrpcClientConfig clientConfig,TopologyMapping topologyMapping){
  34. Config config = new Config();
  35. config.putAll(clientConfig.getConfig());
  36. return new StormRemoteDrpcHandle(config,clientConfig.getHost(),clientConfig.getPort(),clientConfig.getTimeout(),topologyMapping.getConfig());
  37. }
  38. class DrpcClientConfig{
  39. private String host;
  40. private Integer port;
  41. private Integer timeout;
  42. private Map<String,String> config ;
  43. //getter setter
  44. }
  45. class TopologyMapping {
  46. Map<String, Set<String>> config;
  47. //getter setter
  48. }
  49. }