With Apache Kafka

Introduction

Ampool ADS is a next generation memory centric data platform supporting efficient storage and retrieval of data for both Analytics and OLTP applications. The "ampool-connect-kafka" is a Kafka sink Connector for loading data from kafka topics to Ampool ADS tables based on the "kafka-connect" framework.

It supports both MTable and FTable table types. Tables in ampool have to be pre-created manually for ampool-connect-kafka to populate the data.

About Apache Kafka

  • Kafka provides a flexible, scalable, and reliable method to communicate streams of event data from one or more producers to one or more consumers.
  • Kafka Connect is a framework for scalably and reliably connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems.
  • More details refer, https://kafka.apache.org/ and https://kafka.apache.org/documentation/#connect

How to Use

Untar the package ampool-connect-kafka-.tar.gz and you should have the following:

ampool-connect-kafka-<version>
├── dependencies
│   ├── ampool-client-dependencies-<version>.jar
│   ├── ampool-core-<version>.jar
│   ├── fastutil-7.0.9.jar
│   ├── javax.transaction-api-1.2.jar
│   ├── log4j-api-2.5.jar
│   └── log4j-core-2.5.jar
├── lib
│   └── ampool-connect-kafka-<version>.jar
└── README.md

The lib directory contains the jar file for this connector and dependencies directory contains all dependent libraries

Pre-requisites

  • Kafka 0.10.0.1/confluent-oss-3.2.0-2.11
  • Ampool 1.3.0 or higher
  • JDK 1.8

Note

