ФорумПрограммированиеPython → python, postgres и многопоточность

python, postgres и многопоточность

  • bRUtality

    Сообщения: 23 Репутация: N Группа: Кто попало

    Spritz Июнь 21, 2012, 5:23 п.п.

    Доброго времени суток!

    Помогите, пожалуйста, решить следующую проблему.
    Нужно записывать данные в PostgresQL одновременно в нескольких потоках, используя одно и то же соединение (т.к. потоков очень много, и если каждый будет открывать свое соединение, получится существенное замедление работы).
    Вот такой скрипт набросал:

    global conn

    ips = []
    ips = GetIp() #код это функции приводить не буду, она просто достает данные из БД

    for ip in ips:
    dbt = DBThread(ip)
    dbt.start()
    while threading.activeCount() > 1: time.sleep(1)

    conn.close()

    class DBThread(threading.Thread):
    def __init__(self, ip):
    self.ip = ip
    threading.Thread.__init__(self)
    self.cur = conn.cursor()
    self.cpy = StringIO()
    def run(self):
    #здесь идет блок кода, формирующий строку для всавки в БД методом COPY,
    #условно обозначу его
    str_for_copy = get_str()

    self.cpy.write(str_for_copy)
    self.cpy.seek(0)
    self.cur.copy_from(self.cpy, 'my_table', columns = ('column1', 'column2'))
    conn.commit()


    Если потоков немного, порядка 5, то все отрабатывает на ура. При сотне-другой после нескольких успешных записей валится ошибка:

    Exception in thread Thread-19:
    Traceback (most recent call last):
    File "/usr/local/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
    File "/path/script.py", line 91, in run
    cur.copy_from(cpy, 'my_table', columns = ('column1', 'column2'))
    DatabaseError: no COPY in progress


    Exception in thread Thread-218:
    Traceback (most recent call last):
    File "/usr/local/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
    File "/path/script.py", line 57, in run
    rows = cur.fetchall() #это первая строка в методе, использующая cur
    InterfaceError: cursor already closed

    Ощущение, как будто потоки не успевают освобождать какой-то общий ресурс.
    То ли фича питона такая, то ли я где недопонимаю.
  • phpdude

    Сообщения: 26646 Репутация: N Группа: в ухо

    Spritz Июнь 21, 2012, 5:31 п.п., спустя 8 минут 31 секунду

    наверное хочет какую то блокировку на какой то ресурс, перекрытие происходит походу. так ты неверно делаешь просто.

    сделай pooling queue и отпадет проблема.
    Сапожник без сапог
  • bRUtality

    Сообщения: 23 Репутация: N Группа: Кто попало

    Spritz Июнь 21, 2012, 5:36 п.п., спустя 5 минут 11 секунд


    наверное хочет какую то блокировку на какой то ресурс, перекрытие происходит походу. так ты неверно делаешь просто.

    сделай pooling queue и отпадет проблема.

    Вот мне и непонятно, почему блокировка происходит. Ресурс cur приватный, значит для каждого потока должен свой создаваться.
    Пулинг попробую, спасибо за совет.
  • phpdude

    Сообщения: 26646 Репутация: N Группа: в ухо

    Spritz Июнь 21, 2012, 5:39 п.п., спустя 2 минуты 50 секунд

    Ресурс cur приватный, значит для каждого потока должен свой создаваться.

    подозреваю что приватна только ссылка на ресурс.

    по факту ты же приватной ссылке присваиваешь указатель по соединение.

    self.cur = conn.cursor()

    соединение то одно :-)

    ну и получается что у тебя кросс аккесс к соединению.

    я в питоне еще слаб, если можно синхронизацию потоков по курсору сделать - надо сделать и проблема спадет я подозреваю. Это я говорю словами от Явы. Как в питоне локают треды я пока не в курсе, задач не было :)
    Сапожник без сапог
  • bRUtality

    Сообщения: 23 Репутация: N Группа: Кто попало

    Spritz Июнь 21, 2012, 5:46 п.п., спустя 6 минут 28 секунд

    Я питон только начал изучать, могу что-то не понимать пока. Идея была такова: соединение одно, а курсоров - каждому потоку свой. Может, гуру подскажут, как это реализовать?
  • phpdude

    Сообщения: 26646 Репутация: N Группа: в ухо

    Spritz Июнь 21, 2012, 5:48 п.п., спустя 2 минуты 52 секунды

    Идея была такова: соединение одно, а курсоров - каждому потоку свой. Может, гуру подскажут, как это реализовать?

    курсор это всего лишь указатель соединения)) могу быть не прав, но почти уверен в своей правоте. тут либо пулинг либо блокировки
    Сапожник без сапог
  • Nyaah

    Сообщения: 574 Репутация: N Группа: Джедаи

    Spritz Июнь 21, 2012, 5:58 п.п., спустя 9 минут 44 секунды

    Хотел написать очень много матов, но лень. Если коротко, то только по одному коннекту: все потоки будут ждать, пока завершится предыдущий запрос (в случае синхронного соединения), в libpq (ц библиотека для работы с пг) даже для асинхронных запросов в одном коннекте нельзя параллельно пихать запросы: PQsendQuery cannot be called again (on the same connection) until PQgetResult has returned a null pointer, indicating that the command is done.. Питоновский пгконнектор скорее всего обёртка над libpq
    Work, buy, consume, die
  • phpdude

    Сообщения: 26646 Репутация: N Группа: в ухо

    Spritz Июнь 21, 2012, 6 п.п., спустя 2 минуты

    Nyaah, ну вот в этом то и проблема, на эту блокировку и надо процессы блокировать
    Сапожник без сапог
  • Nyaah

    Сообщения: 574 Репутация: N Группа: Джедаи

    Spritz Июнь 21, 2012, 6:01 п.п., спустя 1 минуту 21 секунду

    По поводу параллельных копи и о том, что в питоне многопоточность эмулируется помощью libevent даже говорить не охота. Тут сама задача бессысленна, не то что реализация. Как задача про самолет, под которым движущееся полотно =)
    Work, buy, consume, die
  • phpdude

    Сообщения: 26646 Репутация: N Группа: в ухо

    Spritz Июнь 21, 2012, 6:04 п.п., спустя 2 минуты 18 секунд

    Как задача про самолет, под которым движущееся полотно =)

    если реактивный то взлетит)
    Сапожник без сапог
  • adw0rd

    Сообщения: 22959 Репутация: N Группа: в ухо

    Spritz Июнь 21, 2012, 6:04 п.п., спустя 6 секунд

    http://asvetlov.blogspot.com/2010/11/1.html
    http://asvetlov.blogspot.com/2010/11/2.html
    https://smappi.org/ - платформа по созданию API на все случаи жизни
  • Nyaah

    Сообщения: 574 Репутация: N Группа: Джедаи

    Spritz Июнь 21, 2012, 6:04 п.п., спустя 7 секунд


    Nyaah, ну вот в этом то и проблема, на эту блокировку и надо процессы блокировать
    Блеать, нахер по одной то строке вставлять? =) На синхронизацию и ожидание уйдёт больше времени чем на вставку данных. Понимаю, если вставлять по 1000 строк из 4 потоков, ещё куда ни шло, но чтоб на каждую строку свой поток заводить…
    Work, buy, consume, die
  • adw0rd

    Сообщения: 22959 Репутация: N Группа: в ухо

    Spritz Июнь 21, 2012, 6:06 п.п., спустя 1 минуту 32 секунды

    http://asvetlov.blogspot.com/2011/07/gil.html
    http://asvetlov.blogspot.com/2011/07/signal.html
    https://smappi.org/ - платформа по созданию API на все случаи жизни
  • phpdude

    Сообщения: 26646 Репутация: N Группа: в ухо

    Spritz Июнь 21, 2012, 6:25 п.п., спустя 19 минут 17 секунд



    Nyaah, ну вот в этом то и проблема, на эту блокировку и надо процессы блокировать
    Блеать, нахер по одной то строке вставлять? =) На синхронизацию и ожидание уйдёт больше времени чем на вставку данных. Понимаю, если вставлять по 1000 строк из 4 потоков, ещё куда ни шло, но чтоб на каждую строку свой поток заводить…
    объяснение моей логики простое - чтобы не переписывать уже работающий код :-)

    оптимизация ради оптимизации это хорошо, но иногда лениво :)
    Спустя 8 сек.
    да и пусть сам решает)
    Сапожник без сапог
  • bRUtality

    Сообщения: 23 Репутация: N Группа: Кто попало

    Spritz Июнь 21, 2012, 9:29 п.п., спустя 3 часа 4 минуты 11 секунд

    Всем спасибо за поиск решения моей проблемы!

    Сейчас все объясню. Это реально работающий проект, у которого нагрузка все время возрастает. Суть его очень простая: по snmp идет опрос железок и заносится в базу, а юзеры в онлайн-режиме смотрят свежесобранную статистику. Так вот, сейчас имеется около 600 ip, на каждом около 1000 железок. Если опрашивать последовательно, то статистика запаздывает на 2 часа. Поэтому было принято решение как-то распараллелить сбор данных. Других признаков, кроме как по ip, нету. Я попробовал делать это не через потоки, а отдельным процессом. Результат несложно угадать - т.к. каждый процесс имеет свой коннект, то такое число коннектов нагибает сервер БД. Выше прозвучал вопрос, зачем запускать COPY ради одной строки? Так вот, строк около 1000. А операция COPY, особенно когда пачкой, куда дешевле чем INSERT. Поэтому у меня сейчас нет другой концепции, кроме как запустить много потоков с общим коннектом. Суть проблемы я изложил, если можно предложить что-то более эффективное, то буду благодарен. Или будем делать решать проблему, чтобы потоки не конфликтовали за коннект (как я понял, было предложено два способа: лочить ресурс или использовать пул потоков).

Пожалуйста, авторизуйтесь, чтобы написать комментарий!