Add LLM analysis with UDXFs

Integrate OpenAI models into your data pipelines with user-defined exchange functions

This tutorial shows you how to add LLM-powered analysis to your data using user-defined exchange functions (UDXFs). You’ll learn how to integrate OpenAI’s API into Xorq pipelines, validate schemas automatically, and process text data at scale.

After completing this tutorial, you know how to create UDXFs that add AI-generated columns to tables, handle schema validation, and batch-process API calls efficiently.

Prerequisites

You need:

  • Xorq installed (see Install Xorq)
  • OpenAI Python client: pip install openai
  • OpenAI API key (get one at platform.openai.com)
  • Basic understanding of pandas DataFrames

Set up your API key

Before writing code, set your OpenAI API key as an environment variable.

export OPENAI_API_KEY="your-api-key-here"
set OPENAI_API_KEY=your-api-key-here
TipKeep your API key secure

Never commit API keys to version control. Use environment variables or a secrets manager in production.

How this tutorial works

You’ll build a Python file incrementally. Each section adds new code to the file.

Two tabs per code block:

  • Complete code: The full runnable file at this stage
  • Changes: Just the lines you’re adding (shown as diff)

Create a file called sentiment_udxf.py and build it section by section.

Tip

Think of UDXFs as functions that transform tables through external APIs. Input schema validation catches errors before API calls. Output schema validation creates consistent results. Batch processing handles thousands of rows efficiently.

Set up the OpenAI client

Start by creating a cached OpenAI client that you reuse across API calls.

Add this to your sentiment_udxf.py file:

This is your complete sentiment_udxf.py file at this stage. It imports the OpenAI client, creates a cached singleton instance that reuses the same connection across API calls, and includes a test block to verify initialization.

import functools
import os
from openai import OpenAI


@functools.cache
def get_client():
    client = OpenAI(
        api_key=os.environ["OPENAI_API_KEY"],
    )
    return client


if __name__ == "__main__":
    client = get_client()
    print("OpenAI client initialized")
1
Use @functools.cache to create the client once and reuse it across calls (singleton pattern).
2
Test that the client initializes correctly when you run the file.

This diff shows the initial setup code. You’re importing the OpenAI library and creating a cached client function that initializes once and reuses the same instance across multiple API calls, saving memory and connection overhead.

+ import functools
+ import os
+ from openai import OpenAI
+ 
+ @functools.cache
+ def get_client():
+     client = OpenAI(
+         api_key=os.environ["OPENAI_API_KEY"],
+     )
+     return client
+ 
+ if __name__ == "__main__":
+     client = get_client()
+     print("OpenAI client initialized")

Run the file to verify your setup:

python sentiment_udxf.py

The output shows:

OpenAI client initialized

What just happened? You created a singleton OpenAI client. The @functools.cache decorator creates the client once and reuses it across batch processing, which saves memory and connection overhead.

Create the sentiment extraction function

Now you’ll write the function that calls OpenAI’s API to analyze sentiment.

Update your sentiment_udxf.py file:

This version adds the sentiment extraction function that calls OpenAI’s API. It includes a 30-second timeout for API calls, handles empty strings gracefully, creates a focused prompt for sentiment analysis, and wraps the API call in try-except for error handling.

import functools
import os
from openai import OpenAI

@functools.cache
def get_client():
    client = OpenAI(
        api_key=os.environ["OPENAI_API_KEY"],
    )
    return client


request_timeout = 30


def extract_sentiment(text):
    if text == "":
        return "NEUTRAL"
    

    messages = [
        {
            "role": "system",
            "content": "You are an AI language model trained to analyze and detect the sentiment of forum comments.",
        },
        {
            "role": "user",
            "content": f"Analyze the following comment and determine if the sentiment is: positive, negative or neutral. "
            f"Return only a single word: POSITIVE, NEGATIVE, or NEUTRAL: {text}",
        },
    ]
    

    try:
        response = get_client().chat.completions.create(
            model="gpt-3.5-turbo",
            messages=messages,
            max_tokens=30,
            temperature=0,
            timeout=request_timeout,
        )
        return response.choices[0].message.content.strip()
    except Exception as e:
        return f"ERROR: {e}"

if __name__ == "__main__":

    test_text = "This is an amazing product! I love it."
    result = extract_sentiment(test_text)
    print(f"Sentiment: {result}")
1
Set a 30-second timeout for LLM API calls (longer text may need more time).
2
Define the extraction function that takes text and returns sentiment.
3
Create a two-message prompt: system context (defines the AI’s role) + user instruction (specific task).
4
Call OpenAI’s API with temperature=0 for deterministic output, error handling wraps the call, and .strip() removes whitespace.
5
Test the function on sample text.

