Am veröffentlicht

Automatisiertes Micro-batch Processing in Azure Synapse Analytics

Ein klassisches Data Warehouse ist darauf ausgelegt, den Anwendern tagsüber konsistente Unternehmensdaten für Berichte und Analysen zentral zur Verfügung zu stellen. Um Konflikte zu vermeiden, werden aktuelle operative Daten in der Regel tagsüber gesammelt und die Integration findet nachts statt, wenn keine Anwender auf das Data Warehouse zugreifen. Die Belastung der Computerressourcen ist somit asymmetrisch: in der Nacht sehr hoch, wenn große Datenmengen angeliefert, auf verschiedene Arten transformiert und anschließend in die Datenbanken geladen werden, tagsüber dagegen abhängig von den Abfragen und Auswertungen der Anwender.

Aus diesem Grund kann es sinnvoll sein, das Data Warehouse zum Teil oder Vollständig in die Cloud zu migrieren. Zum einen können Anbieter von Cloudlösungen ihre Rechenkapazitäten kostengünstig anbieten, da in großen Rechenclustern Skaleneffekte zum Tragen kommen, zum anderen werden die Kapazitäten elastisch und skalierbar angeboten und nach Nutzungszeit abgerechnet. Um die Möglichkeiten der Cloud effizient zu nutzen, kann im Zuge dessen auch die klassische Batchverarbeitung neu organisiert werden. Eine erste Alternative ist das Stream Processing, bei dem die Verarbeitung bei Anlieferung in Echtzeit erfolgt, eine zweite das Micro-batch Processing als Hybrid aus beiden Verfahren, bei dem kleine Batches mit kurzer zeitlicher Verzögerung verarbeitet werden. Der Migrationsaufwand ist hierbei gering, wenn die bestehende Logik kaum angepasst werden muss und eventuell vorhandene Transformationsstrecken übernommen werden können. Ein solches Verfahren wird hier anhand von Azure Synapse Analytics dargestellt.

Was ist Synapse Analytics?

Innerhalb der Azure-Landschaft bietet Synapse Analytics eine Plattform für Datenintegration und Data Warehousing. Es bestehen Möglichkeiten zur direkten Anbindung von SQL- oder Azure Cosmos-Datenbanken sowie dem Azure Data Lake Storage, alternativ können vielfältige andere Quellen – wie zum Beispiel MariaDB, Oracle, PostgreSQL oder MongoDB – über Konnektoren angebunden werden, ähnlich zur Azure Data Factory. Auch die Integration orientiert sich an der Data Factory und findet mittels serverloser oder dedizierter SQL-Pools statt, im Bereich Big Data über serverlose Spark-Pools.

Der Datenfluss wird durch Pipelines gesteuert, innerhalb derer verschiedene Aktivitäten logisch zusammengefasst sind. Die Pipelines können unabhängig voneinander arbeiten, aber auch zu komplexen Netzen verknüpft und orchestriert werden. Darüber hinaus ermöglichen Pipelines einen strukturierten Deploymentprozess.

Durch die Vernetzung innerhalb der Azure-Landschaft ist es ein Leichtes, andere Services zu nutzen und etwa Azure Active Directory für Identitätsmanagement und Authentifizierung zu nutzen oder die Daten aus dem Warehouse in PowerBI aufzubereiten. In diesem Beispiel wird das Azure EventGrid genutzt, um die Integration zu automatisieren.

Micro-batch Processing und ELT in der Cloud

Der im Folgenden beschriebene Verarbeitungsprozess unterscheidet sich in zwei Arten vom oben skizzierten. Erstens werden in der klassischen Batchverarbeitung die Daten gesammelt und gemeinsam verarbeitet, häufig über Nacht oder in noch größeren Abständen und ist daher mit hoher Latenz verbunden. Wenn Daten dagegen schnell zur Verfügung stehen sollen, ist ein Stream Processing erforderlich. Oft kommt es aber nicht auf jede Millisekunde an, sondern es reicht, die Daten in kurzen Intervallen oder bei Überschreiten eines Schwellenwertes in einem Micro-batch zu verarbeiten. Somit müssen die Rechenressourcen nicht permanent bereitstehen. Gleichzeitig wird ein eventueller Overhead zum Bereitstellen gering gehalten, da das System nicht für jeden Einzelsatz anlaufen muss. Wie groß ein Micro-batch gewählt wird, hängt von den Anforderungen und der Infrastruktur ab.

