okr/utilities/utils.py
2025-01-02 19:39:39 +01:00

130 lines
4.0 KiB
Python

import json
import streamlit as st
import re
import fitz # PyMuPDF
# Function to construct the prompt
def construct_prompt(prompt_template: str, user_input: str) -> str:
return prompt_template.format(user_input=user_input)
def construct_prompt_for_pdf(prompt_template: str, pdf_input: str) -> str:
return prompt_template.format(pdf_input=pdf_input)
def parse_json_content(cleaned_content: str):
"""
Parses the cleaned content to extract valid JSON data.
Args:
cleaned_content (str): The raw content containing JSON data.
Returns:
dict or list: The parsed JSON object.
"""
import re
# Step 1: Strip unwanted characters and clean the content
cleaned_content = cleaned_content.strip()
# Step 2: Use regex to extract only the valid JSON block (e.g., starts with [ or {)
json_match = re.search(r"(\{.*\}|\[.*\])", cleaned_content, re.DOTALL)
if not json_match:
raise ValueError("No valid JSON found in the content.")
# Step 3: Extract and parse the valid JSON
valid_json = json_match.group(0) # Extract matched JSON block
try:
extracted_data = json.loads(valid_json)
except json.JSONDecodeError as e:
raise ValueError(f"Failed to decode JSON. Error: {e}\nContent:\n{valid_json}")
return extracted_data
# Function to extract and parse JSON response
def extract_llm_response(response):
"""
Extracts and parses the JSON response from the API.
Args:
response (dict): The API response containing a hint and proposals.
Returns:
tuple: A tuple containing the objective (str), key results (list), and hint (str).
"""
print("RESPONSE:",response)
raw_message_content = response["choices"][0]["message"]["content"]
print("raw_message_content:", raw_message_content)
# Clean and parse the JSON content
cleaned_content = raw_message_content.replace("`", "").split("json")[-1]
print("cleaned content", cleaned_content)
parsed_data = parse_json_content(cleaned_content=cleaned_content)
print("parsed_data:",parsed_data)
hint = parsed_data.get("hint", "")
proposals = parsed_data.get("proposals", [])
if proposals:
# Extract the first proposal's objective and key results
first_proposal = proposals[0] # Get the first proposal (assuming it's a list)
objective = first_proposal.get("objective", "")
key_results = first_proposal.get("key_results", [])
else:
objective = ""
key_results = []
#print("debug:", parsed_data.get("objective", ""))
return objective, key_results, hint
#try:
# Extract hint from the response
hint = response.get("hint", "")
# Extract proposals from the response
proposals = response.get("proposals", [])
print("hint:", hint)
print("proposals:", proposals)
# Check if proposals are available
if proposals:
# Extract the first proposal's objective and key results
first_proposal = proposals[0] # Get the first proposal (assuming it's a list)
objective = first_proposal.get("objective", "")
key_results = first_proposal.get("key_results", [])
else:
objective = ""
key_results = []
# Log parsed data for debugging
print("parsed_data:", {"objective": objective, "key_results": key_results, "hint": hint})
return objective, key_results, hint
#except Exception as e:
# print(f"Error parsing API response: {e}")
# return "", [], ""
def extract_text_from_pdf(pdf_path):
"""Extract text from a PDF file."""
doc = fitz.open(pdf_path)
text = ""
for page in doc:
text += page.get_text()
return text
def chunk_text(text, max_chars=3000):
"""Split text into smaller chunks."""
chunks = []
while len(text) > max_chars:
split_index = text[:max_chars].rfind("\n") # Split at the nearest newline
if split_index == -1: # No newline found
split_index = max_chars
chunks.append(text[:split_index])
text = text[split_index:]
chunks.append(text)
return chunks