This diff adds the sentiment extraction function. You’re setting a 30-second timeout, creating a function that handles empty input, building a two-part prompt for OpenAI, and wrapping the API call in error handling that returns “ERROR” instead of crashing.

 import functools
 import os
 from openai import OpenAI
 
 @functools.cache
 def get_client():
     client = OpenAI(
         api_key=os.environ["OPENAI_API_KEY"],
     )
     return client
 
+ request_timeout = 30
+ 
+ def extract_sentiment(text):
+     if text == "":
+         return "NEUTRAL"
+     
+     messages = [
+         {
+             "role": "system",
+             "content": "You are an AI language model trained to analyze and detect the sentiment of forum comments.",
+         },
+         {
+             "role": "user",
+             "content": f"Analyze the following comment and determine if the sentiment is: positive, negative, or neutral. "
+             f"Return only a single word, either POSITIVE, NEGATIVE or NEUTRAL: {text}",
+         },
+     ]
+     
+     try:
+         response = get_client().chat.completions.create(
+             model="gpt-3.5-turbo",
+             messages=messages,
+             max_tokens=30,
+             temperature=0,
+             timeout=request_timeout,
+         )
+         return response.choices[0].message.content.strip()
+     except Exception as e:
+         return f"ERROR: {e}"
+ 
 if __name__ == "__main__":
+     test_text = "This is an amazing product! I love it."
+     result = extract_sentiment(test_text)
+     print(f"Sentiment: {result}")

Run the file:

python sentiment_udxf.py

The output shows:

Sentiment: POSITIVE
Note

The function handles empty strings (returns “NEUTRAL”) and API errors (returns error message) without crashing. This error handling becomes critical when processing thousands of rows where some might have missing or malformed text.

Understanding the parameters: temperature=0 gives consistent results (same input always produces same output), max_tokens=30 limits the response length (you only need one word), and timeout=3 prevents hanging on slow API calls.

Create the batch processing function

This raises a question: how do you apply this to an entire DataFrame?

Here’s where you create a function that processes DataFrames in batches. You map the sentiment extraction over a specific column and add results to a new column.

Update your sentiment_udxf.py file:

This version adds DataFrame batch processing. It imports pandas and utility libraries, creates a curried function that applies sentiment extraction to an entire DataFrame column, uses function composition to chain URL decoding with sentiment analysis, and adds results as a new column.

import functools
import os
from openai import OpenAI

import pandas as pd
from urllib.parse import unquote_plus
import toolz
from xorq.common.utils.toolz_utils import curry

@functools.cache
def get_client():
    client = OpenAI(
        api_key=os.environ["OPENAI_API_KEY"],
    )
    return client

request_timeout = 30

def extract_sentiment(text):
    if text == "":
        return "NEUTRAL"
    
    messages = [
        {
            "role": "system",
            "content": "You are an AI language model trained to analyze and detect the sentiment of forum comments.",
        },
        {
            "role": "user",
            "content": f"Analyze the following comment and determine if the sentiment is: positive, negative or neutral. "
            f"Return only a single word, either POSITIVE, NEGATIVE or NEUTRAL: {text}",
        },
    ]
    
    try:
        response = get_client().chat.completions.create(
            model="gpt-3.5-turbo",
            messages=messages,
            max_tokens=30,
            temperature=0,
            timeout=request_timeout,
        )
        return response.choices[0].message.content.strip()
    except Exception as e:
        return f"ERROR: {e}"


@curry
def get_sentiment_batch(df: pd.DataFrame, input_col, append_col):
    return df.assign(
        **{
            append_col: df[input_col].map(
                toolz.compose(extract_sentiment, unquote_plus)
            )
        }
    )

if __name__ == "__main__":

    test_df = pd.DataFrame({
        "text": [
            "This product is terrible",
            "Best purchase ever!",
            "It's okay, nothing special"
        ]
    })
    

    result_df = get_sentiment_batch(
        test_df, 
        input_col="text", 
        append_col="sentiment"
    )
    print("Batch processing results:")
    print(result_df)
1
Import pandas for DataFrames, unquote_plus for URL decoding, toolz for function composition, and curry for partial application.
2
Use @curry to create a partially applicable function that chains URL decoding with sentiment extraction.
3
Create test data with three different sentiments.
4
Apply the batch function and inspect results.

This diff adds batch processing capabilities. You’re importing pandas and functional programming utilities, creating a curried function that processes entire DataFrame columns, and composing URL decoding with sentiment extraction to handle web-scraped text that may contain encoded characters.

 import functools
 import os
 from openai import OpenAI
