Working with Topics using Apache Kafka Native APIs in Oracle Event Hub Cloud Service

 

Before You Begin

Purpose

In this tutorial, you learn to create an Oracle Event Hub Cloud Service - Topic instance and then produce to and consume from that Topic.

Time to Complete

Approximately 40 minutes.

Background

Oracle Event Hub Cloud Service provides a highly available and scalable messaging platform for loading and analyzing data. Users can spin up multiple clusters and create topics. Oracle Event Hub Cloud Service - Topic helps to produce to and consume from streams of data like a messaging system. It also helps store streams of data in a distributed, and scalable cluster.

What Do You Need?

  1. A running Oracle Event Hub Cloud Service - Platform cluster.
  2. Oracle Event Hub Cloud Service account login credentials
  3. Java 8 or above installed.
  4. Maven 3 or above installed.
  5. Eclipse Integrated Development Environment (IDE) that supports Maven installed.
 

Creating a Topic

Perform the following steps to create an Oracle Event Hub Cloud Service - Topic instance. You can skip this section, if you already have an Oracle Event Hub Cloud Service - Topic instance and plan to use that for this demo.

  1. Log in to your Oracle Event Hub Cloud Service - Topic account.

  2. In the Services page, click Create Service.

  3. The Create Service screen appears. Provide the following details and click Next.

    • Service Name: topicdemo
    • Service Description: Example to demo topic
    • Hosted On: platformdemo
    • Number of Partitions: 2
    • Retention Period (Hours): 24
    Service page of Create Service wizard
    Description of this image
    Note: The platformdemo is the name of the Oracle Event Hub Cloud Service - Platform cluster in which the topic will be created. You can provide a different name if you want to host this in a different Oracle Event Hub Cloud - Platfrom cluster.
  4. In the Confirm page, if you find the details appropriate, click Create.

    Confirmation page of Create Service wizard
    Description of this image
  5. The control returns to the Services page. In the Services page, you could now see the new topicdemo service listed.

    Services page
    Description of this image
  6. Click on the Event Hub icon adjacent to the topicdemo instance to go to the Service Overview page.

  7. In the Service Overview page, observe the Topic field. This is the name of the Topic service that will be used in programs demonstrated in this tutorial.

    Service Overview page
    Description of this image
 

Providing access to the cluster

The Oracle Event Hub Cloud Service - Topic instance that was created in the previous step is created in the Oracle Event Hub Cloud Service - Platform cluster which you selected while creating the Topic instance.

Note: By default the native access to Kafka is blocked due to security reasons. For the purpose of this tutorial, we will open it up to public IP. However, when setting up for development, test, or production, do restrict the native access to specific IP Addresses.

Perform the following steps to provide access rights to the users to access the Oracle Event Hub Cloud Service - Platform cluster.

  1. Go to your Oracle Event Hub Cloud Service - Platform account.

    Services page
    Description of this image
  2. In the Services page, click the icon in the platformdemo cluster, and then click the Access Rule item.

    Note: This is the cluster that you selected while creating the topicdemo instance in the previous section. Select the same cluster that you gave in the previous section.
    Services page
    Description of this image
  3. The Access Rules page is displayed. Click Create Rule.

    Access Rules page
    Description of this image
  4. The Create Access Rule screen is displayed. Provide the following details and click Create.

    • Rule Name: eventhub_public_access
    • Description: This rule provide public access to the cluster.
    • Source: PUBLIC_INTERNET
    • Destination: kafka_KAFKA_SERVER
    • Destination Ports: 6667
    • Protocol: TCP
    Create Access Rule page
    Description of this image
    Note: This configuration is for public access. You can provide access to specific machines by providing their address in the source field, or by using custom rules.
  5. After you return to the Services page, click on platformdemo cluster to go the Service Overview page.

  6. In the Service Overview page, expand Kafka option. Notice the Connect Descriptor element. This is the host name that we will use while connecting to the platformdemo service.

    Service Overview page
    Description of this image
 

Setting Up the Project

