• Some important spark configuration

    --conf spark.driver.memory=90g \
    --conf spark.sql.files.maxPartitionBytes=4194304 \
    --conf spark.eventLog.enabled=true \
    --conf spark.eventLog.dir=/opt/airflow/dags/spark-events
  • Partition configuration

    • Input partition
      • control the input partition by using spark.sql.files.maxPartitionBytes
        • default is 128mb, but if you will have explode operation, consider to decrease this value
      • control the shuffle partition by using spark.sql.shuffle.partitions
        • default is 200 partition, but if you will have explode operation, consider to decrease this value
      • control the output partition by using .option("maxRecordsPerFile", "x")
  • Estimate the number of partition

    import numpy as np
     
     
    print ("\n\n*********************************************************************************")
     
    """
    IF THE SHUFLLE PARTITION IS NOT CHANGE IN THE SPARK UI, PLEASE CHECK THE FOLLOWING:
    - REMOVE THE TABLE IN MINIO UNDER THE SAME NAME OF THE RUNNING SPARK APP
    """
    CONS_SHUFFLE_RATIO_ROWS = 1/21
    CONST_MB_TO_BYTES = 1024 * 1024
     
    total_num_rows = 5932823 <- you only need this as input
     
    seed_no = 123
    num_cores = int(32)
    total_available_memory_gb= 125
    reserved_memory_gb = 35
     
    target_input_max_bytes_per_file_mb = 4
    target_input_max_bytes_per_file_bytes = target_input_max_bytes_per_file_mb * CONST_MB_TO_BYTES
     
    max_records_per_file = 1_000
     
    num_shuffle_partitions = int(np.ceil(total_num_rows * CONS_SHUFFLE_RATIO_ROWS))
     
    print(f"{'SEED NO':<30} : {seed_no}")
    print(f"{'NUM CORES':<30} : {num_cores}")
    print(f"{'TOTAL MEMORY DRIVER':<30} : {total_available_memory_gb-reserved_memory_gb} gb")
    print (f"{'MEMORY PER CORE':<30} : {(total_available_memory_gb - reserved_memory_gb)/num_cores} gb")
     
    print(f"{'MAX INPUT SIZE':<30} : {target_input_max_bytes_per_file_mb} mb / {target_input_max_bytes_per_file_bytes} bytes")
    print (f"{'SHUFFLE PARTITION':<30} : {num_shuffle_partitions} partitions")
    print (f"{'MAX RECORDS PER FILE':<30} : {max_records_per_file} records")
    print ("*********************************************************************************\n\n")