Die Verarbeitung geschieht klassischerweise in einem ETL-Prozess, in dem die Daten aus einem Quellsystem zunächst extrahiert, dann transformiert und aufbereitet, schließlich in die Zieldatenbank geladen werden. Dagegen wird beim ELT-Prozess die Reihenfolge geändert und die Daten zunächst in die Zieldatenbank geladen und dann transformiert. Dadurch stehen die Rohdaten schneller zur Verfügung und können, wenn es für Analysen von Vorteil ist, separat von den transformierten Daten gespeichert und bereitgestellt werden. Ein weiterer Vorteil der Separation von Laden und Transformation ist, dass beide Prozesse parallel geschehen können und die Transformation in der Cloud leicht den Anforderungen entsprechend skaliert werden kann. Im klassischen Betrieb dagegen ist es bei wachsenden Datenmengen notwendig, wenn zum Beispiel die Verarbeitung nicht mehr im zur Verfügung stehenden Zeitfenster möglich ist, die Rechenleistung aufwendig zu erweitern. Die Leistungsfähigkeit muss sich in der Cloud also nicht mehr daran orientieren, was als Maximum benötigt wird.

Einrichten von Synapse

Ein solcher Prozess soll nun in Synapse Analytics abgebildet werden. Zunächst wird ein Synapse-Arbeitsbereich erzeugt, der einem Abonnement und einer Ressourcengruppe zugeordnet ist. Zudem wird ein Data-Lake-Storage angelegt, dessen Name Azure-weit eindeutig sein muss, sowie ein Dateisystem für die Daten des Arbeitsbereiches.

Die Bereitstellung erfolgt automatisiert, allerdings sollte im Anschluss überprüft werden, ob im Abonnement alle erforderlichen Ressourcenanbieter registriert worden sind. Für das automatisierte Starten von Pipelines wird das EventGrid verwendet und die Pipeline basiert auf der DataFactory. Wenn einer der Dienste nicht aktiviert ist, kommt es im Folgenden zu Fehlermeldungen.

Einrichten der Pipelines

Der Arbeitsbereich steht anschließend als Weboberfläche zur Verfügung, der Link steht in der Übersicht der Synapse-Ressource. Die Oberfläche ist in verschiedene Bereiche eingeteilt: einer für die Anbindung von Datenbanken, einer zum Entwickeln von SQL-Skripten, Spark-Notebooks oder Datenflüssen, einer für die Integration durch Pipelines. Darüber hinaus gibt es einen Monitor-Bereich zur Übersicht über die Ausführung von Pipelines oder Triggern sowie Aktivitäten der Spark- oder SQL-Pools.

In diesem Beispiel soll die Verarbeitung durch Spark-Notebooks vorgenommen werden, also muss im Bereich “Entwickeln” ein neues Notebook angelegt werden. Im ersten Schritt wird ein von Azure bereitgestellter Testdatensatz von Taxifahren in New York verwendet, der im Parquet-Format vorliegt. Dieses Format ist spaltenorientiert und ermöglicht eine effiziente Verarbeitung und Speicherung von Daten. Als Sprachen sind im Notebook PySpark, Spark, .NET Spark und Spark SQL möglich, für das Beispiel wird PySpark verwendet.

Erstes Notebook: Bereitstellen der Testdaten

Der erste Schritt stellt Testdaten bereit und sieht wie folgt aus:

start_date = parser.parse(‘2009-02-10’)

end_date = parser.parse(‘2009-02-20’)

df = NycTlcYellow(start_date=start_date, end_date=end_date).to_spark_dataframe()

Hierdurch wird ein Spark Dataframe erzeugt, aus dem nun einzelne Tage ausgewählt und als separate Dateien gespeichert werden. Das Zeitfenster ist schmal gewählt, um nur wenige Testsätze für jeden Tag zu erhalten. In der Praxis würden die Daten aus einem operativen System dynamisch angeliefert werden, was für dieses Beispiel allerdings unerheblich ist, hier werden lediglich drei Sätze über eine Schleife erzeugt:

for i in range(3):

timestamp = datetime.datetime.now().strftime(“%Y-%m-%d_%H-%M-%S”)

date_from = datetime.datetime(2009, 2, 11, 10, 0, 0) + timedelta(days = i)

date_to = datetime.datetime(2009, 2, 11, 12, 0, 0) + timedelta(days = i)

 

data_path = “abfss://%s@%s.dfs.core.windows.net/%s” % (data_container_name, account_name, data_relative_path + timestamp + “.data”)

df_filtered = df.filter(date_from <= df[“tpepPickupDateTime”]).filter(df[“tpepPickupDateTime”] <= date_to)

df_filtered.write.parquet(data_path, mode = “overwrite”)

