Oracle Coherence 内存网格

第 6 章 — 分布式内存集群的数据灌注、查询和统计

 

作者:金辉 (Oracle)

2016 年 6 月 27 日发布


右箭头 第 1 章 — 漫说内存网格
右箭头 第 2&3 章 — 安装和配置开发环境
右箭头 第 4 章 — 开始第一个 Coherence 程序
右箭头 第 5 章 — 在 Coherence 中使用 POF 对象
右箭头 第 6 章 — 分布式内存集群的数据灌注、查询和统计
右箭头 第 7 章 — 监听器处理内存集群中变化的对象
右箭头 第 8 章 — 使用 ZFS 加密
右箭头 第 9 章 — 内存集群与数据库的互操作
右箭头 第 10 章 — Coherence 内存集群的安全访问
右箭头 第 11 章 — 基于内存集群的事件处理

本章内容介绍使用如下 API 包,加载、查询和统计 Coherence 内存集群中的数据。

  • aggregator
  • extractor
  • filter

6.1 简介

 

到目前为止,我们都是单独的存、取内存集群中的对象。每个调用都会引起网络的开销,尤其是使用了分区 (partitioned) 缓存和复制 (replicated) 缓存。 而且,每次执行 put 方法会返回对象,这也增加了不必要的开销。通过使用 putAll 方法,加载到内存集群会更有效率。

 

为了完成本章内容,读者需要完成第5章的内容。同时最好非常熟悉

  • java.io.BufferedReader — 读文本文件
  • java.lang.String.split method — 处理文本文件
  • 和 java.text.SimpleDateFormat 处理日期

6.2 使用对象灌注内存集群

 

本章练习将介绍创建一个控制台应用来向 Coherence 缓存填充 domain 对象。该应用将使用 Coherence 的 com.tangosol.io.pof.PortableObject,实现序列化 POF 对象。

这个练习里,为Contact 创建键(Key),一个生成器来为缓存产生数据,一个加载器加载到缓存中。

1. Create a Class with the Key for the Domain Objects

2. Edit the POF Configuration File

3. Create the Data Generator

4. Create a Console Application to Load the Cache

5. Run the Cache Loading Example

6.2.1 为域对象创建键(Key)

 

为域对象创建一个类,包含 Key:

1. 新建一个 ContactId 类,该类提供了键来索引 employee 信息。创建基于 employee 姓和名作为 ID。这个对象将作为 Key 来获得 Contact 对象。

06-figure-01

这个类使用 POF 序列化技术,必须实现 PortableObject 接口,即实现 writeExternal、readExternal 方法和 equals,hashCode 和 toString 对象方法。

注释:
缓存的键 (keys) 和值 (values) 必须是能被序列化的(比如,实现 java.io.Serializable)。缓存的 keys 必须提供实现 hashCode 和 equals 方法,并且这些方法必须能够在集群各节点中返回一致性的结果。这依赖于实现的 hashCode 和 equals 方法必须基于唯一的对象序列化状态(对象没有 瞬时字段)。内建的 Java 类型,如 String、Integer 和 Date 符合这个要求。

 

package com.oracle.handson.chapter06;

import java.io.IOException;

import com.tangosol.io.pof.PofReader;

import com.tangosol.io.pof.PofWriter;

import com.tangosol.io.pof.PortableObject;

import com.tangosol.util.Base;

import com.tangosol.util.HashHelper;

public class ContactId implements PortableObject {

// ----- data members ---------------------------------------------------

/**

* First name.

*/

private String FirstName;

/**

* Last name.

*/

private String LastName;

public ContactId() {

// TODO Auto-generated constructor stub

}

/**

* Construct a contact person.

*

*/

public ContactId(String firstName, String lastName) {

super();

FirstName = firstName;

LastName = lastName;

}

@Override

public void readExternal(PofReader arg0) throws IOException {

// TODO Auto-generated method stub

FirstName = arg0.readString(0);

LastName = arg0.readString(1);

}

@Override

public void writeExternal(PofWriter arg0) throws IOException {

// TODO Auto-generated method stub

arg0.writeString(0, FirstName);

arg0.writeString(1, LastName);

}

public String getFirstName() {

return FirstName;

}

public void setFirstName(String firstName) {

FirstName = firstName;

}

public String getLastName() {

return LastName;

}

public void setLastName(String lastName) {

LastName = lastName;

}

// ----- Object methods -------------------------------------------------

public boolean equals(Object oThat) {

if (this == oThat) {

return true;

}

if (oThat == null) {

return false;

}

ContactId that = (ContactId) oThat;

return Base.equals(getFirstName(), that.getFirstName())

&& Base.equals(getLastName(), that.getLastName());

}

public int hashCode() {

return HashHelper.hash(getFirstName(),

HashHelper.hash(getLastName(), 0));

}

public String toString() {

return getFirstName() + " " + getLastName();

}
}

 

6.2.2 编辑 POF 配置文件

 

编辑 POF 配置文件,为 ContactId 增加 <user-type> 记录。

<?xml version="1.0"?>

<pof-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns="http://xmlns.oracle.com/coherence/coherence-pof-config"

xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-pof-config http://xmlns.oracle.com/coherence/coherence-pof-config/1.2/coherence-pof-config.xsd">

<user-type-list>

<!-- include all "standard" Coherence POF user types -->

