Skip to main content
This is a notebook

Import Traces from 3rd Party Systems

In ocassion, it is not possible to instrument your Python or Javascript code with Weave's simple integration to obtain real-time traces of your GenAI application. It is often the case that these traces are later on available to you in csv or json format.

In this cookbook we explore the lower level Weave Python API to extract data from a CSV file and import it into Weave to drive insights and rigorous evaluations.

The sample dataset assumed in this cookbook has the following structure:

conversation_id,turn_index,start_time,user_input,ground_truth,answer_text
1234,1,2024-09-04 13:05:39,This is the beginning, ['This was the beginning'], That was the beginning
1235,1,2024-09-04 13:02:11,This is another trace,, That was another trace
1235,2,2024-09-04 13:04:19,This is the next turn,, That was the next turn
1236,1,2024-09-04 13:02:10,This is a 3 turn conversation,, Woah thats a lot of turns
1236,2,2024-09-04 13:02:30,This is the second turn, ['That was definitely the second turn'], You are correct
1236,3,2024-09-04 13:02:53,This is the end,, Well good riddance!

To understand the decisions for import in this cookbook, one should know that Weave traces have parent-child relationships that are 1:Many and continuous. Meaning a single parent may have multiple children, but that parent may itself be a children of another parent.

We therefore use conversation_id as the parent identifier, and the turn_index as the child identifier to provide us complete conversation logging.

Ensure to modify the variables as needed.

Set up the environment

We install and import all needed packages. We set WANDB_API_KEY in our env so that we may easily login with wandb.login() (this should be given to the colab as a secret).

We set the name of the file we upload to colab in name_of_file and set the project in W&B we want to log this into in name_of_wandb_project.

NOTE: name_of_wandb_project may also be in the format of {team_name}/{project_name} to specify a team to log the traces into.

We then fetch a weave client by calling weave.init()

%pip install wandb weave pandas datetime --quiet
import os

import pandas as pd
import wandb
from google.colab import userdata

import weave

## Write samples file to disk
with open("/content/import_cookbook_data.csv", "w") as f:
f.write(
"conversation_id,turn_index,start_time,user_input,ground_truth,answer_text\n"
)
f.write(
'1234,1,2024-09-04 13:05:39,This is the beginning, ["This was the beginning"], That was the beginning\n'
)
f.write(
"1235,1,2024-09-04 13:02:11,This is another trace,, That was another trace\n"
)
f.write(
"1235,2,2024-09-04 13:04:19,This is the next turn,, That was the next turn\n"
)
f.write(
"1236,1,2024-09-04 13:02:10,This is a 3 turn conversation,, Woah thats a lot of turns\n"
)
f.write(
'1236,2,2024-09-04 13:02:30,This is the second turn, ["That was definitely the second turn"], You are correct\n'
)
f.write("1236,3,2024-09-04 13:02:53,This is the end,, Well good riddance!\n")


os.environ["WANDB_API_KEY"] = userdata.get("WANDB_API_KEY")
name_of_file = "/content/import_cookbook_data.csv"
name_of_wandb_project = "import-weave-traces-cookbook"

wandb.login()
weave_client = weave.init(name_of_wandb_project)

Data Loading

We load the data into a Pandas dataframe, and ensure we sort it by the conversation_id and turn_index to ensure the parents and childs are correctly ordered.

This will result in a two column pandas DF with our conversation turns as an array under conversation_data.

## Load data and shape it
df = pd.read_csv(name_of_file)

sorted_df = df.sort_values(["conversation_id", "turn_index"])


# Function to create an array of dictionaries for each conversation
def create_conversation_dict_array(group):
return group.drop("conversation_id", axis=1).to_dict("records")


# Group the dataframe by conversation_id and apply the aggregation
result_df = (
sorted_df.groupby("conversation_id")
.apply(create_conversation_dict_array)
.reset_index()
)
result_df.columns = ["conversation_id", "conversation_data"]

# Show how our aggregation looks
result_df.head()

Log the Traces to Weave

We now iterate through our pandas DF:

  • We create a parent call for every conversation_id
  • We iterate through the turn array to create child calls sorted by their turn_index

Important concepts of the lower level python API:

  • A Weave call is equivalent to a Weave trace, this call may have a parent or children associated with it
  • A Weave call may have other things associated with it: Feedback, Metadata, etc. We only associate inputs and outputs to it here, but you may want to add these things in your import if the data provides it.
  • A weave call is created and finished as these are meant to be tracked real time. Because this is an after-the-fact import, we create and finish once our objects are defined and tied to one another.
  • The op value of a call is how Weave categorizes calls of the same make up. In this example, all parent calls are of Conversation type, and all children calls are of Turn type. You may modify this as you see fit.
  • A call may have inputs and output. inputs are defined at creation an output is defined when the call is finished.
