Skip to content

Commit

Permalink
Merge pull request #818 from lupko/flight-docs
Browse files Browse the repository at this point in the history
RELATED: CQ-670 - docs improvements
  • Loading branch information
no23reason authored Sep 24, 2024
2 parents ad77e6d + 78e1fd9 commit 70b181a
Show file tree
Hide file tree
Showing 7 changed files with 430 additions and 107 deletions.
25 changes: 17 additions & 8 deletions gooddata-flexfun/gooddata_flexfun/flexfun/flex_fun.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,25 @@ class FlexFun(abc.ABC):
- a full schema of the result and all it's columns must be known up-front
- the function may return only subset of columns if the caller only
asks for certain columns
- the function receives `parameters` (arguments) that describe the context
in which it was invoked. The function _may_ use this information as input
to algorithms that it uses to generate the result data
NOTE: this is only about trimming columns - the result cardinality is
the same just that some columns are not returned.
- the caller may indicate to the function that out of all columns in the
schema, it is only interested in a subset of `columns`
NOTE: the list of `columns` is provided as hint for column-trimming purposes.
The function _should_ use the `columns` hint and only return the desired columns.
The function _may_ ignore this and always return all columns / superset of columns:
the caller will ignore the extra information.
At the same time, the `columns` hint is _not_ an indication that the function
should perform particular aggregation or do computation differently. To determine this,
the function _must_ inspect the `parameters` to actually determine what the caller
is interested in.
Programming detail: a new instance of the FlexFun will be created
for every call using the `create` method.
TODO: rename the class... i'm not very creative right now
"""

Name: Optional[str] = None
Expand Down Expand Up @@ -77,9 +86,9 @@ def call(
Function call.
:param parameters: parameters sent from the GoodData Cloud / FlexQuery.
:param columns: hints which columns SHOULD be returned; the FlexFun may decide to ignore
:param columns: hints which columns _should_ be returned; the FlexFun may decide to ignore
this and always return all columns. The extraneous columns will be trimmed when received
by FlexQuery.
by FlexQuery. See comments of FlexFun class to learn more.
:param headers: Flight RPC headers
:return: result of the call
"""
Expand Down
32 changes: 2 additions & 30 deletions gooddata-flexfun/gooddata_flexfun/flexfun/flex_fun_task.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,14 @@
# (C) 2024 GoodData Corporation
from collections.abc import Iterable
from typing import Optional, Union

import pyarrow
import structlog
from gooddata_flight_server import ArrowData, FlightDataTaskResult, Task, TaskError, TaskResult
from gooddata_flight_server import FlightDataTaskResult, Task, TaskError, TaskResult

from gooddata_flexfun.flexfun.flex_fun import FlexFun

_LOGGER = structlog.get_logger("gooddata_flexfun.task")


class _FlexFunResult(FlightDataTaskResult):
__slots__ = ("_result",)

def __init__(self, result: ArrowData) -> None:
# if function returns result as a reader, then naturally it can only be
# consumed once.
#
# on the other hand, if the result is table, it can be read as long as
# the result is present in the system (influenced by result TTL setting)
super().__init__(single_use_data=isinstance(result, pyarrow.RecordBatchReader))

self._result = result

def get_schema(self) -> pyarrow.Schema:
return self._result.schema

def _get_data(self) -> Union[Iterable[ArrowData], ArrowData]:
return self._result

def _close(self) -> None:
if isinstance(self._result, pyarrow.RecordBatchReader):
self._result.close()

self._result = None


class FlexFunTask(Task):
__slots__ = ("_fun", "_parameters", "_columns", "_headers")

Expand Down Expand Up @@ -73,7 +45,7 @@ def run(self) -> Union[TaskResult, TaskError]:
headers=self._headers,
)

return _FlexFunResult(result)
return FlightDataTaskResult.for_data(result)

