Heute möchte ich meinen ersten Eindruck von Microsoft Fabric Lakehouse mit Ihnen teilen. Microsoft Fabric ist ein neues Produkt im Angebot von Microsoft, das sich derzeit in der Preview-Phase befindet. Sie können sich jedoch registrieren und es 60 Tage lang testen. Fabric enthält mehrere Komponenten wie Data Factory, OneLake Data, Warehouse und Lakehouse, die ich in diesem Beitrag vorstellen werde.
Die Lakehouse-Plattform stützt sich vollständig auf die Spark-Engine. Spark selbst bietet Flexibilität und ermöglicht es uns, verschiedene Programmiersprachen wie Spark, Scala, SQL oder Python in Notebooks für Datenoperationen mit den in OneLake gespeicherten Dateien zu verwenden. Es ist erwähnenswert, dass Lakehouse und Warehouse zwar einige Gemeinsamkeiten aufweisen, aber auch einen bedeutenden Unterschied haben. Das Warehouse bietet ein umfassendes T‑SQL-Erlebnis, während Lakehouse in erster Linie als Spark-basiertes Engineering-Tool dient.
Fabric Lakehouse erstellen
Sie können ein neues Lakehouse erstellen, indem Sie im Menü auf der linken Seite auf die Schaltfläche „Erstellen“ klicken.

Nach einiger Zeit sollte Ihr Lakehouse fertig sein. Über dieses Menü (Bildschirm unten) können Sie auf Tabellen und Dateien in Ihrem Lakehouse zugreifen und neue Elemente wie Datenpipelines (Data Factory), Notizbücher und Dataflows (die neue Version von Dataflow in Data Factory) erstellen.

Die Ausführung von Code in Notizbüchern
Dieser Beitrag wird sich auf die Python/Spark-Funktionen in Fabric konzentrieren, also wähle ich „Neues Notizbuch“ aus dem oberen Menü, um mit der Codeentwicklung zu beginnen.

Ich habe ein Notizbuch erstellt und einen Code geschrieben, um mit Python Daten aus dem Internet zu importieren, den Sie unten finden.
# Welcome to your new notebook
# Type here in the cell editor to add code!
from datetime import datetime, timedelta
import requests
import json
import pandas as pd
from builtin.utils import create_array_first_last_day_of_year
LINK = "/lakehouse/default/Files/nbp/"
def import_gold_prices(date: tuple) -> list[(str, str)]:
"""
This function returns an array of date and gold price.
Args:
date: tuple: date range.
Returns:
list[str, str]: list of prices.
"""
url = f"https://api.nbp.pl/api/cenyzlota/{date[0]}/{date[1]}"
response = requests.get(url)
response.raise_for_status()
data = json.loads(response.text)
gold_prices = []
for cena_zlota in data:
date = cena_zlota["data"]
price = cena_zlota["cena"]
gold_prices.append((date, price))
return gold_prices
def import_usd_prices(date: tuple[str, str]) -> list[(str, str)]:
"""
This function returns an array of date and usd price.
Args:
date: tuple: date range.
Returns:
list[str, str]: list of prices.
"""
url = f"https://api.nbp.pl/api/exchangerates/rates/a/usd/{date[0]}/{date[1]}/"
response = requests.get(url)
response.raise_for_status()
data = json.loads(response.text)
prices = []
for cena_usd in data["rates"]:
date = cena_usd["effectiveDate"]
price = cena_usd["mid"]
prices.append((date, price))
return prices
def save_gold_df(dest_path: str) -> None:
start_year = 2013
end_year = 2023
first_last_days_of_years = create_array_first_last_day_of_year(start_year, end_year)
arr = []
for date in first_last_days_of_years:
arr_gold = import_gold_prices(date)
arr.extend(arr_gold)
pd.DataFrame(arr, columns=["date","price"]).to_csv(f'{dest_path}gold.csv', index=False)
def save_usd_df(dest_path: str) -> None:
"""
Saves USD dataframe
Args:
Returns:
"""
start_year = 2006
end_year = 2023
first_last_days_of_years = create_array_first_last_day_of_year(start_year, end_year)
arr = []
for date in first_last_days_of_years:
gold_prices_data = import_usd_prices(date)
arr.extend(gold_prices_data)
pd.DataFrame(arr, columns=["date","price"]).to_csv(f'{dest_path}usd.csv', index=False)
def import_flat_price(dest_path) -> None:
url = "https://static.nbp.pl/dane/rynek-nieruchomosci/ceny_mieszkan.xlsx"
response = requests.get(url)
response.raise_for_status()
with open(f"{dest_path}flat_prices.xlsx", "wb") as file:
file.write(response.content)
#
if __name__ == "__main__":
save_gold_df("/lakehouse/default/Files/nbp/")
save_usd_df("/lakehouse/default/Files/nbp/")
import_flat_price("/lakehouse/default/Files/nbp/")
Um den Code auszuführen, habe ich auf die Schaltfläche „Alles ausführen“ geklickt.

