Day 1
- try to embrace
spark.sql(...)
Day 2
-
HIVE METASTORE (?)
-
deltalake managed and external table
- managed → if you dropped table, both parquet and table metadata is dropped
- external → only table metadata is dropped
-
Deltalake described table
-
Create table
CREATE SCHEMA IF NOT EXISTS ${da.schema_name}
what is the difference with
DeltaTable.createIfNotExists
? -
Define table schema while creating table is recommended
-
SQLite → a database that use local file system
-
Using
CREATE OR REPLACE TEMP VIEW {table_name} ({all_col_schema})
to read and infer schema before creating table or you can useCREATE OR REPLACE TABLE sales_unparsed
(find out more about this and CTAS) [link] -
You can enrich your delta table by adding more metadata information
- you can use
pyspark.sql.functions as F
to get theinput_file_name
orcurrent_timestamp
to enrich your table
- you can use
-
AS A BEST PRACTICE, DONT USE PARTITIONING IN DELTALAKE IF THE TABLE IS SMALL
-
You can use DEEP or SHALLOW clone of delta table source to use it for model development or testing.
-
Avoid to use Spark RDD, just use the higher level API because it is included Query optimization
-
You can use
sparkdf.schema
to get the schema and use the schema to create another table -
df.collect
get all data from all executor to driver -
you can use
df.creteaOrReplaceTempView
to use a sql spark API which create temporary table in memory -
Spark dataframe is immutable
-
count(*)
count null value, usecount(colname)
to skipped null value- conver from timestamp to date format
-
you can use
.
to access nested data -
infer all table or dataframe to pyspark native schema
-
The reason why many spark schema use struct to reduce the memory usage, if you want to explode consider to select only needed column. explode dataframe is expensive!
Day 3 (20230419)
-
Incremental data ingestion with Auto Loader
-
To enable schema evolution for future need, you need you track the schema evolution (ref: deltalake schema evolution format)
-
In order to avoid data throw due to unmatched schema or data type you can create a column called
_rescued_data
(check how to use) -
schema_hints
-
checkpoint is really importance in structure streaming
-
Streaming should be
- High available
- Reply-able
- Durable
- Idemponent
-
You can view the streaming dataframe in streaming manner using
readStream(...).createOrReplaceTempView()
. the you can check or applied the data transformation using spark sql API. Then you can write it back usingspark.table("tempViewname").writeStream(...)
(see. DL 6.3L) -
Streaming
trigger(availableNow)
-
Streaming
trigger("once")
is useful to run the program once you need. Because it store the checkpoint, so next time it run, it will get the data from the last checkpoint. But its recommended to usetrigger("availableNow")
.
MULTI-HOP Architecture