+ import pandas as pd
+ from urllib.parse import unquote_plus
+ import toolz
+ from xorq.common.utils.toolz_utils import curry
 
 @functools.cache
 def get_client():
     client = OpenAI(
         api_key=os.environ["OPENAI_API_KEY"],
     )
     return client
 
 request_timeout = 30
 
 def extract_sentiment(text):
     if text == "":
         return "NEUTRAL"
     
     messages = [
         {
             "role": "system",
             "content": "You are an AI language model trained to analyze and detect the sentiment of forum comments.",
         },
         {
             "role": "user",
             "content": f"Analyze the following comment and determine if the sentiment is: positive, negative or neutral. "
             f"Return only a single word, either POSITIVE, NEGATIVE or NEUTRAL: {text}",
         },
     ]
     
     try:
         response = get_client().chat.completions.create(
             model="gpt-3.5-turbo",
             messages=messages,
             max_tokens=30,
             temperature=0,
             timeout=request_timeout,
         )
         return response.choices[0].message.content.strip()
     except Exception as e:
         return f"ERROR: {e}"
 
+ @curry
+ def get_sentiment_batch(df: pd.DataFrame, input_col, append_col):
+     return df.assign(
+         **{
+             append_col: df[input_col].map(
+                 toolz.compose(extract_sentiment, unquote_plus)
+             )
+         }
+     )
+ 
 if __name__ == "__main__":
+     test_df = pd.DataFrame({
+         "text": [
+             "This product is terrible",
+             "Best purchase ever!",
+             "It's okay, nothing special"
+         ]
+     })
+     
+     result_df = get_sentiment_batch(
+         test_df, 
+         input_col="text", 
+         append_col="sentiment"
+     )
+     print("Batch processing results:")
+     print(result_df)

Run the file:

python sentiment_udxf.py

You’ll see output like:

Batch processing results:
                          text sentiment
0  This product is terrible  NEGATIVE
1       Best purchase ever!  POSITIVE
2  It's okay, nothing special  NEUTRAL

What does success look like? You’ve processed multiple rows in one function call. The @curry decorator lets you pre-configure the column names, then apply the function to any DataFrame. The unquote_plus handles URL-encoded text that might appear in web-scraped data.

Most teams find that currying simplifies pipeline code. Instead of repeating column names in every call, you configure once and reuse everywhere.

Define input and output schemas

Now you’ll specify what columns your UDXF needs and what columns it adds. Schema validation is how Xorq catches errors before making expensive API calls.

Update your sentiment_udxf.py file:

This version adds schema validation. It imports Xorq’s schema utilities, defines input and output schemas with non-null string requirements, creates validators that check input columns exist and specify what output columns get added, ensuring type safety before any API calls.

import functools
import os
from openai import OpenAI
import pandas as pd
from urllib.parse import unquote_plus
import toolz
from xorq.common.utils.toolz_utils import curry

import xorq.api as xo
from xorq.flight.utils import (
    schema_concat,
    schema_contains,
)

@functools.cache
def get_client():
    client = OpenAI(
        api_key=os.environ["OPENAI_API_KEY"],
    )
    return client

request_timeout = 30

def extract_sentiment(text):
    if text == "":
        return "NEUTRAL"
    
    messages = [
        {
            "role": "system",
            "content": "You are an AI language model trained to analyze and detect the sentiment of forum comments.",
        },
        {
            "role": "user",
            "content": f"Analyze the following comment and determine if the sentiment is: positive, negative or neutral. "
            f"Return only a single word, either POSITIVE, NEGATIVE or NEUTRAL: {text}",
        },
    ]
    
    try:
        response = get_client().chat.completions.create(
            model="gpt-3.5-turbo",
            messages=messages,
            max_tokens=30,
            temperature=0,
            timeout=request_timeout,
        )
        return response.choices[0].message.content.strip()
    except Exception as e:
        return f"ERROR: {e}"

@curry
def get_sentiment_batch(df: pd.DataFrame, input_col, append_col):
    return df.assign(
        **{
            append_col: df[input_col].map(
                toolz.compose(extract_sentiment, unquote_plus)
            )
        }
    )


input_col = "text"
append_col = "sentiment"


schema_requirement = xo.schema({input_col: "!str"})


schema_append = xo.schema({append_col: "!str"})


maybe_schema_in = schema_contains(schema_requirement)


maybe_schema_out = schema_concat(to_concat=schema_append)

if __name__ == "__main__":

    print(f"Input validation: table must contain '{input_col}' column (string, not null)")
    print(f"Output transformation: adds '{append_col}' column (string, not null)")
