Databricks notebook#

IMPORTANT To run this in your Databricks workspace, import this Databricks Notebook URL to your workspace: https://ai-cookbook.io/10-min-demo/mosaic-ai-agents-demo-dbx-notebook.html#







TLDR; this notebook will:#

  1. Deploy a RAG application built with Agent Framework to the Agent Evaluation review application

    • The review application is used by your business stakeholders to provide feedback on your app

  2. Evaluate the quality of the application with Agent Evaluation and MLflow

    • These AI-assisted evaluations are used by developers to improve the application’s quality

Products used:#

  • Mosaic AI Agent Framework SDK to quickly and safely build high-quality RAG applications.

  • Mosaic AI Agent Evaluation AI-assisted evaluation tool to determines if outputs are high-quality. Provides an intuitive UI to get feedback from human stakeholders.

  • Mosaic AI Model Serving Hosts the application’s logic as a production-ready, scalable REST API.

  • MLflow Tracks and manages the application lifecycle, including evaluation results and application code/config

  • Generative AI Cookbook A definitive how-to guide, backed by a code repo, for building high-quality Gen AI apps, developed in partnership with Mosaic AI’s research team.

Requires a single-user cluster running on DBR 14.3+#

%pip uninstall -y mlflow mlflow-skinny
%pip install -U -qqqq databricks-agents mlflow mlflow-skinny databricks-vectorsearch databricks-sdk langchain==0.2.11 langchain_core==0.2.23 langchain_community==0.2.10 
dbutils.library.restartPython()

Setup: Load the necessary data and code from the Databricks Cookbook repo#

The following cell clones the Generative AI cookbook repo from https://github.com/databricks/genai-cookbook into a folder genai-cookbook in the same folder as this notebook using a Git Folder.

Alternatively, you can manually clone the Git repo https://github.com/databricks/genai-cookbook to a folder genai-cookbook.

import os
from databricks.sdk.core import DatabricksError
from databricks.sdk import WorkspaceClient

CURRENT_FOLDER = os.getcwd()
QUICK_START_REPO_URL = "https://github.com/databricks/genai-cookbook.git"
QUICK_START_REPO_SAVE_FOLDER = "genai-cookbook"

if os.path.isdir(QUICK_START_REPO_SAVE_FOLDER):
    raise Exception(
        f"{QUICK_START_REPO_SAVE_FOLDER} folder already exists, please change the variable QUICK_START_REPO_SAVE_FOLDER to be a non-existant path."
    )

# Clone the repo
w = WorkspaceClient()
try:
    w.repos.create(
        url=QUICK_START_REPO_URL, provider="github", path=f"{CURRENT_FOLDER}/{QUICK_START_REPO_SAVE_FOLDER}"
    )
    print(f"Cloned sample code repo to: {QUICK_START_REPO_SAVE_FOLDER}")
except DatabricksError as e:
    if e.error_code == "RESOURCE_ALREADY_EXISTS":
        print("Repo already exists. Skipping creation")
    else:
        raise Exception(
            f"Failed to clone the quick start code.  You can manually import this by creating a Git folder from the contents of {QUICK_START_REPO_URL} in the {QUICK_START_REPO_SAVE_FOLDER} folder in your workspace and then re-running this Notebook."
        )

Application configuration#

We’ve selected defaults for the following parameters based on your user name, but inspect and change if you prefer to use existing resources. Any missing resources will be created in the next step.

  1. UC_CATALOG & UC_SCHEMA: Unity Catalog and a Schema where the output Delta Tables with the parsed/chunked documents and Vector Search indexes are stored

  2. UC_MODEL_NAME: Unity Catalog location to log and store the chain’s model

  3. VECTOR_SEARCH_ENDPOINT: Vector Search Endpoint to host the resulting vector index

# Use the current user name to create any necesary resources
w = WorkspaceClient()
user_name = w.current_user.me().user_name.split("@")[0].replace(".", "")

