Java APIs

In addition to plugins and connectors, Ampool allows developers to extend its capabilities. This section walks through a few examples for basic MTable usage, that are included in the distribution. Following operations and concepts are described:

JavaDocs

Note

As of release 1.2, JavaDocs is not included in the Ampool Core binary file. If you intend to develop Java Client for Ampool, pl. contact us through our Ampool website and we can provide you the JavaDocs details.

Ampool Table Column Types

Ampool tables can store binary data (byte-array) in columns as well as various types of Java objects. You can assign one of the supported types to the columns when creating table. In case you do not specify type to a column the default is assumed as BINARY (i.e. byte-array). When you have assigned a type to the column, the APIs internally take care of serialization and deserialization wherever required. See the details for supported Table Column Types

Examples

Also the fully working examples demonstrating the use of Ampool Java client APIs can be found on any cluster host at: "\<ampool-home>/examples" directory. Use "runExamples.sh" script under examples directory for end to end process of compiling the examples, start the single node Ampool cluster, run the examples and shutdown the cluster.

Sample Client Operations

Sample code snippets are given below to show how Java client connects to Ampool Data Store and work with Ampool Tables (MTable & FTable). For fully working code and more scenarios see the Examples mentioned above.

Connecting to Ampool Cluster

To begin, we first obtain a reference to the Ampool cluster that allows us to perform subsequent operations.

Note

For historical reasons Apache Geode uses the term "cache" in many places. This legacy stems from their usage as caching layers in the past. In the following examples clientCache is actually a proxy to Ampool's in-memory data store or Ampool memory cluster

Below we just assume our client application is running on the same host as the locator. Note the log location being set. Ampool admin object (Admin) allows DDL operations on Ampool Tables.

final Properties props = new Properties();
props.setProperty(Constants.MClientCacheconfig.MONARCH_CLIENT_LOG, "/tmp/MTableClient.log");

AmpoolClient aClient = new AmpoolClient("127.0.0.1", 10334, props);
Admin admin = aClient.getAdmin();

Multiple locators can be specified using the Constants.MonarchLocator.MONARCH_LOCATORS("ampool.monarch.locators") property, the format of value is: "locator1[port1],locator2[port2]...", where locatorX is the hostname or IP address of the host running the locator.

final Properties props = new Properties();
props.setProperty(Constants.MonarchLocator.MONARCH_LOCATORS, "10.0.0.25[10334],10.0.0.26[10334]");

/* when provided locator-port is 0, locator-host argument is ignored */
AmpoolClient aClient = new AmpoolClient("dummy", 0, props);

Creating a Table

Once a server connection is created, let's create sample tables,

  • MTable (ORDERED_VERSIONED)

This type of table supports range partitioning to order data based on the row keys.

List<String> columnNames = Arrays.asList("NAME", "ID", "AGE", "SALARY", "DEPT", "DOJ");
MTableDescriptor tableDescriptor = new MTableDescriptor(MTableType.ORDERED_VERSIONED);
Schema schema = new Schema.Builder()
    .column(columnNames.get(0))                     // Column w/ value type as byte-array (implicit)
    .column(columnNames.get(1), BasicTypes.BINARY)  // Column w/ value type as byte-array (explicit)
    .column(columnNames.get(2), BasicTypes.INT)
    .column(columnNames.get(3), BasicTypes.FLOAT)
    .column(columnNames.get(4), BasicTypes.STRING)
    .column(columnNames.get(5), BasicTypes.TIMESTAMP)
    .build();

tableDescriptor.setSchema(schema);       // set schema of the table descriptor.
tableDescriptor.setRedundantCopies(1);   // here we only maintain one copy of the data (i.e. one primary and one secondary replica).
tableDescriptor.setTotalNumOfSplits(4);  // number of buckets to split data into by key; default is 113
tableDescriptor.setMaxVersions(3);       // Number of row versions (default is 1 i.e. no versioning)
tableDescriptor.enableDiskPersistence(MDiskWritePolicy.ASYNCHRONOUS); // here we enable disk persistence, omit to run without persistence