1
Import Xorq’s schema utilities for validation.
2
Define column names as variables for consistency across the file.
3
Create input schema requiring a non-null string column named “text” (!str means non-nullable).
4
Create output schema defining the new column to append.
5
Use schema_contains to validate that input tables have the required column (they can have more columns).
6
Use schema_concat to specify what column gets added to the output (preserves all existing columns).
7
Print schema validation rules to verify they’re configured correctly.

This diff adds schema definitions and validators. You’re importing Xorq’s schema utilities, defining required input columns with non-null constraints, specifying what output columns get added, and creating validation functions that catch schema mismatches at pipeline build time before making expensive API calls.

 import functools
 import os
 from openai import OpenAI
 import pandas as pd
 from urllib.parse import unquote_plus
 import toolz
 from xorq.common.utils.toolz_utils import curry
+ import xorq.api as xo
+ from xorq.flight.utils import (
+     schema_concat,
+     schema_contains,
+ )
 
 @functools.cache
 def get_client():
     client = OpenAI(
         api_key=os.environ["OPENAI_API_KEY"],
     )
     return client
 
 request_timeout = 30
 
 def extract_sentiment(text):
     if text == "":
         return "NEUTRAL"
     
     messages = [
         {
             "role": "system",
             "content": "You are an AI language model trained to analyze and detect the sentiment of forum comments.",
         },
         {
             "role": "user",
             "content": f"Analyze the following comment and determine if the sentiment is: positive, negative or neutral. "
             f"Return only a single word, either POSITIVE, NEGATIVE or NEUTRAL: {text}",
         },
     ]
     
     try:
         response = get_client().chat.completions.create(
             model="gpt-3.5-turbo",
             messages=messages,
             max_tokens=30,
             temperature=0,
             timeout=request_timeout,
         )
         return response.choices[0].message.content.strip()
     except Exception as e:
         return f"ERROR: {e}"
 
 @curry
 def get_sentiment_batch(df: pd.DataFrame, input_col, append_col):
     return df.assign(
         **{
             append_col: df[input_col].map(
                 toolz.compose(extract_sentiment, unquote_plus)
             )
         }
     )
 
+ input_col = "text"
+ append_col = "sentiment"
+ schema_requirement = xo.schema({input_col: "!str"})
+ schema_append = xo.schema({append_col: "!str"})
+ maybe_schema_in = schema_contains(schema_requirement)
+ maybe_schema_out = schema_concat(to_concat=schema_append)
+ 
 if __name__ == "__main__":
+     print(f"Input validation: table must contain '{input_col}' column (string, not null)")
+     print(f"Output transformation: adds '{append_col}' column (string, not null)")

Run the file:

python sentiment_udxf.py

The output shows:

Input validation: table must contain 'text' column (string, not null)
Output transformation: adds 'sentiment' column (string, not null)

Understanding schema functions: The !str syntax means “non-nullable string”. If a table has null values in the text column, then the validation fails before any API calls. This prevents wasted requests on invalid data.

The schema_contains function checks that input tables have at least the required columns (they can have more). The schema_concat function adds new columns to whatever schema came in, preserving all existing columns.

Create the UDXF

You combine your processing function with schemas to create a reusable UDXF.

Update your sentiment_udxf.py file:

This is the complete sentiment_udxf.py file with the UDXF created. It combines the batch processing function with schema validators into a single reusable component called “SentimentAnalyzer” that you can apply to any table with a text column.

import functools
import os
from openai import OpenAI
import pandas as pd
from urllib.parse import unquote_plus
import toolz
from xorq.common.utils.toolz_utils import curry
import xorq.api as xo
from xorq.flight.utils import (
    schema_concat,
    schema_contains,
)

@functools.cache
def get_client():
    client = OpenAI(
        api_key=os.environ["OPENAI_API_KEY"],
    )
    return client

request_timeout = 30

def extract_sentiment(text):
    if text == "":
        return "NEUTRAL"
    
    messages = [
        {
            "role": "system",
            "content": "You are an AI language model trained to analyze and detect the sentiment of forum comments.",
        },
        {
            "role": "user",
            "content": f"Analyze the following comment and determine if the sentiment is: positive, negative or neutral. "
            f"Return only a single word, either POSITIVE, NEGATIVE or NEUTRAL: {text}",
        },
    ]
    
    try:
        response = get_client().chat.completions.create(
            model="gpt-3.5-turbo",
            messages=messages,
            max_tokens=30,
            temperature=0,
            timeout=request_timeout,
        )
        return response.choices[0].message.content.strip()
    except Exception as e:
        return f"ERROR: {e}"

