Skip to content

Commit

Permalink
Merge branch 'main' of github.com:Chainlit/literalai-python into will…
Browse files Browse the repository at this point in the history
…y/eng-1732-remove-promote-endpoint-and-add-promptlineage-blend-change
  • Loading branch information
willydouhard committed Aug 8, 2024
2 parents 8d159b0 + 89e966c commit bf8c2bf
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 8 deletions.
6 changes: 6 additions & 0 deletions literalai/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,7 @@ def create_step(
parent_id: Optional[str] = None,
name: Optional[str] = None,
tags: Optional[List[str]] = None,
root_run_id: Optional[str] = None,
):
"""
Creates a new step with the specified parameters.
Expand All @@ -874,6 +875,7 @@ def create_step(
parent_id (Optional[str]): The ID of the parent step, if any.
name (Optional[str]): The name of the step.
tags (Optional[List[str]]): Tags associated with the step.
root_run_id (Optional[str]): The ID of the root run, if any.
Returns:
The result of the GraphQL helper function for creating a step.
Expand All @@ -890,6 +892,7 @@ def create_step(
parent_id=parent_id,
name=name,
tags=tags,
root_run_id=root_run_id,
)
)

Expand Down Expand Up @@ -2096,6 +2099,7 @@ async def create_step(
parent_id: Optional[str] = None,
name: Optional[str] = None,
tags: Optional[List[str]] = None,
root_run_id: Optional[str] = None,
):
"""
Asynchronously creates a new step with the specified parameters.
Expand All @@ -2111,6 +2115,7 @@ async def create_step(
parent_id (Optional[str]): The ID of the parent step, if any.
name (Optional[str]): The name of the step.
tags (Optional[List[str]]): Tags associated with the step.
root_run_id (Optional[str]): The ID of the root run, if any.
Returns:
The result of the GraphQL helper function for creating a step.
Expand All @@ -2127,6 +2132,7 @@ async def create_step(
parent_id=parent_id,
name=name,
tags=tags,
root_run_id=root_run_id,
)
)

