站内搜索: 请输入搜索关键词

当前页面: 开发资料首页J2EE 专题使用JMS客户端利用空闲的计算机资源

使用JMS客户端利用空闲的计算机资源

摘要: 使用JMS客户端利用空闲的计算机资源

使用JMS客户端利用空闲的计算机资源

摘要

  在企业中,许多计算机由于在其上执行工作的性质而未得到充分利用,或者因为过了上班时间而干脆得不到使用。在许多情况下,应用服务器耗光了宝贵的CPU(尤其是在执行CPU密集型的数学运算时),而网络上的其他计算机则闲置一旁。本文提出了一种框架,用于把Java消息服务(Java Messaging Service,JMS)客户端放置在这些未充分利用的计算机上,以分担一些通常应由服务器执行的工作。该客户端可以监听某个要执行的工作单元的请求队列,然后在应答队列中做出响应。此外,本文还给出了一个BEA WebLogic Integration 8.1架构的例子,它通过把一个工作流以及相关的Java控件用作替代框架,把工作分发到远程客户端上,从而把工作单元可靠地分发给JMS请求队列。

简介

  本文提出了一种J2EE框架,用于解决把工作分配给未充分利用的计算机资源这个难题。具体来说,可以把JMS客户端放置在这些未充分利用的计算机上,从而分担一些通常应由服务器执行的工作。该客户端可以监听某个要执行的工作单元的请求队列,然后在应答队列上做出响应。可以使用一组消息驱动bean获取应答队列上的响应消息,以便进行进一步的处理。此外,还可以使用一种servlet实现来管理性地启动用于创建(要发送给JMS客户端的)工作单元的整个子流程,并使用它来终止这个子流程。

  我使用常见的BEA WebLogic Server作为把离散的工作单元分配给分布式JMS客户端的例子。在另一个更为复杂的例子中,BEA WebLogic Integration (WLI)工作流也执行类似的分发任务,但是通过对请求队列进行监控,它在灵活性、Java控件的可重用性和可伸缩性等方面要更好一些。

使用的例子

  业内有相当多的例子可以演示如何使用JMS框架来充分利用计算机进行并行处理。例如:

使用常规的WebLogic Server来分发工作单元

  借鉴最后一个例子,我将构建一个简单的例子,用于执行矩阵乘法,同时说明如何使用JMS框架把计算工作分发给企业中的计算机资源。JMS客户端将收到一个工作单元实例,之后,它将会调用其doWork()方法。在这个简单的例子中,doWork()方法将把2个3×3的矩阵相乘,然后把结果保存到一个结果矩阵中。
接着,JMS客户端使用工作单元实例的一个副本(工作是在这上面执行的)对应答队列做出响应。一个消息驱动bean将接受已完成的工作。图1说明了我将要讨论的各个组件:


图 1.该WebLogic Server实现中的各个组件

  这种方法以常规方式使用了JMS系统。在下面的内容中,我将引入一些代码,并考虑几个扩展问题。

工作单元类

  JMS请求队列上的每个类都将实现一个UnitOfWork接口,该接口有一个特别有趣的方法,叫做doWork():

public interface UnitOfWork extends java.io.Serializable {
// This method executes itself o­n the client machine
public void doWork();
// This method prints the current contents of performed work
public void print();
// This method stores the instance into a backing store
public void store(); }

  在我们这个简洁而直观的例子中,我使用一个SimpleMatri类实现了UnitOfWork接口:

public class SimpleMatrix implements UnitOfWork {         
private Integer m1[][]; private Integer m2[][];
private Integer result[][]; private Integer rows = new Integer(3);
private Integer cols = new Integer(3);
// May initialize m1 and m2 by locating records from a database
public SimpleMatrix() { }
// This method actually multiplies m1 x m2 and stores in result
public void doWork() { }
// This method stores result into a backing store
public void store() { }
// This method prints the current contents of result
public void print() { } }

  方法的实现相当简单,限于文章的篇幅,这里就不再进行说明。请参见所附的示例代码,其中给出了完整实现。这里的要点在于,这个SimpleMatrix实例被传递给一个JMS客户端,该客户端只要调用doWork()即可利用其CPU来执行工作。对于这个例子,我不会实际从数据库中检索矩阵或者把矩阵保存到数据库中,但是在实际应用中,这是必须完成的工作。