@curry
def get_sentiment_batch(df: pd.DataFrame, input_col, append_col):
    return df.assign(
        **{
            append_col: df[input_col].map(
                toolz.compose(extract_sentiment, unquote_plus)
            )
        }
    )

input_col = "text"
append_col = "sentiment"
schema_requirement = xo.schema({input_col: "!str"})
schema_append = xo.schema({append_col: "!str"})
maybe_schema_in = schema_contains(schema_requirement)
maybe_schema_out = schema_concat(to_concat=schema_append)


do_sentiment_udxf = xo.expr.relations.flight_udxf(
    process_df=get_sentiment_batch(
        input_col=input_col, 
        append_col=append_col
    ),
    maybe_schema_in=maybe_schema_in,
    maybe_schema_out=maybe_schema_out,
    name="SentimentAnalyzer",
)

if __name__ == "__main__":

    print(f"Created UDXF: SentimentAnalyzer")
1
Call flight_udxf with your curried function, input validation, output transformation, and a descriptive name.
2
Verify the UDXF was created successfully.

This diff creates the UDXF by calling flight_udxf with your batch processing function, input schema validator, output schema transformer, and a descriptive name. This packages everything into a reusable component you can apply to any compatible table.

 input_col = "text"
 append_col = "sentiment"
 schema_requirement = xo.schema({input_col: "!str"})
 schema_append = xo.schema({append_col: "!str"})
 maybe_schema_in = schema_contains(schema_requirement)
 maybe_schema_out = schema_concat(to_concat=schema_append)
 
+ do_sentiment_udxf = xo.expr.relations.flight_udxf(
+     process_df=get_sentiment_batch(
+         input_col=input_col, 
+         append_col=append_col
+     ),
+     maybe_schema_in=maybe_schema_in,
+     maybe_schema_out=maybe_schema_out,
+     name="SentimentAnalyzer",
+ )
+ 
 if __name__ == "__main__":
+     print(f"Created UDXF: SentimentAnalyzer")

Run the file:

python sentiment_udxf.py

The output shows:

Created UDXF: SentimentAnalyzer

What just happened? You created a reusable transformation that you can apply to any table. The UDXF validates schemas, processes data through your function, and integrates with Xorq’s deferred execution. You can use this UDXF across multiple pipelines, and Xorq creates consistent behavior.

Think of it this way: you’ve packaged your LLM logic into a verified component. The schema validation means you catch errors at pipeline build time, not at execution time after you’ve made thousands of API calls.

Apply the UDXF to table data

Now you’ll use the UDXF to analyze real HackerNews comments about ClickHouse.

Update your sentiment_udxf.py file:

This is the complete working implementation that applies your UDXF to real data. It fetches the HackerNews dataset, builds a pipeline that filters for ClickHouse mentions, and pipes the text through your sentiment analyzer to add a sentiment column to the results.

import functools
import os
from openai import OpenAI
import pandas as pd
from urllib.parse import unquote_plus
import toolz
from xorq.common.utils.toolz_utils import curry
import xorq.api as xo
from xorq.flight.utils import (
    schema_concat,
    schema_contains,
)

@functools.cache
def get_client():
    client = OpenAI(
        api_key=os.environ["OPENAI_API_KEY"],
    )
    return client

request_timeout = 30

def extract_sentiment(text):
    if text == "":
        return "NEUTRAL"
    
    messages = [
        {
            "role": "system",
            "content": "You are an AI language model trained to analyze and detect the sentiment of forum comments.",
        },
        {
            "role": "user",
            "content": f"Analyze the following comment and determine if the sentiment is: positive, negative or neutral. "
            f"Return only a single word, either POSITIVE, NEGATIVE or NEUTRAL: {text}",
        },
    ]
    
    try:
        response = get_client().chat.completions.create(
            model="gpt-3.5-turbo",
            messages=messages,
            max_tokens=30,
            temperature=0,
            timeout=request_timeout,
        )
        return response.choices[0].message.content.strip()
    except Exception as e:
        return f"ERROR: {e}"

@curry
def get_sentiment_batch(df: pd.DataFrame, input_col, append_col):
    return df.assign(
        **{
            append_col: df[input_col].map(
                toolz.compose(extract_sentiment, unquote_plus)
            )
        }
    )

input_col = "text"
append_col = "sentiment"
schema_requirement = xo.schema({input_col: "!str"})
schema_append = xo.schema({append_col: "!str"})
maybe_schema_in = schema_contains(schema_requirement)
maybe_schema_out = schema_concat(to_concat=schema_append)

