-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat: Experimental transpilation of unannotated python callables #17419
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4692,13 +4692,17 @@ def _prepare_export( | |
| return array_value, id_overrides | ||
|
|
||
| def map(self, func, na_action: Optional[str] = None) -> DataFrame: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd love to see an example in the docstrings, especially if this is compatible with the polars engine and thus would make for a relatively speedy flakeless doctest. |
||
| if not isinstance(func, bigframes.functions.Udf): | ||
| from bigframes._config import options | ||
|
|
||
| if not isinstance(func, bigframes.functions.Udf) and not ( | ||
| options.experiments.enable_python_transpiler and callable(func) | ||
| ): | ||
| raise TypeError("the first argument must be callable") | ||
|
|
||
| if na_action not in {None, "ignore"}: | ||
| raise ValueError(f"na_action={na_action} not supported") | ||
|
|
||
| expr = ops.func_to_op(func).as_expr(ex.free_var("input")) | ||
| expr = ops.func_to_expr(func).apply(ex.free_var("input")) | ||
| if na_action == "ignore": | ||
| # True case, predicate, False case | ||
| expr = ops.where_op.as_expr( | ||
|
|
@@ -4718,11 +4722,47 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): | |
| ) | ||
| warnings.warn(msg, category=bfe.FunctionAxisOnePreviewWarning) | ||
|
|
||
| if not isinstance(func, bigframes.functions.Udf): | ||
| from bigframes._config import options | ||
|
|
||
| if not isinstance(func, bigframes.functions.Udf) and not ( | ||
| options.experiments.enable_python_transpiler and callable(func) | ||
| ): | ||
| raise ValueError( | ||
| "For axis=1 a BigFrames BigQuery function must be used." | ||
| ) | ||
|
|
||
| if ( | ||
| not isinstance(func, bigframes.functions.Udf) | ||
| and options.experiments.enable_python_transpiler | ||
| and callable(func) | ||
| ): | ||
| from bigframes.operations.to_op import CallableExpression | ||
|
|
||
| callable_expr = CallableExpression.from_callable( | ||
| func, unpack_mode=False | ||
| ) | ||
|
|
||
| # Bind the extra arguments (args and kwargs) starting from parameter 1 | ||
| callable_expr = callable_expr.bind_partial(*args, _offset=1, **kwargs) | ||
| expr = callable_expr.expr | ||
|
|
||
| # Now bind the remaining free variables to the DataFrame columns: | ||
| col_bindings = {} | ||
| block = self._get_block() | ||
| for col in self.columns: | ||
| if col in expr.free_variables: | ||
| col_id = block.resolve_label_exact(col) | ||
|
Comment on lines
+4752
to
+4754
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would we expect any trouble from mixing the high-level (dataframe columns) representation and lower level (block column labels) representation? Also, this looks relatively familiar, such as in our |
||
| if col_id is not None: | ||
| col_bindings[col] = ex.deref(col_id) | ||
|
|
||
| expr = expr.bind_variables(col_bindings) | ||
|
|
||
| # Project the expression on the DataFrame block to get a new Series! | ||
| block, result_id = self._get_block().project_expr(expr) | ||
| from bigframes.series import Series | ||
|
|
||
| return Series(block.select_column(result_id)) | ||
|
|
||
| if func.udf_def.signature.is_row_processor: | ||
| # Early check whether the dataframe dtypes are currently supported | ||
| # in the bigquery function | ||
|
|
@@ -4776,8 +4816,14 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): | |
| ) | ||
|
|
||
| # Apply the function | ||
| expr = ops.func_to_expr(func).expr | ||
| if not ( | ||
| isinstance(expr, ex.OpExpression) | ||
| and isinstance(expr.op, ops.NaryOp) | ||
| ): | ||
| raise TypeError(f"Expected OpExpression with NaryOp, got {expr}") | ||
| result_series = rows_as_json_series._apply_nary_op( | ||
| ops.func_to_op(func), | ||
| expr.op, | ||
| list(args), | ||
| ) | ||
|
|
||
|
|
@@ -4837,8 +4883,8 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): | |
|
|
||
| series_list = [self[col] for col in self.columns] | ||
| op_list = series_list[1:] + list(args) | ||
| result_series = series_list[0]._apply_nary_op( | ||
| ops.func_to_op(func), op_list | ||
| result_series = series_list[0]._apply_callable_expr( | ||
| ops.func_to_expr(func), op_list | ||
| ) | ||
| result_series.name = None | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,31 +11,210 @@ | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| from __future__ import annotations | ||
|
|
||
| import dataclasses | ||
| import inspect | ||
| import typing | ||
|
|
||
| import bigframes.core.expression as ex | ||
| import bigframes.core.identifiers as ids | ||
| import bigframes.dtypes as dtypes | ||
| from bigframes._config import options | ||
| from bigframes.functions import Udf | ||
| from bigframes.functions.udf_def import BigqueryUdf, PythonUdf | ||
| from bigframes.operations import base_ops, remote_function_ops | ||
|
|
||
|
|
||
| def func_to_op(op) -> base_ops.NaryOp: | ||
| @dataclasses.dataclass(frozen=True) | ||
| class ArgumentSpec: | ||
| """ | ||
| Information about a single argument to a function | ||
| """ | ||
|
|
||
| name: str | ||
| default_value: typing.Any | ||
| is_varargs: bool | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to track keyword-only and/or kwargs dictionary separately? Or maybe that's not really inferrable from the Python AST? |
||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True) | ||
| class CallableExpression(ex.Expression): | ||
| """ | ||
| Convert various bigframes, python functions into bigframes operations. | ||
| Encodes a calling convention and an expression to bind arguments to. | ||
| """ | ||
|
|
||
| expr: ex.Expression | ||
| arg_specs: typing.Sequence[ArgumentSpec] | ||
|
|
||
| @classmethod | ||
| def from_callable( | ||
| cls, func: typing.Callable, unpack_mode: bool = False | ||
| ) -> CallableExpression: | ||
| sig = inspect.signature(func) | ||
| arg_specs = [] | ||
| for name, param in sig.parameters.items(): | ||
| is_varargs = param.kind == inspect.Parameter.VAR_POSITIONAL | ||
| arg_specs.append( | ||
| ArgumentSpec( | ||
| name=name, | ||
| default_value=param.default, | ||
| is_varargs=is_varargs, | ||
| ) | ||
| ) | ||
|
|
||
| from bigframes.core.bytecode import dis_to_expr | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "dis" is hard for me to understand without context. Could we use a more descriptive name? |
||
|
|
||
| expr = dis_to_expr(func, unpack_mode=unpack_mode) | ||
| return cls(expr=expr, arg_specs=arg_specs) | ||
|
|
||
| def apply(self, *args, **kwargs) -> ex.Expression: | ||
| """ | ||
| Apply the arguments to the expression. | ||
|
|
||
| All args are expected to be column references, or scalars. | ||
| """ | ||
| return self.bind_partial(*args, _offset=0, **kwargs).expr | ||
|
|
||
| def bind_partial( | ||
| self, | ||
| *args, | ||
| _offset: int = 0, | ||
| **kwargs, | ||
| ) -> CallableExpression: | ||
| """ | ||
| Bind a subset of arguments and return a new CallableExpression with the remaining unbound arguments. | ||
| """ | ||
| bindings: dict[typing.Hashable, ex.Expression] = {} | ||
| pos_idx = 0 | ||
| allowed_params = self.arg_specs[_offset:] | ||
| allowed_names = {spec.name for spec in allowed_params} | ||
|
|
||
| # Validate unexpected keyword arguments | ||
| for key in kwargs: | ||
| if key not in allowed_names: | ||
| raise TypeError(f"got an unexpected keyword argument '{key}'") | ||
|
|
||
| def to_expr(val): | ||
| if isinstance(val, ex.Expression): | ||
| return val | ||
| return ex.const(val) | ||
|
|
||
| This should handle anything that might be passed to eg map, combine, other pandas methods that take a function. | ||
| for spec in allowed_params: | ||
| if spec.is_varargs: | ||
| raise NotImplementedError( | ||
| "varargs in compiled python functions is not supported" | ||
| ) | ||
|
|
||
| It should raise a TypeError if the object is not a supported type. | ||
| if pos_idx < len(args): | ||
| if spec.name in kwargs: | ||
| raise TypeError( | ||
| f"got multiple values for keyword argument '{spec.name}'" | ||
| ) | ||
| bindings[spec.name] = to_expr(args[pos_idx]) | ||
| pos_idx += 1 | ||
| elif spec.name in kwargs: | ||
| bindings[spec.name] = to_expr(kwargs[spec.name]) | ||
| elif spec.default_value is not inspect.Parameter.empty: | ||
| bindings[spec.name] = to_expr(spec.default_value) | ||
| else: | ||
| raise TypeError(f"missing required argument: '{spec.name}'") | ||
|
|
||
| Args: | ||
| op: The object to convert. | ||
| if pos_idx < len(args): | ||
| raise TypeError( | ||
| f"too many positional arguments: expected {len(allowed_params)}, got {len(args)}" | ||
| ) | ||
|
|
||
| Returns: | ||
| A bigframes operations. | ||
| new_expr = self.expr.bind_variables(bindings, allow_partial_bindings=True) | ||
| remaining_specs = list(self.arg_specs[:_offset]) | ||
| return CallableExpression(expr=new_expr, arg_specs=remaining_specs) | ||
|
|
||
| @property | ||
| def column_references(self) -> typing.Tuple[ids.ColumnId, ...]: | ||
| return self.expr.column_references | ||
|
|
||
| @property | ||
| def free_variables(self) -> typing.Tuple[typing.Hashable, ...]: | ||
| return self.expr.free_variables | ||
|
|
||
| @property | ||
| def is_const(self) -> bool: | ||
| return self.expr.is_const | ||
|
|
||
| @property | ||
| def is_resolved(self) -> bool: | ||
| return False | ||
|
|
||
| @property | ||
| def output_type(self) -> dtypes.ExpressionType: | ||
| raise ValueError( | ||
| "CallableExpression does not have a fixed output type until arguments are applied." | ||
| ) | ||
|
|
||
| def bind_refs( | ||
| self, | ||
| bindings: typing.Mapping[ids.ColumnId, ex.Expression], | ||
| allow_partial_bindings: bool = False, | ||
| ) -> CallableExpression: | ||
| return dataclasses.replace( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These bind and transform functions are pretty similar to the other implementations, right? Maybe we can do some sort of mixin class to implement these? |
||
| self, | ||
| expr=self.expr.bind_refs( | ||
| bindings, allow_partial_bindings=allow_partial_bindings | ||
| ), | ||
| ) | ||
|
|
||
| def bind_variables( | ||
| self, | ||
| bindings: typing.Mapping[typing.Hashable, ex.Expression], | ||
| allow_partial_bindings: bool = False, | ||
| ) -> CallableExpression: | ||
| arg_names = {spec.name for spec in self.arg_specs} | ||
| filtered_bindings = {k: v for k, v in bindings.items() if k not in arg_names} | ||
| return dataclasses.replace( | ||
| self, | ||
| expr=self.expr.bind_variables( | ||
| filtered_bindings, allow_partial_bindings=allow_partial_bindings | ||
| ), | ||
| ) | ||
|
|
||
| def transform_children( | ||
| self, t: typing.Callable[[ex.Expression], ex.Expression] | ||
| ) -> ex.Expression: | ||
| new_expr = t(self.expr) | ||
| if new_expr != self.expr: | ||
| return dataclasses.replace(self, expr=new_expr) | ||
| return self | ||
|
|
||
|
|
||
| def func_to_expr(op, unpack_mode: bool = False) -> CallableExpression: | ||
| """ | ||
| Convert various bigframes, python functions into bigframes CallableExpression. | ||
| """ | ||
| # TODO(b/517578802): Handle numpy ufuncs, builtin functions, etc. | ||
| if isinstance(op, Udf): | ||
| bq_op: base_ops.NaryOp | ||
| if isinstance(op.udf_def, BigqueryUdf): | ||
| return remote_function_ops.RemoteFunctionOp(function_def=op.udf_def) | ||
| bq_op = remote_function_ops.RemoteFunctionOp(function_def=op.udf_def) | ||
| elif isinstance(op.udf_def, PythonUdf): | ||
| return remote_function_ops.PythonUdfOp(function_def=op.udf_def) | ||
| bq_op = remote_function_ops.PythonUdfOp(function_def=op.udf_def) | ||
| else: | ||
| raise TypeError(f"Unsupported UDF definition: {op.udf_def}") | ||
|
|
||
| inputs_expr = tuple( | ||
| ex.free_var(arg.name) for arg in op.udf_def.signature.inputs | ||
| ) | ||
| expr = ex.OpExpression(bq_op, inputs_expr) | ||
|
|
||
| arg_specs = [ | ||
| ArgumentSpec( | ||
| name=arg.name, | ||
| default_value=inspect.Parameter.empty, | ||
| is_varargs=False, | ||
| ) | ||
| for arg in op.udf_def.signature.inputs | ||
| ] | ||
| return CallableExpression(expr=expr, arg_specs=arg_specs) | ||
|
|
||
| elif options.experiments.enable_python_transpiler and callable(op): | ||
| return CallableExpression.from_callable(op, unpack_mode=unpack_mode) | ||
|
|
||
| else: | ||
| raise TypeError(f"Unsupported function type: {op}") | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I like to make custom exceptions the subclass from PreviewWarning for more explicit opt-in, but probably overkill in retrospect.