import time def batch_function(cdf_df: DataFrame, batch_id): _print(cdf_df.count()) query = ( spark.readStream .format("delta") .options(readChangeFeed="true", maxBytesPerTrigger="1K") .table("_temp_max_bytes_calc") .select("id", "year", "month") .drop_duplicates() .writeStream .foreachBatch(batch_function) .trigger(processingTime="5 seconds") .queryName("my query") .start() ) time.sleep(5) while query.isActive: stop_conditions = [ not query.status["isDataAvailable"], not query.status["isTriggerActive"], query.status["message"] != "Initializing sources", ] if all(stop_conditions): query.stop() time.sleep(1) query.awaitTermination(10)