<include>  coherence-pof-config.xml</include>

<user-type>

<type-id>1001</type-id>

<class-name>com.oracle.handson.chapter05.Address</class-name>

</user-type>

<user-type>

<type-id>1002</type-id>

<class-name>com.oracle.handson.chapter05.PhoneNumber</class-name>

</user-type>

<user-type>

<type-id>1003</type-id>

<class-name>com.oracle.handson.chapter05.Contact</class-name>

</user-type>

<user-type>

<type-id>1004</type-id>

<class-name>com.oracle.handson.chapter06.ContactId</class-name>

</user-type>

</user-type-list>

<allow-interfaces>true</allow-interfaces>

<allow-subclasses>true</allow-subclasses>

</pof-config>

 

6.2.3 创建一个数据生成器

 

为数据生成器建一个类 DataGenerator,该数据生成器随机产生员工的联系名称和地址。使用先前已有的 Address、PhoneNumber 和 Contact 类。使用 java.util.Random 随机产生 names、addresses、telephone numbers 和 ages。

 

06-figure-02

package com.oracle.handson.chapter06;

import com.oracle.handson.chapter05.Address;

import com.oracle.handson.chapter05.Contact;

import com.oracle.handson.chapter05.PhoneNumber;

import com.tangosol.util.Base;

import java.io.BufferedWriter;

import java.io.FileOutputStream;

import java.io.IOException;

import java.io.OutputStream;

import java.io.OutputStreamWriter;

import java.io.PrintWriter;

import java.sql.Date;

import java.util.Collections;

import java.util.Random;

/**

* DataGenerator is a generator of sample contacts.

*/

public class DataGenerator {

// ----- static methods -------------------------------------------------

/**

* Generate contacts.

*/

public static void main(String[] asArg) throws IOException {

String sFile = asArg.length > 0 ? asArg[0] : FILENAME;

int cCon = asArg.length > 1 ? Integer.parseInt(asArg[1]) : 1000;

OutputStream out = new FileOutputStream(sFile);

generate(out, cCon);

out.close();

}

/**

* Generate the contacts and write them to a file.

*/

public static void generate(OutputStream out, int cContacts)

throws IOException {

PrintWriter writer = new PrintWriter(new BufferedWriter(

new OutputStreamWriter(out)));

for (int i = 0; i < cContacts; ++i) {

StringBuffer sb = new StringBuffer(256);

// contact person

sb.append("John,").append(getRandomName()).append(',');

// home and work addresses

sb.append(Integer.toString(Base.getRandom().nextInt(999)))

.append(" Beacon St.,,") /* street1,empty street2 */

.append(getRandomName()) /* random city name */

.append(',').append(getRandomState()).append(',')

.append(getRandomZip())

.append(",US,Yoyodyne Propulsion Systems,")

.append("330 Lectroid Rd.,Grover's Mill,")

.append(getRandomState()).append(',')

.append(getRandomZip()).append(",US,");

// home and work telephone numbers

sb.append("home,")

.append(Base.toDelimitedString(getRandomPhoneDigits(), ","))

.append(",work,")

.append(Base.toDelimitedString(getRandomPhoneDigits(), ","))

.append(',');

// random birth date in millis before or after the epoch

sb.append(getRandomDateInMillis());

writer.println(sb);

}

writer.flush();

}

/**

* Return a random name.

*

*/

private static String getRandomName() {

Random rand = Base.getRandom();

int cCh = 4 + rand.nextInt(7);

char[] ach = new char[cCh];

ach[0] = (char) ('A' + rand.nextInt(26));

for (int of = 1; of < cCh; ++of) {

ach[of] = (char) ('a' + rand.nextInt(26));

}

return new String(ach);

}

/**

* Return a random phone muber. The phone number includes access, country,

* area code, and local number.

*

*/

private static int[] getRandomPhoneDigits() {

Random rand = Base.getRandom();

return new int[] { 11, // access code

rand.nextInt(99), // country code

rand.nextInt(999), // area code

rand.nextInt(9999999) // local number

};

}

/**

* Return a random Phone.

*

*/

private static PhoneNumber getRandomPhone() {

int[] anPhone = getRandomPhoneDigits();

return new PhoneNumber((short) anPhone[0], (short) anPhone[1],

(short) anPhone[2], anPhone[3]);

}

/**

* Return a random Zip code.

*

*/

private static String getRandomZip() {

return Base.toDecString(Base.getRandom().nextInt(99999), 5);

}

/**

* Return a random state.

*

*/

private static String getRandomState() {

return STATE_CODES[Base.getRandom().nextInt(STATE_CODES.length)];

}

/**

* Return a random date in millis before or after the epoch.

*

*/

private static long getRandomDateInMillis() {

return (Base.getRandom().nextInt(40) - 20) * Contact.MILLIS_IN_YEAR;

}

/**

* Generate a Contact with random information.

*

*/

public static Contact getRandomContact() {

return new Contact("John", getRandomName(), new Address(

"1500 Boylston St.", null, getRandomName(), getRandomState(),

getRandomZip(), "US"), new Address("8 Yawkey Way", null,

getRandomName(), getRandomState(), getRandomZip(), "US"),

Collections.singletonMap("work", getRandomPhone()), new Date(

getRandomDateInMillis()));

}

// ----- constants ------------------------------------------------------

/**

* US Postal Service two letter postal codes.

*/

private static final String[] STATE_CODES = { "AL", "AK", "AS", "AZ", "AR",

"CA", "CO", "CT", "DE", "OF", "DC", "FM", "FL", "GA", "GU", "HI",        

"ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MH", "MD", "MA",      

"MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", "NM", "NY",      

"NC", "ND", "MP", "OH", "OK", "OR", "PW", "PA", "PR", "RI", "SC",        

"SD", "TN", "TX", "UT", "VT", "VI", "VA", "WA", "WV", "WI", "WY" };       

/**        

  * Default contacts file name.     

*/        

public static final String FILENAME = "contacts.csv";       

} 

 

