Oracle Coherence 内存网格

第 7 章 — 监听器处理内存集群中变化的对象

 

作者:金辉 (Oracle)

2016 年 7 月 4 日发布


右箭头 第 1 章 — 漫说内存网格
右箭头 第 2&3 章 — 安装和配置开发环境
右箭头 第 4 章 — 开始第一个 Coherence 程序
右箭头 第 5 章 — 在 Coherence 中使用 POF 对象
右箭头 第 6 章 — 分布式内存集群的数据灌注、查询和统计
右箭头 第 7 章 — 监听器处理内存集群中变化的对象
右箭头 第 8 章 — 使用 ZFS 加密
右箭头 第 9 章 — 内存集群与数据库的互操作
右箭头 第 10 章 — Coherence 内存集群的安全访问
右箭头 第 11 章 — 基于内存集群的事件处理

本章内容包含三部分

  • 简介
  • 创建监听器
  • 响应变化

7.1 简介

 

com.tangosol.util.ObservableMap 接口提供了观察发生在内存集群中对象变化的机制。它扩展了 java.util.EventListener 并使用标准 JavaBeans 事件模式。所有 NamedCache 类型的实例都实现了这个接口。为侦听事件,需要在缓存上注册 MapListener(com.tangosol.util.MapListener)  实例。MapListener 实例被客户端所调用。这意味着 listener 代码在客户端进程中执行。

侦听到几种方法:

  • 侦听所有事件
  • 侦听满足过滤条件的事件
  • 侦听指定对象键的事件

不同方法的代码示例如下:

void addMapListener(MapListener listener)
void addMapListener(MapListener listener, Filter filter, boolean fLite)
void addMapListener(MapListener listener, Object oKey, boolean fLite)

 

com.tangosol.util.MapEvent 类捕获对象的键,旧值与新值。方法当事件发生时可以用 getOldValue或getNewValue。下面的例子演示了如何在缓存中注册事件监听器。

namedCache.addMapListener(new MapListener()
{
public void entryDeleted(MapEvent mapEvent)
    {
    // TODO... 当删除事件发生时
    }
public void entryInserted(MapEvent mapEvent)
    {
    // TODO... 当写入事件发生时
    }
public void entryUpdated(MapEvent mapEvent)
    {
     // TODO... 当更新事件发生时
 }
}

 

7.2 创建缓存的 Listener

 

本章介绍如何监听缓存,及相应处理。

7.2.1 为监听缓存变化创建一个 Java 类

 

在加载数据的应用中,为创建 Contact 对象创建监听的类。把这个类的名字命名为 ObserverExample 并确保有 main 函数。

07-figure-01

在这个类里,增加 listener 来显示当缓存中的 Contact 发生的变化。下面的例子,从控制台应用读取输入

BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
String text = console.readLine();

 

在这个类里,创建一个继承 AbstractMapListner 的 inner class。实现 insert,update 和 delete 缓存中的值。在这个场景中,基于包含在 MapEvent 中的新旧值,多数工作在 entryUpdated 方法中处理。

package com.oracle.handson.chapter07;

import com.tangosol.net.NamedCache;

import com.tangosol.util.AbstractMapListener;
import com.tangosol.util.MapEvent;

import com.oracle.handson.chapter05.Contact;

import com.tangosol.net.CacheFactory;

import java.io.IOException;

/**
 * ObserverExample observes changes to contacts.
 */
public class ObserverExample {

    public ObserverExample() {
    }

    // ----- ObserverExample methods -------------------------------------
    public static void main(String[] args) {
        NamedCache cache = CacheFactory.getCache("ContactsCache");
        new ObserverExample().observe(cache);
        try {
            System.in.read();
        } catch (IOException e) {
        }
    }

    /**
     * Observe changes to the contacts.
     *
     * @param cache
     *            target cache
     */
    public void observe(NamedCache cache) {
        cache.addMapListener(new ContactChangeListener());
    }

    // ----- inner class: ContactChangeListener -------------------------

    public class ContactChangeListener extends AbstractMapListener {
        // ----- MapListener interface
        // ------------------------------------------

        public void entryInserted(MapEvent event) {
            System.out.println(event);
        }

        public void entryUpdated(MapEvent event) {
            Contact contactOld = (Contact) event.getOldValue();
            Contact contactNew = (Contact) event.getNewValue();
            StringBuffer sb = new StringBuffer();

            if (!contactOld.getHomeAddress().equals(contactNew.getHomeAddress())) {
                sb.append("Home address ");
            }
            if (!contactOld.getWorkAddress().equals(contactNew.getWorkAddress())) {
                sb.append("Work address ");
            }
            if (!contactOld.getTelephoneNumbers().equals(contactNew.getTelephoneNumbers())) {
                sb.append("Telephone ");
            }
            if (contactOld.getAge() != contactNew.getAge()) {
                sb.append("Birthdate ");
            }
            sb.append("was updated for ").append(event.getKey());
            System.out.println(sb);
        }

        public void entryDeleted(MapEvent event) {
            System.out.println(event.getKey());
        }
    }
}

 

7.2.2 运行 Listener 例子

 

1. 为 ObserverExample 创建运行配置。右键工程中的 ObserverExample->Run As,在 Run Configurations 对话框中,我们复制 chapter06CacheClientQueryExample,并更名为 chapter07CacheClientObserverExample

a) 在 Main 选卡中,确定 com.oracle.handson.chapter07.ObserverExample

