понедельник, 1 июля 2013 г.

Python: примеры и тесты, часть 4 - потоки

Опции командной строки.

Это немаловажная деталь для построения консольных приложений в стиле UNIX: обработка опций в командной строке. Для этого предоставляется модуль getopt, который позволяет обрабатывать опции на манер того, как делает одноимённый вызов C. Обрабатываются как короткие опции (односимвольные с одним предшествующим '-'), так и длинные (многосимвольные с двумя предшествующими '--'). Можно задавать опции как с требуемым значением, так и без. В общем: все удовольствия.

Примеры использования будут показаны ниже , при запуске многопоточных приложений.

Потоки и синхронизация.

В описаниях Python везде упоминаются модуль реализации потоков низкого уровня - thread, и модуль реализации потоков высокого уровня - threading, надстроенный над модулем thread. Почти наверняка, доступно ещё много альтернативных реализаций от сторонних разработчиков, что обуславливается лёгкостью разработки тиражирования программного кода Python. Но все они, в конечном итоге, являются обёртками к pthread_t из POSIX API.

Реализация низкого уровня - модуль thread. Модуль крайне скудно описан в документации и литературе, так что пример кода использования я нигде не встретил, и его пришлось писать "с нуля". В модуле thread очень бедные возможности синхронизации - один единственный примитив (LockType = class lock), нечто на манер самого простейшего бинарного мютекса, ограничивающего критическую секцию кода. Для того, чтобы дождаться окончания дочерних потоков (реализовать "барьер", один из наиболее частых случаев) нужно строить искусственную конструкцию, по типу счётного семафора (файл tlspeed.py):

#!/usr/bin/python -O
# -*- coding: utf-8 -*-
from rdtsc import rdtsc
from calibr import calibr, delay_in_cycle
import getopt
import sys
import thread
import time
import string
 
debuglevel = 0
threadnum = 2                     # заказанное число порождаемых потоков
delay = 1
active = 0
numt = 0                          # текущее число активных дочерних потоков
lock = thread.allocate_lock()     # блокировка доступа к числу активных дочерних потоков
wait = thread.allocate_lock()     # блокировка ожидания завершения всех дочерних потоков
barier = { 'numt' : numt, 'lock' : lock, 'wait' : wait }
 
def thrfun( delay, num, tstart ): # функция потока
    st = rdtsc() - tstart
    barier[ 'numt' ] = barier[ 'numt' ] + 1
    barier[ 'lock' ].release()
    ss = "\t%i : %i <= старт: %u" % ( num, id, st )
    if not active : time.sleep( delay )
    else : delay_in_cycle( delay )
    barier[ 'lock' ].acquire()
    barier[ 'numt' ] = barier[ 'numt' ] - 1
    st = rdtsc() - tstart
    print "%s - финиш: %u" % ( ss, st )
    if 0 == barier[ 'numt' ] :
        barier[ 'wait' ].release()
    barier[ 'lock' ].release()
    return
 
opts, args = getopt.getopt( sys.argv[1:], "vt:d:a" )
for opt, arg in opts:             # опции (ключи) командной строки (-v, -t, -d, -a)
    if 0 == cmp( opt[ 1: ], 'v' ): debuglevel = debuglevel + 1
    if 0 == cmp( opt[ 1: ], 't' ): threadnum = string.atoi( arg )
    if 0 == cmp( opt[ 1: ], 'd' ): delay = string.atoi( arg )
    if 0 == cmp( opt[ 1: ], 'a' ): active = 1
if debuglevel > 0 :
    print opts
    print args
    print debuglevel
    print threadnum
 
barier[ 'wait' ].acquire()        # захват блокировки завершения
for n in range( threadnum ):      # запуск threadnum потоков
    barier[ 'lock' ].acquire()
    id = thread.start_new_thread( thrfun, ( delay, n, rdtsc() ) )
    print "\t%i : %i =>" % ( n, id )
barier[ 'wait' ].acquire()        # ожидание завершения всех потоков
print "завершены все %i потоков, \

       завершается ожидавший главный поток" % threadnum

Вот как выглядит исполнение этого примера (3 дочерних потока, каждый из которых выполняется по 3 секунды):

$ ./tlspeed.py -t3 -d3
0 : -1219589312 =>
1 : -1229980864 =>
2 : -1240466624 =>
0 : -1219589312 <= старт: 316450 - финиш: 4993234920
2 : -1240466624 <= старт: 235850 - финиш: 4992812200
1 : -1229980864 <= старт: 162580 - финиш: 4993124550
завершены все 3 потоков, завершается ожидавший главный поток


Здесь довольно странные (для меня) отрицательный значения в качестве идентификаторов отдельных потоков, хотя в документации сказано, что это просто числовое значение ... и, наверное, такие значение тоже имеют право быть.

