文章
网格计算
作者:金辉 (Oracle)
2016 年 7 月 4 日发布
第 1 章 — 漫说内存网格
第 2&3 章 — 安装和配置开发环境
第 4 章 — 开始第一个 Coherence 程序
第 5 章 — 在 Coherence 中使用 POF 对象
第 6 章 — 分布式内存集群的数据灌注、查询和统计
第 7 章 — 监听器处理内存集群中变化的对象
第 8 章 — 使用 ZFS 加密
第 9 章 — 内存集群与数据库的互操作
第 10 章 — Coherence 内存集群的安全访问
第 11 章 — 基于内存集群的事件处理
本章内容包含三部分
com.tangosol.util.ObservableMap 接口提供了观察发生在内存集群中对象变化的机制。它扩展了 java.util.EventListener 并使用标准 JavaBeans 事件模式。所有 NamedCache 类型的实例都实现了这个接口。为侦听事件,需要在缓存上注册 MapListener(com.tangosol.util.MapListener) 实例。MapListener 实例被客户端所调用。这意味着 listener 代码在客户端进程中执行。
侦听到几种方法:
不同方法的代码示例如下:
void addMapListener(MapListener listener) void addMapListener(MapListener listener, Filter filter, boolean fLite) void addMapListener(MapListener listener, Object oKey, boolean fLite)
com.tangosol.util.MapEvent 类捕获对象的键,旧值与新值。方法当事件发生时可以用 getOldValue或getNewValue。下面的例子演示了如何在缓存中注册事件监听器。
namedCache.addMapListener(new MapListener()
{
public void entryDeleted(MapEvent mapEvent)
{
// TODO... 当删除事件发生时
}
public void entryInserted(MapEvent mapEvent)
{
// TODO... 当写入事件发生时
}
public void entryUpdated(MapEvent mapEvent)
{
// TODO... 当更新事件发生时
}
}
本章介绍如何监听缓存,及相应处理。
在加载数据的应用中,为创建 Contact 对象创建监听的类。把这个类的名字命名为 ObserverExample 并确保有 main 函数。
在这个类里,增加 listener 来显示当缓存中的 Contact 发生的变化。下面的例子,从控制台应用读取输入
BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); String text = console.readLine();
在这个类里,创建一个继承 AbstractMapListner 的 inner class。实现 insert,update 和 delete 缓存中的值。在这个场景中,基于包含在 MapEvent 中的新旧值,多数工作在 entryUpdated 方法中处理。
package com.oracle.handson.chapter07;
import com.tangosol.net.NamedCache;
import com.tangosol.util.AbstractMapListener;
import com.tangosol.util.MapEvent;
import com.oracle.handson.chapter05.Contact;
import com.tangosol.net.CacheFactory;
import java.io.IOException;
/**
* ObserverExample observes changes to contacts.
*/
public class ObserverExample {
public ObserverExample() {
}
// ----- ObserverExample methods -------------------------------------
public static void main(String[] args) {
NamedCache cache = CacheFactory.getCache("ContactsCache");
new ObserverExample().observe(cache);
try {
System.in.read();
} catch (IOException e) {
}
}
/**
* Observe changes to the contacts.
*
* @param cache
* target cache
*/
public void observe(NamedCache cache) {
cache.addMapListener(new ContactChangeListener());
}
// ----- inner class: ContactChangeListener -------------------------
public class ContactChangeListener extends AbstractMapListener {
// ----- MapListener interface
// ------------------------------------------
public void entryInserted(MapEvent event) {
System.out.println(event);
}
public void entryUpdated(MapEvent event) {
Contact contactOld = (Contact) event.getOldValue();
Contact contactNew = (Contact) event.getNewValue();
StringBuffer sb = new StringBuffer();
if (!contactOld.getHomeAddress().equals(contactNew.getHomeAddress())) {
sb.append("Home address ");
}
if (!contactOld.getWorkAddress().equals(contactNew.getWorkAddress())) {
sb.append("Work address ");
}
if (!contactOld.getTelephoneNumbers().equals(contactNew.getTelephoneNumbers())) {
sb.append("Telephone ");
}
if (contactOld.getAge() != contactNew.getAge()) {
sb.append("Birthdate ");
}
sb.append("was updated for ").append(event.getKey());
System.out.println(sb);
}
public void entryDeleted(MapEvent event) {
System.out.println(event.getKey());
}
}
}
1. 为 ObserverExample 创建运行配置。右键工程中的 ObserverExample->Run As,在 Run Configurations 对话框中,我们复制 chapter06CacheClientQueryExample,并更名为 chapter07CacheClientObserverExample
a) 在 Main 选卡中,确定 com.oracle.handson.chapter07.ObserverExample
b) 在 General 选卡 -Coherence 选卡,确保 contacts-cache-config.xml
c) 确保选择 Disabled (cache client)
d) 确保 Cluster port 是 3155
e) Other 选卡,确保 tangosol.pof.config 配置 contacts-pof-config.xml 作为 POF 配置文件
f) Common 选卡,选择 Shared file,并确保是 \chapter03_07 工程
2. 停止所有 Cache servers
3. 启动 CacheServer,复制 chapter06CacheServer为chapter07CacheServer,并运行。
4. 运行 chapter06CacheClientLoaderExample。
5. 运行 chapter07CacheClientObserverExample,这时程序将如下图等待输入:
下一节,将介绍修改缓存中的记录,并返回变更。
在本节中,我们将创建一个 Java 类修改缓存中的条目,并返回修改的记录。
直到现在,我们使用处理缓存条目的方法是 put 和 get 操作。然而在并发访问数据时可用更好的处理。Entry processors(com.tangosol.util.InvocableMap.EntryProcessor) 是执行针对条目进行处理的代理。条目在所在地直接被处理。处理包括 create、update、remove data 或单纯计算。在多节点环境分区模式下,处理是平行执行,所以是支持水平扩展。由于数据不在客户端而在缓存中处理,还节省了 I/O 的开销。
同样的键将在逻辑上被队列化的,由 Entry processors 处理。这允许 lock-free(高性能)处理。com.tangosol.util.InvocableMap接口(被NamedCache 实现) 有如下操作方法:
| Object invoke(Object oKey, InvocableMap.EntryProcessor processor) |
| 针对独立对象调用 EntryProcessor 并返回调用结果。 |
| Map invokeAll(Collection keys, InvocableMap.EntryProcessor processor) |
| 基于键的集合调用 EntryProcessor 并返回每一个调用结果。 |
| Map invokeAll(Filter filter, InvocableMap.EntryProcessor processor) |
| 针对符合 filter 条件的集合调用 EntryProcessor ,并返回每一个调用结果。 |
| 注释: EntryProcessor 类必须在每一个节点的类路径(ClassPath)中。 |
创建一个entry process,可以继承 com.tangosol.util.processes.AbstractProcessor 和实现 process() 方法。比如,下面的代码演示了创建一个 EntryProcessor 实例来变更 Contacts 中的员工对象:
public static class OfficeUpdater extends AbstractProcessor implements PortableObject
...
public Object process(InvocableMap.Entry entry)
{
Contact contact = (Contact) entry.getValue();
contact.setWorkAddress(m_addrWork);
entry.setValue(contact);
return null;
}
为调用 OfficeUpdater 类,我们可以在使用 invokeAll 方法时将 OfficeUpdater 类作为参数。
cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"),new OfficeUpdater(addrWork));
本节内容,将创建一个 Java 类实例化 EntryProcessor 在缓存总更新。上一节的 ObserverExample 类将检测到数据的变化。
1. 在缓存中创建一个类用于更新条目
2. 写代码,查找住在马萨诸塞州,更新他们的工作地址。
一个 inner class 实现 PortableObject 接口(用于序列化和反序列化)并包含一个 EntryProcessor 实例来设置工作地址。使用 Filter 来过滤集群中符合条件的 Contacts。
package com.oracle.handson.chapter07;
import com.tangosol.net.NamedCache;
import com.tangosol.util.filter.EqualsFilter;
import com.tangosol.util.processor.AbstractProcessor;
import com.tangosol.util.InvocableMap;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.oracle.handson.chapter05.Address;
import com.oracle.handson.chapter05.Contact;
import com.tangosol.net.CacheFactory;
import java.io.IOException;
/**
* ProcessorExample executes an example EntryProcessor.
*
*/
public class ProcessorExample {
public ProcessorExample() {
}
public static void main(String[] args) {
NamedCache cache = CacheFactory.getCache("ContactsCache");
new ProcessorExample().execute(cache);
}
// ----- ProcessorExample methods -----------------------------------
public void execute(NamedCache cache) {
// People who live in Massachusetts moved to an in-state office
Address addrWork = new Address("200 Newbury St.", "Yoyodyne, Ltd.", "Boston", "MA", "02116", "US");
cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"), new OfficeUpdater(addrWork));
}
// ----- nested class: OfficeUpdater ------------------------------------
/**
* OfficeUpdater updates a contact's office address.
*/
public static class OfficeUpdater extends AbstractProcessor implements PortableObject {
// ----- constructors -------------------------------------------
/**
* Default constructor (necessary for PortableObject implementation).
*/
public OfficeUpdater() {
}
public OfficeUpdater(Address addrWork) {
m_addrWork = addrWork;
}
// ----- InvocableMap.EntryProcessor interface ------------------
public Object process(InvocableMap.Entry entry) {
Contact contact = (Contact) entry.getValue();
contact.setWorkAddress(m_addrWork);
entry.setValue(contact);
System.out.println("Work address was updated for " + contact.getFirstName() + " " + contact.getLastName());
return null;
}
// ----- PortableObject interface -------------------------------
public void readExternal(PofReader reader) throws IOException {
m_addrWork = (Address) reader.readObject(0);
}
public void writeExternal(PofWriter writer) throws IOException {
writer.writeObject(0, m_addrWork);
}
// ----- data members -------------------------------------------
private Address m_addrWork;
}
}
编辑 contacts-pof-config.xml 文件,增加 OfficeUpdater 条目,如下:
...
<user-type>
<type-id>1006</type-id>
<class-name>com.oracle.handson.chapter07.ProcessorExample$OfficeUpdater</class-name>
</user-type>
...
1. 为 ProcessorExample 创建运行配置。简化操作就是复制 7.2.2 中的运行配置 chapter07CacheClientObserverExample,将其变更为 chapter07CacheClientProcessorExample
a) Main 选卡中,类 com.oracle.handson.chapter07.ProcessorExample;
b) 在 General 选卡 -Coherence 选卡,确保 contacts-cache-config.xml;
c) 确保选择 Disabled (cache client);
d) 确保 Cluster port 是 3155;
e) Other 选卡,确保 tangosol.pof.config 配置 contacts-pof-config.xml 作为 POF 配置文件;
f) Common 选卡,选择 Shared file,并确保是 \chapter03_07 工程;
2. 按照如下步骤操作,测试运行 ObserverExample 和 ProcessorExample
3. 停止所有运行的 cache servers
4. 重启 ContactsCacheServer,运行 chapter07CacheServer
5. 运行 chapter06CacheClientLoaderExample,加载数据
6. 运行chapter07CacheClientObserverExample
7. 运行chapter07CacheClientProcessorExample
![]() | 金辉,在 Oracle 负责云和中间件产品的资深售前顾问和团队经理,有十多年的中间件和项目管理经验,专注在企业云构建,企业集成解决方案领域,熟悉业内主要的 SOA 集成产品。参加过北京马拉松和 TNF50 越野比赛。你可以通过邮件 arthur.jin@oracle.com 与他联系。 |