def flattenTable(serviceName, bucketName):
flattenedStream = spark.readStream.load("{}/streaming/silver".format(bucketName))
flattened = spark.table("audit_logs.silver")
schema = StructType()
keys = (
flattened
.filter(col("serviceName") == serviceName)
.select(just_keys_udf(col("flattened")))
.alias("keys")
.distinct()
.collect()
)
keysList = [i.asDict()['justKeys(flattened)'][1:-1].split(", ") for i in keys]
keysDistinct = {j for i in keysList for j in i if j != ""}
if len(keysDistinct) == 0:
schema.add(StructField('placeholder', StringType()))
else:
for i in keysDistinct:
schema.add(StructField(i, StringType()))
(flattenedStream
.filter(col("serviceName") == serviceName)
.withColumn("requestParams", from_json(col("flattened"), schema))
.drop("flattened")
.writeStream
.partitionBy("date")
.outputMode("append")
.format("delta")
.option("checkpointLocation", "{}/checkpoints/gold/{}".format(bucketName, serviceName))
.option("path", "{}/streaming/gold/{}".format(bucketName, serviceName))
.option("mergeSchema", True)
.trigger(once=True)
.start()
)
Remove all metastore entries and files, if re-running
Last refresh: Never