Альтернативный вариант, это то, что декларируется как реализация высокого уровня - модуль threading. Здесь всё гораздо проще (но в кое-чём и сложнее ... например в передаче параметров в потоковую функцию уже после создания объекта класса Thread, непосредственно перед запуском на выполнение). Но в этом модуле уже представлено достаточно много самых разных примитивов синхронизации:  Lock, RLock, Condition, Semaphore, Event, Queue - это сильно облегчает описание взаимодействий потоков. Пример, эквивалентный предыдущему, может выглядеть как-то так:

#!/usr/bin/python -O
# -*- coding: utf-8 -*-
from rdtsc import rdtsc
from calibr import calibr, delay_in_cycle
import getopt
import sys
import threading
import time
import string
 
debuglevel = 0
threadnum = 2
delay = 1
active = 0
 
def thrfun( *args ):     
   # функция потока
    st = rdtsc() - args[ 2 ] # время старта потока
    ss = "\t%i : %s <= старт: %u" % \
         ( args[ 1 ], threading.currentThread().getName(), st )
    if not active : time.sleep( args[ 0 ] )
    else : delay_in_cycle( args[ 0 ] )
    st = rdtsc() - args[ 2 ] # время завершения потока
    print "%s - финиш: %u" % ( ss, st )
    return
 
opts, args = getopt.getopt( sys.argv[1:], "vt:d:a" )
for opt, arg in opts:        # опции (ключи) командной строки (-v, -t, -d, -a)
    if 0 == cmp( opt[ 1: ], 'v' ): debuglevel = debuglevel + 1
    if 0 == cmp( opt[ 1: ], 't' ): threadnum = string.atoi( arg )
    if 0 == cmp( opt[ 1: ], 'd' ): delay = string.atoi( arg )
    if 0 == cmp( opt[ 1: ], 'a' ): active = 1
if debuglevel > 0 :
    print opts
    print args
    print debuglevel
    print threadnum
 
threads = []
for n in range( threadnum ): # создание и запуск потоков
    parm = [ delay, n, 0 ]
    t = threading.Thread( target=thrfun, args=parm )
    threads.append( t )
    t.setDaemon( 1 )
    print "\t%i : %s =>" % ( n, t.getName() )
    parm[ 2 ] = rdtsc()
    t.start()
for n in range( threadnum ): # ожидание завершения всех потоков
    threads[ n ].join()
print "завершены все %i потоков, \

       завершается ожидавший главный поток" % threadnum

Вот как выглядит выполнение такого варианта (4 потока, выполняющиеся по 2 секунды), теперь вместо численных идентификаторов потока для их различения возникают имена потоков:

$ ./thspeed.py -t4 -d2
0 : Thread-1 =>
1 : Thread-2 =>
2 : Thread-3 =>
3 : Thread-4 =>
0 : Thread-1 <= старт: 558390 - финиш: 3329127080
2 : Thread-3 <= старт: 247300 - финиш: 3328592890
3 : Thread-4 <= старт: 211740 - финиш: 3328294390
1 : Thread-2 <= старт: 312410 - финиш: 3328573940
завершены все 4 потоков, завершается ожидавший главный поток

Обратите внимание на порядок старта потоков (временные метки) и на порядок их завершения. И в примере с низкоуровневым модулем thread, и здесь, в примере с высокоуровневым модулем threading, код допускает в качестве паузы выполнения потока использовать не пассивное ожидание (time.sleep()), а активное ожидание выполнением пустых циклов. При этом картина достаточно радикально меняется, что всегда возникает в параллелизмах (но это я оставлю для анализа любознательному уму заинтересованных читателей):

$ ./thspeed.py -t4 -d2 -a
0 : Thread-1 =>
1 : Thread-2 =>
2 : Thread-3 =>
3 : Thread-4 =>
1 : Thread-2 <= старт: 12039350 - финиш: 3339395210
0 : Thread-1 <= старт: 683100 - финиш: 3340604010
2 : Thread-3 <= старт: 21162120 - финиш: 3346219540
3 : Thread-4 <= старт: 41049890 - финиш: 3366116170
завершены все 4 потоков, завершается ожидавший главный поток


Функция активной задержки (delay_in_cycle()) реализована в собственном модуле calibr.py (он упоминался раньше) как:

def delay_in_cycle( delay = 1 ):
    t = time.time()
    while time.time() - t < delay :
        time.localtime()


Реализация потоков в Python вызывает ряд вопросов:

  • Является ли сама исполняющая система Python (виртуальная машина, исполняющая байт-код) многопоточной? То есть, распараллеливается ли многопоточное Python-приложение по числу доступных на оборудовании процессоров (ядер)? Мои наблюдения в системном мониторе показывают, что, похоже, это так и есть. 
  • Переключение между потоками Python-приложение должно происходить не по системному таймеру (как это происходит с потоками POSIX), а по полному завершению выполнения очередного байт-кода в потоке (об этом сказано и не в одном месте в документации). Более того, частота, с которой интерпретатор проверяет и переключает потоки, может быть переустановлена вызовом sys.setcheckinterval(). По умолчанию (утверждается) интерпретатор проверяет переключение потоков каждые 10 команд байт-кода (чем запросить эту величину из интерпретатора я не нашёл). Имеет ли эта модель какие-то последствия?
   
Отправить комментарий