# UC Catalog & Schema where outputs tables/indexs are saved
# If this catalog/schema does not exist, you need create catalog/schema permissions.
UC_CATALOG = f'{user_name}_catalog'
UC_SCHEMA = f'agent_demo'

# UC Model name where the POC chain is logged
UC_MODEL_NAME = f"`{UC_CATALOG}`.`{UC_SCHEMA}`.doc_bot"

# Vector Search endpoint where index is loaded
# If this does not exist, it will be created
VECTOR_SEARCH_ENDPOINT = f'{user_name}_vector_search'

Check if the UC Catalog & Vector Search endpoint exist; create otherwise#

The code is this cell checks if the resources exist, trying to create them if not.

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.vectorsearch import EndpointStatusState, EndpointType
from databricks.sdk.service.serving import EndpointCoreConfigInput, EndpointStateReady
from databricks.sdk.errors import ResourceDoesNotExist, NotFound, PermissionDenied
import os
w = WorkspaceClient()

# Create UC Catalog if it does not exist, otherwise, raise an exception
try:
    _ = w.catalogs.get(UC_CATALOG)
    print(f"PASS: UC catalog `{UC_CATALOG}` exists")
except NotFound as e:
    print(f"`{UC_CATALOG}` does not exist, trying to create...")
    try:
        _ = w.catalogs.create(name=UC_CATALOG)
    except PermissionDenied as e:
        print(f"FAIL: `{UC_CATALOG}` does not exist, and no permissions to create.  Please provide an existing UC Catalog.")
        raise ValueError(f"Unity Catalog `{UC_CATALOG}` does not exist.")
        
# Create UC Schema if it does not exist, otherwise, raise an exception
try:
    _ = w.schemas.get(full_name=f"{UC_CATALOG}.{UC_SCHEMA}")
    print(f"PASS: UC schema `{UC_CATALOG}.{UC_SCHEMA}` exists")
except NotFound as e:
    print(f"`{UC_CATALOG}.{UC_SCHEMA}` does not exist, trying to create...")
    try:
        _ = w.schemas.create(name=UC_SCHEMA, catalog_name=UC_CATALOG)
        print(f"PASS: UC schema `{UC_CATALOG}.{UC_SCHEMA}` created")
    except PermissionDenied as e:
        print(f"FAIL: `{UC_CATALOG}.{UC_SCHEMA}` does not exist, and no permissions to create.  Please provide an existing UC Schema.")
        raise ValueError("Unity Catalog Schema `{UC_CATALOG}.{UC_SCHEMA}` does not exist.")

# Create the Vector Search endpoint if it does not exist
vector_search_endpoints = w.vector_search_endpoints.list_endpoints()
if sum([VECTOR_SEARCH_ENDPOINT == ve.name for ve in vector_search_endpoints]) == 0:
    print(f"Please wait, creating Vector Search endpoint `{VECTOR_SEARCH_ENDPOINT}`.  This can take up to 20 minutes...")
    w.vector_search_endpoints.create_endpoint_and_wait(VECTOR_SEARCH_ENDPOINT, endpoint_type=EndpointType.STANDARD)

# Make sure vector search endpoint is online and ready.
w.vector_search_endpoints.wait_get_endpoint_vector_search_endpoint_online(VECTOR_SEARCH_ENDPOINT)

print(f"PASS: Vector Search endpoint `{VECTOR_SEARCH_ENDPOINT}` exists")

Build & deploy the application#

Below is a high-level overview of the architecture we will deploy:

1/ Create the Vector Search Index#

First, we copy the sample data to a Delta Table and sync to a Vector Search index. Here, we use the gte-large-en-v1.5 embedding model hosted on Databricks Foundational Model APIs.

# UC locations to store the chunked documents & index
CHUNKS_DELTA_TABLE = f"`{UC_CATALOG}`.`{UC_SCHEMA}`.databricks_docs_chunked"
CHUNKS_VECTOR_INDEX = f"`{UC_CATALOG}`.`{UC_SCHEMA}`.databricks_docs_chunked_index"
from pyspark.sql import SparkSession
from databricks.vector_search.client import VectorSearchClient

