服务提供者接口 我需在在服务提供者中提供两个接口供服务消费者调用,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @RequestMapping("/getbook6") public List<Book> book6(String ids) { System.out.println("ids>>>>>>>>>>>>>>>>>>>>>" + ids); ArrayList<Book> books = new ArrayList<>(); books.add(new Book("《李自成》", 55, "姚雪垠", "人民文学出版社")); books.add(new Book("中国文学简史", 33, "林庚", "清华大学出版社")); books.add(new Book("文学改良刍议", 33, "胡适", "无")); books.add(new Book("ids", 22, "helloworld", "haha")); return books; } @RequestMapping("/getbook6/{id}") public Book book61(@PathVariable Integer id) { Book book = new Book("《李自成》2", 55, "姚雪垠2", "人民文学出版社2"); return book; }
第一个接口是一个批处理接口,第二个接口是一个处理单个请求的接口。在批处理接口中,服务消费者传来的ids参数格式是1,2,3,4…这种格式,正常情况下我们需要根据ids查询到对应的数据,然后组装成一个集合返回,我这里为了处理方便,不管什么样的请求统统都返回一样的数据集;处理单个请求的接口就比较简单了,不再赘述。
服务消费者 OK,服务提供者处理好之后,接下来我们来看看服务消费者要怎么处理。
BookService 首先在BookService中添加两个方法用来调用服务提供者提供的接口,如下:
1 2 3 4 5 6 7 8 9 public Book test8(Long id) { return restTemplate.getForObject("http://HELLO-SERVICE/getbook6/{1}", Book.class, id); } public List<Book> test9(List<Long> ids) { System.out.println("test9---------"+ids+"Thread.currentThread().getName():" + Thread.currentThread().getName()); Book[] books = restTemplate.getForObject("http://HELLO-SERVICE/getbook6?ids={1}", Book[].class, StringUtils.join(ids, ",")); return Arrays.asList(books); }
test8用来调用提供单个id的接口,test9用来调用批处理的接口,在test9中,我将test9执行时所处的线程打印出来,方便我们观察执行结果,另外,在RestTemplate中,如果返回值是一个集合,我们得先用一个数组接收,然后再转为集合。
BookBatchCommand OK,BookService中的方法准备好了后,我们就可以来创建一个BookBatchCommand,这是一个批处理命令,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class BookBatchCommand extends HystrixCommand<List<Book>> { private List<Long> ids; private BookService bookService; public BookBatchCommand(List<Long> ids, BookService bookService) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CollapsingGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("CollapsingKey"))); this.ids = ids; this.bookService = bookService; } @Override protected List<Book> run() throws Exception { return bookService.test9(ids); } }
这个类实际上和我们在上篇博客中介绍的类差不多,都是继承自HystrixCommand,用来处理合并之后的请求,在run方法中调用BookService中的test9方法。
BookCollapseCommand 接下来我们需要创建BookCollapseCommand继承自HystrixCollapser来实现请求合并。如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class BookCollapseCommand extends HystrixCollapser<List<Book>, Book, Long> { private BookService bookService; private Long id; public BookCollapseCommand(BookService bookService, Long id) { super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("bookCollapseCommand")).andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100))); this.bookService = bookService; this.id = id; } @Override public Long getRequestArgument() { return id; } @Override protected HystrixCommand<List<Book>> createCommand(Collection<CollapsedRequest<Book, Long>> collapsedRequests) { List<Long> ids = new ArrayList<>(collapsedRequests.size()); ids.addAll(collapsedRequests.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList())); BookBatchCommand bookBatchCommand = new BookBatchCommand(ids, bookService); return bookBatchCommand; } @Override protected void mapResponseToRequests(List<Book> batchResponse, Collection<CollapsedRequest<Book, Long>> collapsedRequests) { System.out.println("mapResponseToRequests"); int count = 0; for (CollapsedRequest<Book, Long> collapsedRequest : collapsedRequests) { Book book = batchResponse.get(count++); collapsedRequest.setResponse(book); } } }
关于这个类,我说如下几点:
1.首先在构造方法中,我们设置了请求时间窗为100ms,即请求时间间隔在100ms之内的请求会被合并为一个请求。 2.createCommand方法主要用来合并请求,在这里获取到各个单个请求的id,将这些单个的id放到一个集合中,然后再创建出一个BookBatchCommand对象,用该对象去发起一个批量请求。 3.mapResponseToRequests方法主要用来为每个请求设置请求结果。该方法的第一个参数batchResponse表示批处理请求的结果,第二个参数collapsedRequests则代表了每一个被合并的请求,然后我们通过遍历batchResponse来为collapsedRequests设置请求结果。
OK,所有的这些操作完成后,我们就可以来测试啦。
测试 我们在服务消费者端创建访问接口,来测试合并请求,测试接口如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @RequestMapping("/test7") @ResponseBody public void test7() throws ExecutionException, InterruptedException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); BookCollapseCommand bc1 = new BookCollapseCommand(bookService, 1l); BookCollapseCommand bc2 = new BookCollapseCommand(bookService, 2l); BookCollapseCommand bc3 = new BookCollapseCommand(bookService, 3l); BookCollapseCommand bc4 = new BookCollapseCommand(bookService, 4l); Future<Book> q1 = bc1.queue(); Future<Book> q2 = bc2.queue(); Future<Book> q3 = bc3.queue(); Book book1 = q1.get(); Book book2 = q2.get(); Book book3 = q3.get(); Thread.sleep(3000); Future<Book> q4 = bc4.queue(); Book book4 = q4.get(); System.out.println("book1>>>"+book1); System.out.println("book2>>>"+book2); System.out.println("book3>>>"+book3); System.out.println("book4>>>"+book4); context.close(); }
关于这个测试接口我说如下两点:
1.首先要初始化HystrixRequestContext 2.创建BookCollapseCommand类的实例来发起请求,先发送3个请求,然后睡眠3秒钟,再发起1个请求,这样,前3个请求就会被合并为一个请求,第四个请求因为间隔的时间比较久,所以不会被合并,而是单独创建一个线程去处理。
OK,我们来看看执行结果,如下:
通过注解实现请求合并 OK,上面这种请求合并方式写起来稍微有一点麻烦,我们可以使用注解来更优雅的实现这一功能。首先在BookService中添加两个方法,如下:
1 2 3 4 5 6 7 8 9 10 11 @HystrixCollapser(batchMethod = "test11",collapserProperties = {@HystrixProperty(name ="timerDelayInMilliseconds",value = "100")}) public Future<Book> test10(Long id) { return null; } @HystrixCommand public List<Book> test11(List<Long> ids) { System.out.println("test9---------"+ids+"Thread.currentThread().getName():" + Thread.currentThread().getName()); Book[] books = restTemplate.getForObject("http://HELLO-SERVICE/getbook6?ids={1}", Book[].class, StringUtils.join(ids, ",")); return Arrays.asList(books); }
在test10方法上添加@HystrixCollapser注解实现请求合并,用batchMethod属性指明请求合并后的处理方法,collapserProperties属性指定其他属性。
OK,在BookService中写好之后,直接调用就可以了,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @RequestMapping("/test8") @ResponseBody public void test8() throws ExecutionException, InterruptedException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); Future<Book> f1 = bookService.test10(1l); Future<Book> f2 = bookService.test10(2l); Future<Book> f3 = bookService.test10(3l); Book b1 = f1.get(); Book b2 = f2.get(); Book b3 = f3.get(); Thread.sleep(3000); Future<Book> f4 = bookService.test10(4l); Book b4 = f4.get(); System.out.println("b1>>>"+b1); System.out.println("b2>>>"+b2); System.out.println("b3>>>"+b3); System.out.println("b4>>>"+b4); context.close(); }
和前面的一样,前三个请求会进行合并,第四个请求会单独执行,OK,执行结果如下:
总结 请求合并的优点已经看到了,多个请求被合并为一个请求进行一次性处理,可以有效节省网络带宽和线程池资源,但是,有优点必然也有缺点,设置请求合并之后,本来一个请求可能5ms就搞定了,但是现在必须再等10ms看看还有没有其他的请求一起的,这样一个请求的耗时就从5ms增加到15ms了,不过,如果我们要发起的命令本身就是一个高延迟的命令,那么这个时候就可以使用请求合并了,因为这个时候时间窗的时间消耗就显得微不足道了,另外高并发也是请求合并的一个非常重要的场景。