Die Geschwindigkeit der Ausführung war beeindruckend. Anstatt minutenlang zu warten, wie bei Azure Synapse Spark Pool oder Databricks, dauerte es nur 5 Sekunden, um mein Skript zu starten und auszuführen. Der Bildschirm unten zeigt das Ergebnis der Code-Ausführung. Wie Sie sehen können, sind die Dateien im rechten Menü zugänglich. Ich kann auf sie in meinem Notebook über ein Menü im Linux-Stil zugreifen.

Schön, die Dateien wurden importiert, und jetzt brauche ich ein Skript, um sie zu bereinigen und in das erwartete Format zu konvertieren. Ich habe ein weiteres Notebook und ein Skript im selben Lakehouse erstellt. Ich konnte keine Option finden, die es mir erlaubt, Abhängigkeiten auf dem Cluster zu installieren, aber ich kann den Befehl „pip install“ ausführen, um die erforderlichen Python-Bibliotheken in mein Notebook zu importieren.
pip install duckdb
import pandas as pd
import duckdb
from builtin.utils import convert_to_last_day_of_quarter, generate_days_in_years
def clean_flats(dest_path):
flats = pd.read_excel(f"{dest_path}flat_prices.xlsx", header=6, usecols="X:AO", sheet_name="Rynek pierwotny")
cities = ['Białystok','Bydgoszcz','Gdańsk','Gdynia','Katowice','Kielce','Kraków','Lublin','Łódź','Olsztyn','Opole',
'Poznań','Rzeszów','Szczecin','Warszawa','Wrocław','Zielona Góra']
flats.columns = flats.columns.str.replace(".1", "")
flats.columns = flats.columns.str.replace("*", "")
flats = flats[flats['Kwartał'].notna()]
flats_unpivot = pd.melt(flats, id_vars='Kwartał', value_vars=cities)
flats_unpivot['date'] = flats_unpivot.apply(lambda row: convert_to_last_day_of_quarter(row['Kwartał']),axis=1)
flats_unpivot['date'] = pd.to_datetime(flats_unpivot['date'])
flats_unpivot['city'] = flats_unpivot['variable']
flats_unpivot.to_parquet(f"{dest_path}flats_price.parquet")
def clean_currency(dest_path):
gold = pd.read_csv(f"{dest_path}gold.csv")
gold['date'] = pd.to_datetime(gold['date'])
gold['currency'] = 'gold'
calendar = pd.DataFrame(generate_days_in_years(2006,2023), columns=["date","last_date"])
calendar['date'] = pd.to_datetime(calendar['date'])
usd = pd.read_csv(f"{dest_path}usd.csv")
usd['date'] = pd.to_datetime(usd['date'])
usd['currency'] = 'usd'
currency = pd.concat([gold, usd], ignore_index=True, sort=False)
# fill gups
usd = duckdb.sql("""
select
date,
price,
currency,
from
(
select
row_number() over (partition by currency, a.date order by b.date desc) lp,
a.date,
b.date org_date,
b.price,
currency,
from
calendar a left join currency b on b.date between a.date - INTERVAL 3 DAY and a.date
)
WHERE
lp = 1
order by date
""").to_df()
usd.to_parquet(f"{dest_path}currency.parquet")
if __name__ == "__main__":
clean_flats("/lakehouse/default/Files/nbp/")
clean_currency("/lakehouse/default/Files/nbp/")
clean_currency("/lakehouse/default/Files/nbp/")
Wiederum wurde mein Code innerhalb weniger Sekunden ausgeführt, und ich konnte die neuen Dateien in meinem Lakehouse sehen.