运行这个数据生成器 DataGenerator 类,将产生随机的员工信息,这些信息被保存在 CSV 格式的文件里,文件名为 contacts.csv。下面是文件的片段

 06-figure-03

6.2.4 为加载数据创建一个控制台应用程序

 

创建一个 Java 类叫 LoaderExample。实现读取 contacts.csv 文件,加载 employee 数据到 Cache 中的功能。为提高处理效率,最小化网络流量占用,使用 putAll 方法加载数据。

 06-figure-03

package com.oracle.handson.chapter06;

import com.tangosol.net.CacheFactory;

import com.tangosol.net.NamedCache;

import com.oracle.handson.chapter06.ContactId;

import com.oracle.handson.chapter05.Address;

import com.oracle.handson.chapter05.PhoneNumber;

import com.oracle.handson.chapter05.Contact;

import java.io.BufferedReader;

import java.io.FileInputStream;

import java.io.IOException;

import java.io.InputStream;

import java.io.InputStreamReader;

import java.sql.Date;

import java.util.HashMap;

import java.util.Map;

/**

* LoaderExample 从csv文件加载数据到缓存里

*

*/

public class LoaderExample {

// ----- constants ------------------------------------------------------

/**

* Default cache name.

*/

public static final String CACHENAME = "ContactsCache";

/**

* 加载文件

*

* @param args
  
* @throws IOException
  
*/
  
public static void main(String[] args) throws IOException {
  
// TODO Auto-generated method stub
  
String sFile = args.length > 0 ? args[0] : DataGenerator.FILENAME;
  
String sCache = args.length > 1 ? args[1] : CACHENAME;
  
System.out.println("input file: " + sFile);
  
System.out.println("cache name: " + sCache);
    
new   LoaderExample().load(CacheFactory.getCache(sCache),
    
new FileInputStream(sFile));
    
CacheFactory.shutdown();
    
}
    
/**
    
* Load cache from stream.
    
*
    
*/
    
public void load(NamedCache cache, InputStream in) throws IOException {
    
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
    
Map mapBatch = new HashMap(1024);
    
String sRecord;
    
int cRecord = 0;
    
while ((sRecord = reader.readLine()) != null) {
    
// parse record
  
String[] asPart = sRecord.split(",");
  
int ofPart = 0;
    
String sFirstName = asPart[ofPart++];
    
String sLastName = asPart[ofPart++];
    
ContactId id = new ContactId(sFirstName, sLastName);
    
Address addrHome = new Address(
    
/* streetline1 */asPart[ofPart++],
    
/* streetline2 */asPart[ofPart++],
    
/* city */asPart[ofPart++],
    
/* state */asPart[ofPart++],
    
/* zip */asPart[ofPart++],
    
/* country */asPart[ofPart++]);
    
Address addrWork = new Address(
    
/* streetline1 */asPart[ofPart++],
    
/* streetline2 */asPart[ofPart++],
    
/* city */asPart[ofPart++],
    
/* state */asPart[ofPart++],
    
/* zip */asPart[ofPart++],
      
/* country */asPart[ofPart++]);
      
Map mapTelNum = new HashMap();
      
for (int c = asPart.length - 1; ofPart < c;) {
      
mapTelNum.put(/* type */asPart[ofPart++], new PhoneNumber(
      
/* access code */Short.parseShort(asPart[ofPart++]),
      
/* country code */Short.parseShort(asPart[ofPart++]),
      
/* area code */Short.parseShort(asPart[ofPart++]),
      
/* local num */Integer.parseInt(asPart[ofPart++])));
      
}      

Date dtBirth = new Date(Long.parseLong(asPart[ofPart]));
      
// Construct Contact and add to batch
      
Contact con1 = new Contact(sFirstName, sLastName, addrHome,
      
addrWork, mapTelNum, dtBirth);
      
System.out.println(con1);
      
mapBatch.put(id, con1);
      
++cRecord;
      
if (cRecord % 1024 == 0) {
      
// load batch
      
cache.putAll(mapBatch);
      
mapBatch.clear();
      
System.out.print('.');
      
System.out.flush();
      
}
      
}
      
if (!mapBatch.isEmpty()) {
      
// load final batch
      
cache.putAll(mapBatch);
      
}
      
System.out.println("Added " + cRecord + " entries to cache");
      
}
     
}

 

6.2.5 运行加载程序

 

按如下步骤运行加载程序:

1. 停止已运行的 Cache 服务;

2. 复制 chapter05CacheServer,更名为 chapter06CacheServer 后运行;

3. 确认已经运行了 DataGenerator