# Workspace URL for printing links to the delta table/vector index
workspace_url = SparkSession.getActiveSession().conf.get(
    "spark.databricks.workspaceUrl", None
)

# Vector Search client
vsc = VectorSearchClient(disable_notice=True)

# Load the chunked data to Delta Table & enable change-data capture to allow the table to sync to Vector Search
chunked_docs_df = spark.read.parquet(
    f"file:{CURRENT_FOLDER}/{QUICK_START_REPO_SAVE_FOLDER}/quick_start_demo/chunked_databricks_docs.snappy.parquet"
)
chunked_docs_df.write.format("delta").mode("overwrite").saveAsTable(CHUNKS_DELTA_TABLE)
spark.sql(
    f"ALTER TABLE {CHUNKS_DELTA_TABLE} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)"
)

print(
    f"View Delta Table at: https://{workspace_url}/explore/data/{UC_CATALOG}/{UC_SCHEMA}/{CHUNKS_DELTA_TABLE.split('.')[-1]}"
)

# Embed and sync chunks to a vector index
print(
    f"Embedding docs & creating Vector Search Index, this will take ~5 - 10 minutes.\nView Index Status at: https://{workspace_url}/explore/data/{UC_CATALOG}/{UC_SCHEMA}/{CHUNKS_VECTOR_INDEX.split('.')[-1]}"
)

index = vsc.create_delta_sync_index_and_wait(
    endpoint_name=VECTOR_SEARCH_ENDPOINT,
    index_name=CHUNKS_VECTOR_INDEX,
    primary_key="chunk_id",
    source_table_name=CHUNKS_DELTA_TABLE,
    pipeline_type="TRIGGERED",
    embedding_source_column="chunked_text",
    embedding_model_endpoint_name="databricks-gte-large-en",
)

2/ Deploy to the review application#

Now that our Vector Search index is ready, let’s prepare the RAG chain and deploy it to the review application backed by a scalable-production ready REST API on Model serving.

2.1/ Configuring our Chain parameters#

Databricks makes it easy to parameterize your chain with MLflow Model Configurations. Later, you can tune application quality by adjusting these parameters, such as the system prompt or retrieval settings. Most applications will include many more parameters, but for this demo, we’ll keep the configuration to a minimum.

chain_config = {
    "llm_model_serving_endpoint_name": "databricks-dbrx-instruct",  # the foundation model we want to use
    "vector_search_endpoint_name": VECTOR_SEARCH_ENDPOINT,  # Endoint for vector search
    "vector_search_index": f"{CHUNKS_VECTOR_INDEX}",
    "llm_prompt_template": """You are an assistant that answers questions. Use the following pieces of retrieved context to answer the question. Some pieces of context may be irrelevant, in which case you should not use them to form the answer.\n\nContext: {context}""", # LLM Prompt template
}

# Here, we define an input example in the schema required by Agent Framework
input_example = {"messages": [ {"role": "user", "content": "What is Retrieval-augmented Generation?"}]}

2.1/ Log the application & view trace#

We first register the chain as an MLflow model and inspect the MLflow Trace to understand what is happening inside the chain.

MLflow trace#


import mlflow

# Log the model to MLflow
with mlflow.start_run(run_name="databricks-docs-bot"):
    logged_chain_info = mlflow.langchain.log_model(
        lc_model=os.path.join(
            os.getcwd(),
            f"{QUICK_START_REPO_SAVE_FOLDER}/quick_start_demo/sample_rag_chain",
        ),  # Chain code file from the quick start repo
        model_config=chain_config,  # Chain configuration set above
        artifact_path="chain",  # Required by MLflow
        input_example=input_example,  # Save the chain's input schema.  MLflow will execute the chain before logging & capture it's output schema.
    )

