Change Data Capture

Introduction

The change data capture is a way to determine and track the changes made to the data. Ampool mutable tables allow you to insert/update/delete the data. At the same time Ampool also allows you to track these changes and propagate, if required, to any external data source. This can be done via a write-behind listener that can process the all changes made to the Ampool tables. The write-behind listener works asynchronously and allows you to batch a fixed number of changes and then process these together.

Ampool provides an interface MAsyncEventListener that must be implemented and configured for a table. This listener will get invoked asynchronously, based on the configuration, for any changes made to the table. These changes, then, can be processed as required. Following changes are propagated to the listener: Create a new row Update an existing row * Delete an existing row

How to use

Following things are required in order to use change data capture with Ampool tables. Implement an interface MAsyncEventListener (using Java language) and create a jar file with the required classes. Dependent classes should not be included in the jar file as these are already available. The target jar file should be made available on all members. It can be done either copying it manually or using MASH command deploy. * The target jar file path(s) should be specified using --classpath option when starting the Ampool servers.

Above steps will make sure that your custom implementation of write-behind listener is available with all the running Ampool servers and it is ready to be invoked when required.

Once the Ampool servers are running as expected, you can create a table and configure it to use the custom listener by specifying the fully qualified class name.

Sample Listener

Following is an example of a write-behind listener:

package com.example.ampool;

import java.io.IOException;
import java.util.List;

import io.ampool.monarch.table.*;
import org.apache.geode.internal.logging.LogService;
import org.apache.logging.log4j.Logger;

public class MyWriteBehindListener implements MAsyncEventListener {
  private static final Logger logger = LogService.getLogger();

  public MyWriteBehindListener() throws IOException {
    // Initialize the listener and make connection to the external data source, if required
  }

  @Override
  public boolean processEvents(List<CDCEvent> events) {
    try {
      for (CDCEvent event : events) {
        final MEventSequenceID sequenceId = event.getEventSequenceID();
        final MEventOperation operation = event.getOperation();
        final Row row = event.getRow();

        logger.debug("Processing CDCEvent: sequenceId= {}, operation= {}, row= {}", sequenceId, operation, row);

        // process the events based on the type of the operation.
        switch (operation) {
          case CREATE:
            // handle newly created rows
            break;
          case UPDATE:
            // handle updates to the existing rows
            break;
          case DELETE:
            // handle the deletions
            break;
        }
      }
    } catch (Exception e) {
      // Log the error and indicate the failure
      logger.error("Error while processing events.", e);
      return false;
    }
    // indicate that the events were processed successfully
    return true;
  }

  @Override
  public void close() {
    // the cleanup to be done, if any
  }
}

Creating Table with Listener

The listener details can be provided in the table descriptor that is eventually used to create the table as shown below.

    MTableDescriptor td = new MTableDescriptor(MTableType.ORDERED_VERSIONED);
    Schema schema = new Schema.Builder()
    .column("c_1", BasicTypes.STRING)
    .column("c_2", BasicTypes.INT)
    .column("c_3", BasicTypes.LONG)
    .build();
    td.setSchema(schema);

    CDCConfig config = td.createCDCConfig();
    td.addCDCStream("MyListener", "com.example.ampool.MyWriteBehindListener", config);

    //// create the Ampool client with required details
    AmpoolClient aClient = new AmpoolClient();
    MAdmin admin = aClient.getAdmin();
    MTable table = admin.createMTable("my_table", td);

Configuration Attributes

Following attributes (CDCConfig) can be provided for the listener when creating the table. These attributes could be different for different tables using the same write-behind listener:

Attribute Description Default Value
DiskStoreName The disk-store to be used for persistence and must be already created -
MaximumQueueMemory The maximum amount of memory (in MB) 100 MB
DiskSynchronous Whether or not the writing to the disk is synchronous true
BatchSize The batch size 100
BatchTimeInterval The batch time interval (in milliseconds) 5 ms
Persistent Whether or not to persist the stream true
BatchConflationEnabled Whether to enable batch conflation false
DispatcherThreads The number of dispatcher threads 5