4. 创建 LoaderExample 的运行配置

    1) 复制 chapter05CacheClientContactDriver
    2) 更名为 chapter06CacheClientLoaderExample
    3) 在 Main 选卡中,确保运行类是 LoaderExample

06-figure-05

    4) 在 Coherence 选卡,确保 contacts-cache-config.xml 作为 Cache 配置。选择 Disabled (cache client) button;
    5) 在 Other 选卡,确保 POF 文件是 contacts-pof-config.xml;

5. 点运行。

 06-figure-06

6.3 查询和统计缓存中的数据

 

本章联系介绍查询和统计缓存中的数据:

  • 查询指定数据
  • 在缓存中统计信息

加载复制对象后,就可在内存集群中查询和统计 com.tangosol.util.QueryMap 接口提供了管理键和值的方法。 我们通过使用 filters 来越是数据结果。也可以定义索引来优化查询。

一些 QueryMap 中常用的方法:

  • Set entrySet(Filterfilter),返回通过 filter 处理后的数据集合
  • addIndex(ValueExtractorextractor,booleanfOrdered, Comparator comparator),增加索引
  • Set keySet(Filter filter),类似 entrySet,但只返回 keys 的集合,不返回 values 集合

在分区 partitioned 拓扑下,Coherence 将并行处理 filtering。QueryMap 接口使用 Filter 类,更多 API 及介绍见 com.tangosol.util.filter 说明。

所有 Coherence NamedCache 对象都实现了 com.tangosol.util.QueryMap 接口。这使得 NamedCache 对象支持通过指定条件检索键或记录。条件本身通过实现com.tangosol.util.Filter 接口作为对象呈现。

com.tangosol.util.filter 包中包含了许多预定义的、标准的查询条件表达式。比如说包含大于 (GreaterFilter),大于等于 (GreaterEquals)、LikeFilter、NotEqualsFilter、InFilter 和其他。我们可以通过使用基于对象的 filters 来做类似 SQL 表达式的查询。

注释:
Coherence 没有提供 SQLFilter,因为内存集群存放的不是二维表,而是对象。

 

Filter 类使用标准的 Java 方法表现各条件。比如下面的 filter 表现了返回值当中 getHomeAddress.getState 方法等于 Massachusetts (MA) 的查询条件:

