In [1]:
! pip install pdfplumber
! pip install ibm-watson-machine-learning



In [4]:
import os
import pdfplumber
import pandas as pd
from pdfplumber.utils import cluster_objects
from operator import itemgetter
from ibm_watson_machine_learning import APIClient
from ibm_watsonx_ai.foundation_models.utils.enums import ModelTypes
from ibm_watsonx_ai.metanames import GenTextParamsMetaNames as GenParams
from ibm_watsonx_ai.foundation_models import ModelInference
from dotenv import load_dotenv

load_dotenv()

True

In [5]:
# Credentials
SPACE_ID = os.getenv("wml_space_id")
IBM_CLOUD_APIKEY = os.getenv("ibm_cloud_apikey")
WX_PROJECT_ID = os.getenv("wx_project_id")
CREDS = {
    "url": "https://us-south.ml.cloud.ibm.com",
    "apikey": IBM_CLOUD_APIKEY
}
project_id = WX_PROJECT_ID

# Parameters and prompts used
model_type = ModelTypes.MIXTRAL_8X7B_INSTRUCT_V01_Q
decoding_method = "greedy"
max_tokens = 500
min_tokens = 50
repetition_penalty = 1.05
instruction = """
Read and understand the table, then transform the given table into a coherent paragraph. The first row is the column heading. Please keep it as similar as to the original text. Do not include any explanation in the output. Make sure to include every word from the given string. Do not use bullet points.
"""

In [6]:
def get_all_file_paths(directory):
    """
    Get all file paths from different directories
    """
    file_paths = [] 

    # Check all directories
    for root, directories, files in os.walk(directory):
        for filename in files:
            # Create the full filepath by joining the root directory with the filename
            filepath = os.path.join(root, filename)
            file_paths.append(filepath)

    return file_paths

def check_bboxes(word, table_bbox):
    """
    Check whether word is inside a table bbox.
    """
    l = word['x0'], word['top'], word['x1'], word['bottom']
    r = table_bbox
    return l[0] > r[0] and l[1] > r[1] and l[2] < r[2] and l[3] < r[3]

def clean_and_combine_headers(rows):
    """Clean tables and refine table column headers"""
    if not rows:
        return rows

    # Take the first row as the header
    header_row = rows[0]

    # Create a new table starting from the first row
    new_table = [header_row] + rows[1:]

    # Transpose back to columns to filter out columns with all None values
    transposed_rows = list(zip(*new_table))
    final_columns = []
    for col in transposed_rows:
        if any(cell is not None and cell.strip() for cell in col):
            final_columns.append(col)

    # Transpose back to rows
    final_rows = list(zip(*final_columns))

    return final_rows

def clean_sme_text(table, filename):
    """Replace 'Subject Matter Experts:' with 'For {filename}, the subject matter experts are:' in table rows."""
    for i, row in enumerate(table):
        table[i] = [cell.replace("Subject Matter Experts:", f"For {filename}, the subject matter experts are:") if cell and "Subject Matter Experts:" in cell else cell for cell in row]
    return table

def remove_bullet_points(lines):
    """Remove main bullets. Convert sub-bullets ending with ' o' into comma-separated text enclosed by parentheses behind their respective parent bullet."""
    processed_lines = []
    sub_bullets = []

    for line in lines:
        if isinstance(line, list):  # This means it is a table
            if sub_bullets and processed_lines:
                processed_lines[-1] += ' (' + ', '.join(sub_bullets) + ')'
                sub_bullets = []
            processed_lines.append(line)
        else:  # This means it is text
            line = line.replace("â€¢", "") #remove main bullet
            stripped_line = line.strip()
            # Logic for processing sub bullets
            if stripped_line.endswith(' o'): 
                # remove sub-bullets
                sub_bullet_text = stripped_line[:-2].strip()
                if sub_bullet_text:
                    sub_bullets.append(sub_bullet_text)
            else:
                # Combine sub bullets
                if sub_bullets and processed_lines:
                    processed_lines[-1] += ' (' + ', '.join(sub_bullets) + ')'
                    sub_bullets = []
                processed_lines.append(line)

    # In case there are sub-bullets left unprocessed
    if sub_bullets and processed_lines and not isinstance(processed_lines[-1], list):
        processed_lines[-1] += ' (' + ', '.join(sub_bullets) + ')'

    return processed_lines