do_sentiment_udxf = xo.expr.relations.flight_udxf(
    process_df=get_sentiment_batch(
        input_col=input_col, 
        append_col=append_col
    ),
    maybe_schema_in=maybe_schema_in,
    maybe_schema_out=maybe_schema_out,
    name="SentimentAnalyzer",
)

if __name__ == "__main__":

    hn = xo.examples.hn_posts_nano.fetch(table_name="hackernews")
    

    expr = (
        hn.order_by(hn.time.desc())
        .filter(
            xo.or_(
                hn.text.cast(str).like("%ClickHouse%"),
                hn.title.cast(str).like("%ClickHouse%"),
            )
        )
        .select(hn.text)
        .limit(2)
        .pipe(do_sentiment_udxf)
    )
    

    print(f"Pipeline built (deferred execution)")
    print(f"Expression type: {type(expr)}")
    

    df = expr.execute()
    

    print("\nResults:")
    print(df)
1
Fetch the HackerNews example dataset.
2
Build a pipeline: sort by time descending, filter for ClickHouse mentions, select text column, limit to two rows, pipe through the UDXF.
3
Verify the pipeline is built but not executed yet (deferred execution).
4
Call .execute() to trigger the pipeline and make API calls.
5
Display the DataFrame with sentiment analysis.

This diff adds the pipeline that applies your UDXF to real data. You’re loading the HackerNews dataset, building a deferred expression that filters and transforms data, piping it through your UDXF, and executing the entire pipeline to get sentiment analysis results.

 if __name__ == "__main__":
+     hn = xo.examples.hn_posts_nano.fetch(table_name="hackernews")
+     
+     expr = (
+         hn.order_by(hn.time.desc())
+         .filter(
+             xo.or_(
+                 hn.text.cast(str).like("%ClickHouse%"),
+                 hn.title.cast(str).like("%ClickHouse%"),
+             )
+         )
+         .select(hn.text)
+         .limit(2)
+         .pipe(do_sentiment_udxf)
+     )
+     
+     print(f"Pipeline built (deferred execution)")
+     print(f"Expression type: {type(expr)}")
+     
+     df = expr.execute()
+     
+     print("\nResults:")
+     print(df)

Run the file:

python sentiment_udxf.py

You’ll see output like:

Pipeline built (deferred execution)
Expression type: <class 'xorq.expr.relations.FlightExchangeExpr'>

Results:
                                                text  sentiment
0  ClickHouse is incredibly fast for analytics...  POSITIVE
1  I tried ClickHouse but had issues with dup...  NEGATIVE
Note

Nothing executes until you call .execute(). The .pipe(do_sentiment_udxf) creates a deferred expression that represents “apply this UDXF to this table.” Xorq validates schemas at build time but doesn’t make API calls until execution.

Understanding deferred execution: you can chain multiple operations (filter, select, UDXFs) and Xorq optimizes the entire pipeline before execution. If the input schema doesn’t match what the UDXF expects, then you get an error immediately at build time, not after processing starts.

What does success look like? You’ve analyzed real text data with OpenAI’s API through a validated pipeline. The sentiment column was added based on your schema definition. If you had 10,000 rows instead of 2, then the exact same code works because the UDXF handles batch processing.

Note

UDXFs integrate LLM APIs into Xorq’s execution model. You get schema validation (catches errors at build time), deferred execution (optimizes the pipeline), and composable transformations (chain operations). The pattern scales from 10 rows to 10 million with the same code.

Handle different use cases

Now that you’ve built a sentiment analyzer, you might wonder: does this pattern work for other LLM tasks? The answer is yes. The same UDXF structure adapts to any text processing task where you need schema validation and batch processing.

Think of the UDXF pattern as a template. You keep the structure (extraction function, batch processor, schemas, UDXF creation) and swap in different prompts and column names. Here are three common variations you can test right now.

To try these variations: Create a new Python file for each one, copy the code from the tabs below, and run it. Each example includes test data so you can verify it works immediately.

This variation classifies text into predefined categories. You’d use this to route customer tickets, categorize feedback, or organize documents by topic.

Create a file called classification_udxf.py:

import functools
import os
from openai import OpenAI
import pandas as pd
from urllib.parse import unquote_plus
import toolz
from xorq.common.utils.toolz_utils import curry
import xorq.api as xo
from xorq.flight.utils import schema_concat, schema_contains

@functools.cache
def get_client():
    return OpenAI(api_key=os.environ["OPENAI_API_KEY"])