(new EqualsFilter("getHomeAddress.getState", "MA");

 

如果对象测试发现这个 filter 没有 get 方法,则测试失败。

下面有一些使用 Filter 的示例:

• 返回住在某城市,城市首字母是 S 的人

Set sPeople = cache.entrySet(new LikeFilter("getHomeAddress.getCity","S%");

 

• 返回年龄大于 42 岁的人

final int nAge = 42;// Find all contacts who are older than nAge
Set sSeniors = cache.entrySet(new GreaterFilter("getAge", nAge));

 

除此之外 entrySet 和 keySet 两个方法在 QueryMap 接口中定义,Coherence 支持程序员使用 addIndex 方法定义索引来提高查询性能。不像关系型数据库,索引是严格约束。但是 Coherence 不是关系型数据库。

为定义缓存中被索引的对象的值,Coherence 引入一个叫 value extractor 的概念。com.tangosol.util.ValueExtractor 接口定义了 extract 方法。如果给出一个对象参数,一个 ValueExtractor 实现返回基于这个参数的值。一个简单的例子是 com.tangosol.util.extractor.ReflectionExtractor 接口:

new ReflectionExtractor("getCity")

 

我们可以通过 CoherenceAPI 使用值的提取器。通常情况下,它用于建立索引数据。

一个比较有用的提取器 (extractor) 是 ChainedExtractor。这是一个基于提取器数组的实现。 提取器在数组中是排序的,从左到右。前一个提取器的结果可作为��一个的目标,比如:

new ChainedExtractor(new ReflectionExtractor("getHomeAddress"), new ReflectionExtractor("getState"))

 

这个例子假定 HomeAddress 和 State 对象属于复杂对象 Contact。ChainedExtractor 首先在每个被缓存的 Contact 对象上使用反射调用 getHomeAddress方法,然后在返回结果上,再使用反射机制调用 getState 方法。

6.3.1 创建一个查询缓存数据的类

新建一个 Java 类 QueryExample 来处理查询。

用 entrySet 方法获得 employee contact 信息:

• 所有住在马萨诸塞州的 employees

cache.entrySet(new EqualsFilter("getHomeAddress.getState", "MA"));

 

• 住在马萨诸塞州,并且工作在其他地方的 employees

cache.entrySet(new AndFilter(
new EqualsFilter("getHomeAddress.getState", "MA"),
new NotEqualsFilter("getWorkAddress.getState", "MA"))); 

 

• Employees 居住地所在城市的首字母是 S:

cache.entrySet(new LikeFilter("getHomeAddress.getCity", "S%")); 

 

• 内存集群中所有 last name 的首字母是“S”并且住在马萨诸塞州的员工。即使用键也使用值来做查询。注意到在缓存记录中,使用 ContactId 作为键。我们可以通过 KeyExtractor 获得这些值。KeyExtractor 是专门依靠键对象来进行查询的值提取器:

cache.entrySet(new AndFilter(
new LikeFilter(new KeyExtractor("getLastName"), "S%", (char) 0, false),
new EqualsFilter("getHomeAddress.getState", "MA"))); 

 

• 年龄大于某个值得全部员工

final int nAge = 42;
setResults = cache.entrySet(new GreaterFilter("getAge", nAge));

 

• 更多使用 indexes 提高性能的,到 QueryMap 接口中看 addIndex 方法的介绍。下面的程序可自己改造后,测试没有索引与有索引带来性能上的差异。

package com.oracle.handson.chapter06;

import com.tangosol.net.CacheFactory;

import com.tangosol.net.NamedCache;

import com.tangosol.util.extractor.ChainedExtractor;

import com.tangosol.util.extractor.KeyExtractor;

import com.tangosol.util.extractor.ReflectionExtractor;

import com.tangosol.util.filter.AlwaysFilter;

import com.tangosol.util.filter.AndFilter;

import com.tangosol.util.filter.EqualsFilter;

import com.tangosol.util.filter.GreaterFilter;

import com.tangosol.util.filter.LikeFilter;

import com.tangosol.util.filter.NotEqualsFilter;

import java.util.Iterator;

import java.util.Set;

/**

* QueryExample runs sample queries for contacts.

*

*/

public class QueryExample {

// ----- QueryExample methods ---------------------------------------

public static void main(String[] args) {

NamedCache cache = CacheFactory.getCache("ContactsCache");

query(cache);

}

/**

* Perform the example queries

*

*/

public static void query(NamedCache cache) {

// Add indexes to make queries more efficient

ReflectionExtractor reflectAddrHome = new ReflectionExtractor("getHomeAddress");

// Add an index for the age

cache.addIndex(new ReflectionExtractor("getAge"), true, null);

// Add index for state within home address

cache.addIndex(new ChainedExtractor(reflectAddrHome, new ReflectionExtractor("getState")), true, null);

// Add index for state within work address

cache.addIndex(new ChainedExtractor(new ReflectionExtractor("getWorkAddress"), new ReflectionExtractor("getState")), true, null);

// Add index for city within home address

cache.addIndex(new ChainedExtractor(reflectAddrHome, new ReflectionExtractor("getCity")), true, null);

// Find all contacts who live in Massachusetts

Set setResults = cache.entrySet(new EqualsFilter("getHomeAddress.getState", "MA"));

printResults("MA Residents", setResults);

// Find all contacts who live in Massachusetts and work elsewhere

setResults = cache.entrySet(new AndFilter(new EqualsFilter("getHomeAddress.getState", "MA"), new NotEqualsFilter("getWorkAddress.getState", "MA")));

printResults("MA Residents, Work Elsewhere", setResults);

// Find all contacts whose city name begins with 'S'

setResults = cache.entrySet(new LikeFilter("getHomeAddress.getCity", "S%"));

printResults("City Begins with S", setResults);

final int nAge = 42;

// Find all contacts who are older than nAge

setResults = cache.entrySet(new GreaterFilter("getAge", nAge));

printResults("Age > " + nAge, setResults);

// Find all contacts with last name beginning with 'S' that live

// in Massachusetts. Uses both key and value in the query.

setResults = cache.entrySet(new AndFilter(new LikeFilter(new KeyExtractor("getLastName"), "S%", (char) 0, false), new EqualsFilter(

"getHomeAddress.getState", "MA")));

printResults("Last Name Begins with S and State Is MA", setResults);

}

/**

* Print results of the query

*

* @param sTitle

* the title that describes the results

* @param setResults

* a set of query results

*/

private static void printResults(String sTitle, Set setResults) {

int i = 0;

System.out.println(sTitle);

for (Iterator iter = setResults.iterator(); iter.hasNext();) {

System.out.println(iter.next());

i++;

}

System.out.println("记录总数个数是:" + i + "个。");

}
}

 

6.3.2 运行查询的示例程序

创建一个运行 QueryExample 的运行配置:

1.    确定已经运行的 Cache 服务停止后,重新运行,并运行了 6.2.5 中的价值程序 LoadExample;

2.    复制一个 chapter06CacheClientLoaderExample,重新命名为 chapter06CacheClientQueryExample

1). 在 Main 选卡,确认运行类是 com.oracle.handson.chapter06.QueryExample

2). 其它配置由于集成自 chapter06CacheClientLoaderExample,基本无需修改。

3). 确认运行了 DataGenerator/LoaderExample,然后运行这个 QueryExample。

在运行程序打印了所有查询结果后,输出类似如下

 06-figure-07

6.3.3 修改查询程序,统计查询结果

可以在查询程序的代码上增加代码,用来统计查询结果。记录的聚合器 (com.tangosol.util.InvocableMap.EntryAggregator) 用来处理返回的对象和统计结果。EntryAggregator 实例作为代理在集群内并行执行。并行的查询统计有利于增加集群的成员个数。

有两种方法进行统计:统计集合的键,或使用特定的 filter。下例使用 EntryAggregator 方法来处理这一任务。

Object aggregate(Collection keys, InvocableMap.entryAggregator agg)
Object aggregate(Filter filter, InvocableMap.entryAggregator agg)

 

为 filter 增加返回数据的统计:

1.    在 QueryExample 增加计算

• 年龄大于某值的员工人数。使用 GreaterFilter 和 Count 类:

cache.aggregate(new GreaterFilter("getAge", nAge), new Count())

 

• 年纪最小的员工

cache.aggregate(AlwaysFilter.INSTANCE, new LongMin("getAge"))

 