Servlet工作创建程序

  可以使用一个servlet来创建这些UnitOfWork实例。尽管WebLogic Server启动类可以执行同样的功能,但出于管理的目的,从安全的Web浏览器发送消息给servlet要更加容易。(另一种可选的实现是使用Web服务。)如果在servlet上安置了安全性,通过身份验证的用户可以在查询字符串中传递命令,以便开始交付工作单元给JMS请求队列或停止交付。我将给出一个servlet的主干例子,以说明其中的一些有用方法。

public class WorkServlet extends HttpServlet {  
... private QueueSender qsender;
private ObjectMessage msg;
private int numMessages = 5; ...
// This places the unit of work o­n the request queue
public synchronized boolean sendMessages(int numberOfMessages,

PrintWriter o) {
for(int i=0; i SimpleMatrix simple = new SimpleMatrix();
// Send message to the request queue
try {
msg.setObject(simple);
msg.setJMSReplyTo(QueueResponder);
qsender.send(msg);
if (o != null)
o.println("Sent a Message to queue.");
} catch (Exception e)
{ System.out.println("WorkServlet:sendMessages:Cannot" +
"send message, exception raised");
e.printStackTrace(); return false; }
} return true; }
// This may place a few units of work o­nto the queue o­n startup
public void init(ServletConfig config) throws ServletException {
... } // Responds to a Query String action with keywords
// Action = SEND : place unit of work object o­nto queue
// Action = CONNECT : connect to JMS server //
Action = DISCONNECT : disconnect from JMS server
public void doGet(HttpServletRequest request,
HttpServletResponse response)
throws ServletException, IOException {
doPost( request, response ); }
public void doPost(HttpServletRequest request,
HttpServletResponse response)

throws ServletException, IOException {
... if (request.getParameter("Action").equals("SEND")

sendMessages(numberMessages, response.getWriter()); } }


  JMS客户端类的任务仅仅是接受请求队列上的消息,
调用对象上的doWork()方法在这台计算机上执行工作,
然后把结果返回给应答队列,消息驱动bean从应答队列中获取结果,以便进行进一步的处理和保存。可以检查它是否是文本消息,然后告诉客户端停止处理,从而允许发送控件消息给客户端。当然,在实际应用中,消息可能包含客户端的名称,这样就不会造成所有的客户端都停止处理。

  使用UnitOfWork接口的优点在于,JMS客户端只要编写一次,就可以用于以后实现该接口的任何类。这使得JMS客户端具有很大的通用性,可以不加修改地应用到许多不同的场景中。只需把编译后的UnitOfWork接口以及它所有的实现类都包含在客户端的类路径中。

  在这个简化模型中,客户端需要等待消息到达以启动处理。在实际情况中,客户端的方法等待的条件可以是一天中的某个时刻,比如下午5点,或者是计算机的CPU负载低于某个阈值。需要把此类逻辑添加到客户端,使之与计算机的使用安排更加一致。

消息驱动bean接收程序

  消息驱动bean实例将监听应答队列,看看有没有已完成的工作对象单元。下面给出一个例子的主干部分:

public class MessageWorkBean implements MessageDrivenBean,                                           
MessageListener { ...
// This method will receive a unit of work object to store
public void o­nMessage(Message msg) {
ObjectMessage om = (ObjectMessage) msg;
try { UnitOfWork unit = (UnitOfWork)om.getObject();
unit.print(); unit.store(); }
catch(JMSException ex) {
log("Message Driven Bean: Could not retrieve Unit of Work.");
ex.printStackTrace(); } } }

  这里有一个有趣的方法叫做onMessage()。这个方法的用途仅仅是从应答队列接收已完成的对象。接着,它将调用其print()和store()方法。我的目标是让服务器把它对这个工作单元的处理工作分发给其他计算机。我已经通过JMS客户端实现了这一点,并使用消息驱动bean把结果返回给服务器。

可扩展性方面的考虑

  在这个框架的实际实现中,我们应该要解决几个问题,从而让例子变得可以扩展。

  还应该考虑本文结尾处的参考资料。此外,对其他服务器也适用的考虑事项是把客户端部分交付给各台计算机的方式。一种方式是自愿,即每台计算机的所有者都下载一个可以在客户端计算机上配置和运行的安装程序。另一种方式是使用商业软件分发包,它可以自动下载客户端的最新版本,并把它安装在客户端计算机上。

使用WebLogic Integration工作流来分发工作

