Oracle Coherence 内存网格

第 11 章 — 基于内存集群的事件处理

 

作者:金辉 (Oracle)


右箭头 第 1 章 — 漫说内存网格
右箭头 第 2&3 章 — 安装和配置开发环境
右箭头 第 4 章 — 开始第一个 Coherence 程序
右箭头 第 5 章 — 在 Coherence 中使用 POF 对象
右箭头 第 6 章 — 分布式内存集群的数据灌注、查询和统计
右箭头 第 7 章 — 监听器处理内存集群中变化的对象
右箭头 第 8 章 — 使用 ZFS 加密
右箭头 第 9 章 — 内存集群与数据库的互操作
右箭头 第 10 章 — Coherence 内存集群的安全访问
右箭头 第 11 章 — 基于内存集群的事件处理

本章,将介绍如何通过使用 EntryProcessor 事件,并通过拦截器获得事件。本章新建一个工程,将创建事件拦截器并 引发事件处理,来演示活动事件的工作框架。

更多的细节介绍,见“Oracle Fusion Middleware Developing Applications with Oracle Coherence”中的“Using Live Events”,和“Oracle Fusion Middleware Java API Reference for Oracle Coherence”。

11.1 介绍

 

Coherence 提供了基于事件处理的框架,使应用程序能够对数据网格内地操作做出反应。事件的框架使用一个基于事件的模型代表集群操作的可观察到的现象,支持的事件包括:

  • 分区服务 partitioned service
  • Cache和
  • 应用事件 application events

这些事件被注册的事件拦截器所消费。事件拦截器需要实现EventInterceptor,并且配置在缓存配置文件中。

11.1.1 什么是事件拦截器

 

应用通过注册事件拦截器 (EventInterceptor) 对活动事件作出反应。拦截器显式的定义接收哪些事件并做出何种反应。任意数量的事件拦截器可以创建并注册为一个特定的缓存或所有缓存由特定的分区管理服务。多个拦截器注册的相同的事件类型自动链接在一起,在单个事件的上下文中执行。(Any number of event interceptors can be created and registered for a specific cache or for all caches managed by a specific partitioned service. Multiple interceptors that are registered for the same event type are automatically chained together and executed in the context of a single event.)

事件拦截器通过实现 EventInterceptor 接口来创建。使用泛型和过滤泛型作为类型参数来定义接口。onEvent 方法提供了处理事件的能力。

@Interceptor 标注用于限制到特定的事件类型和事件,并提供了进一步的拦截配置。@Interceptor 标注包含下面的功能:

  • identifier — 拦截器的唯一标识符。如果在缓存配置里有,可以被重新。
  • entryEvents — 指定拦截器想要订阅到一个事件数组类型。
  • entryProcessorEvents — 指定拦截器想要订阅的一个条目处理事件数组类型。
  • transferEvents — 定义拦截器想要订阅到转换事件类型数组。
  • transactionEvents — 指定拦截器想要订阅到事务事件数组类型。
  • order — 指定拦截器在执行链上的位置。通常的值是高或低,值高表明拦截器放在拦截链的前端,默认是低。这个值能够被缓存配置文件所重写。

@Interceptor 的更多介绍见官网文档。

11.1.2 什么是缓存事件

 

  • 条目事件 (EntryEvent) 用于表现基于缓存条目的一些操作 (inserting, updating,和removing) 。条目事件可以为插入、更新和删除,进行前处理或后处理。
  • 条目处理 (EntryProcessor)  事件用于执行集群内的一条处理条目。条目处理事件有前处理,后处理。

11.1.3 什么是分区的服务事件

 

分区服务事件是 (PartitionedService)包含一组转换事件, 用于表现在storage-enable成员间的转换。

11.1.4 什么是注册事件拦截器

 

注册事件拦截器即可以在缓存配置文件里,也可以在程序里。一个事件拦截器可以被一到多个缓存注册,或者指定的分区服务。一个事件拦截器被缓存注册就只能接受这个缓存的事件。一个事件拦截器被分区服务注册,可以接收这个服务下所有缓存的事件。

在缓存配置文件里,完整的拦截器类名称定义在 <interceptor> 单元的 <cache-name><cache-mapping> 节里。拦截器管理哪个缓存,在 <cache-name> 单元里配置。事件拦截器注册在哪个分区服务上,在 <caching-schemes> 里配置。