b) 在 General 选卡 -Coherence 选卡,确保 contacts-cache-config.xml

c) 确保选择 Disabled (cache client)

d) 确保 Cluster port 是 3155

e) Other 选卡,确保 tangosol.pof.config 配置 contacts-pof-config.xml 作为 POF 配置文件

f) Common 选卡,选择 Shared file,并确保是 \chapter03_07 工程

2. 停止所有 Cache servers

3. 启动 CacheServer,复制 chapter06CacheServer为chapter07CacheServer,并运行。

4. 运行 chapter06CacheClientLoaderExample。

5. 运行 chapter07CacheClientObserverExample,这时程序将如下图等待输入:

07-figure-02

下一节,将介绍修改缓存中的记录,并返回变更。

7.3 如何响应缓存中的变化

 

在本节中,我们将创建一个 Java 类修改缓存中的条目,并返回修改的记录。

直到现在,我们使用处理缓存条目的方法是 put 和 get 操作。然而在并发访问数据时可用更好的处理。Entry processors(com.tangosol.util.InvocableMap.EntryProcessor) 是执行针对条目进行处理的代理。条目在所在地直接被处理。处理包括 create、update、remove data 或单纯计算。在多节点环境分区模式下,处理是平行执行,所以是支持水平扩展。由于数据不在客户端而在缓存中处理,还节省了 I/O 的开销。

同样的键将在逻辑上被队列化的,由 Entry processors 处理。这允许 lock-free(高性能)处理。com.tangosol.util.InvocableMap接口(被NamedCache 实现) 有如下操作方法:

Object invoke(Object oKey, InvocableMap.EntryProcessor processor)
针对独立对象调用 EntryProcessor 并返回调用结果。
Map invokeAll(Collection keys, InvocableMap.EntryProcessor processor)
基于键的集合调用 EntryProcessor 并返回每一个调用结果。
Map invokeAll(Filter filter, InvocableMap.EntryProcessor processor)
针对符合 filter 条件的集合调用 EntryProcessor ,并返回每一个调用结果。

 

 注释:
EntryProcessor 类必须在每一个节点的类路径(ClassPath)中。

 

创建一个entry process,可以继承 com.tangosol.util.processes.AbstractProcessor 和实现 process() 方法。比如,下面的代码演示了创建一个 EntryProcessor 实例来变更 Contacts 中的员工对象:

public static class OfficeUpdater extends AbstractProcessor implements PortableObject
  ...
    public Object process(InvocableMap.Entry entry) 
        {
        Contact contact = (Contact) entry.getValue();
        contact.setWorkAddress(m_addrWork);
        entry.setValue(contact);
        return null;
        }

 

为调用 OfficeUpdater 类,我们可以在使用 invokeAll 方法时将 OfficeUpdater 类作为参数。

cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"),new OfficeUpdater(addrWork));

 

  本节内容,将创建一个 Java 类实例化 EntryProcessor 在缓存总更新。上一节的 ObserverExample 类将检测到数据的变化。

7.3.1 新建一个拥有更新已存条目的类

 

1. 在缓存中创建一个类用于更新条目

2. 写代码,查找住在马萨诸塞州,更新他们的工作地址。