  前面给出了一种把工作单元分发给客户端的直观方法,即使用servlet和消息驱动bean。尽管该方法实现起来相当容易,但是它不能解决的问题还很多,比如如何以自支持的方式启动过程,定时把请求交付给请求队列。当然,我们不希望让管理员编写一个shell脚本来不停地调用该servlet。此外,还应该以一种应用程序可以预先控制的方式限制所使用的请求数量。考虑到这一点,下面给出一个更加复杂的例子,用于把工作单元分发给远程JMS客户端并对其做出响应,从而利用未充分使用的计算机。
该方法使用了两个在BEA WebLogic Workshop中开发的WebLogic Integration (WLI)工作流,即Java流程定义(Java Process Definition,JPD)文件,它是BPEL/J (Business Process Engineering Language for Java)的前身。BPEL/J是在JSR 207中定义的。第一个工作流响应某些Web服务请求而启动,并执行初始化以通过一个JMS控件订阅JMS请求队列。该工作流使用一个Timer控件不停地进行循环,并定时唤醒一个while循环,从而在请求队列上放置更多的工作单元。该工作流还将使用一个定制Java控件(在本文相关代码中给出)来浏览请求队列,以便决定是否需要在队列上放置更多请求来防止队列出现过载。最后,工作流还将等待来自Web服务的停止消息,然后停止处理。第二个工作流执行的任务与前面例子中的消息驱动bean相同,因为它将对响应队列中的消息做出响应,以便从出队的响应队列调用print()和store()方法。这是一个生存期很短的工作流,而WebLogic Integration将按照要求产生足够的实例。

浏览JMS队列

  WebLogic Integration被用作一种为远程流程构造和汇编服务的机制。有现成的组件程序集,即Java控件,它使得开发人员可以轻松地构建复合应用程序而不需要进行大量的开发。尽管WebLogic Integration提供了开箱即用的JMS控件,用于在使用JMS时抽象化内部细节,在某些情况下,由于要细粒度地访问底层方法,最好还是创建一个可重用的定制控件。在这个示例框架中,我需要浏览工作请求队列,以统计在队列中等待的工作项的数量,然后决定能否在队列中放入更多工作项,而不会引起队列过载。为此,我们编写了一个定制Java控件,JMSBrowse,它有一个这样的方法:

public interface JMSBrowse extends Control {  
int numberOfElementsInQueue(String qFactory, String qName); }

   这个控件的实现使用了JMS QueueBrowser类来查看一个带有给定的JMS连接工厂的给定JMS队列。它返回队列中等待处理的实例个数。本文所附的代码中提供了完整实现。

启动和停止工作流的Web服务

  为了启动和停止负责把工作单元分发给请求队列的WebLogic Integration流程,我们创建了一个Java Web Service (JWS),它服从JSR 181,带有两个方法。

public classControlWebService implements                
com.bea.jws.WebService {
/** * @common:control */
private Controls.JMSStopControlMessage JMSStopControl;
/** * @common:control */
private Controls.JMSControlMessage JMSControl;
static final long serialVersionUID = 1L; /**
* @common:operation */ public void startFlow() {
JMSControl.subscribe();
JMSControl.sendTextMessage("start");
JMSControl.unsubscribe(); }
/** * @common:operation */
public void stopFlow() {
JMSStopControl.subscribe();
JMSStopControl.sendTextMessage("stop");
JMSStopControl.unsubscribe(); } }

   该Web服务不是直接调用工作流,而是把一条消息放在JMS队列中,然后调用Worker.Message把消息发送给分发JPD。这解除了Web服务实现与工作流之间的耦合,以保持其模块性。在WebLogic Integration中,有一个概念叫做事件生成器,可以使用WebLogic Integration Administration Console对它进行配置您可以把事件生成器配置为从JMS Worker.Message中取出消息,然后将其交付给一个Message Broker通道(逻辑概念)。分发工作流监听/UnitOfWork/StartWorkflow通道,该通道被绑定在与JMS Worker.Message队列相关联的JMS事件生成器上。只要有一个String “start”消息交付到此通道上,工作流就会开始工作。类似地,开始之后,分发工作流就会在它的一个Event Choice节点中监听Message Broker通道(/UnitOfWork/StopWorkflow),以便从Worker.StopMessage JMS队列接收”stop”消息。然后,事件生成器再次把Worker.StopMessage队列上的JMS消息关联到/UnitOfWork/StopWorkflow通道,以便交付消息。

