[MCOL-1362] Add a export function that utilizes (sequential) write from Spark workers Created: 2018-04-24 Updated: 2023-10-26 Resolved: 2018-11-30 |
|
| Status: | Closed |
| Project: | MariaDB ColumnStore |
| Component/s: | None |
| Affects Version/s: | None |
| Fix Version/s: | 1.2.2 |
| Type: | New Feature | Priority: | Major |
| Reporter: | Jens Röwekamp (Inactive) | Assignee: | David Thompson (Inactive) |
| Resolution: | Fixed | Votes: | 0 |
| Labels: | None | ||
| Issue Links: |
|
||||||||||||
| Sprint: | 2018-20 | ||||||||||||
| Description |
|
The current export function calls collect() on the DataFrame, and thereby writes it to memory in the Spark driver. This can lead to ridiculous amounts of memory usage (depending on the DF size). The current option only needs the bulk write SDK installed on Spark's driver and avoids concurrency problems, as the driver is the only process writing to CS. Another option is to export each DataFrame's partition directly from the worker. This would result in less memory usage. On the downside every worker needs to have the CS bulk write API installed and we might run into concurrency problems if multiple processes want to write simultaneously to the same table. This ticket covers the export from worker nodes and not the driver. Depending on the concurrency problem, we might want to consider writing sequentially from each worker, or writing in parallel to different tables and joining them afterwards. |
| Comments |
| Comment by Jens Röwekamp (Inactive) [ 2018-11-01 ] |
|
In today's call with dshjoshi and dthompson we decided to implement sequential write from the Spark Workers to Columnstore to avoid the extra step of collecting the DataFrame at the Spark Driver. |
| Comment by Jens Röwekamp (Inactive) [ 2018-11-10 ] |
|
Introduced a new Scala Spark Connector function: ColumnStoreExporter.exportFromWorkers("database", "table", RDD, [partitions], [path to Columnstore.xml]) It exports a RDD from the Spark worker nodes instead of the Spark driver. This decreased the ingestion time by 59% in my test case compared to ColumnStoreExporter.export() I wasn't able to implement it for PySpark as its runJob() function seems to behave different than Scala's. To use exportFromWorkers() the Bulk Write SDK and Columnstore.xml need to be installed on the Spark driver and workers. I added simple test to the regression suite (derived from export()'s) and executed it successfully on CentOS 7 and Windows 10. For QA:
|