# Test the chain locally to see the MLflow Trace
chain = mlflow.langchain.load_model(logged_chain_info.model_uri)
chain.invoke(input_example)

2.1/ Deploy the application#

Now, we:

  1. Register the application in Unity Catalog

  2. Use Agent Framework to deploy to the Quality Lab review application

Along side the review ap, a scalable, production-ready Model Serving endpoint is also deployed.

Agent Evaluation review application#

from databricks import agents
import time
from databricks.sdk.service.serving import EndpointStateReady, EndpointStateConfigUpdate

# Use Unity Catalog to log the chain
mlflow.set_registry_uri('databricks-uc')

# Register the chain to UC
uc_registered_model_info = mlflow.register_model(model_uri=logged_chain_info.model_uri, name=UC_MODEL_NAME)

# Deploy to enable the Review APP and create an API endpoint
deployment_info = agents.deploy(model_name=UC_MODEL_NAME, model_version=uc_registered_model_info.version)

# Wait for the Review App to be ready
print("\nWaiting for endpoint to deploy.  This can take 10 - 20 minutes.", end="")
while w.serving_endpoints.get(deployment_info.endpoint_name).state.ready == EndpointStateReady.NOT_READY or w.serving_endpoints.get(deployment_info.endpoint_name).state.config_update == EndpointStateConfigUpdate.IN_PROGRESS:
    print(".", end="")
    time.sleep(30)

3/ Use Agent Evaluation to evaluate your application#

3.1/ Have stakeholders chat your bot to build your evaluation dataset#

Normally, you would now give access to internal domain experts and have them test and review the bot. Your domain experts do NOT need to have Databricks Workspace access - you can assign permissions to any user in your SSO if you have enabled SCIM

This is a critical step to build or improve your evaluation dataset: have users ask questions to your bot, and provide the bot with output answer when they don’t answer properly.

Your applicaation is automatically capturing all stakeholder questions and bot responses, including the MLflow Trace for each, into Delta Tables in your Lakehouse. On top of that, Databricks makes it easy to track feedback from your end user: if the chatbot doesn’t give a good answer and the user gives a thumbdown, their feedback is included in the Delta Tables.

Your evaluation dataset forms the basis of your development workflow to improve quality: identifying the root causes of quality issues and then objectively measuring the impact of your fixes.


3.2/ Run Evaluation of your Chain#

Now, let’s use everage Agent Evaluation’s specialized AI evaluators to evaluate our model performance. Agent Evaluation is integrated into mlflow.evaluate(...), all you need to do is pass model_type="databricks-agent".

For this demo, we use a toy 10 question evaluation dataset. Read more about our best practices on the size of your evaluation dataset.

import pandas as pd