// Set Start/Stop key range as ordered table uses range partitioning
String startKey = "row" + "00000";
String stopKey = "row" + "99999";

// Key range is uniformly split into number of splits/buckets specified above
// You can also optionally assign a specific key range for each bucket.
tableDescriptor.setStartStopRangeKey(startKey, stopKey);

// Create Table
MTable table = admin.createMTable(tableName, tableDescriptor);
  • MTable (UNORDERED)

Unordered table uses hash partitioning.

List<String> columnNames = Arrays.asList("NAME", "ID", "AGE", "SALARY", "DEPT", "DOJ");
MTableDescriptor tableDescriptor = new MTableDescriptor(MTableType.UNORDERED);
Schema schema = new Schema.Builder()
    .column(columnNames.get(0))                     // Column w/ value type as byte-array (implicit)
    .column(columnNames.get(1), BasicTypes.BINARY)  // Column w/ value type as byte-array (explicit)
    .column(columnNames.get(2), BasicTypes.INT)
    .column(columnNames.get(3), BasicTypes.FLOAT)
    .column(columnNames.get(4), BasicTypes.STRING)
    .column(columnNames.get(5), BasicTypes.TIMESTAMP)
    .build();

tableDescriptor.setSchema(schema);       // set schema of the table descriptor.
tableDescriptor.setRedundantCopies(1);   // here we only maintain one copy of the data (i.e. one primary and one secondary replica).
tableDescriptor.setTotalNumOfSplits(4);  // number of buckets to split data into by key; default is 113
tableDescriptor.setMaxVersions(3);       // Number of row versions (default is 1 i.e. no versioning)
tableDescriptor.enableDiskPersistence(MDiskWritePolicy.SYNCHRONOUS); // here we enable synchronous disk persistence, omit to run without persistence

// Create Table
MTable table = admin.createMTable(tableName, tableDescriptor);
  • FTable
List<String> columnNames = Arrays.asList("NAME", "ID", "AGE", "SALARY", "DEPT", "DOJ");
FTableDescriptor tableDescriptor = new FTableDescriptor();
Schema schema = new Schema.Builder()
    .column(columnNames.get(0))                     // Column w/ value type as byte-array (implicit)
    .column(columnNames.get(1), BasicTypes.BINARY)  // Column w/ value type as byte-array (explicit)
    .column(columnNames.get(2), BasicTypes.INT)
    .column(columnNames.get(3), BasicTypes.FLOAT)
    .column(columnNames.get(4), BasicTypes.STRING)
    .column(columnNames.get(5), BasicTypes.TIMESTAMP)
    .build();

tableDescriptor.setSchema(schema);       // set schema of the table descriptor.
tableDescriptor.setRedundantCopies(1);   // here we only maintain one copy of the data (i.e. one primary and one secondary replica).
tableDescriptor.setTotalNumOfSplits(4);  // number of buckets to split data into by key; default is 113

// Set the partitioning column
tableDescriptor.setPartitioningColumn(Bytes.toBytes(columnNames.get(1))); //set the partitioning column used to distribute FTable data to multiple buckets

// create table
FTable table = admin.createFTable(tableName, tableDescriptor);

Refer an Existing Table

Use the Ampool Client to get an existing table:

  • MTable:
MTable table = aClient.getMTable(tableName);
  • FTable
FTable table = aClient.getFTable(tableName);

Inserting Data

  • MTable

It provides two interfaces to insert data: put(Put) and put(List). The list based version can be faster if you are just insterting data in batches, concurrent Put's are the fastest way if you can have multiple clients or threads ingesting data.

For our simple example let's use put(List) function:

