Co-processors

This section describes the MCoprocessor functionality available with MTable. Co-processors are user code modules that exceute before or after supported trigger events The MCoprocesor class provides a framework to run the user's custom code on an Ampool server. MTable supports two types of co-processors:

Note

Currently, co-processors are not upgradable in place; i.e., in order to upgrade a co-processor the old one must be removed and the new one installed.

Observer Co-Processors

Observer co-processors are just like database triggers, i.e. they execute user provided custom code after/before the occurrence of certain events (for exmaple after/before a get or put). The code is automatically executed before/after the corresponding operations are performed.

Writing a Table Observer

1. Implement a Table Observer Co-Processor

Write an observer class that extends the MBaseRegionObserver class. The TableObserverExample shows how a user can override the ‘preGet()’ method. For all supported observer APIs refer to the io.ampool.monarch.table.coprocessor.MTableObserver interface.

The example uses the preGet method here to first check if the queried rowkey is admin or not. If it is not admin, the code returns without allowing the system to perform the get operation; if it is admin, the operation is allowed.

public class TableObserverExample extends MBaseRegionObserver {

  private static final byte[] ADMIN = Bytes.toBytes("admin");
  @Override
  public void preGet(MObserverContext context, Get get) {
    //If rowKey="admin", skip this operation.
    if(Bytes.equals(get.getRowKey(), ADMIN)){
      System.out.println("For RowKey  = ADMIN skip the region.get operation, instead return value as [VALUE_FILTERED_PRE_GET]");
      context.setBypass(true);
    }
  }
  @Override
  public void postGet(MObserverContext context, Get get) {
   //postGet related code
  }
}
2. Register Co-processor on the Server

Ensure that TableObserverExample class is in the server's classpath or provide the jar file containing this class in server classpath while starting it.

Note

Co-processors must be manually placed on all server nodes currently.

3. Test Co-processor from the Client

Write a program to test your co-processor. This can be a simple program that adds a few rows and does a single Mtable get operation.

In your program add the example table observer co-processor using MTableDecriptor addCoprocessor(String className) function:

MTableDescriptor tableDescriptor = new MTableDescriptor();
tableDescriptor.addCoprocessor("io.ampool.monarch.table.coprocessor.TableObserverExample");

Endpoint Co-processors

Endpoint co-processors push distributed computation to the place where the data resides (somewhat similar to the stored procedures in an RDBMS).

Public APIs:

/**
 * Creates an instance of the given co-processor class and invokes the specified {@code methodName} on a associated table split having {@code row}.
 *
 * @param className
 * @param methodName - method to be executed in a co-processor
 * @param row
 * @param request
 * @return returns list of results.
 */
List<Object> coprocessorService(String className, String methodName, byte[] row, MExecutionRequest request);

/**
 * Creates an instance of the given co-processor class and invokes the specified {@code methodName} on a associated table splits having keys in the range
 * {@code startKey} and {@code endKey}.
 * <p>
 *
 * @param className  - Class of co-processor
 * @param methodName - method to be executed in a co-processor
 * @param startKey   - startKey. It can be {@code null}, in such case it means first key of table.
 * @param endKey     - endKey. It can be {@code null}, in such case it means last key of table.
 * @param request    - {@link MExecutionRequest}
 * @return Returns the results against each TableSplit.
 */
Map<Integer, List<Object>> coprocessorService(String className, String methodName, byte[] startKey, byte[] endKey, MExecutionRequest request);

Writing an Endpoint Co-processor

1. Implement a End-point Co-processor

Write an endpoint class that extends the MCoprocessor class.

public class SampleRowCountCoprocessor extends MCoprocessor {
  public SampleRowCountCoprocessor() {
  }