Confluent (https://www.confluent.io) supports schema-registry and Kafka connect framework in their 3.0.0 or higher releases. For other distributions, it should be installed separately.

Integration with Cloudera's Hadoop Distribution

  • Add a kafka service, using a latest parcel. Recommended version is KAFKA-2.1.1-1.2.1.1.p0.18-el5.parcel.
  • We recommend to use confluent-oss-3.2.0-2.11 to externally start schema-registry and launch connector.
  • Ampool 1.3.0 or latest.

Schema Type (kafka) to Ampool type mapping.

Available in version 1.4.0 and later.

Kafka type Ampool type
BOOLEAN BasicTypes.BOOLEAN
FLOAT32 BasicTypes.FLOAT
FLOAT64 BasicTypes.DOUBLE
INT8 BasicTypes.BYTE
INT16 BasicTypes.SHORT
INT32 BasicTypes.INT
INT64 BasicTypes.LONG
STRING BasicTypes.STRING
BYTES BasicTypes.BINARY

Available in version 1.5.1 and later.

Kafka type Ampool type
Date BasicTypes.Date
Decimal or BigDecimal BasicTypes.BigDecimal
Time BasicTypes.Timestamp
Timestamp BasicTypes.Timestamp

Sink connector properties

Name Description Default
name Unique name for the connector. Attempting to register again with the same name will fail.
connector.class The Java class for the sink connector. io.ampool.kafka.connect.AmpoolSinkConnector
tasks.max The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism.
locator.host The host name or IP address to which the Ampool locator is bound.
locator.port Ampool locator service port
batch.size Number of records to be inserted in single put/append operation while writing records from topic to ampool tables. Default value of 1000 is recommended (For the usecase having total 10 columns each having a regular size). User may adjust it according to number of columns and value of each column. 1000
ampool.tables Comma seperated list that represents a ampool table names. Example: table1,table2,table3. Total number of table names has to match with total number of topic names specified by topics. Table needs to exist, so user has to create all table manually in ampool before ingesting data into topics.
topics Comma seperated list that represents a kafka topic names. Example: topic1,topic2,topic3. Tables and topics are mapped according to their respective order in a comma seperated list, so with the example given, topic to table mapping is [topic1 -> table1, topic2 -> table2 and topic3 -> table3]
ampool.tables.rowkey.column Rowkey to be used for a MTable only. Useful to map a primary-key as a rowkey. Needs to specify in a TOPICNAME:ROWKEY format. If not specified, unique rowkey is generated as a combination of topic partiton offset, each element separated by pipe characher. none

Sample Sink connector properties (sink-connector.properties)

name=ampool-sink-connector
connector.class=io.ampool.kafka.connect.AmpoolSinkConnector
tasks.max=1
locator.host=localhost
locator.port=10334
batch.size=10
ampool.tables=test
topics=test
ampool.tables.rowkey.column=test:id
  • Each kafka topic is mapped to an Ampool table. Ampool Table is expected to be pre-existing with the appropriate configuration (#buckets, #replicas etc).

Setting up the Environment (Standalone mode)

  1. Setup a single-node(localhost) ampool cluster containing one locator and one server and create a table.

    mash>create table --type=UNORDERED --name=test --schema-json="{"id":"INT","name":"STRING"}"
    
  2. Download and install confluent-oss-3.2.0-2.11. export confluent base/home dir as CONFLUENT_HOME.

  3. Extract ampool-connect-kafka package, e.g: tar -xvzf ampool-connect-kafka-.tar.gz Let's call the base dir ampool-connect-kafka- as AMPOOL_CONNECT_KAFKA_HOME Create a sample configuration file in dir AMPOOL_CONNECT_KAFKA_HOME as shown above and name it as ampool-sink.properties.

  4. Create $CONFLUENT_HOME/share/java/ampool-connect-kafka dir and copy the following connector files as shown bellow.

    1
    2
    3
    4
    5
    6
    #!/bin/bash
    cd $CONFLUENT_HOME
    mkdir -p $CONFLUENT_HOME/share/java/ampool-connect-kafka
    cp $AMPOOL_CONNECT_KAFKA_HOME/lib/ampool-connect-kafka-<version>.jar $CONFLUENT_HOME/share/java/ampool-connect- kafka
    mkdir -p $CONFLUENT_HOME/etc/ampool-connect-kafka
    cp $AMPOOL_CONNECT_KAFKA_HOME/ampool-sink.properties $CONFLUENT_HOME/etc/ampool-connect-kafka/
    
  5. Start Zookeeper, Kafka server and Schema registry

    ./bin/zookeeper-server-start etc/kafka/zookeeper.properties
    ./bin/kafka-server-start etc/kafka/server.properties
    ./bin/schema-registry-start etc/schema-registry/schema-registry.properties
    
  6. Start ampool-sink-connector (This will get launched with a worker node). Set the CLASSPATH to add required dependencies.

    export CLASSPATH="$CONFLUENT_HOME/share/java/ampool-connect-kafka/ampool-connect-kafka-<version>.jar:$AMPOOL_CONNECT_KAFKA_HOME/dependencies/ampool-client-dependencies-<version>.jar:$AMPOOL_CONNECT_KAFKA_HOME/dependencies/ampool-core-<version>.jar"
    ./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/ampool-connect-kafka/ampool-sink.properties
    
  7. Ingest a sample data using a avro console, start the console to create the topic and write values

    1
    2
    3
    4
    5
    6
    7
    8
    #!/bin/bash
    $> $CONFLUENT_HOME/bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test --property     value.schema='{"type":"record","name":"record","fields":[{"name":"id","type":"int"}, {"name":"name", "type": "string"}]}'
    #ingest records in a kafka topic
    {"id": 1, "name": "Miss Havisham"}
    {"id": 2, "name": "Joe Gargery"}
    {"id": 3, "name": "Anna Howe"}
    {"id": 4, "name": "Elizabeth Bennett"}
    {"id": 5, "name": "Julien Sorel"}
    
  8. Do a table scan to verify records

    mash>tscan --table=/test
    

Note

In case, the user has a data available in a JSON file, (s)he can directly ingest using following command:
$> $CONFLUENT_HOME/bin/kafka-avro-console-producer --broker-list localhost:9092 --property value.schema='{"type":"record","name":"record","fields":[{"name":"id","type":"int"}, {"name":"name", "type":"string"}]}' --topic test < ./data.json

Now, the data ends up in the ampool ADS, ready for further in-memory processing using ampool capabilities.

Guideline for Setting up the Environment (Distributed mode)

  1. Configure and Start Apache ZooKeeper. Configure the etc/kafka/zookeeper.properties as per your setup. Following are the configuration properties that user may want to check before starting a zookeeper.

    dataDir - It should point to a directory where you want ZooKeeper to save its data
    clientPort - Defaults to 2181.
    

    Start a ZooKeeper sever:

    ./bin/zookeeper-server-start etc/kafka/zookeeper.properties
    
  2. Setup Apache Kafka brokers on different nodes. On each node configure the etc/kafka/server.properties as per the setup. Following are the configuration properties that user may want to check before starting a zookeeper:

    broker.id - Id of the broker i.e. an integer. Each broker in a cluster needs to have a unique id.
    log.dirs  - Directory where you want Kafka to commit its message. Not to be confused it with usual log files.
    port - Port on which Kafka will accept connections from producers and consumers
    zookeeper.connect - Comma separate list of ZooKeeper nodes. E.g. hostname1:port1,hostname2:port2
    

    Start kafka broker on each node:

    ./bin/kafka-server-start etc/kafka/server.properties
    
  3. Configure and start Schema registry. Configure etc/schema-registry/schema-registry.properties as per your setup. User may need to check following configuration.

    kafkastore.connection.url Zookeeper url for the Kafka cluster
    

    Start schema registry.

    ./bin/schema-registry-start etc/schema-registry/schema-registry.properties
    
  4. Configure and Launch ampool sink connector. Configure etc/schema-registry/connect-avro-standalone.properties as per your setup. User may need to check following configuration.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    #!/bin/bash
    # Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
    bootstrap.servers=quickstart.cloudera:9092
    
    # The converters specify the format of data in Kafka and how to translate it into Connect data.
    # Every Connect user will need to configure these based on the format they want their data in
    # when loaded from or stored into Kafka
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://<SCHEMA_REGISTRY_HOST>:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://<SCHEMA_REGISTRY_HOST>:8081
    

    Launch the ampool sink-connector

    1
    2
    3
    #!/bin/bash
    export CLASSPATH="$CONFLUENT_HOME/share/java/ampool-connect-kafka/ampool-connect-kafka-<version>.jar:$AMPOOL_CONNECT_KAFKA_HOME/dependencies/ampool-client-dependencies-<version>.jar:$AMPOOL_CONNECT_KAFKA_HOME/dependencies/ampool-core-<version>.jar"
    ./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/ampool-connect-kafka/ampool-sink.properties