Der Pfad innerhalb des Data Lakes ist parametrisiert über container_name, account_name und data_relative_path. Zudem wird ein Zeitstempel angehängt, um die Datensätze zu unterscheiden. Über Filter wird das passende Zeitintervall ausgewählt.

Zusätzlich werden Metadaten abgelegt, die den Zeitpunkt der Lieferung, den letzten Zugriff, die Anzahl der enthaltenen Datensätze sowie ein Kennzeichen, ob die Daten bereits eingelesen wurden, enthalten. Dies geschieht in einem separaten Container, der Pfad ist ebenfalls parametrisiert.

Zweites Notebook: Laden der Testdaten

Der zweite Teil besteht daraus, die zuvor bereitgestellten Daten entgegenzunehmen und gesammelt zu laden. Dieses Notebook wird automatisiert gestartet, wenn im Data Lake eine neue Datei angelegt wird. Als erstes werden die Metadaten ausgelesen und die Dateien ausgewählt, die neu geliefert und noch nicht verarbeitet wurden.

Über einen Schwellenwert wird die Größe des Micro-batches gesteuert. Inder Praxis wäre das Ziel der Verarbeitung das Data Warehouse, hier werden die Datensätze gesammelt in einem Dataframe abgelegt. Wenn der Schwellenwert überschritten ist, wird zunächst ein leeres Dataframe anhand eines zuvor gespeicherten Schemas angelegt.

if df_meta.agg({“datasets” : “sum”}).first()[0] > dataset_threshold:

schema_path = “abfss://%s@%s.dfs.core.windows.net/%s” % (meta_container_name, account_name, “nyctlc1.json”)

schema_rdd = spark.sparkContext.wholeTextFiles(schema_path)

schema_txt = json.loads(str(schema_rdd.collect()[0][1]))

schema = StructType.fromJson(schema_txt)

df_total = spark.createDataFrame([], schema)

Anschließend werden alle noch nicht gelesenen Parquet-Dateien durchlaufen und an das leere Dataframe angefügt. In den Metadaten wird dabei der Datensatz auf „read“ gesetzt und der Zeitstempel des Zugriffs aktualisiert.

for timestamp in df_meta.select(df_metadata[“created”]).collect()[0]:

timestamp = timestamp[0]

data_path = “abfss://%s@%s.dfs.core.windows.net/%s” % (data_container_name, account_name, data_relative_path + timestamp + “.data”)

df_new = spark.read.parquet(data_path)

df_total = df_total.union(df_new)

 

timestamp_new = datetime.datetime.now().strftime(“%Y-%m-%d_%H-%M-%S”)

df_meta_update = df_meta.filter(df_meta[“created”] == timestamp).withColumn(“read”, lit(True)).withColumn(“accessed”, lit(timestamp_new))

df_meta = df_meta.filter(df_meta[“created”] != timestamp).union(df_meta_update)

 

out_path = “abfss://%s@%s.dfs.core.windows.net/%s” % (out_container_name, account_name, out_relative_path)

df_total.write.parquet(out_path, mode = “append”)

Aufbau der Pipelines

Die beiden Notebooks werden über eine Aktivität jeweils zunächst in eine Pipeline eingebunden und parametrisiert. Anschließend können die Variablen als dynamischer Inhalt, also als einer der Pipeline zugeordneten Variable, verwendet werden.

Für die zweite Pipeline, die die Testdaten lädt, wird ein Trigger gesetzt, der die Ausführung automatisch startet, sobald ein Blob im Data Lake erstellt wird. In der Konfiguration kann über den Blobpfad selektiert werden, wann ein Ereignis ausgelöst wird:

Vom Speicherkonto wird eine Benachrichtigung an das EventGrid gesendet, wenn eine Datei angelegt wird. Das EventGrid ist mit Azure verknüpft, so dass der Trigger durch dieses Ereignis ausgelöst wird.

Ergebnis

Mit den gewählten Parametern werden drei Parquet-Dateien mit jeweils knapp über 40.000 Datensätzen geschrieben, wie der Auszug der angelegten Metadaten zeigt:

Dabei wird bei jeder Datei ein Event erzeugt, das die Pipeline mit dem zweiten Notebook startet. Da jedoch der Schwellenwert auf 100.000 gesetzt ist, findet eine Verarbeitung erst mit der dritten Datei statt.

Im Anschluss sind die gelieferten Dateien in eine zusammengefasst und würden in der Praxis weiter verarbeitet werden, außerdem sind in den Metadaten der Zeitpunkt des Zugriffes sowie der Status „read“ aktualisiert.