Clean Log Table Python
Customer Guide: Python PII Cleaning Script
This document provides instructions for setting up and using the Python-based PII (Personally Identifiable Information) cleaning script. This script serves as a powerful, high-accuracy alternative to SQL-based cleaning methods by leveraging Microsoft's Presidio library to detect and redact sensitive data from your logs before making them available for integration.
How It Works
The script is designed to run as a continuous, incremental process. It connects to your ClickHouse database and performs the following steps:
- Checks Progress: It looks at a local file (
last_processed_timestamp.txt
) to find the last log entry it processed. If the file doesn't exist, it starts from the beginning. - Fetches New Data: It queries your raw source table for any new log entries that have arrived since the last run.
- Cleans with Presidio: For each new log, it uses the Presidio engine to analyze the configured text column (e.g.,
message
) for sensitive information like names, addresses, credit card numbers, and IP addresses. - Inserts Clean Data: It inserts the cleaned, PII-free version of the logs into a new, secure destination table.
- Saves Progress: It updates the
last_processed_timestamp.txt
file with the timestamp of the last log it processed, ensuring it won't process the same data twice.
This process is designed to be run on a schedule (e.g., every 5 minutes), creating a near real-time, sanitized copy of your log data.
Step 1: Prerequisites
Before you run the script, please ensure the following requirements are met on the machine where the script will run.
1. Install Python Libraries
You will need Python 3.7+ and the following libraries. You can install them using pip:
pip install clickhouse-driver presidio-analyzer presidio-anonymizer "spacy==3.7.2"
2. Download Language Model
Presidio uses the spacy
library for natural language processing. Download the English language model with this command:
python -m spacy download en_core_web_lg
3. ClickHouse User Permissions
The script requires a ClickHouse user with the following permissions:
- READ access on the source table (e.g.,
customer_db.logs
). - WRITE and CREATE TABLE access on the target database (e.g.,
customer_db
) so it can create the cleaned table and insert data into it.
Step 2: Configuration
Open the Python script (clickhouse_presidio_incremental_script.py
) in an editor and modify the Configuration section at the top.
CLICKHOUSE_HOST
: The hostname or IP address of your ClickHouse server.ADMIN_USER
: The ClickHouse username you created in the prerequisite step.DATABASE
: The name of your ClickHouse database.SOURCE_TABLE
: The name of your original table containing raw logs (e.g.,logs
).CLEANED_TABLE
: The name for the new table that will store the sanitized logs (e.g.,llm_logs_presidio_cleaned
).COLUMN_TO_CLEAN
: The name of the column within your logs that contains the text you want to anonymize (e.g.,message
).COLUMNS_TO_TRANSFER
: A list of all columns you want to copy from the source table to the cleaned table. The order must be consistent.ENTITIES_TO_REDACT
: A list of PII types you want Presidio to find and redact. The script includes a comprehensive default list. You can customize this based on your needs.
Step 3: Running the Script
Initial Run
The first time you run the script, it will process all existing data from your source table. This may take a long time depending on the volume of your historical logs.
- Navigate to the directory containing the script in your terminal.
- Run the script:
python clickhouse_presidio_incremental_script.py
- The script will prompt you to enter the password for the ClickHouse user.
- It will then begin the backfill process, creating the new table and populating it with cleaned data.
Scheduling for Continuous Updates
After the initial backfill is complete, you should run the script on a regular schedule to process new data as it arrives. A cron
job is a standard tool for this on Linux or macOS.
For example, to run the script every 5 minutes, you can add the following line to your crontab:
# Edit your crontab file
crontab -e
# Add this line, replacing the paths with the correct ones for your system
*/5 * * * * /usr/bin/python3 /path/to/your/clickhouse_presidio_incremental_script.py
This ensures that your cleaned table is consistently updated with the latest PII-free logs, ready for secure integration.
Full Python Script
Below is the complete clickhouse_presidio_incremental_script.py
file.
import getpass
from clickhouse_driver import Client
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
import os
from datetime import datetime
# --- Configuration ---
CLICKHOUSE_HOST = 'localhost'
ADMIN_USER = 'default'
DATABASE = 'customer_db'
# --- Table and Column Configuration ---
SOURCE_TABLE = 'logs'
CLEANED_TABLE = 'llm_logs_presidio_cleaned'
COLUMN_TO_CLEAN = 'message'
COLUMNS_TO_TRANSFER = [
'event_timestamp',
'level',
'message',
'user_id',
'llm_model'
]
# --- Processing Configuration ---
BATCH_SIZE = 1000
# File to store the timestamp of the last processed row
STATE_FILE = 'last_processed_timestamp.txt'
# --- Presidio Configuration ---
ENTITIES_TO_REDACT = [
"CREDIT_CARD", "CRYPTO", "DATE_TIME", "EMAIL_ADDRESS", "IBAN_CODE",
"IP_ADDRESS", "NRP", "LOCATION", "PERSON", "PHONE_NUMBER", "MEDICAL_LICENSE"
]
class PresidioAnonymizer:
"""A helper class to encapsulate Presidio functionality."""
def __init__(self):
self.analyzer = AnalyzerEngine()
self.anonymizer = AnonymizerEngine()
def anonymize_text(self, text):
"""Analyzes and anonymizes a given text, replacing PII."""
if not text or not isinstance(text, str):
return text
try:
analyzer_results = self.analyzer.analyze(
text=text, entities=ENTITIES_TO_REDACT, language='en'
)
anonymized_result = self.anonymizer.anonymize(
text=text,
analyzer_results=analyzer_results,
operators={"DEFAULT": OperatorConfig("replace", {"new_value": "<REDACTED>"})}
)
return anonymized_result.text
except Exception as e:
print(f"Error during anonymization: {e}")
return text # Return original text on error
def get_last_processed_timestamp():
"""Reads the last processed timestamp from the state file."""
if os.path.exists(STATE_FILE):
with open(STATE_FILE, 'r') as f:
timestamp_str = f.read().strip()
# Ensure the timestamp is in the correct format for ClickHouse
return timestamp_str
# Return a very old date if the file doesn't exist to process all records
return '1970-01-01 00:00:00'
def save_last_processed_timestamp(timestamp):
"""Saves the latest processed timestamp to the state file."""
with open(STATE_FILE, 'w') as f:
# Convert datetime object to string if necessary
f.write(str(timestamp))
print(f"State updated. Last processed timestamp is now: {timestamp}")
def create_cleaned_table(client):
"""Creates the destination table if it doesn't already exist."""
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {DATABASE}.{CLEANED_TABLE} (
event_timestamp DateTime,
level String,
message String,
user_id String,
llm_model String
) ENGINE = MergeTree()
ORDER BY event_timestamp;
"""
print(f"Ensuring destination table '{CLEANED_TABLE}' exists...")
client.execute(create_table_query)
print("Table ready.")
def main():
"""Main function to run the incremental ETL and anonymization process."""
anonymizer = PresidioAnonymizer()
total_processed = 0
try:
admin_password = getpass.getpass(f"Enter password for ClickHouse user '{ADMIN_USER}': ")
client = Client(host=CLICKHOUSE_HOST, user=ADMIN_USER, password=admin_password, database=DATABASE)
print("\nSuccessfully connected to ClickHouse.")
create_cleaned_table(client)
last_timestamp = get_last_processed_timestamp()
print(f"\nStarting PII cleaning process for data newer than: {last_timestamp}")
columns_str = ", ".join(COLUMNS_TO_TRANSFER)
column_to_clean_index = COLUMNS_TO_TRANSFER.index(COLUMN_TO_CLEAN)
timestamp_column_index = COLUMNS_TO_TRANSFER.index('event_timestamp')
while True:
# Query for new data since the last run
select_query = f"""
SELECT {columns_str} FROM {DATABASE}.{SOURCE_TABLE}
WHERE event_timestamp > toDateTime('{last_timestamp}')
ORDER BY event_timestamp
LIMIT {BATCH_SIZE}
"""
source_data = client.execute(select_query)
if not source_data:
print("No new data to process.")
break
cleaned_batch = []
latest_timestamp_in_batch = None
for row in source_data:
mutable_row = list(row)
text_to_clean = mutable_row[column_to_clean_index]
cleaned_text = anonymizer.anonymize_text(text_to_clean)
mutable_row[column_to_clean_index] = cleaned_text
cleaned_batch.append(mutable_row)
# Keep track of the latest timestamp in the current batch
latest_timestamp_in_batch = row[timestamp_column_index]
print(f"Inserting {len(cleaned_batch)} cleaned rows...")
client.execute(f'INSERT INTO {DATABASE}.{CLEANED_TABLE} VALUES', cleaned_batch)
# After successful insertion, update the state file
if latest_timestamp_in_batch:
save_last_processed_timestamp(latest_timestamp_in_batch)
last_timestamp = latest_timestamp_in_batch # for the next loop iteration
total_processed += len(source_data)
# If we processed less than the batch size, we've caught up.
if len(source_data) < BATCH_SIZE:
break
print("\n-------------------------------------------")
print(f"✅ Run complete! Total new rows processed in this run: {total_processed}")
print(f"Cleaned data is available in the table: '{DATABASE}.{CLEANED_TABLE}'")
print("-------------------------------------------")
except Exception as e:
print(f"\nAn error occurred: {e}")
finally:
if 'client' in locals() and client.connection.connected:
client.disconnect()
print("\nDisconnected from ClickHouse.")
if __name__ == "__main__":
main()
Updated 9 days ago