def watsonx_ai_api(creds, model_id, decoding_method, max_tokens, min_tokens, repetition_penalty, instruction, context):
    """Calling the watsonx.ai API"""
    parameters = {
        GenParams.DECODING_METHOD: decoding_method,
        GenParams.MAX_NEW_TOKENS: max_tokens,
        GenParams.MIN_NEW_TOKENS: min_tokens,
        GenParams.REPETITION_PENALTY: repetition_penalty,
        GenParams.STOP_SEQUENCES: ["\n\n"]
    }
    model = ModelInference(
        model_id=model_id, 
        params=parameters, 
        credentials=creds,
        project_id=project_id)

    result = model.generate_text(" ".join([instruction, context]))
    return result

def process_file(pdf_path, html_path):
    """Main function to preprocess a PDF file and output a HTML file"""
    footer_threshold = 50  # footer region
    header_threshold = 50  # header region
    filename = os.path.splitext(os.path.basename(pdf_path))[0]  # Extract the filename without extension

    # Process the PDF
    with pdfplumber.open(pdf_path) as pdf:
        all_html = "<html><body>"

        # Process page by page
        for page_number, page in enumerate(pdf.pages, start=1):
            # Find tables and get the bounding boxes and get table data
            tables = page.find_tables()
            table_bboxes = [i.bbox for i in tables]
            tables_data = [{'table': i.extract(), 'top': i.bbox[1]} for i in tables]

            # Extract non-table text and exclude footer text and header text 
            page_height = page.height
            non_table_words = [
                word for word in page.extract_words()
                if not any([check_bboxes(word, table_bbox) for table_bbox in table_bboxes])
                and word['bottom'] < page_height - footer_threshold
                 # Starting from second page and onwards because header only exists starting page 2
                and (page_number == 1 or word['top'] > header_threshold)
            ]
            
            #Ensure we append table at the right location, so we can replace table with summarized text later
            lines = []
            for cluster in cluster_objects(non_table_words + tables_data, itemgetter('top'), tolerance=5):
                if isinstance(cluster[0], dict) and 'text' in cluster[0]:  # Check if the first element is a word
                    lines.append(' '.join([i['text'] for i in cluster if 'text' in i]))
                elif 'table' in cluster[0]:
                    lines.append(cluster[0]['table'])

            # Group and format main bullet and sub-bullets
            processed_lines = remove_bullet_points(lines)

            # Add text and tables to HTML
            for line in processed_lines:
                if isinstance(line, list):  # This is a table
                    table = clean_and_combine_headers(line)
                    table = clean_sme_text(table, filename)  # Replace SME text in the table
                    if len(table) > 1 and len(table[0]) > 0:  # Ensure the table has the correct structure
                        df = pd.DataFrame(table[1:], columns=table[0])
                        table_string = "Input: " + df.to_string(index=False) +"Output: "
                        # Pass the table information to watsonx.ai API to have it summarized.
                        result = watsonx_ai_api(
                            CREDS, model_type, decoding_method, max_tokens, min_tokens,
                            repetition_penalty, instruction, table_string
                        )
                        all_html += f"<p>{result}</p>"
                else:  # This is text
                    all_html += f"<p>{line}</p>"

        all_html += "</body></html>"

    #write the html file
    with open(html_path, "w", encoding="utf-8") as f:
        f.write(all_html)

In [None]:
#Folder paths
directory = [
    "Filtered Pdf's/Complexity- High/Business/", "Filtered Pdf's/Complexity- High/Technical/",
    "Filtered Pdf's/Complexity- Medium/Business/", "Filtered Pdf's/Complexity- Medium/Technical/",
    "Filtered Pdf's/Complexity- Easy/Business/", "Filtered Pdf's/Complexity- Easy/Technical/",
]

#Find all file paths
final_path = []
for dir in directory:
    file_paths = get_all_file_paths(dir)
    for path in file_paths:
        final_path.append(path)

# If output directory does not exist, create html folder
output_dir = 'html2'
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Run through each file 
for pdf_path in final_path:
    filename = os.path.splitext(os.path.basename(pdf_path))[0]
    if pdf_path.split('.')[-1] == 'pdf':
        html_path = os.path.join(output_dir, f"{filename}.html")
        process_file(pdf_path, html_path)
        print(f"{filename}.html is created.")