Skip to content
2 changes: 1 addition & 1 deletion bigframes/blob/_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def _output_bq_type(self):
def _create_udf(self):
"""Create Python UDF in BQ. Return name of the UDF."""
udf_name = str(
self._session._loader._storage_manager.generate_unique_resource_id()
self._session._anon_dataset_manager.generate_unique_resource_id()
)

func_body = inspect.getsource(self._func)
Expand Down
2 changes: 1 addition & 1 deletion bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3775,7 +3775,7 @@ def to_gbq(

# The client code owns this table reference now
temp_table_ref = (
self._session._temp_storage_manager.generate_unique_resource_id()
self._session._anon_dataset_manager.generate_unique_resource_id()
)
destination_table = f"{temp_table_ref.project}.{temp_table_ref.dataset_id}.{temp_table_ref.table_id}"

Expand Down
53 changes: 26 additions & 27 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,14 @@
import bigframes.dtypes
import bigframes.functions._function_session as bff_session
import bigframes.functions.function as bff
from bigframes.session import bigquery_session
import bigframes.session._io.bigquery as bf_io_bigquery
import bigframes.session.anonymous_dataset
import bigframes.session.clients
import bigframes.session.executor
import bigframes.session.loader
import bigframes.session.metrics
import bigframes.session.planner
import bigframes.session.temp_storage
import bigframes.session.validation

# Avoid circular imports.
Expand Down Expand Up @@ -247,14 +248,26 @@ def __init__(

self._metrics = bigframes.session.metrics.ExecutionMetrics()
self._function_session = bff_session.FunctionSession()
self._temp_storage_manager = (
bigframes.session.temp_storage.AnonymousDatasetManager(
self._anon_dataset_manager = (
bigframes.session.anonymous_dataset.AnonymousDatasetManager(
self._clients_provider.bqclient,
location=self._location,
session_id=self._session_id,
kms_key=self._bq_kms_key_name,
)
)
# Session temp tables don't support specifying kms key, so use anon dataset if kms key specified
self._session_resource_manager = (
bigquery_session.SessionResourceManager(
self.bqclient,
self._location,
)
if (self._bq_kms_key_name is None)
else None
)
self._temp_storage_manager = (
self._session_resource_manager or self._anon_dataset_manager
)
self._executor: bigframes.session.executor.Executor = (
bigframes.session.executor.BigQueryCachingExecutor(
bqclient=self._clients_provider.bqclient,
Expand Down Expand Up @@ -375,7 +388,7 @@ def _allows_ambiguity(self) -> bool:

@property
def _anonymous_dataset(self):
return self._temp_storage_manager.dataset
return self._anon_dataset_manager.dataset

def __hash__(self):
# Stable hash needed to use in expression tree
Expand All @@ -388,9 +401,11 @@ def close(self):

# Protect against failure when the Session is a fake for testing or
# failed to initialize.
temp_storage_manager = getattr(self, "_temp_storage_manager", None)
if temp_storage_manager:
self._temp_storage_manager.clean_up_tables()
if anon_dataset_manager := getattr(self, "_anon_dataset_manager", None):
anon_dataset_manager.close()

if session_resource_manager := getattr(self, "_session_resource_manager", None):
session_resource_manager.close()

remote_function_session = getattr(self, "_function_session", None)
if remote_function_session:
Expand Down Expand Up @@ -906,8 +921,6 @@ def read_csv(
engine=engine,
write_engine=write_engine,
)
table = self._temp_storage_manager.allocate_temp_table()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean the table isn't created right away? If so, I think we might need to supply a session ID in the load job, if available.

Edit: I see this was moved to read_bigquery_load_job, which makes sense to me. Aside: with more hybrid engine stuff in the future, I can imagine some cases where to_gbq() would be doing a load job to a user-managed table, but I suppose that would probably use a very different job config, anyway.


if engine is not None and engine == "bigquery":
if any(param is not None for param in (dtype, names)):
not_supported = ("dtype", "names")
Expand Down Expand Up @@ -967,9 +980,7 @@ def read_csv(
)

job_config = bigquery.LoadJobConfig()
job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED
job_config.source_format = bigquery.SourceFormat.CSV
job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
job_config.autodetect = True
job_config.field_delimiter = sep
job_config.encoding = encoding
Expand All @@ -983,9 +994,8 @@ def read_csv(
elif header > 0:
job_config.skip_leading_rows = header

return self._loader._read_bigquery_load_job(
return self._loader.read_bigquery_load_job(
filepath_or_buffer,
table,
job_config=job_config,
index_col=index_col,
columns=columns,
Expand Down Expand Up @@ -1052,18 +1062,12 @@ def read_parquet(
engine=engine,
write_engine=write_engine,
)
table = self._temp_storage_manager.allocate_temp_table()

if engine == "bigquery":
job_config = bigquery.LoadJobConfig()
job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED
job_config.source_format = bigquery.SourceFormat.PARQUET
job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
job_config.labels = {"bigframes-api": "read_parquet"}

return self._loader._read_bigquery_load_job(
path, table, job_config=job_config
)
return self._loader.read_bigquery_load_job(path, job_config=job_config)
else:
if "*" in path:
raise ValueError(
Expand Down Expand Up @@ -1106,8 +1110,6 @@ def read_json(
engine=engine,
write_engine=write_engine,
)
table = self._temp_storage_manager.allocate_temp_table()

if engine == "bigquery":

if dtype is not None:
Expand All @@ -1131,16 +1133,13 @@ def read_json(
)

job_config = bigquery.LoadJobConfig()
job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
job_config.autodetect = True
job_config.encoding = encoding
job_config.labels = {"bigframes-api": "read_json"}

return self._loader._read_bigquery_load_job(
return self._loader.read_bigquery_load_job(
path_or_buf,
table,
job_config=job_config,
)
else:
Expand Down Expand Up @@ -1713,7 +1712,7 @@ def _start_query_ml_ddl(

def _create_object_table(self, path: str, connection: str) -> str:
"""Create a random id Object Table from the input path and connection."""
table = str(self._loader._storage_manager.generate_unique_resource_id())
table = str(self._anon_dataset_manager.generate_unique_resource_id())

import textwrap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

import google.cloud.bigquery as bigquery

import bigframes.constants as constants
from bigframes import constants
from bigframes.session import temporary_storage
import bigframes.session._io.bigquery as bf_io_bigquery

_TEMP_TABLE_ID_FORMAT = "bqdf{date}_{session_id}_{random_id}"


class AnonymousDatasetManager:
class AnonymousDatasetManager(temporary_storage.TemporaryStorageManager):
"""
Responsible for allocating and cleaning up temporary gbq tables used by a BigFrames session.
"""
Expand All @@ -38,19 +39,23 @@ def __init__(
kms_key: Optional[str] = None
):
self.bqclient = bqclient
self.location = location
self._location = location
self.dataset = bf_io_bigquery.create_bq_dataset_reference(
self.bqclient,
location=self.location,
location=self._location,
api_name="session-__init__",
)

self.session_id = session_id
self._table_ids: List[bigquery.TableReference] = []
self._kms_key = kms_key

def allocate_and_create_temp_table(
self, schema: Sequence[bigquery.SchemaField], cluster_cols: Sequence[str]
@property
def location(self):
return self._location

def create_temp_table(
self, schema: Sequence[bigquery.SchemaField], cluster_cols: Sequence[str] = []
) -> bigquery.TableReference:
"""
Allocates and and creates a table in the anonymous dataset.
Expand Down Expand Up @@ -99,7 +104,8 @@ def generate_unique_resource_id(self) -> bigquery.TableReference:
)
return self.dataset.table(table_id)

def clean_up_tables(self):
def close(self):
"""Delete tables that were created with this session's session_id."""
for table_ref in self._table_ids:
self.bqclient.delete_table(table_ref, not_found_ok=True)
self._table_ids.clear()
21 changes: 9 additions & 12 deletions bigframes/session/bigquery_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import google.cloud.bigquery as bigquery

from bigframes.core.compile import googlesql
from bigframes.session import temporary_storage

KEEPALIVE_QUERY_TIMEOUT_SECONDS = 5.0

Expand All @@ -32,21 +33,22 @@
logger = logging.getLogger(__name__)


class SessionResourceManager:
class SessionResourceManager(temporary_storage.TemporaryStorageManager):
"""
Responsible for allocating and cleaning up temporary gbq tables used by a BigFrames session.
"""

def __init__(
self, bqclient: bigquery.Client, location: str, *, kms_key: Optional[str] = None
):
def __init__(self, bqclient: bigquery.Client, location: str):
self.bqclient = bqclient
self.location = location
self._kms_key = kms_key
self._location = location
self._session_id: Optional[str] = None
self._sessiondaemon: Optional[RecurringTaskDaemon] = None
self._session_lock = threading.RLock()

@property
def location(self):
return self._location

def create_temp_table(
self, schema: Sequence[bigquery.SchemaField], cluster_cols: Sequence[str] = []
) -> bigquery.TableReference:
Expand All @@ -56,17 +58,13 @@ def create_temp_table(
with self._session_lock:
table_ref = bigquery.TableReference(
bigquery.DatasetReference(self.bqclient.project, "_SESSION"),
uuid.uuid4().hex,
f"bqdf_{uuid.uuid4()}",
)
job_config = bigquery.QueryJobConfig(
connection_properties=[
bigquery.ConnectionProperty("session_id", self._get_session_id())
]
)
if self._kms_key:
job_config.destination_encryption_configuration = (
bigquery.EncryptionConfiguration(kms_key_name=self._kms_key)
)

ibis_schema = ibis_bq.BigQuerySchema.to_ibis(list(schema))

Expand All @@ -87,7 +85,6 @@ def create_temp_table(
ddl = f"CREATE TEMP TABLE `_SESSION`.{googlesql.identifier(table_ref.table_id)} ({fields_string}){cluster_string}"

job = self.bqclient.query(ddl, job_config=job_config)

job.result()
# return the fully qualified table, so it can be used outside of the session
return job.destination
Expand Down
14 changes: 6 additions & 8 deletions bigframes/session/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import bigframes.session._io.bigquery as bq_io
import bigframes.session.metrics
import bigframes.session.planner
import bigframes.session.temp_storage
import bigframes.session.temporary_storage

# Max complexity that should be executed as a single query
QUERY_COMPLEXITY_LIMIT = 1e7
Expand Down Expand Up @@ -195,7 +195,7 @@ class BigQueryCachingExecutor(Executor):
def __init__(
self,
bqclient: bigquery.Client,
storage_manager: bigframes.session.temp_storage.AnonymousDatasetManager,
storage_manager: bigframes.session.temporary_storage.TemporaryStorageManager,
bqstoragereadclient: google.cloud.bigquery_storage_v1.BigQueryReadClient,
*,
strictly_ordered: bool = True,
Expand All @@ -221,7 +221,7 @@ def to_sql(
enable_cache: bool = True,
) -> str:
if offset_column:
array_value, internal_offset_col = array_value.promote_offsets()
array_value, _ = array_value.promote_offsets()
node = (
self.replace_cached_subtrees(array_value.node)
if enable_cache
Expand All @@ -248,7 +248,7 @@ def execute(
job_config = bigquery.QueryJobConfig()
# Use explicit destination to avoid 10GB limit of temporary table
if use_explicit_destination:
destination_table = self.storage_manager.allocate_and_create_temp_table(
destination_table = self.storage_manager.create_temp_table(
array_value.schema.to_bigquery(), cluster_cols=[]
)
job_config.destination = destination_table
Expand Down Expand Up @@ -392,7 +392,7 @@ def peek(
job_config = bigquery.QueryJobConfig()
# Use explicit destination to avoid 10GB limit of temporary table
if use_explicit_destination:
destination_table = self.storage_manager.allocate_and_create_temp_table(
destination_table = self.storage_manager.create_temp_table(
array_value.schema.to_bigquery(), cluster_cols=[]
)
job_config.destination = destination_table
Expand Down Expand Up @@ -645,9 +645,7 @@ def _sql_as_cached_temp_table(
cluster_cols: Sequence[str],
) -> bigquery.TableReference:
assert len(cluster_cols) <= _MAX_CLUSTER_COLUMNS
temp_table = self.storage_manager.allocate_and_create_temp_table(
schema, cluster_cols
)
temp_table = self.storage_manager.create_temp_table(schema, cluster_cols)

# TODO: Get default job config settings
job_config = cast(
Expand Down
Loading