[MCOL-1362] Add a export function that utilizes (sequential) write from Spark workers Created: 2018-04-24  Updated: 2023-10-26  Resolved: 2018-11-30

Status: Closed
Project: MariaDB ColumnStore
Component/s: None
Affects Version/s: None
Fix Version/s: 1.2.2

Type: New Feature Priority: Major
Reporter: Jens Röwekamp (Inactive) Assignee: David Thompson (Inactive)
Resolution: Fixed Votes: 0
Labels: None

Issue Links:
Relates
relates to MCOL-1852 Spark Exporter uses collect() instead... Closed
relates to MCOL-1077 Two applications using Bulk Insert API Closed
Sprint: 2018-20

 Description   

The current export function calls collect() on the DataFrame, and thereby writes it to memory in the Spark driver. This can lead to ridiculous amounts of memory usage (depending on the DF size). The current option only needs the bulk write SDK installed on Spark's driver and avoids concurrency problems, as the driver is the only process writing to CS.

Another option is to export each DataFrame's partition directly from the worker. This would result in less memory usage. On the downside every worker needs to have the CS bulk write API installed and we might run into concurrency problems if multiple processes want to write simultaneously to the same table.

This ticket covers the export from worker nodes and not the driver.

Depending on the concurrency problem, we might want to consider writing sequentially from each worker, or writing in parallel to different tables and joining them afterwards.



 Comments   
Comment by Jens Röwekamp (Inactive) [ 2018-11-01 ]

In today's call with dshjoshi and dthompson we decided to implement sequential write from the Spark Workers to Columnstore to avoid the extra step of collecting the DataFrame at the Spark Driver.

Comment by Jens Röwekamp (Inactive) [ 2018-11-10 ]

Introduced a new Scala Spark Connector function:

ColumnStoreExporter.exportFromWorkers("database", "table", RDD, [partitions], [path to Columnstore.xml])

It exports a RDD from the Spark worker nodes instead of the Spark driver. This decreased the ingestion time by 59% in my test case compared to ColumnStoreExporter.export()

I wasn't able to implement it for PySpark as its runJob() function seems to behave different than Scala's.

To use exportFromWorkers() the Bulk Write SDK and Columnstore.xml need to be installed on the Spark driver and workers.
When using exportFromWorkers() sequential transactions from the workers are executed. This means that we don't have only one transaction (that rolls back in an error case) as in the case of export(), but as many transactions as the RDD to export has partitions. I added an optional partition parameter to be able to only ingest the failed partitions in an error case.

I added simple test to the regression suite (derived from export()'s) and executed it successfully on CentOS 7 and Windows 10.

For QA:

  • execute regression test suite
  • if you want to: set up a multi node spark cluster and test manual recovery cases by using the optimal partition parameter (to ingest only the failed partitions)
Generated at Thu Feb 08 02:28:10 UTC 2024 using Jira 8.20.16#820016-sha1:9d11dbea5f4be3d4cc21f03a88dd11d8c8687422.