Schließlich kann ich meinen Bericht auf der Grundlage der gesammelten und umgewandelten Dateien erstellen. Ich habe das Notizbuch „Bericht“ erstellt und den folgenden Code ausgeführt, um die Berichtsdatei zu erstellen.
pip install duckdb
import pandas as pd
import duckdb
from builtin.utils import convert_to_last_day_of_quarter, generate_days_in_years
def report(source_path):
calendar = pd.DataFrame(generate_days_in_years(2006,2023), columns=["date","last_date"])
flats_price = pd.read_parquet(f"{source_path}flats_price.parquet")
currency = pd.read_parquet(f"{source_path}currency.parquet")
df_data = duckdb.sql("""
select
a.date,
a.value flat_price,
b.price gold,
b.price*31 ounce,
c.price usd,
a.value / c.price flat_price_usd,
a.value / (b.price * 31) flat_price_gold,
(a.value - lag(a.value) over (order by a.date))/lag(a.value) over (order by a.date) m2mgrow
from
flats_price a
left join currency b on a.date = b.date and b.currency = 'gold'
left join currency c on a.date = c.date and c.currency = 'usd'
where
city = 'Warszawa'
order by a.date
""").to_df()
df_data.to_csv(f"{source_path}data.csv", encoding='utf-8', index=False)
df_data.to_parquet(f"{source_path}flats_report.parquet")
if __name__ == "__main__":
report("/lakehouse/default/Files/nbp/")
Wie Sie vielleicht bemerken, verwende ich Spark nicht, weil ich einen Code repliziere, den ich lokal auf meinem Computer entwickelt und getestet habe. Außerdem ist nicht jeder Data Scientist mit Spark vertraut, und es gibt Anwendungsfälle, in denen es nicht benötigt wird.
Wenn Sie es jedoch vorziehen, mit Apache Spark zu arbeiten, können Sie natürlich Spark-Befehle verwenden, um Daten umzuwandeln, wie im folgenden Beispiel gezeigt. Wenn Sie Ihre Dateien im Delta-Format speichern, stehen Ihnen außerdem Tabellen in der Lakehouse-Struktur unter dem Knoten „tables“ zur Verfügung.
df = spark.read.parquet("Files/nbp/flats_price.parquet")
df.write.mode("overwrite").saveAsTable("flats_report")
df = spark.read.parquet("Files/nbp/currency.parquet")
df.write.mode("overwrite").saveAsTable("dim_currency")

Fabric Pipelines
Wenn wir die Entwicklung des Notebooks abgeschlossen haben, können wir mit der Prozessorchestrierung unter Verwendung von Fabric fortfahren. Fabric unterstützt dies durch das Modul Data Factory, das eine breite Palette von Aktivitäten bietet. Zu diesen Aktivitäten gehören das Kopieren von Daten aus externen Quellen, die Ausführung gespeicherter Prozeduren, die Iteration durch Elemente, die Ausführung von Notizbüchern und vieles mehr.

Hier sehen Sie den Fluss, den ich für die Orchestrierung meines Prozesses zur Analyse von Pauschalpreisen erstellt habe. Mithilfe von Pipelines können wir Abhängigkeiten zwischen den Schritten des Prozesses herstellen.
SQL Endpoint
Die Funktion, die mich am meisten interessiert, ist der SQL-Endpunkt, bei dem es sich um ein automatisch aus einem Lakehouse generiertes Warehouse handelt. Damit kann ich eine von Spark erstellte Tabelle mit SQL abfragen und sie dann mit Power BI nutzen. Unten finden Sie einen Bericht, den ich auf der Grundlage der Ausgabe des Berichtsnotizbuchs erstellt habe. Es ist wirklich erstaunlich, wie schnell man verschiedene Technologien miteinander verbinden kann, um seine Daten zu visualisieren.

Anmerkung:
Um die Dateien mit SQL Endpoint abzufragen, müssen Sie sie im Delta-Format speichern.
df = spark.read.parquet("Files/nbp/flats_report.parquet")
# df now is a Spark DataFrame containing parquet data from "Files/nbp/flats_report.parquet".
df.write.format("delta").mode("overwrite").saveAsTable("flats_report1")
Zusammenfassung
Microsoft Fabric befindet sich noch in der Vorschauphase, aber es scheint sich in Richtung einer einheitlichen Umgebung für die Arbeit mit Daten innerhalb Ihres Unternehmens zu bewegen. Diese Lösung zielt darauf ab, die Integration von Data Engineering, Data Science und Power BI Reporting zu erleichtern. Es ist jedoch anzumerken, dass sie noch nicht produktionsreif ist und einige Verbesserungen erfordert. Kleine Aktualisierungen sind erforderlich, um die Benutzerfreundlichkeit zu verbessern; ich habe zum Beispiel eine Weile gebraucht, um herauszufinden, wie man den Namen eines Notizbuchs ändert. Außerdem ist es wichtig zu erwähnen, dass die Notizbücher nicht dieselben Ressourcen nutzen, so dass man für jedes Notizbuch separate Dateien bereitstellen muss (z. B. ein Python-Modul). Ich frage mich, was mit bestehenden Tools wie ADF und Azure Synapse in Bezug auf die Fabric-Implementierung passieren wird. Nichtsdestotrotz arbeitet Microsoft aktiv daran, und wir können erwarten, neue Funktionen in diesem leistungsstarken Tool zu sehen.
Erfahren Sie mehr über Lösungen im Bereich Data Management oder besuchen Sie eines unserer kostenlosen Webinare.
Quelle: medium.com