I am writing parquet files to S3 using pyspark code (dataframe.write.mode(‘append’).parquet(s3_folder_path)).So the folder contains multiple parquet files with names starting with part-0000-*.snappy.parquet.I wanted to read these in a single table in Datagrok at folder level.I am able to read by browsing the file name,but next time when the s3 folder is refreshed and file name get changed,datagrok shows error “The specified key does not exist error”.
So I am looking for a solution to read parquet part files from a folder.
Hello Shaju,
as I understood, you want to read multiple parquet files into one single dataframe from S3?
Unfortunately, currently the platform can’t do it, only a single parquet file to dataframe is possible. It can be done manually by opening multiple tables and joining them or if you are familiar with the JS programming language you can write a script using platform capabilities that will fetch all data from S3, form multiple dataframes and combine them into one. I can help you with that if it’s the main goal.
Could you clarify about S3 folder refresh, please? What exactly is changing there and how are you opening files?
Hi ppolovyi,
Thanks for your reply.I am using spark to write dataframe in to S3 as parquet format.Spark will name the files based on partitioning, so we cannot specify the filename and names will be starting with “part-00000-random UUID.snappy.parquet”.Each time when spark job writes dataframe to S3 ,the file name will be different.So I am looking to read at folder level without specifying the filename in datagrok.
Hi, to achieve this goal you can use our JS-API:
//name: Read parquet
//language: javascript
// Create connection to S3
const conn = DG.DataConnection.create('<name of connection>', {dataSource: 'S3', region: <region>, accessKey: <IAM access key>, secretKey: <IAM secret key>, bucket: <bucket name>});
// Save connection
conn = await grok.dapi.connections.save(conn);
// Fetch all FileInfos
const files: FileInfo[] = await grok.dapi.files.list(`${conn.nqName}:<path to folder inside bucket>`, true, '.parquet');
//Then read content as Uint8Array
for (const f of files) {
const bytes: Uint8Array= await grok.dapi.files.readAsBytes(f);
const dataframe: DG.DataFrame = await grok.functions.call('Arrow:fromParquet', {'bytes': bytes});
}
Note, that it is possible only using latest version of platform and with installed last version of Datagrok Arrow package. On previous versions of platform it’s not possible to use Arrow package like this.
Sorry, I presented an example using typescript, that implies you have a package, but it’s not a case, as I understood) Next example implies that you have already created a connection and it uses JavaScript as a language.
//name: Read parquet
//language: javascript
for (const f of await grok.dapi.files.list('MyConnection:MyFolder', true, '.parquet')) {
var table = await grok.functions.call('Arrow:fromParquet', {'bytes': await grok.dapi.files.readAsBytes(f)});
grok.shell.addTableView(table);
}
Note, that you need to replace ‘MyConnection:MyFolder’ with yours.
Please, let me know if you managed to do everything you wanted. The version of the platform should be dev or if you have local deployment it should be bleeding-edge container.