Perform the following steps to create a Maven project in Eclipse IDE.

  1. Open Eclipse. From the File menu, expand New and then select Other.

    Eclipse File menu
    Description of this image
  2. The Select a Wizard page appears. Select Maven Project and click Next.

    New Project wizard
    Description of this image
  3. The New Maven Project page appears. Select Create a simple project (skip archetype selection) item. Click Next.

    New Maven Project wizard
    Description of this image
  4. The Artifact section appears. Provide the following details and click Finish.

    • Group Id: com.oracle
    • Artifact Id: oehcsproject
    • Version: 0.0.1-SNAPSHOT
    • Packaging: jar
    • Name: oehcs-demo
    Configure project page of New Maven Project wizard
    Description of this image
  5. The oehcsproject opens up. Observe the structure of the project.

    package Explorer containing oehcsproject
    Description of this image
  6. Open pom.xml file. Observe the <Project> tag and its elements.

    <project xmlns="http://maven.apache.org/POM/4.0.0" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
    http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.oracle</groupId>
    <artifactId>oehcsproject</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>oehcs-demo</name>
    </project>
    pom.xml file
    Description of this image
  7. Add the following code inside the <Project> tag.

    <properties>
    <slf4j.version>1.7.10</slf4j.version>
    <zookeeper.version>0.8</zookeeper.version>
    <kafkaapi.version>0.9.0.1</kafkaapi.version>
    <junit.version>4.11</junit.version>
    <json-simple.version>1.1.1</json-simple.version>
    </properties>

    <dependencies>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>${slf4j.version}</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>${slf4j.version}</version>
    </dependency>
    <dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>${zookeeper.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>${kafkaapi.version}</version>
    </dependency>
    <dependency>
    <groupId>com.googlecode.json-simple</groupId>
    <artifactId>json-simple</artifactId>
    <version>${json-simple.version}</version>
    </dependency>
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>${junit.version}</version>
    <scope>test</scope>
    </dependency>
    </dependencies>
    pom.xml file
    Description of this image
 

Creating the Producer program

Perform the following steps to create the OEHCSProducer.java program.

  1. In the oehcsproject, right click the src/main/java package and select New within which select class.

    Package Explorer with oehcsproject
    Description of this image
  2. The New Java Class creation wizard appears. Provide the Name as OEHCSProducer and click Finish.

    Create a new Java class wizard
    Description of this image
  3. The OEHCSProducer.java file will open up. Add the following import statements.

    import java.util.Properties;
    import java.util.Scanner;

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
  4. Enter the following code inside the OEHCSProducer class.

    public static void main(String[] args) {
    String topicName = "bigdata001-topicdemo";
    String bootstrapServer = "198.168.1.10:6667";

    System.out.println("Starting producer for topic: " + topicName);
    Properties props = new Properties();
    props.put("bootstrap.servers", bootstrapServer);
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    Producer producer = new KafkaProducer(props);

    System.out.println("Input the contents and then press enter to send to the Topic. ");
    Scanner input = new Scanner(System.in);
    while (input.hasNext()) {
    producer.send(new ProducerRecord(topicName, input.nextLine().toString()));
    }

    input.close();
    producer.close();
    }
    OEHCSProducer.java file
    Description of this image
 

Creating the Consumer program

Perform the following steps to create the OEHCSConsumer.java program.

  1. In the oehcsproject, right click the src/main/java package and select New within which select class.

    Package Explorer with oehcsproject
    Description of this image
  2. The New Java Class creation wizard appears. Provide the Name as OEHCSConsumer and click Finish.

    Create a new Java class wizard
    Description of this image
  3. The OEHCSConsumer.java file will open up. Add the following import statements.

    import java.util.Arrays;
    import java.util.Properties;

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
  4. Enter the following code inside the OEHCSConsumer class.

    public static void main(String[] args) {
    String topicName = "bigdata001-topicdemo";
    String bootstrapServer = "198.168.1.10:6667";
    String group = "test_group";

    System.out.println("Starting consumer for topic: " + topicName);
    Properties props = new Properties();
    props.put("bootstrap.servers", bootstrapServer);
    props.put("group.id", group);
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer consumer = new KafkaConsumer(props);
    consumer.subscribe(Arrays.asList(topicName));
    System.out.println("Subscribed to topic: " + consumer.subscription());

    while (true) {
    ConsumerRecords records = consumer.poll(1000);
    for (ConsumerRecord record : records) {
    System.out.print("offset = " + record.offset());
    System.out.println(", value = " + record.value());
    }
    }
    }
    OEHCSConsumer.java file
    Description of this image
 

Running the Producer and Consumer Programs

Perform the following steps to run the OEHCSConsumer and OEHCSProducer programs.

  1. Open OEHCSConsumer.java program. Right click on the page and select run as, within which select Java Application.

  2. Observe the Console. The following output is displayed.

    Starting consumer for topic: bigdata001-topicdemo
    Subscribed to topic: [bigdata001-topicdemo]
    OEHCSConsumer console
    Description of this image
  3. Open OEHCSProducer.java program. Right click on the page and select run as, within which select Java Application.

  4. Observe the Console. The following output is displayed.

    Starting producer for topic: bigdata001-topicdemo
    Input the contents and then press enter to send to the Topic.
    OEHCSProducer console
    Description of this image
  5. Go to OEHCSProducer console and provide the following data and press Enter.

    Hello World
    This program demonstrates Oracle Event Hub
    OEHCSProducer console
    Description of this image
  6. Go to OEHCSConsumer console and Observe. You could see that the contents that you entered in the OEHCSProducer console is displayed in the OEHCSConsumer console.

    offset = 1, value = Hello World
    offset = 2, value = This program demonstrates Oracle Event Hub
    OEHCSConsumer console
    Description of this image
 

Want to Learn More?