除了使用缓存配置文件,事件拦截器也可以程序化注册。关键的类方法注册事件拦截器是 ConfigurableCacheFactory 接口上的 getInterceptorRegistry 方法,及 InterceptorRegistry 接口上的 getEventInterceptor 和 registerEventInterceptor 方法。下面是些实例代码的片段:

ConfigurableCacheFactory ccf = CacheFactory.getConfigurableCacheFactory();
InterceptorRegistry reg = ccf.getInterceptorRegistry();
 
reg.registerEventInterceptor(new TimedTraceInterceptor(), RegistrationBehavior.FAIL);

11.2 创建、注册和执行事件拦截器

 

下面的操作描述如何创建、注册和执行事件拦截器。 在这个练习中计算前操作和后操作的时间差。

11.2.1 创建测量前/后操作时间差的事件拦截器

 

创捷一个事件拦截器:TimedTraceInterceptor 来度量前/后操作的时间差(INSERTING/INSERTED、UPDATING/UPDATED 和 REMOVING/REMOVED)。步骤如下:

1.    新建一个 Application Client 工程:UEMEvents。

配置里选择 CoherenceConfig

不选择 Create a default main

11-figure-01

2.    创建新的 Java 类:TimedTraceInterceptor

com.oracle.handson.chapter

不选择产生 main 方法

实现 TimedTraceInterceptor 接口

11-figure-02

代码见

\chapter11_UEMEvents\appClientModule\com\oracle\handson\chapter11\s02

\TimedTraceInterceptor.java

TimedTraceInterceptor 实现了 EventInterceptor 接口;

@Interceptor 标注提供了唯一标识,和执行顺序;

还包含一个受保护的内生类:EventTimer。来封装每个事件通知的时间。相关时间发送到 Coherence log 显示样本和统计信息。

没有使用过滤器,处理通用的事件。

11.2.2 创建一个类拖延处理事件

 

创建一个类:LazyProcessor,来显式的处理延迟。这个类可以指定处理两个事件见到时间延长,以毫秒计。这类被 EventsTimingExample 所使用,EventsTimingExample 在稍后创建。

The data that the LazyProcessor class produces will be sent across the wire, so the class should use POF (Portable Object Format). You will add the LazyProcessorclass to the POF configuration file in a later step.

创建 LazyProcessor 类步骤如下:

1.    新建类 LazyProcessor,无main方法

2.    这个类实现 PortableObject 口,来做序列化,也扩展 AbstractProcessor 类。代码见

\chapter11_UEMEvents\appClientModule\com\oracle\handson\chapter11\s02\LazyProcessor.java

11.2.3 注册事件拦截器

 

这个工程里,拦截器注册在缓存配置文件里。

1.    项目浏览器里打开 coherence-cache-config.xml 文件另存为 uem112-cache-config.xml

2.    修改缓存配置文件里调用事件拦截器的部分:

11-figure-03

11.2.4 创建 POF 配置

 

pof-config.xml 另存为 uem112-pof-config.xml。

11-figure-04

11.2.5 创建类使用时间的事件拦截器

 

新建类 EventsExamples,触发事件拦截器 TimedTraceInterceptor 的处理。步骤如下:

1.    新建类:EventsExamples,没有 main 方法。

2.    完成该程序代码部分

\chapter11_UEMEvents\appClientModule\com\oracle\handson\chapter11\s02\EventsExamples.java

11.2.6 创建驱动应用调用时间时间示例程序

 

创建一个 Java程序调用 EventsTimingExample,步骤如下:

1.    新建 Java 程序:RunDriver,有 main 方法。

2.    Main 方法中运行 EventsTimingExample

\chapter11_UEMEvents\appClientModule\com\oracle\handson\chapter11\s02\RunDriver.java

11.2.7 创建缓存服务启动配置

 

创建缓存服务器运行配置运行 chapter11_UEMEvents 项目,步骤如下:

1.    右键项目,Run As->Run Configurations.

2.    新的运行配置名称为 chapter112CacheServerUEMEvents

Main 选卡
project chapter11_UEMEvents
Main Class com.tangosol.net.DefaultCacheServer
Coherence 选卡
缓存配置文件 appClientModule/uem112-cache-config.xml
Local storage Enabled (cache server)
Cluster port 3155
Other选卡  
tangosol.pof.config uem112-pof-config.xml
Common 选卡
Shared file \chapter11_UEMEvents
Classpath选卡
11-figure-05

 

