diff --git a/querybook/scripts/init_db b/querybook/scripts/init_db index a6656b6b..123d8bb9 100755 --- a/querybook/scripts/init_db +++ b/querybook/scripts/init_db @@ -2,5 +2,20 @@ cd querybook PYTHONPATH=server alembic upgrade head -PYTHONPATH=server python server/scripts/init_es.py + +# Retry Elasticsearch initialization to handle timing issues +echo "Initializing Elasticsearch indices..." +for i in {1..30}; do + if PYTHONPATH=server python server/scripts/init_es.py; then + echo "Elasticsearch initialized successfully" + cd - + exit 0 + else + echo "Failed to initialize Elasticsearch, retrying in 5 seconds... ($i/30)" + sleep 5 + fi +done + +echo "ERROR: Failed to initialize Elasticsearch after 30 attempts" cd - +exit 1 diff --git a/querybook/server/app/datasource.py b/querybook/server/app/datasource.py index 124df071..eb6cc394 100644 --- a/querybook/server/app/datasource.py +++ b/querybook/server/app/datasource.py @@ -37,13 +37,16 @@ class RequestException(Exception): self.status_code = status_code -_epoch = datetime.datetime.utcfromtimestamp(0) +_epoch = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) def DATE_MILLISECONDS(dt): """Return miliseconds for the given date""" if isinstance(dt, datetime.date): - dt = datetime.datetime.combine(dt, datetime.datetime.min.time()) + dt = datetime.datetime.combine(dt, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc) + # Ensure datetime is timezone-aware (treat naive as UTC) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=datetime.timezone.utc) delta = dt - _epoch return delta.total_seconds() * 1000.0 diff --git a/querybook/server/datasources/query_execution.py b/querybook/server/datasources/query_execution.py index e70122b3..9b6ab563 100644 --- a/querybook/server/datasources/query_execution.py +++ b/querybook/server/datasources/query_execution.py @@ -15,6 +15,7 @@ from app.auth.permission import ( from clients.common import FileDoesNotExist from lib.export.all_exporters import ALL_EXPORTERS, get_exporter from lib.result_store import GenericReader +from lib.utils.serialize import serialize_value from lib.query_analysis.templating import ( QueryTemplatingError, get_templated_variables_in_string, @@ -162,7 +163,7 @@ def create_query_execution( session=session, ) - query_execution_dict = query_execution.to_dict(with_query_review=True) + query_execution_dict = serialize_value(query_execution.to_dict(with_query_review=True)) if data_doc: socketio.emit( @@ -238,11 +239,11 @@ def cancel_query_execution(query_execution_id): completed_at=datetime.utcnow(), ) - execution_dict = logic.update_query_execution( + execution_dict = serialize_value(logic.update_query_execution( query_execution_id, status=QueryExecutionStatus.CANCEL, completed_at=datetime.utcnow(), - ).to_dict() + ).to_dict()) socketio.emit( "query_cancel", diff --git a/querybook/server/datasources_socketio/datadoc.py b/querybook/server/datasources_socketio/datadoc.py index d7455cd9..2f41e7a2 100644 --- a/querybook/server/datasources_socketio/datadoc.py +++ b/querybook/server/datasources_socketio/datadoc.py @@ -165,7 +165,7 @@ def on_leave_room(data_doc_id): @register_socket("disconnect", namespace=DATA_DOC_NAMESPACE) -def disconnect(): +def disconnect(*args, **kwargs): data_doc_ids = rooms(request.sid, namespace=DATA_DOC_NAMESPACE) for data_doc_id in data_doc_ids: leave_room(data_doc_id) diff --git a/querybook/server/datasources_socketio/query_execution.py b/querybook/server/datasources_socketio/query_execution.py index 9c6a2f8a..7b3668db 100644 --- a/querybook/server/datasources_socketio/query_execution.py +++ b/querybook/server/datasources_socketio/query_execution.py @@ -65,7 +65,7 @@ def on_leave_room(query_execution_id): @register_socket("disconnect", namespace=QUERY_EXECUTION_NAMESPACE) -def disconnect(): +def disconnect(*args, **kwargs): query_execution_ids = rooms(request.sid, namespace=QUERY_EXECUTION_NAMESPACE) for query_execution_id in query_execution_ids: leave_room(query_execution_id) diff --git a/querybook/server/lib/query_executor/clients/trino.py b/querybook/server/lib/query_executor/clients/trino.py index 35e9839d..658a91d9 100644 --- a/querybook/server/lib/query_executor/clients/trino.py +++ b/querybook/server/lib/query_executor/clients/trino.py @@ -22,7 +22,7 @@ class TrinoCursor(PrestoCursorMixin[trino.dbapi.Cursor, List[Any]], CursorBaseCl def poll(self): try: - self.rows.extend(self._cursor._query.fetch()) + self.rows.extend(self._cursor._iterator) self._cursor._iterator = iter(self.rows) poll_result = self._cursor.stats completed = self._cursor._query._finished diff --git a/querybook/server/lib/utils/json.py b/querybook/server/lib/utils/json.py index c8fb2250..daa43c0f 100644 --- a/querybook/server/lib/utils/json.py +++ b/querybook/server/lib/utils/json.py @@ -1,4 +1,5 @@ from datetime import datetime, date +from decimal import Decimal import json from flask import json as flask_json @@ -27,6 +28,8 @@ class JSONEncoder(flask_json.JSONEncoder): return self.datetime_formatter(obj) elif isinstance(obj, date): return self.date_formatter(obj) + elif isinstance(obj, Decimal): + return float(obj) elif isinstance(obj, Row): return list(obj) diff --git a/querybook/server/lib/utils/utils.py b/querybook/server/lib/utils/utils.py index d1add40e..244fab5b 100644 --- a/querybook/server/lib/utils/utils.py +++ b/querybook/server/lib/utils/utils.py @@ -2,7 +2,7 @@ from contextlib import contextmanager import inspect import signal import subprocess -from datetime import datetime, date +from datetime import datetime, date, timezone from functools import wraps from typing import Optional, Union @@ -11,7 +11,7 @@ import gevent from lib.logger import get_logger LOG = get_logger(__file__) -_epoch = datetime.utcfromtimestamp(0) +_epoch = datetime(1970, 1, 1, tzinfo=timezone.utc) def DATE_TO_UTC(dt: date) -> int: @@ -23,11 +23,10 @@ def DATE_TO_UTC(dt: date) -> int: return the result in seconds instead of ms at least for the time being. """ - return ( - int((datetime.combine(dt, datetime.min.time()) - _epoch).total_seconds()) - if dt - else None - ) + if not dt: + return None + dt_combined = datetime.combine(dt, datetime.min.time(), tzinfo=timezone.utc) + return int((dt_combined - _epoch).total_seconds()) def DATETIME_TO_UTC(dt: datetime) -> int: @@ -39,7 +38,12 @@ def DATETIME_TO_UTC(dt: datetime) -> int: return the result in seconds instead of ms at least for the time being. """ - return int((dt - _epoch).total_seconds()) if dt else None + if not dt: + return None + # Ensure datetime is timezone-aware (treat naive as UTC) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return int((dt - _epoch).total_seconds()) def DATE_STRING(dt): diff --git a/querybook/server/logic/datadoc_collab.py b/querybook/server/logic/datadoc_collab.py index 76a0ce5c..9fb371ed 100644 --- a/querybook/server/logic/datadoc_collab.py +++ b/querybook/server/logic/datadoc_collab.py @@ -33,7 +33,7 @@ def update_datadoc(doc_id, fields, sid="", session=None): session=session, **fields, ) - doc_dict = doc.to_dict() + doc_dict = serialize_value(doc.to_dict()) socketio.emit( "data_doc_updated", @@ -74,7 +74,7 @@ def restore_data_doc( "data_doc_restored", ( sid, - restored_datadoc.to_dict(with_cells=True), + serialize_value(restored_datadoc.to_dict(with_cells=True)), commit_message, user.get_name(), ), @@ -190,7 +190,7 @@ def paste_data_cell( ( sid, index, - data_cell.to_dict(), + serialize_value(data_cell.to_dict()), ), namespace=DATA_DOC_NAMESPACE, room=doc_id, diff --git a/requirements/engine/trino.txt b/requirements/engine/trino.txt index 86cb0ed2..c3b91e72 100644 --- a/requirements/engine/trino.txt +++ b/requirements/engine/trino.txt @@ -1 +1,2 @@ -trino==0.305.0 +trino==0.336.0 +sqlalchemy-trino==0.5.0