  这实际上创建了一种与启动和停止分发工作流的实现解耦合的面向服务方法。通过Web服务客户端,或者使用所提供的WebLogic Integration Workshop Test Browser,可以轻松对Web服务进行测试。

  分发工作流

  图2说明了负责分发工作单元的DistributeFlow.jpd、我们的简单矩阵对象以及请求队列的相关部分:


图2. 用于分发工作单元的工作流(单击图像查看大图)

  while循环不断地循环,直到一条stop消息改变布尔变量的值,才跳出循环并结束工作流。Event Choice等待两个Control Receive回调的其中一个。第一个回调是通过刚刚描述的Web服务从一个Message Broker通道接收一条Stop消息。第二个回调对一个Timer控件做出响应,我们已经通过该控件的属性面板对它进行了设置,每5秒钟发生一次。这将使处理继续,而下一个行为将调用定制Java控件来浏览Worker.Request队列,以获得等待处理的请求的个数。接下来,决策节点检查请求的个数是否已经超出请求的最大个数(此处的最大个数被设置为5,其值保存在一个变量中)。如果尚未超出,就会调用一个执行节点,然后使用JMS控件在请求队列中放入5个矩阵对象,如下所示:

public void perform() throws Exception {   
for(int i = 0 ; i < maxInQueue; i++) {
matrix = new SimpleMatrix();
jmsControl.sendObjectMessage(matrix); } }

响应工作流的JMS客户端

  响应工作流的JMS客户端与前面在WebLogic Server部分中描述的JMS客户端几乎一模一样。惟一的区别在于,现在客户端使用一条字节消息(而不是对象消息)对响应队列做出响应。客户端把SimpleMatrix对象转换为一个字节数组,并将其传递给响应队列。这样做的理由是,与绑定到响应队列的事件生成器相关联的Message Broker通道只能够监听数据流,即String、XML Bean或字节数组。相关代码被设计用于对WebLogic Integration请求消息和普通的WebLogic Server请求消息做出响应。

  接收一个已完成的工作单元的工作流如图3所示:


图3.接收程序的工作流

  这里的重要行为是perform节点,它用于把字节数组转换为一个对象并调用print()和store()方法。

public void perform() throws Exception {   
ByteArrayInputStream arrayInputStream = new
ByteArrayInputStream(rawData.byteValue());
ObjectInputStream objectInputStream = new
ObjectInputStream(arrayInputStream);
UnitOfWork unit = (UnitOfWork) objectInputStream.readObject();
unit.print(); unit.store(); objectInputStream.close(); }

使用WebLogic Integration工作流

  您已经了解到,使用工作流、Java控件和Message Broker通道可以提供一种更加完善的方式,把工作分发给未完全利用的计算机。只要在流程流中添加更多的行为节点,就可以让处理过程变得像您所期望的那样面面俱到。例如,该工作流可以有一个审计控件,用于在把所有发送到内部日志文件的请求放到队列中之前对其进行审计。该工作流可以把请求重定向到其他JMS队列,只要修改JMS控件的属性值即可。为了实现可扩展性,甚至可以让远程Web服务启动多个工作流实例。最后,基于业务安排,Timer控件的时间间隔可以更小。

  使用Message Broker通道和事件生成器的另一个好处在于,WebLogic Integration Administration Console可以监控事件生成器的响应消息的数量,以便实现进一步的控制。通过控制台,您可以挂起和恢复事件生成器及通道,从而对生产事件做出响应。
这种灵活性使得WebLogic Integration工作流成为一种极具吸引力的方法。

结束语

  使用远程JMS客户端来分发工作的优点在于,它有效地利用了网络计算机来进行某种类型的批处理工作,同时减轻了原来服务器的负担。这种方法的一个著名例子是Search for Extraterrestrial Intelligence (SETI@home)系统,它利用全世界的PC来执行工作单元。本文使用了一种JMS客户端的框架,还讨论了如何部署这类解决方案以实现可扩展性,力求让这种方法通用化。本文还讨论了用于把工作分发给远程客户端的多种方法,并提供了一种面向服务的方法作为首选。


↑返回目录
前一篇: 用Web Service进行企业级的门户集成
后一篇: JavaMail 1.4 发布