11.2.8 为运行示例程序创建启动配置

 

1.    右键项,Run As->Run Configurations,复制 chapter112CacheServerUEMEvents,更名为 chapter112CacheClientUEMEventDriver

2.    修改配置

Main 选卡
project chapter11_UEMEvents
Main Class com.oracle.handson.chapter11.s02.RunDriver
Coherence 选卡
缓存配置文件 appClientModule/uem112-cache-config.xml
Local storage Enabled (cache server)
Cluster port 3155
Other选卡  
tangosol.pof.config uem112-pof-config.xml
Common 选卡
Shared file \chapter11_UEMEvents
Classpath选卡
11-figure-06

 

11.2.9 运行时间的事件示例

 

1.    运行缓存服务:chapter112CacheServerUEMEvents

2.    运行示例:chapter112CacheClientUEMEventDriver

11-figure-07

11.3 事件拦截器的前后处理使用方法

 

In this exercise you will create an event interceptor to detect and veto events based on a specified key. To complete this exercise, follow these steps:

Create an Event Interceptor to Detect and Veto Events

Register the Veto Events Event Interceptor

Create a Class to Exercise the Veto Events Event Interceptor

Edit the Driver File for the Veto Events Example

11.3.1 创建事件拦截器

 

演示通过活动事件接受或禁止事件,创建一个事件拦截器: CantankerousInterceptor。拦截器基于针对特定键的事件抛出意外。

1.    新建类:CantankerousInterceptor,不包含 main 方法。

2.    为事件拦截器写代码。Import Event,EventInterceptor,Interceptor,EntryEvent和EntryEvent

示例是当试图插入某键时,抛出意外。如果是前操作,则回滚,如果是后操作,则写入日志。

\chapter11_UEMEvents\appClientModule\com\oracle\handson\chapter11\s03\CantankerousInterceptor.java

11.3.2 缓存配置文件 — 注册事件拦截器

 

复制 uem112-cache-config.xml 保存为 uem113-cache-config.xml 缓存配置文件,增加注册内容,示例如下:

11-figure-08

11.3.3 创建类测试这个拦截器

 

创建类测试 CantankerousInterceptor 事件拦截器。在这个类里,新建一个子类 VetoedEventsExample,The VetoedEventsExamplesubclass initiates the action to be performed by CantankerousInterceptor. The code illustrates the semantics of throwing exceptions in pre- and postcommit events. The exceptions that are expected only to be logged are inserted into a results cache. The entries inserted into the results cache are displayed by using the standard output of the process executing this class.

\chapter11_UEMEvents\appClientModule\com\oracle\handson\chapter11\s03\VetoExample.java

11.3.4 修改测试驱动程序

 

复制 11.2 中创建的 RunDriver 到 s03 下,程序,修改内容如下:

1.    将第 2 节里示例 Events Timing 的 import 进行替换:

import com.oracle.handson. chapter11.s02.EventsExamples.EventsTimingExample;

 

替换为 import VetoedEventsExample 子类。

import com.oracle.handson.chapter11.s03.VetoExample.VetoedEventsExample;

 

2.    将原调用 Timed Events 的代码:

EVENTS_EXAMPLES.put("timing interceptor", new EventsTimingExample());

 

替换为调用 VetoedEventsExample

EVENTS_EXAMPLES.put("veto interceptor", new VetoedEventsExample());

 

11.3.5 运行本节示例应用

新建服务器运行配置
chapter112CacheServerUEMEvents 复制重命名为 chapter113CacheServerUEMEvents
Coherence选卡
缓存配置文件 appClientModule/uem113-cache-config.xml

 

点击启动,待服务器启动完成后,再运行第 2 个服务器,和第 3 个服务器。

新建测试应用运行配置
chapter112CacheClientUEMEventDriver 复制重命名为 chapter113CacheClientUEMEventDriver
Main 类 com.oracle.handson.chapter11.s03.RunDriver
Coherence 选卡
缓存配置文件 appClientModule/uem113-cache-config.xml

 

点运行

11-figure-09

缓存服务输出类似如下

11-figure-10

11.4 使用事件拦截器日志记录分区活动

 

本节创建事件拦截器来记录分区服务的分区事件。

11.4.1 Create a Class to Terminate a JVM and to Enable and Disable Logging

 