def on_task_cancel(self) -> None:
_LOGGER.info("flexfun_task_cancel", fun=self._fun.Name, task_id=self._task_id)
Expand Down
51 changes: 1 addition & 50 deletions gooddata-flexfun/gooddata_flexfun/flexfun/flight_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,56 +184,7 @@ def do_get(
if task_id is None or not len(task_id):
raise ErrorInfo.bad_argument("Incorrect ticket payload. The ticket payload does not specify 'task_id'.")

task_result = self._ctx.task_executor.wait_for_result(task_id)
if task_result is None:
raise ErrorInfo.for_reason(
ErrorCode.INVALID_TICKET,
f"Unable to serve data for task '{task_id}'. The task result is not present.",
).to_user_error()

result = task_result.result
if not isinstance(result, FlightDataTaskResult):
raise ErrorInfo.for_reason(
ErrorCode.INTERNAL_ERROR,
f"An internal error has occurred while attempting read result for '{task_id}'."
f"While the result exists, it is of an unexpected type. "
f"This is a bug in FlexFun server implementation.",
).to_internal_error()

rlock, data = result.acquire_data()

def _on_end(_: Optional[pyarrow.ArrowException]) -> None:
"""
Once the request that streams the data out is done, make sure
to release the read-lock. Single-use results are closed at
this point because the data cannot be read again anyway.
"""
rlock.release()

if result.single_use_data:
# note: results with single-use data can only ever have one active
# reader (e.g. this one). since the rlock is now released the
# close will proceed without chance of being blocked
try:
result.close()
except Exception:
# log and sink these Exceptions - not much to do
_LOGGER.error("do_get_close_failed", exc_info=True)

finalizer = self.call_finalizer_middleware(context)
finalizer.register_on_end(_on_end)

if isinstance(data, pyarrow.Table):
_LOGGER.info("do_get_table", task_id=task_id, num_rows=data.num_rows)

return pyarrow.flight.RecordBatchStream(data)
elif isinstance(data, pyarrow.RecordBatchReader):
_LOGGER.info("do_get_reader", task_id=task_id)

return pyarrow.flight.RecordBatchStream(data)

_LOGGER.info("do_get_generator", task_id=task_id)
return pyarrow.flight.GeneratorStream(data)
return self.do_get_task_result(context, self._ctx.task_executor, task_id)
except Exception:
_LOGGER.error("do_get_failed", exc_info=True)
raise
Expand Down
202 changes: 201 additions & 1 deletion gooddata-flight-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,55 @@ def my_service(ctx: gf.ServerContext) -> gf.FlightServerMethods:
# ... create and return server methods ...
```

### Authentication

Currently, the server supports two modes of authentication:

- no authentication
- token-based authentication and allows you to plug in custom token verification logic

The token verification method that comes built-in with the server is a simple one: the token is
an arbitrary, secret value shared between server and client. You configure the list of valid secret
tokens at server start-up and then at your discretion distribute these secret values to clients.

By default, the server runs with no authentication. To turn on the token based authentication,
you have to:

- Set the `authentication_method` setting to `token`.

By default, the server will use the built-in token verification strategy
called `EnumeratedTokenVerification`.

- Configure the secret tokens.

You can do this using environment variable: `GOODDATA_FLIGHT_ENUMERATED_TOKENS__TOKENS='["", ""]'`.
Put the secret token(s) inside the quotes. Alternatively, you can code tokens into a configuration file
such as this:

```toml
[enumerated_tokens]
tokens = ["", ""]
```

IMPORTANT: never commit secrets to your VCS.

With this setup in place, the server will expect the Flight clients to include token in the
`authorization` header in form of `Bearer <token>`. The token must be present on every
call.

Here is an example how to make a call that includes the `authorization` header:

```python
import pyarrow.flight

def example_call_using_tokens():
opts = pyarrow.flight.FlightCallOptions(headers=[(b"authorization", b"Bearer <token>")])
client = pyarrow.flight.FlightClient("grpc+tls://localhost:17001")

for flight in client.list_flights(b"", opts):
print(flight)
```

## Developer Manual

This part of the documentation explains additional capabilities of the server.
Expand All @@ -219,7 +268,158 @@ a configured amount of time (see `task_result_ttl_sec` setting). The infrastruct
your task may generate result that can be consumed either repeatedly (say Arrow Tables) or just
once (say RecordBatchReader backed by live stream).

TODO: continue & add examples
Here is an example showing how to code a task, how to integrate its execution and how to
send out data that it generated:

```python
from typing import Union, Any

import pyarrow.flight

import gooddata_flight_server as gf


class MyServiceTask(gf.Task):
def __init__(
self,
task_specific_payload: Any,
cmd: bytes,
):
super().__init__(cmd)

self._task_specific_payload = task_specific_payload

def run(self) -> Union[gf.TaskResult, gf.TaskError]:
# tasks support cancellation; your code can check for
# cancellation at any time; if the task was cancelled the
# method will raise exception.
#
# do not forget to do cleanup on cancellation
self.check_cancelled()

# ... do whatever is needed to generate the data
data: pyarrow.RecordBatchReader = some_method_to_generate_data()

# when the data is ready, wrap it in a result that implements
# the FlightDataTaskResult interface; there are built-in implementations
# to wrap Arrow Table or Arrow RecordBatchReader.
#
# you can write your own result if you need special handling
# of result and/or resources bound to the result.
return gf.FlightDataTaskResult.for_data(data)


class DataServiceMethods(gf.FlightServerMethods):
def __init__(self, ctx: gf.ServerContext) -> None:
self._ctx = ctx

def _prepare_flight_info(self, task_result: gf.TaskExecutionResult) -> pyarrow.flight.FlightInfo:
if task_result.error is not None:
raise task_result.error.as_flight_error()

if task_result.cancelled:
raise gf.ErrorInfo.for_reason(
gf.ErrorCode.COMMAND_CANCELLED,
f"Service call was cancelled. Invocation task was: '{task_result.task_id}'.",
).to_server_error()

result = task_result.result

return pyarrow.flight.FlightInfo(
schema=result.get_schema(),
descriptor=pyarrow.flight.FlightDescriptor.for_command(task_result.cmd),
endpoints=[
pyarrow.flight.FlightEndpoint(
ticket=pyarrow.flight.Ticket(ticket=task_result.task_id.encode()),
locations=[self._ctx.location],
)
],
total_records=-1,
total_bytes=-1,
)

def get_flight_info(
self,
context: pyarrow.flight.ServerCallContext,
descriptor: pyarrow.flight.FlightDescriptor,
) -> pyarrow.flight.FlightInfo:
cmd = descriptor.command
# parse & validate the command
some_parsed_command = ...

# create your custom task; you will usually pass the parsed command
# so that task knows what to do. The 'raw' command is required as well because
# it should be bounced back in the FlightInfo
task = MyServiceTask(task_specific_payload=some_parsed_command, cmd=cmd)
self._ctx.task_executor.submit(task)

# wait for the task to complete
result = self._ctx.task_executor.wait_for_result(task_id=task.task_id)

# once the task completes, create the FlightInfo or raise exception in
# case the task failed. The ticket in the FlightInfo should contain the
# task identifier.
return self._prepare_flight_info(result)

def do_get(self,
context: pyarrow.flight.ServerCallContext,
ticket: pyarrow.flight.Ticket
) -> pyarrow.flight.FlightDataStream:
# caller comes to pick the data; the ticket should be the task identifier
task_id = ticket.ticket.decode()

# this utility method on the base class takes care of everything needed
# to correctly create FlightDataStream from the task result (or die trying
# in case the task result is no longer preset, or the result indicates that
# the task has failed)
return self.do_get_task_result(context, self._ctx.task_executor, task_id)
```

### Custom token verification strategy

At the moment, the built-in token verification strategy supported by the server is the
most basic one. In cases when this strategy is not good enough, you can code your own
and plug it into the server.

The `TokenVerificationStrategy` interface sets contract for your custom strategy. You
implement this class inside a Python module and then tell the server to load that
module.

For example, you create a module `my_service.auth.custom_token_verification` where you
implement the verification strategy:

```python
import gooddata_flight_server as gf
import pyarrow.flight
from typing import Any


class MyCustomTokenVerification(gf.TokenVerificationStrategy):
def verify(self, call_info: pyarrow.flight.CallInfo, token: str) -> Any:
# implement your arbitrary logic here;
#
# see method and class documentation to learn more
raise NotImplementedError

@classmethod
def create(cls, ctx: gf.ServerContext) -> "TokenVerificationStrategy":
# code has chance to read any necessary settings from `ctx.settings`
# property and then use those values to construct the class
#
# see method and class documentation to learn more
return MyCustomTokenVerification()
```

Then, you can use the `token_verification` setting to tell the server to look up
and load token verification strategy from `my_service.auth.custom_token_verification` module.

Using custom verification strategy, you can implement support for say JWT tokens or look
up valid tokens inside some database.

NOTE: As is, the server infrastructure does not concern itself with how the clients actually
obtain the valid tokens. At the moment, this is outside of this project's scope. You can distribute
tokens to clients using some procedure or implement custom APIs where clients have to log in
in order to obtain a valid token.

### Logging

Expand Down
Loading

0 comments on commit 70b181a

Please sign in to comment.