public static void putBatch(MTable table, List<String> columnNames, Map<String, List<byte[]>> data, int batchSize) {

List<Put> putList = new ArrayList<Put>(batchSize);

Iterator<String> iter = data.keySet().iterator();

while (iter.hasNext()) {
  putList.clear();

  // create the next batch
  for (int i = 0; i < batchSize; i++) {
    String key = iter.next();
    List<byte[]> rowData = data.get(key);

    Put record = new Put(key);

    // add the data for each column
    for (int colIndex = 0; colIndex < columnNames.size(); colIndex++) {
      record.addColumn(Bytes.toBytes(columnNames.get(colIndex)), rowData.get(colIndex));
    }
    // add the record to the batch list
    putList.add(record);

    // if no more data exit batch loop
    if ( ! iter.hasNext()) {
      break;
    }
  }
  // actually put this batch
  if (putList.size() != 0) {
    table.put(putList);
  }
}
  • FTable

FTable provides an append() interface to ingest data. It does not support tables updates using put() operation. Multiple records can be injected in a single batch and is efficient than calling append operation for each record.

For our simple example let's use append function:

private static void appendRecords(){
   final FTable fTable = aClient.getFTable(tableName);

   //ingest records using batch append
   Record [] records = new Record[10];
   for (int i = 0; i <10 ; i++) {
     Record record = new Record();
     record.add(columnNames.get(0), Bytes.toBytes("NAME"+i));
     record.add(columnNames.get(1), Bytes.toBytes("ID"+i));
     record.add(columnNames.get(2), 10+i);
     record.add(columnNames.get(3), (float)10000.0*i);
     record.add(columnNames.get(4), "DEPT_"+(i%5));
     record.add(columnNames.get(5), java.sql.Timestamp.valueOf("2000-11-21"));
     records[i] = record;
   }
   fTable.append(records);

   //ingest records using append
   for (int i = 10; i < 20 ; i++) {
     Record record = new Record();
     record.add(columnNames.get(0), Bytes.toBytes("NAME"+i));
     record.add(columnNames.get(1), Bytes.toBytes("ID"+i));
     record.add(columnNames.get(2), 10+i);
     record.add(columnNames.get(3), (float)10000.0*i);
     record.add(columnNames.get(4), "DEPT_"+(i%5));
     record.add(columnNames.get(5), java.sql.Timestamp.valueOf("2000-11-21"));
     fTable.append(record);
   }

}

Get operation

Note

FTable does not support get operation.

Here we just get a row and print the column value lengths:

Note

Keys provided in most API calls must be byte[], below we assume in most cases they are byte[]; in some we assume String and convert.

Get get = new Get(Bytes.toBytes(key);
Row row = table.get(get);
int columnIndex = 0;
for (final Cell cell : row.getCells()) {
  System.out.println("ColumnName   => " + Bytes.toString(cell.getColumnName())
      + " AND ColumnValue length  => " +  ((byte[])cell.getColumnValue()).length);
  columnIndex++;
}

Scan operation

  • MTable (Basic scan)
Scan scan = new Scan();
scan.setReturnKeysFlag(scan_keys); // we want the keys back (not really needed as this is the default behavior)

Iterator itr = table.getScanner(scan).iterator();
while(itr.hasNext()) {
  Row row = (Row) itr.next();
  byte[] key = row.getRowId();
  // process the result/row
}
  • MTable (Paged Scan)

MTable also has support for a paged, or batched scan. In this mode no resources or connections are held between batches so that if processing a batch is expected to take a long time, or requires someone to hit "enter", the scan maintains its state without hitting connection timeouts or holding excessive resources.

// very simple example of an inter-active scan using paged mode
public static void doPagedScan(MTable table, int scanBatch) {
Scan scan = new Scan();
scan.setBatchSize(scanBatch);
scan.enableBatchMode(); // enable paging/batched mode
scan.setReturnKeysFlag(true);

Scanner scanner = table.getScanner(scan);
Row[] rows = scanner.nextBatch();
int count = 0, pageCount = 0;
System.out.println("Paging through results");
while (rows.length > 0) {
  pageCount++;
  for (int i = 0; i < rows.length; i++) {
    count++;
    byte[] key = rows[i].getRowId();
    System.out.println(""+count+") "+" rowID: "+Bytes.toString(key));
  }
  System.out.print("End page " + pageCount + ", Hit enter or type [c | C] to continue, type [e | E] to end: ");
  String input = System.console().readLine();
  if (input.startsWith("e") || input.startsWith("E")) {
    System.out.println("Ending scan.");
    break;
  }
  rows = scanner.nextBatch();
}
System.out.println("total page count was "+pageCount);
System.out.println("total row count was "+count);
scanner.close(); // ==> need to close, especially if we ended without reading all rows

As shown always close a scanner when done, especially for paged scans as they have no timeout

  • FTable (Basic Scan)

Scan APIs are mostly same for MTable and FTable although see FTable examples for more details.

Scan scan = new Scan();
Iterator itr = table.getScanner(scan).iterator();
while(itr.hasNext()) {
  Row row = (Row) itr.next();
  byte[] key = row.getRowId();
  // process the result/row
}

Delete Column Values

Note

FTable does not support delete operation.

Delete the values for columns named "AGE" and "DEPT" for a given row:

Note

Deleting the values for specific columns is same as updating them with NULL values.

Delete colDelete = new Delete(key);
colDelete.addColumn(Bytes.toBytes("AGE"));
colDelete.addColumn(Bytes.toBytes("DEPT"));
table.delete(delete);

Delete Row

Note

FTable does not support delete operation

Delete entire row:

Delete delete = new Delete(Bytes.toBytes(key);
table.delete(delete);

| WARNING | |-----------| | If row versioning is being used and no specific version is provided in the MDelete all versions of the row are deleted! |

checkAndPut

Note

FTable does not support CheckAndPut operation

Check and put allows you to check that a certain condition in a row is true, then modify that or another column value in the row if it is true. Currently the modified and checked row must be the same.

// key is String here, so convert to bytes
Put cput = new Put(Bytes.toBytes(key));
// change the value if the check succeeds
cput.addColumn(Bytes.toBytes(columnNames.get(4)),
        Bytes.toBytes("testcheckandput"));
boolean result = false; // place to hold result returned
try {
    result = table.checkAndPut(Bytes.toBytes(key),
            Bytes.toBytes(columnNames.get(5)), // check column 5 value
            Bytes.toBytes("foobar"), cput); // equals foobar
} catch (IOException ioe) {
    ioe.printStackTrace();
}

Retruned result will hold true is the operation was executed (check passed) or false if not (check failed).

checkAndDelete

Note

FTable does not support checkAndDelete operation

Check and delete allows you to check that a certain condition in a row is true, then delete that or another column value in the row, or the entire row, if it is true. Currently the modified and checked row must be the same.

Detete a column value (set to null) if the check passes.

Delete del = new Delete(Bytes.toBytes(key));
del.addColumn(Bytes.toBytes(columnNames.get(4)));
try {
  result = table.checkAndDelete(Bytes.toBytes(key),
          Bytes.toBytes(columnNames.get(1)), Bytes.toBytes("val31"), del);
} catch (Exception exc) {
  exc.printStackTrace();
}

Delete the entire row if the check passes:

Delete del = new Delete(Bytes.toBytes(key));
try {
  result = table.checkAndDelete(Bytes.toBytes(key),
          Bytes.toBytes(columnNames.get(3)), Bytes.toBytes("foobar"), del);
} catch (Exception exc) {
  exc.printStackTrace();
}

Delete Table

  • MTable

This will remove all data, including any persisted on disk!

admin.deleteMTable(tableName);
  • FTable

This will remove all data, including any persisted on disk!

admin.deleteFTable(tableName);

Close System Connection

Always close the connection when you are done to insure resources are cleaned up promptly.

aClient.close();

Row Versioning

Note

FTable does not support row versioning

Row versioning, if set using setMaxVersions(), allows you to keep row history. If no version is set in an operation then the latest version (or all verions in the case of delete row) is affected.

Versions are generally assumed to be a timestamp, but they may be another user assigned value. To restrict an operation to a given version/timestamp use:

operation.setTimestamp(Long version);

For example:

delete.setTimestamp(400L);

To put data with a specified timestamp use:

Put record = new Put(Bytes.toBytes(key));
record.setTimeStamp(timestamp);
...

If versioning is enabled (max versions > 1) the system will use the current time that the operation is executed as the version.

Note

As of v1.2.0, table scans always return the latest version of the entry (row) in the table. In future, a user shall be able to specify a specific version to fetch.