新建类:RedistributionInvocable,定义集群成员中被执行的三个状态:

  • DISABLE:禁止通过 RedistributionInterceptor 事件拦截器日志记录。
  • ENABLE:允许通过 RedistributionInterceptor 事件拦截器日志记录。
  • KILL:如果调用 RedistributionInvocable 则终止 JVM。

日志记录的禁止或运行通过 AtomicBoolean 类,比如:

public static final AtomicBoolean ENABLED = new AtomicBoolean(false)

 

终止 JVM,则调用 System.exit。

类产生的数据将被在集群内传播,所以类必须是 POF 的,后面将配置POF文件。

创建 RedistributionInvocable 类步骤如下:

1.    新建类 RedistributionInvocable,确保默认包是 com.oracle.handson.chapter11.s04,不产生 main 方法。

2.    写代码,定义不同状态,具体代码见

\chapter11_UEMEvents\appClientModule\com\oracle\handson\chapter11\s04\RedistributionInvocable.java

11.4.2 为日志记录分区活动创建事件拦截器

 

新建事件拦截器:RedistributionInterceptor,来为分区服务记录分区事件。步骤如下:

1.    新建类:RedistributionInterceptor。确保包路径 com.oracle.handson.chapter11.s04。不选择产生 main 方法。

2.    写代码记录分区事件。

Import RedistributionInvocable
Import CacheFactory
Import EventInterceptor
Import Interceptor
Import TransferEvent
实现 EventInterceptor<TransferEvent> 接口

 

示例代码见

\chapter11_UEMEvents\appClientModule\com\oracle\handson\chapter11\s04\RedistributionInterceptor.java

11.4.3 创建事件触发程序

 

创建新类:LogExample,触发被 RedistributionInterceptor 所拦截处理的事件。步骤如下:

1.    新建类:LogExample,没有 main 方法。

2.    为 LogExample 写程序:

具体代码见

\chapter11_UEMEvents\appClientModule\com\oracle\handson\chapter11\s04\LogExample.java

11.4.4 缓存配置文件里注册日志分区活动事件拦截器

 

1.    复制 uem113-cache-config.xml,另存为 uem114-cache-config.xml

2.    新增事件拦截器

11-figure-11

11.4.5 配置 POF 配置文件

 

除了 RedistributionInvocable 以外,所有记录分区活动事件的信息是在集群成员自己内部。而被 RedistributionInvocable 处理的信息将跨集群成员,所以必须是 POF 的。

1.    复制uem112-pof-config.xml文件,另存为uem114-pof-config.xml。

2.    定义com.oracle.handson.chapter11.s04.RedistributionInvocable。

 11-figure-12

11.4.6 修改测试运行驱动程序

 

1.    复制 11.3 中创建的 RunDriver 到 s04 下,程序,修改内容如下:

2.    修改原引用 Vetoed Events 的部分,

import com.oracle.handson.VetoExample.VetoedEventsExample;

 

替换为

import com.oracle.handson.LogExample.RedistributionEventsExample;

 

3.    修改原调用 Vetoed Events 的部分

EVENTS_EXAMPLES.put("veto interceptor", new VetoedEventsExample());

 

替换为

EVENTS_EXAMPLES.put("redistribution interceptor", new RedistributionEventsExample());

 

11.4.7 运行记录分区事件

 

新建服务器运行配置
chapter113CacheServerUEMEvents 复制重命名为 chapter114CacheServerUEMEvents
Coherence选卡
缓存配置文件 appClientModule/uem114-cache-config.xml
Other 选卡  
Tangosol.pof.config uem114-pof-config.xml

 

点击启动,待服务器启动完成后,再运行第 2 个服务器,和第 3 个服务器。

新建测试应用运行配置
chapter113CacheClientUEMEventDriver 复制重命名为 chapter114CacheClientUEMEventDriver
main com.oracle.handson.chapter11.s04.RunDriver
Coherence选卡
缓存配置文件 appClientModule/uem114-cache-config.xml
Other 选卡  
Tangosol.pof.config uem114-pof-config.xml

 

点运行,Client 输出如下图

11-figure-13

缓存服务器输出如下:

 11-figure-14

关于作者

 niehao

金辉,在 Oracle 负责云和中间件产品的资深售前顾问和团队经理,有十多年的中间件和项目管理经验,专注在企业云构建,企业集成解决方案领域,熟悉业内主要的 SOA 集成产品。参加过北京马拉松和 TNF50 越野比赛。你可以通过邮件 arthur.jin@oracle.com 与他联系。