With Apache Spark
Ampool provides a distributed in-memory data store where multiple computational frameworks can access and process the data from. Spark is one such framework for distributed data processing supported by Ampool. Typically Spark caches data into its executor memory from underlying disk based stores for faster processing. Data fetched is immutable and typically can not be used across multple spark jobs unless it is persisted back to disk store and re-read for processing by another spark job. Ampool alleviate this inefficiency by supporting mutable, in-memory tables such that Spark can directly acess and process the data from it at in-memory speed avoiding the need for large immutable in-memory cache on the spark side.
Ampool provides seamless integration between its memory tables and Spark DataFrames. A Spark DataFrame is a distributed collection of data organized into named columns that is equivalent to a table in RDBMS. Spark provides SQL like query functionality as DataFrame API (like filters and aggregations). Along with it, one can also register a DataFrame as temporary table and use SQL queries similar to any RDBMS.
Ampool tables also allow one to organize data into named columns (with respective types) that are distributed across nodes in an underlying Ampool cluster. DataFrame's and mtable's are equivalent and can be mapped by columns with their respective types. This page discusses Spark DataFrame and Ampool tables (MTable/FTable) integration that allows one to save Spark DataFrames as an Ampool Table with respective column types.
Spark-Ampool deployment configuration¶
It is possible for Spark cluster to remotely access the Ampool Datastore installed on a separte cluster. Although is not a recommended configuration. In a commonly recommended configuration, Ampool acts as a high speed local persistent data store collocated with applications running on compute cluster, while long term archive stores like HDFS,S3 etc act as remote data stores. Ampool seemlessly tier the data to remote cluster such that Applications running on compute cluster does not have to worry about tiering the data for optimal access and efficiency.
Spark cluster is typically deployed either in standalone mode or in the existing cluster managed by YARN or Mesos cluster managers (See for more details). Spark cluster using Ampool as an in-memory datastore should be collocated with Ampool Cluster to avail the data locality, as shown in the diagram below.
Configure Spark to use Ampool¶
Typically no specific configuration is needed for Spark to use Ampool. Although while running the Spark shell or submitting the spark applications that interact with Ampool must specify the Spark-Ampool connenctor JARS on the commandline using --jars options. Spark takes care of distributing these connector jars to all the executors. So Ampool-Spark connector package must be available on the Spark client node that invokes spark-submit or spark-shell program to run the spark applications.
If you don't want to distribute the Spark-Ampool connector jars to executors for every single spark job, you can install the Spark-Ampool connector package on all the Spark worker nodes (or nodes where YARN nodemanagers are running) and configure them to use Spark-Ampool connector jars such that they always start the spark executors w/ Ampool connector jars in the classpath.
Installing Ampool-Spark connector package¶
Untar the package in a appropriate directory on Spark client node (or all Executor nodes). You should have the following:
ampool-spark-<version> ├── dependencies │ ├── ampool-client-dependencies-<version>.jar │ ├── ampool-core-<version>.jar │ ├── fastutil-7.0.9.jar │ ├── javax.transaction-api-1.2.jar │ ├── log4j-api-2.5.jar │ └── log4j-core-2.5.jar ├── examples │ ├── pom.xml │ └── src │ └── main │ └── java │ └── io │ └── ampool │ └── examples │ └── spark │ ├── SparkExampleDF.java │ └── SparkExampleML.java ├── lib │ └── ampool-spark-<version>.jar └── README.txt
The lib directory contains the jar file for this connector and dependencies directory contains all dependent libraries.
Setup the Spark and Ampool Cluster as recommended above. Details for setting up an Ampool cluster can be found here. Once the Ampool cluster is running, the locator host-name and port will be required further.
In Spark applications, Ampool cluster details can be provided via following properties:
- ampool.locator.host (default localhost)
- ampool.locator.port (default 10334)
Once all the required libraries are available and cluster is running, you can launch Spark jobs (via spark-shell or spark-submit) as below:
$ <spark-home>/bin/spark-shell --jars lib/ampool-spark-<version>.jar,dependencies/ampool-client-dependencies-<version>.jar
The above command will spawn a spark shell, with required libraries loaded, that will allow you to use standard DataFrame APIs to save and retrieve DataFrame to/from Ampool Table.
Make sure that Ampool version used on the client machine is compatible with the version of the Ampool server cluster you are using.
Using Ampool in Spark Applications¶
All the examples below work with both MTable and FTable. When Ampool table is created while writing DataFrame, appropriate options should be specified to create right table type and configuration.
The provided example snippets below cover following scenarios:
- Save an existing Spark DataFrame as Ampool Table (by using the DataFrame column mapping)
- Load an exsisting Ampool Table as a Spark DataFrame (by using the Ampool Table column mapping)
- Load an existing Ampool Table as DataFrame with following operations:
- Column Selections
- Load an existing Ampool Table as DataFrame, register it as a temporary table and then use Spark-SQL to query data from Ampool
The Column Types¶
Ampool Tables supports most of the types supported by Spark DataFrames. The column types from a Spark DataFrame are mapped to the respective Ampool Table column types.
Following basic Spark types are supported by Ampool:
Ampool also supports following complex types contaning one or more basic types:
Along with the above types Spark-ML Vector, the user defined type, is also natively supported by the connector. Both the Sparse and Dense vectors can be natively read and saved to Ampool. You can save and retrieve a Spark DataFrames with Vectors using Ampool, like any other.
Spark DataFrame Read/Write Options for Ampool¶
When DataFrame in spark is either read or written to Ampool table, following options can be used,
|ampool.locator.host||Hostname for Ampool Cluster locator||localhost|
|ampool.locator.port||Port number for Ampool Cluster Locator||10334|
|ampool.batch.size||Number of records to be inserted in single put/append operation while writing DataFrame to Ampool MTable or FTable||1000|
|ampool.log.file||Absolute local file path for Ampool Client used by Spark Executors interacting with Ampool|
|ampool.table.type||Type of the underlying Ampool Table to be specified when new Table is created while writing the DataFrame.
ordered: for ORDERED table
unordered: for UNORDERED table
immutable: for append only table.
|ampool.enable.persistence||When new Ampool Table is created while writing the DataFrame, this option specifies type of persistence.
Values are: sync or async.
If option not specified persistence is disabled.
|ampool.table.redundancy||When new Ampool Table is created while writing the DataFrame, this option specifies redundency factor for the table.||0|
|ampool.batch.prefix||The specified prefix used with each row-key in a batch.
This is for a use case where multiple data-frames are stored to same Ampool MTable in batches. Specifying unique prefix for each batch/data-frame is mandatory when multiple data frames are stored in the same Ampool table. Ampool uses this batch prefix to generate unique key for each record in the data frame. In case the user does not provide unique prefix for multiple batches, there may be data-loss as some of the existing keys may get overwritten.
This is relevant only for MTable. For FTable the row keys are internal and generated based on insertion timestamp.
|ampool.key.columns||When new Ampool Table is created while writing the DataFrame, this option specifies comma separated list of columns to use as unique row key. If this option is specified then ampool.batch.prefix should NOT be used.||None|
|ampool.partitioning.column||If new Ampool FTable is created while writing the DataFrame, this option specifies which column to use for data partitioning. If not specified entire row is used as partitioning key.||None|
|ampool.key.separator||In case "ampool.key.columns" has multiple column names, the specified separator is used to join the string representation of column values to form a row-key.||,|
|ampool.max.versions||Set the Maximum number of versions to be kept in memory for versioning. Also during read query, gets the all available versions for each row.
Valid only for Ampool mutable tables.
|ampool.read.filter.latest||During read query, whether to execute filters on every version of row and only to latest version of row.||false|
|ampool.block.size||The number of rows to be stored together as a single block in Ampool immutable tables.||1000|
|ampool.block.format||The format used to store data for a single block when it reaches specified block-size.
Values are: AMP_BYTES, AMP_SNAPPY, ORC_BYTES.
Following block formats are supported for Immutable tables: AMP_BYTES: The rows are serialized using Ampool specific encoding and stored individually. This may consume more memory but there is no additional overhead during scan to retrieve a row. AMP_SNAPPY: The rows serialized using Ampool specific encoding and compressed using Snappy compression upon reaching the specified block size. This will help reducing memory usage but all rows will have to be decompressed during scan. * ORC_BYTES: All rows from a block will be converted to ORC columnar format upon reaching the specified block size. Then each block will contain binary ORC data, representing the rows, which will be interpreted during scan.
Save DataFrame to Ampool¶
To save an existing DataFrame as Ampool Table you can execute following command (from spark-shell). If "ampool.table.type" is not specified in the options then default table created is Ampool FTable.
scala> val myDataFrame: DataFrame = ... scala> val options = Map(("ampool.locator.host","localhost"), ("ampool.locator.port","10334"), ("ampool.table.type","ordered")) scala> myDataFrame.write.format("io.ampool").options(options).save("ampool_table")
The above command will create an ordered Ampool MTable called
ampool_table in an Ampool cluster (using localhost and 10334 as locator host and port respectively). The schema of the created table will be equivalent to the schema of
For example if you have json data as a source for a dataframe you can turn it into a DataFrame backed by Ampool MTable and do Spark operations on it:
scala> val options = Map(("ampool.locator.host","localhost"), ("ampool.locator.port","10334"),("ampool.table.type","ordered")) scala> val jsonDataFrame = sqlContext.read.json(path) scala> jsonDataFrame.write.format("io.ampool").options(options).save("json_table")
Load DataFrame from Ampool¶
If you already have an Ampool FTable it can be loaded as a DataFrame in Spark (from spark-shell):
scala> val options = Map(("ampool.locator.host","localhost"), ("ampool.locator.port","10334"), ("ampool.table.type","immutable")) scala> val ampoolDataFrame = sqlContext.read.format("io.ampool").options(options).load("ampool_table") scala> ampoolDataFrame.show() scala> ampoolDataFrame.filter("size > 4096").show()
The above command will create a spark data-frame from existing FTable
ampool_table. Once the FTable is loaded as data-frame we can execute any supported data-frame operation on it, which will eventually translate/retrieve data from Ampool as required.
Using Spark-SQL over Ampool¶
You can also load an existing Ampool table as DataFrame, register it as temporary table and then use the Spark SQL to query data from Ampool (from spark-shell):
scala> val options = Map(("ampool.locator.host","localhost"), ("ampool.locator.port","10334")) scala> sqlContext.read.format("io.ampool").options(options).load("ampool_table").registerTempTable("my_table") scala> val results = sqlContext.sql("select * from my_table where size = 4096") scala> results.show() scala> println(results.count()) scala> results.foreach(println)
Once the Table is loaded as a table in Spark, any valid Spark SQL query can be used on it which will retrieve data from the Ampool Table as required.
Although the above examples are in scala, python shell can also be used.
Using Custom Spark UDF¶
You can use the custom Spark UDFs along with Ampool connector. The sample custom UDF (
SampleEqualsUDF) is provided in the examples. You need to register the UDF with the SQL Context with appropriate return type. Once registered, it can be used similar to any other built-in UDFs in SQL queries as below:
scala> val df = sqlContext.read.format("io.ampool").load("ampool_table")df.registerTempTable("my_table") scala> sqlContext.udf.register("MyEquals", SampleEqualsUDF.SAMPLE_EQUALS_UDF, DataTypes.BooleanType) scala> sqlContext.sql("select * from my_table where MyEquals(column_1, 'constant_value')").show()
Spark examples - How to run¶
The examples directory contains a Maven project with more examples. These examples demonstrate the usage of Spark DataFrames and Spark ML with Ampool.
To compile these examples, run following command from examples directory:
$ mvn clean package
Once the examples are compiled, you can specify Ampool cluster details via arguments and run a specific example as below:
$ <spark-home>/bin/spark-submit --class io.ampool.examples.spark.SparkExampleDF \ --jars ../lib/ampool-spark-<version>.jar,../dependencies/ampool-client-dependencies-<version>.jar \ target/ampool-spark-examples-<version>.jar localhost 10334 $ <spark-home>/bin/spark-submit --class io.ampool.examples.spark.SparkExampleML \ --jars ../lib/ampool-spark-<version>.jar,../dependencies/ampool-client-dependencies-<version>.jar \ target/ampool-spark-examples-<version>.jar localhost 10334
When these examples are run, there are lot of log messages displayed on console. You can get rid of these by redirecting standard errors to null (i.e. append
2> /dev/null to the command).
Working with Kerberised environment¶
- Setup a spark components while setting up a Hadoop cluster.
- Configured Hadoop with kerberos
- Kerberos client is enabled on all user nodes and user needs to have a valid kerberos ticket.
- Setup spark connector on hadoop cluster nodes as explained in this document.
Following is the high level architecture that describes,¶
- How users having kerberos tickets communicate with secure Hadoop cluster having distributed spark service.
- Spark service internally connects to ampool using embedded spark connector (ampool client).
Connecting to restricted (unsecured) ampool cluster¶
- In this mode, even though hadoop cluster is kerberised, communication between spark connector and ampool cluster is not secured.
- Recommonded to use only when ampool cluster is setup on restricted controlled network.
- Following is a example, where exisitng spark example SparkExampleDF has been run in a cluster mode with the above architecture.
spark-submit --class io.ampool.examples.spark.SparkExampleDF --master yarn --deploy-mode cluster --jars ../lib/ampool-1.4.2-spark_2.1.jar,../dependencies/ampool-client-dependencies-1.4.2.jar,../dependencies/ampool-core-1.4.2.jar,../dependencies/fastutil-7.0.9.jar,../dependencies/javax.transaction-api-1.2.jar,../dependencies/log4j-api-2.6.1.jar,../dependencies/log4j-core-2.6.1.jar,../dependencies/shiro-core-1.2.4.jar ./target/ampool-1.4.2-spark_2.1-examples.jar <LOCATOR_HOST> <LOCATOR_PORT>
Connecting to secured (kerberised) ampool cluster¶
Following additional security properties are used while connecting with kerberised ampool cluster.
|security.enable.kerberos.authc||true or false|
|security.kerberos.service.principal||service pricipal of ampool cluster|
|security.kerberos.user.principal||client user pricipal for ampool client|
|security.kerberos.user.keytab||keytab file path for the client user|
Note: Keytab file provided in the security properties should be present on all worker nodes.
Example code snippet showing usage of security properties:
Map<String, String> options = new HashMap<>(); options.put("ampool.locator.host", locatorHost); options.put("ampool.locator.port", String.valueOf(locatorPort)); options.put("security.kerberos.service.principal", "locator/localhost@TEST.AMPOOL.IO"); options.put("security.kerberos.user.principal", "ampoolspark@TEST.AMPOOL.IO"); options.put("security.kerberos.user.keytab", "/tmp/keytabs/ampoolspark.keytab"); options.put("security.enable.kerberos.authc", "true"); /** overwrite existing table, if specified.. **/ SaveMode saveMode = Boolean.getBoolean("overwrite") ? SaveMode.Overwrite : SaveMode.Overwrite; /** save the dataFrame to Ampool as `tableName' **/ df.write().format("io.ampool").options(options).mode(saveMode).save(tableName); System.out.println("########## DATA FROM AMPOOL ############"); /** load the data-frame from Ampool `tableName' **/ Dataset df1 = sqlContext.read().format("io.ampool").options(options).load(tableName);
The current Kerberos implementation has a few limitations at this time:
The Spark connector does not support user impersonation. Therefore, a service user should be used for security.kerberos.user.principal and security.kerberos.user.keytab, as shown above
Authorization of operations through Spark is not supported within Ampool ADS as it doesn’t receive user credentials. Therefore, authorization needs to be handled at Spark or at a different level
In future, we plan to pass individual user credentials to Ampool ADS through Spark connector so that the operations can be carried out with those credentials.