import time
def batch_function(cdf_df: DataFrame, batch_id):
_print(cdf_df.count())
query = (
spark.readStream
.format("delta")
.options(readChangeFeed="true", maxBytesPerTrigger="1K")
.table("_temp_max_bytes_calc")
.select("id", "year", "month")
.drop_duplicates()
.writeStream
.foreachBatch(batch_function)
.trigger(processingTime="5 seconds")
.queryName("my query")
.start()
)
time.sleep(5)
while query.isActive:
stop_conditions = [
not query.status["isDataAvailable"],
not query.status["isTriggerActive"],
query.status["message"] != "Initializing sources", ]
if all(stop_conditions):
query.stop()
time.sleep(1)
query.awaitTermination(10)