一个 inner class 实现 PortableObject 接口(用于序列化和反序列化)并包含一个 EntryProcessor 实例来设置工作地址。使用 Filter 来过滤集群中符合条件的 Contacts。

package com.oracle.handson.chapter07;

import com.tangosol.net.NamedCache;

import com.tangosol.util.filter.EqualsFilter;
import com.tangosol.util.processor.AbstractProcessor;
import com.tangosol.util.InvocableMap;

import com.tangosol.io.pof.PortableObject;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;

import com.oracle.handson.chapter05.Address;
import com.oracle.handson.chapter05.Contact;

import com.tangosol.net.CacheFactory;

import java.io.IOException;

/**
 * ProcessorExample executes an example EntryProcessor.
 *
 */
public class ProcessorExample {
    public ProcessorExample() {
    }

    public static void main(String[] args) {
        NamedCache cache = CacheFactory.getCache("ContactsCache");
        new ProcessorExample().execute(cache);
    }

    // ----- ProcessorExample methods -----------------------------------

    public void execute(NamedCache cache) {
        // People who live in Massachusetts moved to an in-state office
        Address addrWork = new Address("200 Newbury St.", "Yoyodyne, Ltd.", "Boston", "MA", "02116", "US");

        cache.invokeAll(new EqualsFilter("getHomeAddress.getState", "MA"), new OfficeUpdater(addrWork));
    }

    // ----- nested class: OfficeUpdater ------------------------------------

    /**
     * OfficeUpdater updates a contact's office address.
     */
    public static class OfficeUpdater extends AbstractProcessor implements PortableObject {
        // ----- constructors -------------------------------------------

        /**
         * Default constructor (necessary for PortableObject implementation).
         */
        public OfficeUpdater() {
        }

        public OfficeUpdater(Address addrWork) {
            m_addrWork = addrWork;
        }

        // ----- InvocableMap.EntryProcessor interface ------------------

        public Object process(InvocableMap.Entry entry) {
            Contact contact = (Contact) entry.getValue();

            contact.setWorkAddress(m_addrWork);
            entry.setValue(contact);
            System.out.println("Work address was updated for " + contact.getFirstName() + " " + contact.getLastName());
            return null;
        }

        // ----- PortableObject interface -------------------------------

        public void readExternal(PofReader reader) throws IOException {
            m_addrWork = (Address) reader.readObject(0);
        }

        public void writeExternal(PofWriter writer) throws IOException {
            writer.writeObject(0, m_addrWork);
        }

        // ----- data members -------------------------------------------

        private Address m_addrWork;
    }
}

 

7.3.2 编辑修改 POF 配置文件

 

编辑 contacts-pof-config.xml 文件,增加 OfficeUpdater 条目,如下:

07-figure-03

...
<user-type>
       <type-id>1006</type-id>
      <class-name>com.oracle.handson.chapter07.ProcessorExample$OfficeUpdater</class-name>
</user-type>
...

 

7.3.3 运行更新缓存的例子

 

1. 为 ProcessorExample 创建运行配置。简化操作就是复制 7.2.2 中的运行配置 chapter07CacheClientObserverExample,将其变更为 chapter07CacheClientProcessorExample

a) Main 选卡中,类 com.oracle.handson.chapter07.ProcessorExample;

b) 在 General 选卡 -Coherence 选卡,确保 contacts-cache-config.xml;

c) 确保选择 Disabled (cache client);

d) 确保 Cluster port 是 3155;

e) Other 选卡,确保 tangosol.pof.config 配置 contacts-pof-config.xml 作为 POF 配置文件;

f) Common 选卡,选择 Shared file,并确保是 \chapter03_07 工程;

2. 按照如下步骤操作,测试运行 ObserverExample 和 ProcessorExample

3. 停止所有运行的 cache servers

4. 重启 ContactsCacheServer,运行 chapter07CacheServer

5. 运行 chapter06CacheClientLoaderExample,加载数据

6. 运行chapter07CacheClientObserverExample

7. 运行chapter07CacheClientProcessorExample

07-figure-04

关于作者

 niehao

金辉,在 Oracle 负责云和中间件产品的资深售前顾问和团队经理,有十多年的中间件和项目管理经验,专注在企业云构建,企业集成解决方案领域,熟悉业内主要的 SOA 集成产品。参加过北京马拉松和 TNF50 越野比赛。你可以通过邮件 arthur.jin@oracle.com 与他联系。