influx db storage hinzu
This commit is contained in:
52
kurse.py
52
kurse.py
@@ -1,7 +1,10 @@
|
|||||||
import pandas as pd
|
import pandas as pd
|
||||||
import yfinance as yf
|
import yfinance as yf
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
def lade_kurse(csv_datei: str, ausgabe_datei: str = "kurse.csv"):
|
def lade_kurse(csv_datei: str, ausgabe_datei: str = "kurse.csv",
|
||||||
|
influx_url: str = None, influx_token: str = None,
|
||||||
|
influx_org: str = None, influx_bucket: str = None):
|
||||||
# CSV lesen, erwartet Spalte 'Symbol'
|
# CSV lesen, erwartet Spalte 'Symbol'
|
||||||
daten = pd.read_csv(csv_datei)
|
daten = pd.read_csv(csv_datei)
|
||||||
if 'Symbol' not in daten.columns:
|
if 'Symbol' not in daten.columns:
|
||||||
@@ -28,6 +31,49 @@ def lade_kurse(csv_datei: str, ausgabe_datei: str = "kurse.csv"):
|
|||||||
df_kurse.to_csv(ausgabe_datei, index=False)
|
df_kurse.to_csv(ausgabe_datei, index=False)
|
||||||
print(f"Kurse wurden erfolgreich in {ausgabe_datei} gespeichert.")
|
print(f"Kurse wurden erfolgreich in {ausgabe_datei} gespeichert.")
|
||||||
|
|
||||||
# Beispielaufruf (entferne Kommentar zur Nutzung)
|
# Optional: in InfluxDB schreiben (InfluxDB 2.x client)
|
||||||
lade_kurse('meine_aktien.csv')
|
if influx_url and influx_token and influx_org and influx_bucket:
|
||||||
|
try:
|
||||||
|
from influxdb_client import InfluxDBClient, Point, WritePrecision
|
||||||
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
||||||
|
except ImportError:
|
||||||
|
print("Package 'influxdb-client' nicht installiert. Installiere mit: pip install influxdb-client")
|
||||||
|
return
|
||||||
|
|
||||||
|
client = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org)
|
||||||
|
# synchronen Schreibmodus verwenden, damit keine Hintergrund-Threads offen bleiben
|
||||||
|
write_api = client.write_api(write_options=SYNCHRONOUS)
|
||||||
|
punkte = []
|
||||||
|
jetzt = datetime.utcnow()
|
||||||
|
for eintrag in kurs_liste:
|
||||||
|
symbol = eintrag.get("Symbol")
|
||||||
|
kurs = eintrag.get("Letzter_Kurs")
|
||||||
|
# Nur tatsächliche Kurse schreiben
|
||||||
|
if kurs is None:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
punkt = Point("aktien") \
|
||||||
|
.tag("symbol", str(symbol)) \
|
||||||
|
.field("letzter_kurs", float(kurs)) \
|
||||||
|
.time(jetzt, WritePrecision.S)
|
||||||
|
punkte.append(punkt)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Fehler beim Erstellen des Influx-Punktes für {symbol}: {e}")
|
||||||
|
|
||||||
|
if punkte:
|
||||||
|
try:
|
||||||
|
write_api.write(bucket=influx_bucket, org=influx_org, record=punkte)
|
||||||
|
# write_api synchron geschlossen, dann client schließen
|
||||||
|
write_api.close()
|
||||||
|
print(f"{len(punkte)} Kurse in InfluxDB-Bucket '{influx_bucket}' geschrieben.")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Fehler beim Schreiben in InfluxDB: {e}")
|
||||||
|
client.close()
|
||||||
|
|
||||||
|
# Beispielaufruf (entferne Kommentar zur Nutzung)
|
||||||
|
lade_kurse('meine_aktien.csv',
|
||||||
|
influx_url='http://192.168.178.102:8086',
|
||||||
|
influx_token='aPJDBRd27Te5IEz1k7SM5mii-PZzmhXFsCkKxzddm-NSc6EIzimCEGPt_iXT6kRj0IWHo8OhFu9lEnapTMWMQA==',
|
||||||
|
influx_org='inflx-aktien',
|
||||||
|
influx_bucket='aktienkurse')
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user