import time
 
def batch_function(cdf_df: DataFrame, batch_id):
    _print(cdf_df.count())
 
query = (
    spark.readStream
    .format("delta")
    .options(readChangeFeed="true", maxBytesPerTrigger="1K")
    .table("_temp_max_bytes_calc")
    .select("id", "year", "month")
    .drop_duplicates()
    .writeStream
    .foreachBatch(batch_function)
    .trigger(processingTime="5 seconds")
    .queryName("my query")
    .start()
)
 
time.sleep(5)
while query.isActive:
    stop_conditions = [
        not query.status["isDataAvailable"],
        not query.status["isTriggerActive"],
        query.status["message"] != "Initializing sources", ]
 
    if all(stop_conditions):
        query.stop()
 
    time.sleep(1)
 
query.awaitTermination(10)