dubbo是阿里巴巴开源的单一长连接服务框架,底层通信采用nio框架,支持netty,mina,grizzly,默认是netty。对dubbo比较感兴趣的是:
1. client端的线程模型是什么样的?
传统的io client是请求应答模式,发送请求-->等待远程应答。dubbo底层是异步IO的,所有请求复用单一长连接,所以调用都不会阻在IO上,而是阻在Future超时wait上。
2. server端的线程模型是什么样的?
这个比较成熟了,现在一般的server都是基于nio,一批io thread负责处理io,一批worker thread负责处理业务。
一. 快速启动
学习dubbo最好的方式是快速运行起来,由于dubbo还是比较重量级的产品,之前遇到一些问题。
server端:
import java.io.IOException; import com.alibaba.dubbo.config.ApplicationConfig; import com.alibaba.dubbo.config.ProtocolConfig; import com.alibaba.dubbo.config.ServiceConfig; import com.duitang.dboss.client.test.BlogQueryService; import com.duitang.dboss.client.test.BlogQueryServiceImpl; public class DubboServerTester { public static void main(String[] args) throws IOException { BlogQueryService blogQueryService = new BlogQueryServiceImpl(); ApplicationConfig application = new ApplicationConfig(); application.setName("dubbo-test"); ProtocolConfig protocol = new ProtocolConfig(); protocol.setName("dubbo"); protocol.setPort(8989); protocol.setThreads(200); // RegistryConfig registry = new RegistryConfig(); // registry.setAddress("10.20.130.230:9090"); // registry.setUsername("aaa"); // registry.setPassword("bbb"); ServiceConfig<BlogQueryService> service = new ServiceConfig<BlogQueryService>(); // 此实例很重,封装了与注册中心的连接,请自行缓存,否则可能造成内存和连接泄漏 service.setApplication(application); // service.setRegistry(registry); service.setRegister(false); service.setProtocol(protocol); // 多个协议可以用setProtocols() service.setInterface(BlogQueryService.class); service.setRef(blogQueryService); service.setVersion("1.0.0"); // 暴露及注册服务 service.export(); System.out.println("Press any key to exit."); System.in.read(); } }注意:dubbo export服务默认依赖于RegistryConfig,如果没有配置RegistryConfig会报错.可以通过service.setRegister(false)禁用。
client:
import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import com.alibaba.dubbo.config.ApplicationConfig; import com.alibaba.dubbo.config.ReferenceConfig; import com.duitang.dboss.client.test.BlogQueryService; public class DubboClientTester { public static void main(String[] args) throws InterruptedException, IOException { ApplicationConfig application = new ApplicationConfig(); application.setName("dubbo-test"); ReferenceConfig<BlogQueryService> reference = new ReferenceConfig<BlogQueryService>(); reference.setUrl("dubbo://127.0.0.1:8989/com.duitang.dboss.client.test.BlogQueryService"); reference.setTimeout(500); reference.setConnections(10); reference.setApplication(application); reference.setInterface(BlogQueryService.class); reference.setVersion("1.0.0"); final BlogQueryService blogQueryService = reference.get(); long begin = System.currentTimeMillis(); System.out.println(blogQueryService.test()); long end = System.currentTimeMillis(); System.out.println(" cost:" + (end - begin)); ExecutorService es = Executors.newFixedThreadPool(50, new NamedThreadFactory("my test")); List<Callable<String>> tasks = new ArrayList<Callable<String>>(); for (int i = 0; i < 100000; ++i) { tasks.add(new Callable<String>() { @Override public String call() throws Exception { System.out.println("run"); System.out.println(blogQueryService.test()); System.out.println("run success"); return null; } }); } List<Future<String>> futurelist = es.invokeAll(tasks); for (Future<String> future : futurelist) { try { String result = future.get(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("------------------------------------------------------------------------------------------------------------------------------------------------\r\n"); } es.shutdown(); System.out.println("end"); System.in.read(); } static class NamedThreadFactory implements ThreadFactory { private static final AtomicInteger POOL_SEQ = new AtomicInteger(1); private final AtomicInteger mThreadNum = new AtomicInteger(1); private final String mPrefix; private final boolean mDaemo; private final ThreadGroup mGroup; public NamedThreadFactory(){ this("pool-" + POOL_SEQ.getAndIncrement(), false); } public NamedThreadFactory(String prefix){ this(prefix, false); } public NamedThreadFactory(String prefix, boolean daemo){ mPrefix = prefix + "-thread-"; mDaemo = daemo; SecurityManager s = System.getSecurityManager(); mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup(); } public Thread newThread(Runnable runnable) { String name = mPrefix + mThreadNum.getAndIncrement(); Thread ret = new Thread(mGroup, runnable, name, 0); ret.setDaemon(mDaemo); return ret; } public ThreadGroup getThreadGroup() { return mGroup; } } }
1. 通过setUrl("")来实现远程服务直连。
2. 需要注意的是默认connection只有一个,可以通过setConnections()来指定connection pool。在高负载环境下,nio的单连接也会遇到瓶颈,此时你可以通过设置连接池来让更多的连接分担dubbo的请求负载,从而提高系统的吞吐量。”
二. 代码流程
这里重点分析一下client的调用过程,client调用分为三个部分:
1). 初始化,建立连接。
2). 发送请求。
3). 等待远程应答。
(一).初始化
1. DubboProtocol.initClient()
2. Exchangers.connect(URL url, ExchangeHandler handler)
3. Exchangers.getExchanger(url).connect(url, handler)
4. HeaderExchanger.connect(URL url, ExchangeHandler handler)
5. return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
6. Transporters.getTransporter().connect(URL url, ChannelHandler handler)
7. NettyTransporter.connect(URL url, ChannelHandler listener)
8. new NettyClient(url, listener) //timeout默认值:timeout=1000;connectTimeout=3000;
9. NettyClient.doOpen() //创建netty的ClientBootstrap
bootstrap = new ClientBootstrap(channelFactory);
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout()); //注意:此timeout是timeout,而非connectTimeout
10. AbstractClient.connect()
11. NettyClient.doConnect() //如果远程地址无法连接,抛出timeout异常流程结束。
ChannelFuture future = bootstrap.connect(getConnectAddress());
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
(二).发送请求
1.DubboInvoker.doInvoke(Invocation invocation) //currentClient.request(invocation, timeout).get()
2.HeaderExchangeClient.request(invocation, timeout)
3.HeaderExchangeChannel.request(Invocation invocation,timeout)
4.AbstractPeer.send(Request request)
5.NettyChannel.send(Object message, boolean sent)
6.NioClientSocketChannel.write(message)
7.NettyHandler.writeRequested(ChannelHandlerContext ctx, MessageEvent e)
8.AbstractPeer.sent(Channel ch, Request request)
(三).等待远程应答
在调用DubboInvoker.doInvoke(Invocation invocation)中实际是调用currentClient.request(invocation, timeout).get(),此方法会返回DefaultFuture,调用get方法会阻塞直到超时,在阻塞的同时netty的io线程会接收到远程应答,如果收到响应会产生io事件调用NettyHandler.messageReceived。
1.NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
2.AbstractPeer.received(Channel ch, Object msg)
3.MultiMessageHandler.received(Channel channel, Object message)
4.AllChannelHandler.received(Channel channel, Object message)
5.DecodeHandler.received(Channel channel, Object message)
6.HeaderExchangeHandler.received(Channel channel, Object message)
7.DefaultFuture.received(Channel channel, Response response) //注意是static方法
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
}
三. dubbo client的核心
我认为dubbo client的核心在DefaultFuture。所以远程调用都不会阻在IO上,而是阻在Future超时wait上,下面忽略掉远程调用把future抽取出来。
下面是代码实现
package executor; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; public class Commands { private ExecutorService senders = Executors.newCachedThreadPool(); private ExecutorService receviers = Executors.newCachedThreadPool(); private AtomicLong counter = new AtomicLong(); public CommandResponse execute(Callable<Object> task, int timeout) { Future<Object> result = senders.submit(task); long id = counter.getAndIncrement(); CommandFuture commandFuture = new CommandFuture(id); receviers.submit(new ReceiveWorker(id, result)); return commandFuture.get(timeout); } static class ReceiveWorker implements Runnable { private Future<Object> result; private Long id; public ReceiveWorker(Long id, Future<Object> result){ super(); this.result = result; this.id = id; } @Override public void run() { try { Object obj = result.get(); CommandFuture.received(new CommandResponse(id, obj)); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } public void shutdown() { senders.shutdown(); receviers.shutdown(); } }
package executor; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class CommandFuture { private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); private CommandResponse response; private static final Map<Long, CommandFuture> FUTURES = new ConcurrentHashMap<Long, CommandFuture>(); public CommandFuture(Long id){ FUTURES.put(id, this); } public boolean isDone() { return response != null; } public CommandResponse get(int timeout) { if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (!isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start >= timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (!isDone()) { throw new TimeoutException("timeout"); } } return response; } public void doReceived(CommandResponse response) { lock.lock(); try { this.response = response; if (done != null) { done.signal(); } } finally { lock.unlock(); } } public static void received(CommandResponse response) { try { CommandFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { System.out.println("some error!"); } } finally { // CHANNELS.remove(response.getId()); } } }
package executor; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; public class Commands { private ExecutorService senders = Executors.newCachedThreadPool(); private ExecutorService receviers = Executors.newCachedThreadPool(); private AtomicLong counter = new AtomicLong(); public CommandResponse execute(Callable<Object> task, int timeout) { Future<Object> result = senders.submit(task); long id = counter.getAndIncrement(); CommandFuture commandFuture = new CommandFuture(id); receviers.submit(new ReceiveWorker(id, result)); return commandFuture.get(timeout); } static class ReceiveWorker implements Runnable { private Future<Object> result; private Long id; public ReceiveWorker(Long id, Future<Object> result){ super(); this.result = result; this.id = id; } @Override public void run() { try { Object obj = result.get(); CommandFuture.received(new CommandResponse(id, obj)); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } public void shutdown() { senders.shutdown(); receviers.shutdown(); } }
下面是jstack
相关推荐
Dubbo入门到精通架构高级课程(视频+课件+源码)分布式系列共10套
我感觉很不错的宝贝,现在和大家分享,希望能够帮到大家,如果你需要可以下载看看,很适合喜欢研究技术的人员
dubbo入门级别教程,dubbo+zookeeper环境搭建+内附实例代码,简单易用,可快速上手
Dubbo入门实例Demo 新手入门遇到好多麻烦,网上搜来的入门demo也是各种问题,百般周折自己终于倒腾出来了,与大家共享~
Dubbo入门_实战 使用dubbo优化单点登录系统、 Dubbo的架构 监控中心
dubbo 入门经验 总结 直接下载解压就好,包含安装文件和demo
dubbo入门 dubbo_demo.zip,dubbo入门 dubbo_demo.zip,dubbo入门 dubbo_demo.zip
dubbo入门实例源码,直接解压后,分别将dubboprovider和dubboconsumer项目导入myeclipse中,先启动zookeeper注册中心(bin\zkServer.cmd或zkServer.sh),再启动provider中main…………
dubbo入门helloworld例子,使用maven构建,下载后可以直接导入工程运行
dubbo 入门案例 helloworld, 文档参考 http://blog.csdn.net/likunwen_001/article/details/78894898
用maven构建项目使用spring和multicast广播注册中心方式实现Dubbo入门之hello world(用maven构建项目使用spring和multicast广播注册中心方式实现Dubbo入门之hello world(简单测试版)文档说明以及源码
Dubbo入门案例和项目源码
学习dubbo的生产者消费者的代码,仅仅只是入门阶段的,供新手有个直观的认识。
Dubbo(二)------Dubbo入门示例 https://blog.csdn.net/qq_29914837/article/details/102980012
dubbo入门案例与资料
https://blog.csdn.net/qq_29914837/article/details/103724067 Dubbo(五)------Dubbo入门示例(基于api配置)
https://blog.csdn.net/qq_29914837/article/details/103720188 Dubbo(四)------Dubbo入门示例(基于属性配置)
Dubbo入门搭建zookeeper集群+服务端消费端demo项目也可以到我的GitHub上下载:https://github.com/panyingting/study
zookeeper+dubbo入门案例 消费方+提供方