How to read and write deep distribution files using Apache spark

by SkillAiNest

If you use Apache Sparks to write your data pipeline, you may need to export or copy data from a source to destination, saving the partition folders between the source and the destination.

When I did an online research about doing this in spark, I learned very little lessons that offer an end-ending solution that worked-especially when the parties will be in deep nest and you will not know that the values ​​of these folders will take the values ​​(for example year=*/month=*/day=*/hour=*/*.csv,

In this tutorial, I have provided a similar implementation using sparks.

Here’s what we will cover is:

Bet

You also need a basic understanding of computing computing using a framework such as Hudop and Spark, as well as the code that is programmed in language -based language -based languages. The code is examined using dependent below:

Setup

I am assuming that you have a partition folder created on the source with the pattern below (which is a standard partition column that includes the date of date):

year/month/day/hour

Importantly, as I mentioned above, I am assuming that you do not know the full name of the folders – except that they have some permanent examples.

The liar begins

  1. If you think about using recursiveFileLookup And pathGlobFilter During both reading and writing options, it does not work enough, as the actions mentioned above are only available on readers.

  2. If you think about reading and writing on the basis of a possible year/month/day/hour/hour, and if it has not been found related to a partition folder, it can be done, but it will not be more effective.

My solution

After finding some trials and mistakes and in stack overflow and spark documents, I targeted an idea to use his combination input_file_name()For, for, for,. regexp_extract()And partitionBy() The last goal is to write API to achieve. You can find a scholarly sample code below:

package main.scala.blog



import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.functions.{udf, input_file_name, col, lit, regexp_extract}

object PartitionedReaderWriter {

    def main(args: Array(String)) {
        
        val spark = SparkSession
                    .builder
                    .appName("PartitionedReaderWriterApp")
                    .getOrCreate()

        val sourceBasePath = "data/partitioned_files_source/user"
        
        val sourceDf = spark.read
                            .format("csv")
                            .schema("State STRING, Color STRING, Count INT")
                            .option("header", "true")
                            .option("pathGlobFilter", "*.csv")
                            .option("recursiveFileLookup", "true")
                            .load(sourceBasePath)

        val destinationBasePath = "data/partitioned_files_destination/user"
        
        val writeDf = sourceDf
                        .withColumn("year", regexp_extract(input_file_name(), "year=(\\d{4})", 1))
                        .withColumn("month", regexp_extract(input_file_name(), "month=(\\d{2})", 1))
                        .withColumn("day", regexp_extract(input_file_name(), "day=(\\d{2})", 1))
                        .withColumn("hour", regexp_extract(input_file_name(), "hour=(\\d{2})", 1))

        
        writeDf.write
                .format("csv")
                .option("header", "true")
                .mode("overwrite")
                .partitionBy("year", "month", "day", "hour")
                .save(destinationBasePath)

        
        spark.stop()        
    }
}

What is happening in the above code is here:

  1. Within the important method, you start by adding sparks to make a spark session by adding initial setup code.

  2. Read data from you sourceBasePath Using sparks read() API with shape csv (You can also provide schemes optionally). Option recursiveFileLookup And pathGlobFilter Repeatedly read through nasid folders and describe someone csv File, respectively.

  3. The next section has a basic logic where you can use input_file_name() To return the entire way of file and regexp_extract() To extract year For, for, for,. monthFor, for, for,. dayAnd hour Store with the relevant sub -folds on the way and as auxiliary columns on the data frame.

  4. Finally, you write a data frame using it csv Re -format and use important partitionBy Explain the pre -created auxiliary columns as a partition column. Then save the data frame destinationBasePath.

  5. After the copy is completed, you stop calling this spark session stop() API.

Conclusion

In this article I have shown you how to export / copy a deep nesting data files using Apache Spark in an effective manner. I hope you know this useful!

You can read my other articles

You may also like

Leave a Comment

At Skillainest, we believe the future belongs to those who embrace AI, upgrade their skills, and stay ahead of the curve.

Get latest news

Subscribe my Newsletter for new blog posts, tips & new photos. Let's stay updated!

@2025 Skillainest.Designed and Developed by Pro