# Log traces to weave

# Iterate through our aggregated conversations
for _, row in result_df.iterrows():
# Define our conversation parent,
# we are now creating a "call" with the weave_client we defined before
parent_call = weave_client.create_call(
# The Op value will register this as a Weave Op, which will allow us to retrieve these as a group easily in the future
op="Conversation",
# We set the inputs of our high level conversation as all the turns under it
inputs={
"conversation_data": row["conversation_data"][:-1]
if len(row["conversation_data"]) > 1
else row["conversation_data"]
},
# Our Conversation parent does not have a further parent
parent=None,
# The name of how this specific conversation will appear in the UI
display_name=f"conversation-{row['conversation_id']}",
)

# We set the output of the parent to be the last trace in the conversation
parent_output = row["conversation_data"][len(row["conversation_data"]) - 1]

# We now iterate through all the conversation turns for the parent
# and log them as children of the conversation
for item in row["conversation_data"]:
item_id = f"{row['conversation_id']}-{item['turn_index']}"

# We create a call again here to be categorized under the conversation
call = weave_client.create_call(
# We qualify a single conversation trace as a "Turn"
op="Turn",
# We provide all inputs of the turn, including RAG 'ground_truth'
inputs={
"turn_index": item["turn_index"],
"start_time": item["start_time"],
"user_input": item["user_input"],
"ground_truth": item["ground_truth"],
},
# We set this to be a child of the parent we defined
parent=parent_call,
# We provide it a name to be id'ed by in Weave
display_name=item_id,
)

# We set the output of the call as the answer
output = {
"answer_text": item["answer_text"],
}

# Because these are traces that already happened, we finish the single turn call
weave_client.finish_call(call=call, output=output)
# Now that we have logged all its children, we also finish the parent call
weave_client.finish_call(call=parent_call, output=parent_output)

Result: Traces are Logged to Weave

Traces:

image.png

Operations:

image.png

Bonus: Export your traces to run rigorous evaluations!

Once our traces are in Weave and we have an understanding on how the conversations are looking, we may want to later on export them to another process to run Weave Evaluations

image.png

To do this, we fetch all conversations from W&B through our simple query API and create a dataset from it.

## This cell does not run by default, comment the below line to execute this script
%%script false --no-raise-error
## Get all Conversation traces for evaluation and prepare dataset for eval

# We create a query filter that brings us all our Conversation objects
# The ref shown below is specific to your project, and you can obtain it by
# going into your project's Operations in the UI, clicking on the "Conversations"
# object, then the "Use" tab in the side panel.
weave_ref_for_conversation_op = "weave:///wandb-smle/import-weave-traces-cookbook/op/Conversation:tzUhDyzVm5bqQsuqh5RT4axEXSosyLIYZn9zbRyenaw"
filter = weave.trace_server.trace_server_interface.CallsFilter(
op_names=[weave_ref_for_conversation_op],
)

# We execute the query
conversation_traces = weave_client.get_calls(filter)

rows = []

# We go through our conversation traces and construct dataset rows from it
for single_conv in conversation_traces:
# In this example, we may only care for conversations that utilized our RAG
# pipeline, so we filter for such types of conversations
is_rag = False
for single_trace in single_conv.inputs['conversation_data']:
if single_trace['ground_truth'] is not None:
is_rag = True
break
if single_conv.output['ground_truth'] is not None:
is_rag = True

# Once we've identified a converation to have used RAG, we add it to our dataset
if is_rag:
inputs = []
ground_truths = []
answers = []

# We go through every turn in the conversation
for turn in single_conv.inputs['conversation_data']:
inputs.append(turn.get('user_input', ''))
ground_truths.append(turn.get('ground_truth', ''))
answers.append(turn.get('answer_text', ''))
## Account for when conversations are a single turn
if len(single_conv.inputs) != 1 or single_conv.inputs['conversation_data'][0].get('turn_index') != single_conv.output.get('turn_index'):
inputs.append(single_conv.output.get('user_input', ''))
ground_truths.append(single_conv.output.get('ground_truth', ''))
answers.append(single_conv.output.get('answer_text', ''))

data = {
'question': inputs,
'contexts': ground_truths,
'answer': answers
}

rows.append(data)

# With our dataset rows created, we create the Dataset object and
# publish it back to Weave for later retrieval
dset = weave.Dataset(name = "conv_traces_for_eval", rows=rows)
weave.publish(dset)

Result

image.png

To learn more about evaluations, check out our Quickstart on using your newly created dataset to evaluate your RAG application!