  public long rowCount(MCoprocessorContext context) {
    MExecutionRequest request = context.getRequest();
    long rowCount = 0L;

    try {
      Scan e = request.getScanner();
      e.setFilter(new KeyOnlyFilter());
      Iterator itr = context.getMTableRegion().getScanner(e).iterator();
      while (itr.hasNext()) {
        ++rowCount;
      }
      return rowCount;
    } catch (Exception var7) {
      throw new MCoprocessorException("Error in scanning results");
    }
  }
}
2. Register Co-processor on the Server

For all servers insure that this endpoint class is in the server classpath or provide a jar containing this class in server classpath while starting it.

3. Test Co-processor from the Client

Write a simple program to test it.

public class MTableCoprocessorExample {

  private static final String TABLE_NAME = "EmployeeTable";
  private static final String COL1 = "NAME";
  private static final String COL2 = "ID";
  private static final String COL3 = "AGE";
  private static final String COL4 = "SALARY";

  final static int numBuckets = 113;
  final static int numOfEntries = 1000;

  private static String ROW_COUNT_COPROCESSOR_CLASS = "io.ampool.quickstart.SampleRowCountCoprocessor";

  private static AmpoolClient client;

  public static void main(String args[]) {

    Properties props = new Properties();
    props.setProperty(Constants.MClientCacheconfig.MONARCH_CLIENT_LOG,
        "/tmp/MTableCoprocessorExample.log");
    client = new AmpoolClient("localhost", 10334, props);
    System.out.println("Connection to monarch distributed system is successfully done!");

    try {
      MTable mtable = createTable(TABLE_NAME);
      insertRows(mtable);

      System.out.println("Running aggregation client to get row count for table " + TABLE_NAME);

      Scan scan = new Scan();
      String startKey = "rowkey-0";
      String stopKey = "rowkey-" + (numOfEntries);
      scan.setStartRow(Bytes.toBytes(startKey));
      MExecutionRequest request = new MExecutionRequest();
      request.setScanner(scan);

      Map<Integer, List<Object>> collector = mtable.coprocessorService(ROW_COUNT_COPROCESSOR_CLASS,
          "rowCount", scan.getStartRow(), scan.getStopRow(), request);

      final Iterator<Entry<Integer, List<Object>>> entryItr = collector.entrySet().iterator();
      long rowCount = collector.values().stream().mapToLong(value ->
                    value.stream().map(val -> (Long) val).reduce(0L, (prev, current) -> prev + current)
            ).sum();
      System.out.println("Row count: " + rowCount);
    } catch (MCoprocessorException cce) {

    }
  }

  private static MTable createTable(String tableName) {
    MTableDescriptor tableDescriptor = new MTableDescriptor();
    Schema schema =
        new Schema.Builder().column(COL1).column(COL2).column(COL3).column(COL4).build();
    tableDescriptor.setSchema(schema);
    tableDescriptor.setRedundantCopies(1);
    tableDescriptor.setTotalNumOfSplits(numBuckets);

    Admin admin = client.getAdmin();
    MTable mtable = admin.createMTable(tableName, tableDescriptor);
    return mtable;
  }

  private static void insertRows(MTable mtable) {
    for (int keyIndex = 0; keyIndex < numOfEntries; keyIndex++) {
      Put myput1 = new Put(Bytes.toBytes("rowkey-" + padWithZero(keyIndex, 3)));
      myput1.addColumn(Bytes.toBytes(COL1), Bytes.toBytes("col" + keyIndex));
      myput1.addColumn(Bytes.toBytes(COL2), Bytes.toBytes(keyIndex + 10));
      myput1.addColumn(Bytes.toBytes(COL3), Bytes.toBytes(keyIndex + 10));
      myput1.addColumn(Bytes.toBytes(COL4), Bytes.toBytes(keyIndex + 10));
      mtable.put(myput1);
    }
  }

  private static String padWithZero(final int value, final int maxSize) {
    String valueString = String.valueOf(value);
    for (int index = valueString.length(); index <= maxSize; index++) {
      valueString = "0" + valueString;
    }
    return valueString;
  }

}

Please see the API documentation for more information on using this interface.