• 年纪最小的员工,使用 AlwaysFilter 和 LongMax 类:

cache.aggregate(AlwaysFilter.INSTANCE, new LongMax("getAge"))

 

• 员工平均年龄。使用 AlwaysFilter 和 DoubleAverage 类:

cache.aggregate(AlwaysFilter.INSTANCE, new DoubleAverage("getAge")

 

2.    Import Count、DoubleAverage、LongMax 和 LongMin aggregator 类。

import com.tangosol.util.aggregator.Count;

import com.tangosol.util.aggregator.DoubleAverage;

import com.tangosol.util.aggregator.LongMax;
import com.tangosol.util.aggregator.LongMin;

 

新的 QueryExample02.java 程序如下:

package com.oracle.handson.chapter06;

import com.tangosol.net.CacheFactory;

import com.tangosol.net.NamedCache;

import com.tangosol.util.aggregator.Count;

import com.tangosol.util.aggregator.DoubleAverage;

import com.tangosol.util.aggregator.LongMax;

import com.tangosol.util.aggregator.LongMin;

import com.tangosol.util.extractor.ChainedExtractor;

import com.tangosol.util.extractor.KeyExtractor;

import com.tangosol.util.extractor.ReflectionExtractor;

import com.tangosol.util.filter.AlwaysFilter;

import com.tangosol.util.filter.AndFilter;

import com.tangosol.util.filter.EqualsFilter;

import com.tangosol.util.filter.GreaterFilter;

import com.tangosol.util.filter.LikeFilter;

import com.tangosol.util.filter.NotEqualsFilter;

import java.util.Iterator;

import java.util.Set;

/**

* QueryExample runs sample queries for contacts.

*/

public class QueryExample02 {

// ----- QueryExample methods ---------------------------------------

public static void main(String[] args) {

NamedCache cache = CacheFactory.getCache("ContactsCache");

query(cache);

}

/**

* Perform the example queries

*

*/

public static void query(NamedCache cache) {

// Add indexes to make queries more efficient

ReflectionExtractor reflectAddrHome = new ReflectionExtractor("getHomeAddress");

cache.addIndex(new ReflectionExtractor("getAge"), true, null);

cache.addIndex(new ChainedExtractor(reflectAddrHome, new ReflectionExtractor("getState")), true, null);

cache.addIndex(new ChainedExtractor(new ReflectionExtractor("getWorkAddress"), new ReflectionExtractor("getState")), true, null);

cache.addIndex(new ChainedExtractor(reflectAddrHome, new ReflectionExtractor("getCity")), true, null);

// Find all contacts who live in Massachusetts

Set setResults = cache.entrySet(new EqualsFilter("getHomeAddress.getState", "MA"));

printResults("MA Residents", setResults);

// Find all contacts who live in Massachusetts and work elsewhere

setResults = cache.entrySet(new AndFilter(new EqualsFilter("getHomeAddress.getState", "MA"), new NotEqualsFilter("getWorkAddress.getState", "MA")));

printResults("MA Residents, Work Elsewhere", setResults);

// Find all contacts whose city name begins with 'S'

setResults = cache.entrySet(new LikeFilter("getHomeAddress.getCity", "S%"));

printResults("City Begins with S", setResults);

final int nAge = 42;

// Find all contacts who are older than nAge

setResults = cache.entrySet(new GreaterFilter("getAge", nAge));

printResults("Age > " + nAge, setResults);

// Find all contacts with last name beginning with 'S' that live

// in Massachusetts. Uses both key and value in the query.

setResults = cache.entrySet(new AndFilter(new LikeFilter(new KeyExtractor("getLastName"), "S%", (char) 0, false), new EqualsFilter(

"getHomeAddress.getState", "MA")));
 printResults("Last Name Begins with S and State Is MA", setResults);

// Count contacts who are older than nAge

System.out.println("count > " + nAge + ": " + cache.aggregate(new GreaterFilter("getAge", nAge), new Count()));

// Find minimum age

System.out.println("min age: " + cache.aggregate(AlwaysFilter.INSTANCE, new LongMin("getAge")));

// Calculate average age

System.out.println("avg age: " + cache.aggregate(AlwaysFilter.INSTANCE, new DoubleAverage("getAge")));

// Find maximum age

System.out.println("max age: " + cache.aggregate(AlwaysFilter.INSTANCE, new LongMax("getAge")));

}

/**

* Print results of the query

*

*/

private static void printResults(String sTitle, Set setResults) {

System.out.println(sTitle);

for (Iterator iter = setResults.iterator(); iter.hasNext();) {

System.out.println(iter.next());

}

}
}

 

6.3.4 运行统计程序

 

1. 复制产生一个新的 chapter06CacheClientQueryExample02,主程序是 QueryExample02

2. 确保 DataGenerator、LoaderExample 和 chapter06CacheClientQueryExample02applications。输出类似如下:

06-figure-08

6.4 使用表达式来查询统计

 

比如下例,创建一个家在 MA,工作不在 MA 的 filter

...
QueryHelper.createFilter("homeAddress.state = 'MA' and workAddress.state !='MA'")
...

 

这条语句简单、易读,等同于使用 Coherence API

new AndFilter(new EqualsFilter("getHomeAddress.getState", "MA"),
      new NotEqualsFilter("getWorkAddress.getState", "MA"))

 

更多 QueryHelper API 里使用 Where 字句的内容见文档。

6.4.1 简单查询示例

 

类似上节里进行简单 indexes,统计 QueryExample。

1. Import QueryHelper API

import static com.tangosol.util.QueryHelper.*;

 

2. 注意 Import ChainedExtractor、KeyExtractor 和 ReflectionExtractor classes。

3. 注意 imports AlwaysFilter、AndFilter、EqualsFilter、GreaterFilter、LikeFilter 和 NotEqualsFilter 等类

4. 在 cache.addIndex 语句中,用 createExtractor 执行 QueryHelper API 来替换实例化 ReflectionExtractor。

类似上节里进行简单 indexes,统计 QueryExample。

被替代的 ReflectionExtractor 同等效果的 createExtractor
cache.addIndex(new ReflectionExtractor("getAge"), true, null); cache.addIndex(createExtractor("age"), true, null);
cache.addIndex(new ChainedExtractor(reflectAddrHome, new ReflectionExtractor("getState")), true, null); cache.addIndex(createExtractor("homeAddress.state"), false, null);
cache.addIndex(new ChainedExtractor(new ReflectionExtractor("getWorkAddress"), new ReflectionExtractor("getState")), true, null); cache.addIndex(createExtractor("workAddress.state"),false, null);
cache.addIndex(new ChainedExtractor(reflectAddrHome, new ReflectionExtractor("getCity")), true, null); cache.addIndex(createExtractor("homeAddress.city"), true, null);

 

5. 在 setResults 时,调用 createFilter 时使用合适的 Coherence Query Language,替换 *Fielter 方法来获得结果集合

 被替换的 *Fielter方式 同等效果的 createFilter
 Set setResults = cache.entrySet(new EqualsFilter("getHomeAddress.getState", "MA"));  Set setResults = cache.entrySet(createFilter("homeAddress.state = 'MA'"));
 Set setResults = cache.entrySet(new AndFilter(new EqualsFilter("getHomeAddress.getState", "MA"), new NotEqualsFilter("getWorkAddress.getState", "MA")));  setResults = cache.entrySet( createFilter("homeAddress.state is 'MA' and workAddress is not 'MA'"));
 Set setResults = cache.entrySet(new LikeFilter("getHomeAddress.getCity", "S%"));  Set setResults = cache.entrySet(createFilter("homeAddress.city like 'S%'"));
 Set setResults = cache.entrySet(new GreaterFilter("getAge", nAge));
 

final int nAge = 42;

Object[] aEnv = new Object[] {new Integer(nAge)};

...
Set setResults = cache.entrySet(createFilter("age > ?1",aEnv));
 Set setResults = cache.entrySet(new AndFilter(new LikeFilter(new KeyExtractor("getLastName"), "S%", (char) 0, false), new EqualsFilter("getHomeAddress.getState", "MA")));  Set setResults = cache.entrySet(createFilter("key(lastName) like 'S%' and homeAddress.state = 'MA'"));

 

6. 在 aggregate 时,通过 createFilter 调用 Coherence Query Language 替代 *Filter

 被替换的 *Fielter方式  同等效果的 createFilter
 System.out.println("count > " + nAge + ": "+ cache.aggregate(new GreaterFilter("getAge", nAge), new Count()));  System.out.println("count > " + nAge + ": " + cache.aggregate(createFilter("age > ?1", aEnv), new Count()));
 System.out.println("min age: " + cache.aggregate(AlwaysFilter.INSTANCE, new LongMin("getAge")));  Filter always = createFilter("true");
System.out.println("min age: " + cache.aggregate(always, new LongMin("getAge")));
 System.out.println("avg age: " + cache.aggregate(AlwaysFilter.INSTANCE, new DoubleAverage("getAge")));  System.out.println("avg age: " + cache.aggregate(always, new DoubleAverage("getAge")));
 System.out.println("max age: " + cache.aggregate(AlwaysFilter.INSTANCE, new LongMax("getAge")));  System.out.println("max age: " + cache.aggregate(always, new LongMax("getAge")));

 

完成替换之后的 QueryExample,改名称为 QueryExample03

package com.oracle.handson.chapter06;

import com.tangosol.net.CacheFactory;

import com.tangosol.net.NamedCache;

import com.tangosol.util.Filter;

import static com.tangosol.util.QueryHelper.*;

import com.tangosol.util.aggregator.Count;

// import com.tangosol.util.extractor.ChainedExtractor;

// import com.tangosol.util.extractor.KeyExtractor;

// import com.tangosol.util.extractor.ReflectionExtractor;

// import com.tangosol.util.aggregator.Count;

import com.tangosol.util.aggregator.DoubleAverage;

import com.tangosol.util.aggregator.LongMax;

import com.tangosol.util.aggregator.LongMin;

// import com.tangosol.util.filter.AlwaysFilter;

// import com.tangosol.util.filter.AndFilter;

// import com.tangosol.util.filter.EqualsFilter;

// import com.tangosol.util.filter.GreaterFilter;

// import com.tangosol.util.filter.LikeFilter;

// import com.tangosol.util.filter.NotEqualsFilter;

import java.util.Iterator;

import java.util.Set;

/**

* QueryExample runs sample queries for contacts.

*

*/

public class QueryExample03 {

// ----- QueryExample methods ---------------------------------------

public static void main(String[] args) {

NamedCache cache = CacheFactory.getCache("ContactsCache");

query(cache);

}

/**

* Perform the example queries

*

*/

public static void query(NamedCache cache) {

// Add indexes to make queries more efficient

// ReflectionExtractor reflectAddrHome =

// new ReflectionExtractor("getHomeAddress");

// Add an index for the age

// cache.addIndex(new ReflectionExtractor("getAge"), true, null);

cache.addIndex(createExtractor("age"), true, null);

// Add index for state within home address

// cache.addIndex(new ChainedExtractor(reflectAddrHome,

// new ReflectionExtractor("getState")), true, null);

cache.addIndex(createExtractor("homeAddress.state"), false, null);

// Add index for state within work address

// cache.addIndex(new ChainedExtractor(

// new ReflectionExtractor("getWorkAddress"),

// new ReflectionExtractor("getState")), true, null);

cache.addIndex(createExtractor("workAddress.state"), false, null);

// Add index for city within home address

// cache.addIndex(new ChainedExtractor(reflectAddrHome,

// new ReflectionExtractor("getCity")), true, null);

cache.addIndex(createExtractor("homeAddress.city"), true, null);

// Find all contacts who live in Massachusetts

// Set setResults = cache.entrySet(new EqualsFilter(

// "getHomeAddress.getState", "MA"));

Set setResults = cache.entrySet(createFilter("homeAddress.state = 'MA'"));

printResults("MA Residents", setResults);

// Find all contacts who live in Massachusetts and work elsewhere

// setResults = cache.entrySet(new AndFilter(

// new EqualsFilter("getHomeAddress.getState", "MA"),

// new NotEqualsFilter("getWorkAddress.getState", "MA")));

setResults = cache.entrySet(createFilter("homeAddress.state is 'MA' and workAddress is not 'MA'"));

printResults("MA Residents, Work Elsewhere", setResults);

// Find all contacts whose city name begins with 'S'

// setResults = cache.entrySet(new LikeFilter("getHomeAddress.getCity",

// "S%"));

setResults = cache.entrySet(createFilter("homeAddress.city like 'S%'"));

printResults("City Begins with S", setResults);

final int nAge = 42;

Object[] aEnv = new Object[] { new Integer(nAge) };

// Find all contacts who are older than nAge

// setResults = cache.entrySet(new GreaterFilter("getAge", nAge));

setResults = cache.entrySet(createFilter("age > ?1", aEnv));

printResults("Age > " + nAge, setResults);

// Find all contacts with last name beginning with 'S' that live

// in Massachusetts. Uses both key and value in the query.

// setResults = cache.entrySet(new AndFilter(

// new LikeFilter(new KeyExtractor("getLastName"), "S%",

// (char) 0, false),

// new EqualsFilter("getHomeAddress.getState", "MA")));

setResults = cache.entrySet(createFilter("key(lastName) like 'S%' and homeAddress.state = 'MA'"));

setResults = cache.entrySet(createFilter("key().lastName like 'S%' and homeAddress.state = 'MA'"));

printResults("Last Name Begins with S and State Is MA", setResults);

// Count contacts who are older than nAge

// System.out.println("count > " + nAge + ": "+

// cache.aggregate(new GreaterFilter("getAge", nAge), new Count()));

System.out.println("count > " + nAge + ": " + cache.aggregate(createFilter("age > ?1", aEnv), new Count()));

// Find minimum age

// System.out.println("min age: " +

// cache.aggregate(AlwaysFilter.INSTANCE, new LongMin("getAge")));

Filter always = createFilter("true");

System.out.println("min age: " + cache.aggregate(always, new LongMin("getAge")));

// Calculate average age

// System.out.println("avg age: " +

// cache.aggregate(AlwaysFilter.INSTANCE, new DoubleAverage("getAge")));

System.out.println("avg age: " + cache.aggregate(always, new DoubleAverage("getAge")));

// Find maximum age

// System.out.println("max age: " +

// cache.aggregate(AlwaysFilter.INSTANCE, new LongMax("getAge")));

System.out.println("max age: " + cache.aggregate(always, new LongMax("getAge")));

System.out.println("------QueryLanguageExample completed------");

}

/**

* Print results of the query

*

* @param sTitle

* the title that describes the results

* @param setResults

* a set of query results

*/

private static void printResults(String sTitle, Set setResults) {

System.out.println(sTitle);

for (Iterator iter = setResults.iterator(); iter.hasNext();) {

System.out.println(iter.next());

}

}
}

 

6.4.2 运行这个示例程序

 

1. 停止所有服务

2. 重新运行 chapter06CacheServer

3. 运行 chapter06CacheClientLoaderExample

4. 复制 chapter06CacheClientQueryExample02,将其更名为 chapter06CacheClientQueryExample03,Main 类指定为 com.oracle.handson.chapter06.QueryExample03

5. 运行 chapter06CacheClientQueryExample03

输出结果类似如下图

 06-figure-09

关于作者

 niehao

金辉,在 Oracle 负责云和中间件产品的资深售前顾问和团队经理,有十多年的中间件和项目管理经验,专注在企业云构建,企业集成解决方案领域,熟悉业内主要的 SOA 集成产品。参加过北京马拉松和 TNF50 越野比赛。你可以通过邮件 arthur.jin@oracle.com 与他联系。