sample_eval_set = [
    {
        "request_id": "5482",
        "request": "What happens if I try to access an index that is out of bounds in an array using the [ ] operator in Databricks SQL when spark.sql.ansi.enabled is set to false?",
        "expected_response": "If you try to access an index that is out of bounds in an array using the [ ] operator in Databricks SQL when spark.sql.ansi.enabled is set to false, Databricks will return NULL instead of raising an error.",
    },
    {
        "request_id": "2112",
        "request": "Why is a long-running stage in my Spark job only showing one task, and how can I resolve this issue?",
        "expected_response": "A long-running stage with one task in a Spark job could be due to several reasons such as:\n\n1. Expensive User Defined Functions (UDFs) on small data\n2. Window function without a PARTITION BY statement\n3. Reading from an unsplittable file type like gzip\n4. Setting the multiLine option when reading a JSON or CSV file\n5. Schema inference of a large file\n6. Use of repartition(1) or coalesce(1)\n\nTo resolve this issue, you can:\n\n1. Optimize your UDFs or replace them with built-in functions if possible.\n2. Ensure that you have a proper PARTITION BY statement in your window functions.\n3. Avoid using unsplittable file types like gzip. Instead, use splittable file types like snappy or lz4.\n4. Avoid setting the multiLine option when reading JSON or CSV files.\n5. Perform schema inference on a small sample of your data and then apply it to the entire dataset.\n6. Avoid using repartition(1) or coalesce(1) unless necessary.\n\nBy implementing these changes, you should be able to resolve the issue of a long-running stage with only one task in your Spark job.",
    },
    {
        "request_id": "5054",
        "request": "How can I represent 4-byte single-precision floating point numbers in Databricks SQL and what are their limits?",
        "expected_response": "4-byte single-precision floating point numbers can be represented in Databricks SQL using the `FLOAT` or `REAL` syntax. The range of numbers that can be represented is from -3.402E+38 to +3.402E+38, including negative infinity, positive infinity, and NaN (not a number). Here are some examples of how to represent these numbers:\n\n* `+1F` represents 1.0\n* `5E10F` represents 5E10\n* `5.3E10F` represents 5.3E10\n* `-.1F` represents -0.1\n* `2.F` represents 2.0\n* `-5555555555555555.1F` represents -5.5555558E15\n* `CAST(6.1 AS FLOAT)` represents 6.1\n\nNote that `FLOAT` is a base-2 numeric type, so the representation of base-10 literals may not be exact. If you need to accurately represent fractional or large base-10 numbers, consider using the `DECIMAL` type instead.",
    },
    {
        "request_id": "2003",
        "request": "How can I identify the reason for failing executors in my Databricks workspace, and what steps can I take to resolve memory issues?",
        "expected_response": "1. Identify failing executors: In your Databricks workspace, navigate to the compute's Event log to check for any explanations regarding executor failures. Look for messages indicating spot instance losses or cluster resizing due to autoscaling. If using spot instances, refer to 'Losing spot instances' documentation. For autoscaling, refer to 'Learn more about cluster resizing' documentation.\n\n2. Check executor logs: If no information is found in the event log, go to the Spark UI and click the Executors tab. Here, you can access logs from failed executors to investigate further.\n\n3. Identify memory issues: If the above steps do not provide a clear reason for failing executors, it is likely a memory issue. To dig into memory issues, refer to the 'Spark memory issues' documentation.\n\n4. Resolve memory issues: To resolve memory issues, consider the following steps:\n\n   a. Increase executor memory: Allocate more memory to executors by adjusting the 'spark.executor.memory' property in your Spark configuration.\n\n   b. Increase driver memory: Allocate more memory to the driver by adjusting the 'spark.driver.memory' property in your Spark configuration.\n\n   c. Use off-heap memory: Enable off-heap memory by setting the 'spark.memory.offHeap.enabled' property to 'true' and allocating off-heap memory using the 'spark.memory.offHeap.size' property.\n\n   d. Optimize data processing: Review your data processing workflows and optimize them for memory efficiency. This may include reducing data shuffling, using broadcast variables, or caching data strategically.\n\n   e. Monitor memory usage: Monitor memory usage in your Databricks workspace to identify potential memory leaks or inefficient memory utilization. Use tools like the Spark UI, Ganglia, or Grafana to monitor memory usage.",
    },
]

eval_df = pd.DataFrame(sample_eval_set)
display(eval_df)
with mlflow.start_run(run_id=logged_chain_info.run_id):
    # Evaluate
    eval_results = mlflow.evaluate(
        data=eval_df,  # Your evaluation set
        model=logged_chain_info.model_uri,  # previously logged model
        model_type="databricks-agent",  # activate Mosaic AI Agent Evaluation
    )

What’s next?#

Code-based quick starts#

Time required

Outcome

Link

🕧
10 minutes

Sample RAG app deployed to web-based chat app that collects feedback

🕧🕧🕧
60 minutes

POC RAG app with your data deployed to a chat UI that can collect feedback from your business stakeholders

Deploy POC w/ your data

🕧🕧
30 minutes

Comprehensive quality/cost/latency evaluation of your POC app

- Evaluate your POC
- Identify the root causes of quality issues