Expand Down
12 changes: 12 additions & 0 deletions literalai/api/gql.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
STEP_FIELDS = """
id
threadId
rootRunId
parentId
startTime
endTime
Expand Down Expand Up @@ -559,6 +560,7 @@
"""
mutation CreateStep(
$threadId: String,
$rootRunId: String,
$type: StepType,
$startTime: DateTime,
$endTime: DateTime,
Expand All @@ -572,6 +574,7 @@
) {
createStep(
threadId: $threadId,
rootRunId: $rootRunId,
type: $type,
startTime: $startTime,
endTime: $endTime,
Expand Down Expand Up @@ -878,12 +881,14 @@
$input: Json!
$expectedOutput: Json
$metadata: Json
$generationId: String
) {
createDatasetItem(
datasetId: $datasetId
input: $input
expectedOutput: $expectedOutput
metadata: $metadata
generationId: $generationId
) {
id
createdAt
Expand All @@ -892,6 +897,7 @@
input
expectedOutput
intermediarySteps
stepId
}
}
"""
Expand All @@ -906,6 +912,7 @@
input
expectedOutput
intermediarySteps
stepId
}
}
"""
Expand All @@ -920,6 +927,7 @@
input
expectedOutput
intermediarySteps
stepId
}
}
"""
Expand All @@ -942,6 +950,7 @@
input
expectedOutput
intermediarySteps
stepId
}
}
"""
Expand All @@ -964,6 +973,7 @@
input
expectedOutput
intermediarySteps
stepId
}
}
"""
Expand Down Expand Up @@ -1099,6 +1109,7 @@ def steps_query_variables_builder(steps):
for id in range(len(steps)):
generated += f"""$id_{id}: String!
$threadId_{id}: String
$rootRunId_{id}: String
$type_{id}: StepType
$startTime_{id}: DateTime
$endTime_{id}: DateTime
Expand All @@ -1123,6 +1134,7 @@ def steps_ingest_steps_builder(steps):
step{id}: ingestStep(
id: $id_{id}
threadId: $threadId_{id}
rootRunId: $rootRunId_{id}
startTime: $startTime_{id}
endTime: $endTime_{id}
type: $type_{id}
Expand Down
2 changes: 2 additions & 0 deletions literalai/api/step_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def create_step_helper(
parent_id: Optional[str] = None,
name: Optional[str] = None,
tags: Optional[List[str]] = None,
root_run_id: Optional[str] = None,
):
variables = {
"threadId": thread_id,
Expand All @@ -30,6 +31,7 @@ def create_step_helper(
"parentId": parent_id,
"name": name,
"tags": tags,
"root_run_id": root_run_id,
}

def process_response(response):
Expand Down
25 changes: 23 additions & 2 deletions literalai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from literalai.api import AsyncLiteralAPI, LiteralAPI
from literalai.callback.langchain_callback import get_langchain_callback
from literalai.context import active_steps_var, active_thread_var
from literalai.context import active_steps_var, active_thread_var, active_root_run_var
from literalai.environment import EnvContextManager, env_decorator
from literalai.event_processor import EventProcessor
from literalai.evaluation.experiment_item_run import (
Expand All @@ -20,7 +20,8 @@
Step,
StepContextManager,
TrueStepType,
step_decorator, Attachment,
step_decorator,
Attachment,
)
from literalai.observability.thread import ThreadContextManager, thread_decorator

Expand Down Expand Up @@ -153,6 +154,7 @@ def step(
id: Optional[str] = None,
parent_id: Optional[str] = None,
thread_id: Optional[str] = None,
root_run_id: Optional[str] = None,
**kwargs,
):
"""
Expand All @@ -167,6 +169,7 @@ def step(
id (Optional[str]): The id of the step to create.
parent_id (Optional[str]): The id of the parent step.
thread_id (Optional[str]): The id of the parent thread.
root_run_id (Optional[str]): The id of the root run.
Returns:
The wrapper for the step's context.
Expand All @@ -180,6 +183,7 @@ def step(
id=id,
parent_id=parent_id,
thread_id=thread_id,
root_run_id=root_run_id,
**kwargs,
)
else:
Expand All @@ -190,6 +194,7 @@ def step(
id=id,
parent_id=parent_id,
thread_id=thread_id,
root_run_id=root_run_id,
**kwargs,
)

Expand All @@ -201,6 +206,7 @@ def run(
id: Optional[str] = None,
parent_id: Optional[str] = None,
thread_id: Optional[str] = None,
root_run_id: Optional[str] = None,
):
"""
Creates a run where all the subsequents steps will be logged. Works as a decorator or a ContextManager.
Expand All @@ -211,6 +217,7 @@ def run(
id (Optional[str]): The id of the step to create.
parent_id (Optional[str]): The id of the parent step.
thread_id (Optional[str]): The id of the parent thread.
root_run_id (Optional[str]): The id of the root run.
Returns:
The wrapper for the step's context.
Expand All @@ -222,6 +229,7 @@ def run(
id=id,
parent_id=parent_id,
thread_id=thread_id,
root_run_id=root_run_id,
)

def message(
Expand All @@ -235,6 +243,7 @@ def message(
attachments: List[Attachment] = [],
tags: Optional[List[str]] = None,
metadata: Dict = {},
root_run_id: Optional[str] = None,
):
"""
Creates a conversational message step and sends it to Literal AI.
Expand All @@ -251,6 +260,7 @@ def message(
attachments (List[Attachment]): A list of attachments to append to the message.
tags (Optional[List[str]]): A list of tags to add to the message.
metadata (Dict): Metadata to add to the message, in key-value pairs.
root_run_id (Optional[str]): The id of the root run.
Returns:
Message: the created message.
Expand All @@ -266,6 +276,7 @@ def message(
tags=tags,
metadata=metadata,
processor=self.event_processor,
root_run_id=root_run_id,
)
step.end()

Expand Down Expand Up @@ -335,6 +346,7 @@ def start_step(
id: Optional[str] = None,
parent_id: Optional[str] = None,
thread_id: Optional[str] = None,
root_run_id: Optional[str] = None,
**kwargs,
):
"""
Expand All @@ -348,6 +360,7 @@ def start_step(
id (Optional[str]): The id of the step to create.
parent_id (Optional[str]): The id of the parent step.
thread_id (Optional[str]): The id of the parent thread.
root_run_id (Optional[str]): The id of the root run.
Returns:
Step: the created step.
Expand All @@ -359,6 +372,7 @@ def start_step(
parent_id=parent_id,
thread_id=thread_id,
processor=self.event_processor,
root_run_id=root_run_id,
**kwargs,
)
step.start()
Expand All @@ -380,12 +394,19 @@ def get_current_thread(self):
"""
return active_thread_var.get()

def get_current_root_run(self):
"""
Gets the current root run from the context.
"""
return active_root_run_var.get()

def reset_context(self):
"""
Resets the context, forgetting active steps & setting current thread to None.
"""
active_steps_var.set([])
active_thread_var.set(None)
active_root_run_var.set(None)

def flush_and_stop(self):
"""
Expand Down
1 change: 1 addition & 0 deletions literalai/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

active_steps_var = ContextVar[List["Step"]]("active_steps", default=[])
active_thread_var = ContextVar[Optional["Thread"]]("active_thread", default=None)
active_root_run_var = ContextVar[Optional["Step"]]("active_root_run_var", default=None)

active_experiment_item_run_id_var = ContextVar[Optional[str]](
"active_experiment_item_run", default=None
Expand Down
8 changes: 7 additions & 1 deletion literalai/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ async def to_thread(func, /, *args, **kwargs):
class EventProcessor:
event_queue: queue.Queue
batch: List["StepDict"]
batch_timeout: float = 5.0

def __init__(self, api: "LiteralAPI", batch_size: int = 1, disabled: bool = False):
self.batch_size = batch_size
Expand All @@ -40,6 +41,7 @@ def __init__(self, api: "LiteralAPI", batch_size: int = 1, disabled: bool = Fals
self.disabled = disabled
self.processing_counter = 0
self.counter_lock = threading.Lock()
self.last_batch_time = time.time()
if not self.disabled:
self.processing_thread.start()

Expand All @@ -58,9 +60,13 @@ async def a_add_events(self, event: "StepDict"):
def _process_events(self):
while True:
batch = []
start_time = time.time()
try:
elapsed_time = time.time() - start_time
# Try to fill the batch up to the batch_size
while len(batch) < self.batch_size:
while (
len(batch) < self.batch_size and elapsed_time < self.batch_timeout
):
# Attempt to get events with a timeout
event = self.event_queue.get(timeout=0.5)
batch.append(event)
Expand Down
Loading

0 comments on commit bf8c2bf

Please sign in to comment.