def classify_topic(text):
    """Classify text into categories: TECHNICAL, BUSINESS, SUPPORT"""
    if text == "":
        return "UNKNOWN"
    
    messages = [
        {
            "role": "system",
            "content": "You are an AI trained to classify text into categories.",
        },
        {
            "role": "user",
            "content": f"Classify this text into one of: TECHNICAL, BUSINESS, SUPPORT. "
            f"Return only a single word: {text}",
        },
    ]
    
    try:
        response = get_client().chat.completions.create(
            model="gpt-3.5-turbo",
            messages=messages,
            max_tokens=30,
            temperature=0,
            timeout=3,
        )
        return response.choices[0].message.content.strip()
    except Exception as e:
        return f"ERROR: {e}"

@curry
def get_classification_batch(df: pd.DataFrame, input_col, append_col):
    return df.assign(
        **{
            append_col: df[input_col].map(
                toolz.compose(classify_topic, unquote_plus)
            )
        }
    )

classification_udxf = xo.expr.relations.flight_udxf(
    process_df=get_classification_batch(input_col="text", append_col="topic"),
    maybe_schema_in=schema_contains(xo.schema({"text": "!str"})),
    maybe_schema_out=schema_concat(to_concat=xo.schema({"topic": "!str"})),
    name="TopicClassifier",
)

if __name__ == "__main__":
    # Test with sample support tickets
    test_tickets = pd.DataFrame({
        "text": [
            "Server is down and users can't access the application",
            "How much does the enterprise plan cost per month?",
            "I forgot my password and need to reset it"
        ]
    })
    
    result = get_classification_batch(
        test_tickets,
        input_col="text",
        append_col="topic"
    )
    print("Classification results:")
    print(result)

Run the file:

python classification_udxf.py

The output shows:

Classification results:
                                               text                           topic
0  Server is down and users can't access the a...  TECHNICAL
1  How much does the enterprise plan cost per ...   BUSINESS
2  I forgot my password and need to reset it       SUPPORT

When to use this: You have text data that falls into discrete categories and you need consistent classification across thousands of rows. This works well for routing systems, content organization, and automated triaging.

This variation condenses long text into concise summaries. You’d use this to create executive summaries, generate email previews, or extract key points from documents.

Create a file called summarization_udxf.py:

import functools
import os
from openai import OpenAI
import pandas as pd
from urllib.parse import unquote_plus
import toolz
from xorq.common.utils.toolz_utils import curry
import xorq.api as xo
from xorq.flight.utils import schema_concat, schema_contains

@functools.cache
def get_client():
    return OpenAI(api_key=os.environ["OPENAI_API_KEY"])

def summarize_text(text):
    """Summarize text into one concise sentence"""
    if text == "":
        return ""
    
    messages = [
        {
            "role": "system",
            "content": "You are an AI trained to summarize text concisely.",
        },
        {
            "role": "user",
            "content": f"Summarize this in one sentence (max 20 words): {text}",
        },
    ]
    
    try:
        response = get_client().chat.completions.create(
            model="gpt-3.5-turbo",
            messages=messages,
            max_tokens=50,
            temperature=0,
            timeout=3,
        )
        return response.choices[0].message.content.strip()
    except Exception as e:
        return f"ERROR: {e}"

@curry
def get_summary_batch(df: pd.DataFrame, input_col, append_col):
    return df.assign(
        **{
            append_col: df[input_col].map(
                toolz.compose(summarize_text, unquote_plus)
            )
        }
    )

summarization_udxf = xo.expr.relations.flight_udxf(
    process_df=get_summary_batch(input_col="text", append_col="summary"),
    maybe_schema_in=schema_contains(xo.schema({"text": "!str"})),
    maybe_schema_out=schema_concat(to_concat=xo.schema({"summary": "!str"})),
    name="TextSummarizer",
)

if __name__ == "__main__":
    # Test with sample product reviews
    test_reviews = pd.DataFrame({
        "text": [
            "This product exceeded my expectations in every way. The build quality is exceptional, "
            "the features work flawlessly, and customer support was incredibly helpful when I had questions.",
            "I had several issues with the setup process. The documentation was unclear and I spent hours "
            "troubleshooting. However, once it was working, the performance has been solid.",
        ]
    })
    
    result = get_summary_batch(
        test_reviews,
        input_col="text",
        append_col="summary"
    )
    print("Summarization results:")
    print(result)

Run the file:

python summarization_udxf.py

The output shows:

Summarization results:
                                                text                                summary
0  This product exceeded my expectations in ev...  Exceptional product with great quality and support.
1  I had several issues with the setup process...  Setup was difficult but performance is solid.

