Robust SQL calls from Python

SQL
Python
reliability
Author

Dekel Wainer

Published

May 29, 2025

Making Python code interact with an SQL database is straightforward. Imagine you’re using a cloud SQL and want to insert data into 2 tables. You might generate this code:

import os
import pyodbc

conn_str = os.getenv("sql_connection_string")  # secret connection string

with pyodbc.connect(conn_str) as conn:
    with conn.cursor() as cursor:

        insert_query = """
            INSERT INTO some_table (field1, field2, field3)  -- fieldN are the actual fields in the database
            VALUES (?, ?, ?)
        """
        cursor.execute(insert_query, (field1, field2, field3))  # fieldN here are the variables in Python
        conn.commit()

        insert_query = """
            INSERT INTO another_table (field4, field5)
            VALUES (?, ?)
        """
        cursor.execute(insert_query, (field4, field5)) 
        conn.commit()

A vibe-coding developer might stop here, assured by the LLM that this is good implementation because: - The connection string isn’t being exposed. - Placeholders (the ‘?’) are being used, which protects against SQL injection. - A context manager is used (the ‘with’ statements), which automatically closes the connection to the SQL database.

But it’s not good implementation, because there’s no error handling. This means any communication or schema error will result in unexpected behavior in the program. Worse still, if an error occurs after the first query is committed but the second isn’t, your database only contains partial data.

So a better approach is to add explicit error handling, and commit data after all queries have succeeded. Nothing gets committed unless everything runs smoothly.

try:
    with pyodbc.connect(conn_str) as conn:
        with conn.cursor() as cursor:
            insert_query_1 = """
                INSERT INTO some_table (field1, field2, field3)
                VALUES (?, ?, ?)
            """
            cursor.execute(insert_query_1, (field1, field2, field3))

            insert_query_2 = """
                INSERT INTO another_table (field4, field5)
                VALUES (?, ?)
            """
            cursor.execute(insert_query_2, (field4, field5))

        # commit once
        conn.commit()

except Exception as e:
    print(f"Database operation failed: {e}")
    conn.rollback()

But what if you don’t have all the information needed to commit concurrently in a single script? For instance, if your program collects data across dozens of scripts - there is no opportunity to commit everything at once.

Instead, you can call a stable class into each relevant script, and append the query you want to commit. Then, at some later point in the program, you loop over that list and write the queries to the database.

# define this class in a helper module
class QueryCollector:
    def __init__(self):
        self.queries = []

    def add(self, query, params):
        self.queries.append((query, params))

    def execute_all(self, conn_str):
        try:
            with pyodbc.connect(conn_str) as conn:
                with conn.cursor() as cursor:
                    for query, params in self.queries:
                        cursor.execute(query, params)
                conn.commit()
        except Exception as e:
            print(f"Error during batched SQL execution: {e}")
            conn.rollback()


# call the class in any script that needs it
collector = QueryCollector()

collector.add(
    "INSERT INTO some_table (field1, field2, field3) VALUES (?, ?, ?)",
    (field1, field2, field3)
)


# commit everything at a later point in the program, once you collected all your queries
collector.execute_all(os.getenv("sql_connection_string"))

Now you have a centralized solution which is easy to expand. For example, you can add complex error handling, query-level logging, retries, or group commits to reduce I/O.”