Spring Cloud Stream入门

这篇文章与Pivotal的Spring产品和技术营销经理Ben Wilcock共同撰写。?已上传文件 ?
?已注册新用户 ?
?下了订单 sound这些听起来像是我们的应用程序体系结构的许多部分可能感兴趣的事件,对吗?例如,当在我们的网站上下订单时,我们需要一个电话来处理付款,一个电话来预订股票,以及一个电话来开始挑选,盘点和运输产品的过程。对于单个订单来说,还算不错。我们的商店可以直接向这些后端服务提出一些请求,并且不应造成太多的管理费用。但是,如果我们真的很擅长销售产品,会发生什么情况?每秒处理100个订单突然意味着我们的前端每秒对后端服务进行300次呼叫。如果我们再添加一项服务(例如,向内部销售信息中心报告),则现在每秒有400个呼叫。这会带来很多开销如果相反,我们可以让我们的网站立即提醒整个体系结构怎么办?它会大喊:“嘿我进行了一次销售”,所有感兴趣的组件都可以采取适当的措施。这意味着我们在添加其他服务时不需要更新前端,而我们的新服务只需要知道要听的内容即可。为什么选择Spring Cloud Stream?以上是事件驱动架构的示例,在该架构中,我们的服务不是逐个接触每个服务,而是发出状态更改。如果上传了文件,我们的文件服务可以将其发送到消息传递平台,然后我们的Super Duper Image Resizer 3000服务可以侦听并自动生成大小不同的配置文件图像。 Pivotal自己的Richard Seroter专门就这个主题写了一篇文章,读起来很棒。理查德(Richard)在他的博客文章中谈到消息传递是一种将事件快速可靠地传递给许多消费者的方式,他还谈到了我们今天要谈论的话题:Spring Cloud Stream.Kafka和RabbitMQ都是我们的忠实粉丝作为事件流平台,因此在此演示中,我们将使用Kafka。不管您选择使用哪种方法,使其易于产生和使用事件对于开发人员而言都很重要。我使用了许多框架,这些框架从底层消息队列中抽象出来,但是没有一个框架像Spring Cloud Stream一样容易和灵活。我的队友本·威尔考克(Ben Wilcock)整理了一个演示,真正演示了启动和运行有多么容易。让我们一起旋转一下-然后,您可以在此处下载完整的源代码。为演示做准备我们的演示只需要几件事,即Docker和Docker Compose,当然,您最喜欢的发行版也是如此。 JDK(甚至可以采用我们赞助的OpenJDK)。为了简化起见,该演示包含一个Docker Compose配置,该配置将同时设置Kafka和RabbitMQ,尽管出于我们的目的,我们仅使用Kafka。我们可以使用一个简单的命令将其旋转:docker-compose up这将读取我们的docker-compose.yml文件,下载必要的容器映像,运行它们并对其进行配置。片刻之后,Kafka应该就可以启动并运行了。准备发送事件我们的演示由两个Spring微服务组成,一个用于产生事件,另一个用于消耗事件。在我们的虚拟场景中,消息生产者将创建银行贷款申请流,我们的处理者将检查这些申请是否应被批准或拒绝。首先生成一些要发送到Kafka的消息,该消息的代码位于loansource目录中。此处有一些代码文件。 Loan.java文件定义了一个贷款对象,Statuses.java文件定义了一个贷款可以进入的所有状态。但是,有趣的是LoansourceApplication.java文件,它实际上是在生成我们的消息。可以想象,Spring及其依赖项自动为我们处理了许多组件连接。让我们看看LoansourceApplication.java看看它是如何工作的。
公共供应商 supplyLoan(){
  return()-> {
    字符串rName = names.get(new Random()。nextInt(names.size()));
    长rAmount = amount.get(new Random()。nextInt(amounts.size()));
    贷款=新贷款(UUID.randomUUID()。toString(),rName,rAmount);
    log.info(“ {} {} for ${} for {}”,loan.getStatus(),loan.getUuid(),loan.getAmount(),loan.getName());
    退还贷款;
  };
} Supplier <>是Java函数数据类型。因为只有一个@Bean方法返回此类型,所以Spring Cloud Stream确切知道下一步该怎么做。默认情况下,它将每秒触发一次此功能,并将结果发送到名为Output的默认MessageChannel。这个函数方法的好处是它仅包含业务逻辑,因此您可以使用自己喜欢的测试方法对其进行测试。我们可以使用application.properties文件中的spring.cloud.function.definition属性来显式声明我们使用哪个函数bean想要绑定到绑定目标,但是对于仅定义一个@Bean的情况,则没有必要。同样,如果要使用其他轮询间隔,则可以在application.properties文件中使用spring.integration.poller.fixed-delay属性。唯一剩下的问题是:“ Spring如何知道我们正在写的Kafka?”为此,我们来看一下pom.xml:
org.springframework.cloud
春天云流粘结剂卡夫卡
在我们的代码中提供这种依赖关系会告诉Spring,“我想将这些消息发送给Kafka”。由于我们的Kafka服务器正在默认端口上的localhost上进行侦听,因此我们不需要在application.properties文件中提供任何其他配置,但是如果不是这样,我们当然可以提供主机名,端口等信息,身份验证等我们可以运行代码并激活“kafka''配置文件,我们已将其配置为包含Kafka SCS绑定的配置文件,我们应该看到它开始产生消息:
./mvnw软件包spring-boot:run -DskipTests = true -Pkafka稍后,我们将看到我们的应用程序开始创建新的贷款并将其发送到Kafka:2019-10-15 … LoansourceApplication:PENDING 9eff9b58-e1f1- Donald的474d-8f1d-aa4db8dbb75a售价$10000000
2019-10-15 … LoansourceApplication:PENDING d507c06c-81bb-4a98-8f85-38f74af36984为$100 for Jacinda
2019-10-15 … LoansourceApplication:待售$100 for Jacinda的19fc86a4-d461-470c-8005-423ce1a258e7
2019-10-15 … LoansourceApplication:PENDING 33f3756c-ea9b-472f-bad2-73f1647188b1为$10000 for Vladimir
2019-10-15 … LoansourceApplication:Pending 1625d10f-c1c8-4e75-8fe8-10ce363ef56f为Theresa,价格为$10000000如果您愿意,还可以使用KafDrop在浏览器中查看消息。只需将浏览器指向localhost:9000,您应该会看到一个UI,该UI可让您查看存储在Kafka中的消息:接收事件我们这里只有一半的方程式,但是我们还需要一些东西来消耗和处理这些事件。为此,我们将在“贷款检查”目录中查找。对于演示的后半部分,我们的贷款检查器将观察每个申请并批准或拒绝。如果获得批准,则将批准消息发送到已批准的主题,否则,将拒绝消息发送至拒绝的主题。您可以从此处推断出,线下的其他系统可以侦听并提取这些消息以进行进一步处理。例如,付款系统可能会监听已批准的贷款以开始处理。我们将看到此处的代码略有不同,只是指向不同的主题。我们在LoanCheckApplication.java中看到@EnableBinding(LoanProcessor.class)批注,这意味着我们所有的通道绑定定义都在LoanProcessor类中找到。在LoanProcessor.java文件中,我们将看到定义了我们正在侦听的MessageChannel名为输出,与生产者写入的默认主题匹配。此外,我们定义了两个将要写入,已批准和已拒绝的MessageChannel。对于每种方法,我们还定义了在这些通道上收到消息时要调用的方法。
公共接口LoanProcessor {
  字符串APPLICATIONS_IN =“输出”;
  字符串APPROVED_OUT =“已批准”;
  字符串DECLINED_OUT =“拒绝”;

  @Input(APPLICATIONS_IN)
  SubscribableChannel sourceOfLoanApplications();

  @Output(APPROVED_OUT)
  MessageChannel批准();

  @输出(DECLINED_OUT)
  MessageChannel拒绝();
}最后,如果我们查看LoanChecker.java文件,我们可以看到这与调用哪种方法有联系。我们将看到我们有一个方法checkAndSortLoans,其@StreamListener批注与我们先前定义的输入相匹配:@StreamListener(LoanProcessor.APPLICATIONS_IN)
公共无效checkAndSortLoans(贷款){
  log.info(“ {} {} for ${} for {}”,loan.getStatus(),loan.getUuid(),loan.getAmount(),loan.getName());

  如果(loan.getAmount()> MAX_AMOUNT){
    loan.setStatus(Statuses.DECLINED.name());
    handler.declined()。send(message(loan));
  }其他{
    loan.setStatus(Statuses.APPROVED.name());
    Processor.approved()。send(message(loan));
  }
}我们可以像打开贷款一样启动此代码,方法是打开一个单独的终端并运行以下命令:cd loancheck
./mvnw软件包spring-boot:run -DskipTests = true -Pkafka过了一会儿,我们将开始看到待处理的消息通过,然后分类为批准或拒绝:2019-10-15 … LoanChecker:待处理95a887cf -ab5f-48c4-b03b-556675446cfc为$1000 for Kim
2019-10-15 … LoanChecker:已批准95a887cf-ab5f-48c4-b03b-556675446cfc为$1000 for Kim
2019-10-15 … LoanChecker:悬而未决的a15f13fe-fc9a-40fb-b6f0-24106a18c0cd for $100000000 for Angela
2019-10-15 … LoanChecker:拒绝了a15f13fe-fc9a-40fb-b6f0-24106a18c0cd为US $100000000为Angela
封装UpSpring Cloud Stream为潜在的复杂消息传递平台提供了极其强大的抽象,将产生消息的行为变成了几行代码。如果您的基础架构需要更改,并且您需要迁移到新的消息传递平台,则除了pom文件外,没有一行代码需要更改。无论您使用的是Kafka,RabbitMQ还是GCP Pub / Sub或Azure Event Hub等云提供商的解决方案,Spring Cloud Stream都意味着它可以轻松快速地启动并运行。在Pivotal的技术营销团队工作,重点是为Pivotal客户以及Cloud Foundry,BOSH和Knative社区提供技术培训。在加入Pivotal之前,Brian从事软件的开发和运营,主要负责Cloud Foundry和BOSH的工作,涉及金融,娱乐和技术等许多行业的公司。他喜欢在许多技术领域中学习和试验,更重要的是分享在此过程中获得的经验教训。
          
          
          
          

          

          
          
          
        

资讯来源:由0x资讯编译自HACKERNOON。版权归作者所有,原文链接:https://hackernoon.com/getting-started-with-spring-cloud-stream-l84p2w5b。未经许可,不得转载
提示:投资有风险,入市需谨慎,本资讯不作为投资理财建议。请理性投资,切实提高风险防范意识;如有发现的违法犯罪线索,可积极向有关部门举报反映。
你可能还喜欢