When to use this: You have long-form text and need to extract the main point quickly, or you’re building previews for a user interface. This works well for document management systems, email clients, and content feeds.

This variation extracts structured information from unstructured text. You’d use this to build knowledge graphs, extract metadata, or identify key entities in documents.

Create a file called entity_extraction_udxf.py:

import functools
import os
from openai import OpenAI
import pandas as pd
from urllib.parse import unquote_plus
import toolz
from xorq.common.utils.toolz_utils import curry
import xorq.api as xo
from xorq.flight.utils import schema_concat, schema_contains

@functools.cache
def get_client():
    return OpenAI(api_key=os.environ["OPENAI_API_KEY"])

def extract_entities(text):
    """Extract company names and products from text"""
    if text == "":
        return "[]"
    
    messages = [
        {
            "role": "system",
            "content": "You are an AI trained to extract entities from text.",
        },
        {
            "role": "user",
            "content": f"Extract all company names and products from this text. "
            f"Return as JSON array of objects with 'type' and 'name' fields: {text}",
        },
    ]
    
    try:
        response = get_client().chat.completions.create(
            model="gpt-3.5-turbo",
            messages=messages,
            max_tokens=100,
            temperature=0,
            timeout=3,
        )
        return response.choices[0].message.content.strip()
    except Exception as e:
        return f"ERROR: {e}"

@curry
def get_entities_batch(df: pd.DataFrame, input_col, append_col):
    return df.assign(
        **{
            append_col: df[input_col].map(
                toolz.compose(extract_entities, unquote_plus)
            )
        }
    )

entity_extraction_udxf = xo.expr.relations.flight_udxf(
    process_df=get_entities_batch(input_col="text", append_col="entities"),
    maybe_schema_in=schema_contains(xo.schema({"text": "!str"})),
    maybe_schema_out=schema_concat(to_concat=xo.schema({"entities": "!str"})),
    name="EntityExtractor",
)

if __name__ == "__main__":
    # Test with sample news articles
    test_articles = pd.DataFrame({
        "text": [
            "Apple announced the new MacBook Pro with M3 chip at their annual event yesterday.",
            "Microsoft partners with OpenAI to integrate ChatGPT into Office 365 suite.",
        ]
    })
    
    result = get_entities_batch(
        test_articles,
        input_col="text",
        append_col="entities"
    )
    print("Entity extraction results:")
    print(result)

Run the file:

python entity_extraction_udxf.py

The output shows:

Entity extraction results:
                                                text                                           entities
0  Apple announced the new MacBook Pro with M3...  [{"type":"company","name":"Apple"},{"type":"...
1  Microsoft partners with OpenAI to integrate...  [{"type":"company","name":"Microsoft"},{"ty...

When to use this: You’re building structured databases from unstructured text, or you need to track mentions of specific entities across documents. This works well for competitive intelligence, knowledge management, and automated tagging systems.

Understanding the pattern: all three variations follow identical structure. You define an extraction function (what LLM does), wrap it in a batch processor (how it runs on DataFrames), add schema validation (what columns you need), and create the UDXF (package it for reuse). The infrastructure stays the same while the LLM prompt changes.

Here’s what makes this powerful: if you can describe the task to an LLM in a prompt, then you can turn it into a UDXF. Text translation? Same pattern. Content moderation? Same pattern. Data extraction? Same pattern. The schema validation catches errors before API calls, and deferred execution optimizes the pipeline.

Tip

Once you’ve tested these three examples, create your own UDXF for a different task. Copy one of the files above, modify the prompt in the extraction function, update the column names, and test it with your own data. The pattern is the same regardless of the LLM task.

What does success look like? You understand that UDXFs aren’t just for sentiment analysis. Any text processing task that needs schema validation and batch processing fits this pattern. You’ve tested three working examples and can adapt them to your specific use cases. The structure stays consistent while the prompts change.

What you learned

You’ve learned how to integrate LLM APIs into Xorq pipelines using UDXFs. Here’s what you accomplished:

  • Created an OpenAI client with singleton pattern
  • Wrote a sentiment extraction function with error handling
  • Built a batch processing function using currying
  • Defined input validation and output schemas
  • Created a UDXF that combines all components
  • Applied the UDXF to table data with deferred execution
  • Executed the pipeline and analyzed results

The key insight? UDXFs are Xorq’s pattern for external API integration. Schema validation catches errors early. Deferred execution optimizes pipelines. Currying makes functions reusable. The pattern works with any API like OpenAI, Anthropic, custom models. It scales from 10 rows to 10 million.

Next steps

Now that you know how to add LLM analysis with UDXFs, continue learning: