Onik Lab.

Psycopg2 Connection Pool 사용 총 정리

June 21, 2022 | 11 Minute Read • 0 Comments

Python 용 PostgreSQL 모듈인 Psycopg2의 Connection Pool과 관련된 내용을 정리해 보겠습니다.

먼저 관련된 기본 정의나 예제는 좋은 글들이 많아서 해당 부분을 참고할게요.

일단, 이렇게 3개 글을 먼저 한번 쭉 정도를 해 보신 다음에, 이 게시물을 읽는 것을 강력하게 권장하니 참고 바랍니다.


이제 다 읽으셨으면, 본론 들어가겠습니다.

먼저 앞선 게시물을 요약하면 다음과 같습니다.

  • Connection Pool을 사용하면 Python에서 PostgreSQL을 사용하는 데 있어서 Connection 객체를 더욱 효율적으로 사용할 수 있어서 좋다
  • Pool은 AbstractConnectionPool의 추상 클래스, 즉 부모 클래스가 있으며,
  • 실제 구현은 자식 클래스인 SimpleConnectionPool, ThreadedConnectionPool을 선언하는 형태로 이루어진다.
  • SimpleConnectionPool은 단일스레드에서 사용하고, 그렇지 않을 경우 ThreadedConnectionPool을 사용한다.
  • Connection Pool을 사용할 때에는 getconn()을 사용하고, 다 쓰면 putconn()을 사용한다.

일단 이 정도가 되겠습니다.

여기서 추가로 참고해야 할 부분이 있습니다.

  • PersistentConnectionPool은 어디갔나요? → psycopg2.9.3에서 Deprecated 되었습니다.
  • 왜 사용할 때 getconn()이고, 사용을 마칠 때 putconn()인가요?
    • 그것은 ConnectionPool을 생성할 때 minconnection 개수만큼 connection을 미리 만든 다음
    • 실제 사용 시 미리 만든 connection을 가져와야 하므로 getconn()을 사용합니다.
    • 다 썼으면 반납해야죠? 그래서 putconn()을 사용해서 접속 connection을 다시 pool에 넣는겁니다.
    • 실제로 getconn() 함수의 동작은 pop() 함수를 사용하고, putconn() 함수의 동작은 append() 함수를 사용합니다.
    def _getconn(self, key=None):
        # ... 상단 코드 생략

        if self._pool:
            self._used[key] = conn = self._pool.pop()
            self._rused[id(conn)] = key
            return conn

        # ... 하단 코드 생략

    def _putconn(self, conn, key=None, close=False):
        # ... 상단 코드 생략

        if len(self._pool) < self.minconn and not close:
            if not conn.closed:
                status = conn.info.transaction_status
                if status == _ext.TRANSACTION_STATUS_UNKNOWN:
                    # server connection lost
                    conn.close()
                elif status != _ext.TRANSACTION_STATUS_IDLE:
                    # connection in error or in transaction
                    conn.rollback()
                    self._pool.append(conn)
                else:
                    # regular idle connection
                    self._pool.append(conn)

        # ... 하단 코드 생략
  • 그래서 구현은 어떻게 하나요? → 상단 ‘사용하기’ 예제 글에 잘 설명되어 있네요.
    • 그래도 그 글만 보면 추가 설명이 안되므로, 부득이하게 아래 코드와 같이 예제 코드 가져오겠습니다.
from psycopg2 import pool
from contextlib import contextmanager
import atexit

class DBHelper:
    def __init__(self):
        self._connection_pool = None

    def initialize_connection_pool(self):
        self._connection_pool = pool.ThreadedConnectionPool(1, 5, user = "user",
                                              password = "password",
                                              host = "0.0.0.0",
                                              port = "0000",
                                              database = "db")

    @contextmanager
    def get_resource_rdb(self, autocommit=True):
        if self._connection_pool is None:
            self.initialize_connection_pool()

        conn = self._connection_pool.getconn()
        conn.autocommit = autocommit
        cursor = conn.cursor()
        
        try:
            yield cursor, conn
        finally:
            cursor.close()
            self._connection_pool.putconn(conn)

    def shutdown_connection_pool(self):
        if self._connection_pool is not None:
            self._connection_pool.closeall()

db_helper = DBHelper()

@atexit.register
def shutdown_connection_pool():
    db_helper.shutdown_connection_pool()

# 실제 사용 예제
with db_helper.get_resource_rdb() as (cursor, _):
    cursor.execute(query)
    result = cursor.fetchall()

어떻게 보면, 모범 예제라고 볼 수가 있겠지만, 사실 저 코드를 그대로 쓰는 것보다는 상황에 맞게 변형해서 쓸 줄 알아야 합니다. 그러기 위해서는 저 코드에 대한 세부 동작을 명확하게 이해해야 되겠죠?

1. Connection Pool 기본 동작 프로세스

이게 제일 중요합니다. 이 부분을 이해하지 못하면 응용 예제 코드 못 짭니다.

  • pool.ThreadedConnectionPool() 생성자를 통해서 Pool에서 사용할 변수를 선언해야 합니다.
  • 실제 사용은 getconn() 함수를 사용합니다.
  • cursor()를 선언하여, connection 및 cursor 변수를 활용하여 SQL을 수행합니다.
  • 다 썼으면 cursor.close()로 닫아주고, putconn() 함수를 사용하여 사용을 해제합니다.

그런데 위에 쓴 부분은 Connection Pool에 대해서 위의 글들도 읽어보고 기본적으로 이해를 하셨다면, 아마 쉽게 이해할 수 있을 것입니다.

2. ThreadedConnectionPool 예제 코드 이해 및 프로세스

1) Class 선언 및 사용

  • 일단, Connection Pool 자체를 Class 형태로 선언해서 사용한다는 것을 알 수 있습니다.
  • 실제 접속 Pool 생성, 사용, 해제 등의 모든 부분을 하나의 Class Method인 get_resource_rdb() 함수에 몰아서 넣었다는 것을 확인할 수 있습니다.

2) Contextmanager

  • 몰아서 넣어서 사용하기 위한 가장 좋은 방법은 무엇일까요? 바로 contextmanager를 사용하는 것입니다. 그래서 @contextmanager Decorator가 붙은 것입니다. 그리고 저 함수를 실제로 사용할 때에는 위와 같이 with를 사용해서 블록 처리를 하게끔 했고요.
  • 하지만 contextmanager는 사용의 편리성과 명확한 코드 정리를 위해서 사용되는 것이지 필수 사용은 아닙니다. 만약에 contextmanager를 사용하지 않겠다고 할 경우에는 아래와 같이 구현할 수 있습니다. (꼭 아래와 같이 구현하라는 것은 아닙니다 주의 바람)
db = DBHelper()
db.initialize_connection_pool()
conn = db._connection_pool.getconn()
cursor = conn.cursor()

# 실제 사용할 SQL 구문
cursor.execute(query)
result = cursor.fetchall()

cursor.close()
db._connection_pool.putconn(conn)
  • 막상 예제 코드 작성을 해보니까. 썩 그렇게 깔끔하진 않네요. 그래서 contextmanager를 쓰는 것이 보기에도 깔끔하고 관리 차원에서도 역시 좋은 것 같습니다.

3) atexit

  • 그리고 @atexit.register Decorator가 붙은 함수를 확인할 수 있습니다. 이 부분은 쓰는 것이 좋습니다. 왜냐하면 Connection Pool도 모두 사용하고 난 다음에 닫아줘야 하니까, atexit 모듈을 추가해서 프로그램 종료 시 사용할 함수를 지정하고 그 함수에다가 Connection Pool을 닫아주는 것이 좋기 때문입니다.
  • 저는 실제로 저렇게 쓰지 않았고 약간 다르게 썼습니다.
def shutdown_connection_pool(db):
    db.shutdown_connection_pool()

def main():
    # ...
    atexit.register(shutdown_connection_pool, db_helper)
  • 아마도? 상관 없을 것 같습니다. 이의제기하거나 피드백 필요하면 댓글 환영합니다.

3. Multi-Thread Application 개발 적용

  • 어쩌면 제가 이 글을 쓰는 것이 아마도 지금 쓰는 내용이 그 이유가 되지 않을까 개인적으로 생각해 봅니다.
  • 사실 단일 스레드에서는 SimpleConnectionPool을 쓰면 된다고 위에도 썼고, 다른 글에도 언급되어 있습니다. 그러면 멀티 스레드에서는 어떻게 써야 할까요? 그냥 위 예제 참고하면 쉽게 쓸 수 있지 않을까 생각하고, 실제로 그렇습니다.
  • 그러나 만약, 위 예제 코드가 가지는 의미도 완전히 이해하지 못 한 상태에서 그대로 복사 붙여넣기만 한 다음에, Multi-Thread로 생성된 각 Thread 별로 get_resource_rdb() 함수를 사용했다가는 무슨 일이 벌어질까요.
class DBHelper:
    # ... 생략

    @contextmanager
    def get_resource_rdb(self, autocommit=True):
        if self._connection_pool is None:
            self.initialize_connection_pool()

        conn = self._connection_pool.getconn()
        conn.autocommit = autocommit
        cursor = conn.cursor()
        
        try:
            yield cursor, conn
        finally:
            cursor.close()
            self._connection_pool.putconn(conn)
def main():
  global db
  db = DBHelper()
  Thread(target=function1).start()
  Thread(target=function2).start()
  
def function1():
  with db_helper.get_resource_rdb() as (cursor, _):
    cursor.execute("select * from table1")
    result = cursor.fetchall()

def function2():
  with db_helper.get_resource_rdb() as (cursor, _):
    cursor.execute("select * from table2")
    result = cursor.fetchall()

(전 사실 global을 별로 좋아하진 않지만, 예제 코드 쉽게 쓰려고 그냥 저렇게 쓴 것입니다.)

  • 결과는 에러가 발생합니다.
예외가 발생했습니다. PoolError
trying to put unkeyed connection
  File "D:\workspace\3.7\q1.py", line 148, in get_resource_rdb
    self._connection_pool.putconn(conn)
  File "D:\workspace\3.7\q1.py", line 468, in trend_func
    logfile.close()
  • 도대체 왜 이런 일이 생긴 것일까요? 그것은 다중 스레드와 contextmanager가 어떻게 동작하는 지를 이해해야 합니다.

일단, main() 함수에서 여러 개의 Thread를 실행하는 동작을 날리면, 당연히 Thread를 기준으로 독립적으로 동작하게 될 것입니다.

Multithread

당연히 아시겠지만, Thread 동작은 순서를 가지는 것을 의미하는 것은 아닙니다. 그래서 2개의 Thread에서 동시에 with 구문을 사용하여 get_resource_rdb() 함수를 호출하는 상황이 발생했을 때 순차적으로 동작하는 것이 절대로 아닙니다.

실제 문제가 되는 부분은 다름 아닌 이 부분입니다.

    def get_resource_rdb(self, autocommit=True):
        if self._connection_pool is None:
            self.initialize_connection_pool()

Connection Pool을 Initialize를 해 줘야 하는데,

아래와 같이 실행되는 것이 아니라,

  • 먼저 실행된 Thread에서 connection pool이 없음을 확인해서 생성을 해 준 다음
  • 다음 실행된 Thread에서 connection pool이 생성된 것을 확인해서 다음 절차로 넘어감

다음과 같은 형태로 실행되는 것으로 인식합니다.

  • 먼저 실행된 Thread에서 connection pool이 없음을 확인했고
  • 다음 실행된 Thread에서도 역시 connection pool이 없음을 확인하였음

그렇게 되면 프로세스는 어떠한 형태로 넘어갈까요?

  • 1번 스레드에서 Connection Pool이 없는 것을 보고 생성합니다.
  • 1번 스레드에서 getconn() 함수를 수행해서 connection 정보(999번)를 가져옵니다.
  • 1번 스레드의 Query를 실행합니다.
  • 2번 스레드에서 Connection Pool이 없는 것을 보고 다시 생성합니다.
  • 2번 스레드에서 getconn() 함수를 수행해서 connection 정보(998번)를 가져옵니다.
  • 2번 스레드의 Query를 실행합니다.
  • 1번 스레드에서 putconn() 함수를 수행해서 connection 정보를 다시 넣어야 하는데??
  • 어디갔을까요? 없어졌습니다.

왜 없어졌냐.

  • 실제 getconn() 함수의 동작은 999번 connection을 가져오는 대신, 가져온 connection 정보를 connection_pool 변수에 저장합니다.
  • 그리고 putconn() 함수가 동작할 때 999번 connection 사용이 종료된 것을 확인하여, 가져온 connection 정보는 지우고 999번 connection을 다시 pool에 저장하는 개념이죠.
  • 다시 말하면 자전거 대여소에서 999번 자전거를 빌려가고 대여 대장에 999번을 쓴 다음, 대여를 완료해서 999번 자전거를 반납하면 대여소에 자전거가 다시 보관되고 대여 대장의 999번은 밑줄 긋고 지워지겠죠.

한마디로 Connection Pool의 connection 정보는 대여 시스템으로 보셔야 합니다.

2번 스레드에서 Connection Pool을 다시 생성했다는 것은, 기존 자전거 대여소가 폐업하고 새로 자전거 대여소를 세웠다는 뜻으로 보셔야 할 것입니다.

그래서 먼저 빌렸던 999번 자전거를 반납해야 하는데, 새로 세운 대여소에서 ‘우리 그런 자전거 없는데요?’ 하고 안받아준다는 것이고 그래서 생긴 에러라는 것입니다.

그럼 도대체 어떻게 해결하나요? 아래와 같이 해야겠죠.

class DBHelper:
    def __init__(self):
        self.initialize_connection_pool()

    @contextmanager
    def get_resource_rdb(self, autocommit=True):

        conn = self._connection_pool.getconn()
        conn.autocommit = autocommit
        cursor = conn.cursor()
        
        try:
            yield cursor, conn
        finally:
            cursor.close()
            self._connection_pool.putconn(conn)

def main():
  global db
  db = DBHelper()
  Thread(target=function1).start()
  Thread(target=function2).start()
  

Class 변수 생성할 때 아예 Connection Pool도 같이 선언해서 1개로 확실하게 만들어주고, 실제 사용할 때에는 기존 생성된 Pool로부터 getconn() / putconn()만 처리해주는 것입니다.

  • 기존 동작이 생성, 사용, 반납을 get_resource_rdb() 함수에서 처리했다면,
  • 변경 동작은 사용, 반납을 get_resource_rdb() 함수에서 처리하고, 생성은 사전에 먼저 합니다.

그렇게 하면 아무런 문제 없이 올바르게 동작하는 것을 확인할 수 있습니다.

이제 Python의 PostgreSQL Connection Pool에 대한 더욱 고차원적인(?) 이해를 할 수 있을 것으로 기대해 보겠습니다.

이상 글 마치겠습니다.