Ruby - многопоточность

Традиционные программы имеют один поток выполнения: операторы или инструкции, составляющие программу, выполняются последовательно, пока программа не завершится.

Многопоточная программа имеет более одного потока выполнения. Внутри каждого потока операторы выполняются последовательно, но сами потоки могут выполняться параллельно, например, на многоядерном ЦП. Часто на машине с одним ЦП несколько потоков фактически не выполняются параллельно, но параллелизм моделируется путем чередования выполнения потоков.

Ruby позволяет легко писать многопоточные программы с помощью класса Thread . Потоки Ruby - это легкий и эффективный способ добиться параллелизма в вашем коде.

Создание потоков Ruby

Чтобы начать новый поток, просто свяжите блок с вызовом Thread.new . Будет создан новый поток для выполнения кода в блоке, и исходный поток немедленно вернется из Thread.new и возобновит выполнение со следующим оператором -

# Thread #1 is running here
Thread.new {
   # Thread #2 runs this code
}
# Thread #1 runs this code

пример

Вот пример, показывающий, как мы можем использовать многопоточную программу Ruby.

#!/usr/bin/ruby

def func1
   i = 0
   while i<=2
      puts "func1 at: #{Time.now}"
      sleep(2)
      i = i+1
   end
end

def func2
   j = 0
   while j<=2
      puts "func2 at: #{Time.now}"
      sleep(1)
      j = j+1
   end
end

puts "Started At #{Time.now}"
t1 = Thread.new{func1()}
t2 = Thread.new{func2()}
t1.join
t2.join
puts "End at #{Time.now}"

Это даст следующий результат -

Started At Wed May 14 08:21:54 -0700 2008
func1 at: Wed May 14 08:21:54 -0700 2008
func2 at: Wed May 14 08:21:54 -0700 2008
func2 at: Wed May 14 08:21:55 -0700 2008
func1 at: Wed May 14 08:21:56 -0700 2008
func2 at: Wed May 14 08:21:56 -0700 2008
func1 at: Wed May 14 08:21:58 -0700 2008
End at Wed May 14 08:22:00 -0700 2008

Жизненный цикл потока

Новые потоки создаются с помощью Thread.new . Вы также можете использовать синонимы Thread.start и Thread.fork .

Нет необходимости запускать поток после его создания, он начинает работать автоматически, когда ресурсы ЦП становятся доступными.

Класс Thread определяет ряд методов для запроса и управления потоком во время его работы. Поток запускает код в блоке, связанном с вызовом Thread.new, а затем останавливается.

Значение последнего выражения в этом блоке является значением потока и может быть получено путем вызова метода value объекта Thread. Если поток завершился, значение сразу же возвращает значение потока. В противном случае метод значения блокируется и не возвращается, пока поток не завершится.

Метод класса Thread.current возвращает объект Thread, представляющий текущий поток. Это позволяет потокам манипулировать собой. Метод класса Thread.main возвращает объект Thread, представляющий основной поток. Это начальный поток выполнения, который начался при запуске программы Ruby.

Вы можете дождаться завершения определенного потока, вызвав метод этого потока Thread.join . Вызывающий поток будет заблокирован, пока данный поток не завершится.

Потоки и исключения

Если исключение возникает в основном потоке и нигде не обрабатывается, интерпретатор Ruby выводит сообщение и завершает работу. В потоках, кроме основного потока, необработанные исключения вызывают остановку выполнения потока.

Если нить t завершается из-за необработанного исключения, а другой поток sвызывает t.join или t.value, затем исключение, возникшее вt поднят в потоке s.

Если Thread.abort_on_exception имеет значение false , условие по умолчанию, необработанное исключение просто убивает текущий поток, а все остальные продолжают выполняться.

Если вы хотите, чтобы какое-либо необработанное исключение в каком-либо потоке приводило к завершению работы интерпретатора, установите для метода класса Thread.abort_on_exception значение true .

t = Thread.new { ... }
t.abort_on_exception = true

Переменные потока

Поток обычно может получить доступ к любым переменным, которые находятся в области видимости при создании потока. Переменные, локальные для блока потока, являются локальными для потока и не являются общими.

Класс Thread имеет специальное средство, которое позволяет создавать локальные переменные потока и обращаться к ним по имени. Вы просто обрабатываете объект потока, как если бы это был хэш, записывая элементы с помощью [] = и считывая их обратно с помощью [].

В этом примере каждый поток записывает текущее значение переменной count в переменную threadlocal с ключом mycount .

#!/usr/bin/ruby

count = 0
arr = []

10.times do |i|
   arr[i] = Thread.new {
      sleep(rand(0)/10.0)
      Thread.current["mycount"] = count
      count += 1
   }
end

arr.each {|t| t.join; print t["mycount"], ", " }
puts "count = #{count}"

Это дает следующий результат -

8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10

Основной поток ожидает завершения подпотоков и затем распечатывает значение счетчика, захваченное каждым из них.

Приоритеты потоков

Первым фактором, влияющим на планирование потоков, является приоритет потока: потоки с высоким приоритетом планируются раньше потоков с низким приоритетом. Точнее, поток получит процессорное время только в том случае, если нет потоков с более высоким приоритетом, ожидающих запуска.

Вы можете установить и запросить приоритет объекта Ruby Thread с приоритетом = и приоритетом . Вновь созданный поток запускается с тем же приоритетом, что и поток, который его создал. Основной поток запускается с приоритетом 0.

Невозможно установить приоритет потока перед его запуском. Однако поток может повышать или понижать свой собственный приоритет в качестве первого действия, которое он выполняет.

Исключение темы

Если два потока имеют общий доступ к одним и тем же данным, и хотя бы один из потоков изменяет эти данные, вы должны позаботиться о том, чтобы ни один поток никогда не мог увидеть данные в несогласованном состоянии. Это называется исключением потока .

Mutex- это класс, реализующий простую блокировку семафоров для взаимоисключающего доступа к некоторому общему ресурсу. То есть только один поток может удерживать блокировку в данный момент времени. Другие потоки могут выбрать ожидание в очереди, пока блокировка станет доступной, или могут просто выбрать немедленное получение ошибки, указывающей, что блокировка недоступна.

Помещая все обращения к общим данным под контроль мьютекса , мы обеспечиваем согласованность и атомарную работу. Давайте попробуем примеры, первый без mutax и второй с mutax -

Пример без Mutax

#!/usr/bin/ruby
require 'thread'

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      count1 += 1
      count2 += 1
   end
end
spy = Thread.new do
   loop do
      difference += (count1 - count2).abs
   end
end
sleep 1
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

Это даст следующий результат -

count1 :  1583766
count2 :  1583766
difference : 0
#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      mutex.synchronize do
         count1 += 1
         count2 += 1
      end
   end
end
spy = Thread.new do
   loop do
      mutex.synchronize do
         difference += (count1 - count2).abs
      end
   end
end
sleep 1
mutex.lock
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

Это даст следующий результат -

count1 :  696591
count2 :  696591
difference : 0

Обработка тупика

Когда мы начинаем использовать объекты Mutex для исключения потоков, мы должны быть осторожны, чтобы избежать взаимоблокировки . Тупик - это состояние, которое возникает, когда все потоки ожидают получения ресурса, удерживаемого другим потоком. Поскольку все потоки заблокированы, они не могут снять удерживаемые блокировки. И поскольку они не могут снять блокировки, никакой другой поток не может получить эти блокировки.

Здесь на помощь приходят условные переменные . Переменная условия просто семафор , который связан с ресурсом и используется в защите определенного мьютекса . Когда вам нужен недоступный ресурс, вы ждете переменной условия. Это действие снимает блокировку с соответствующего мьютекса . Когда какой-либо другой поток сигнализирует о доступности ресурса, исходный поток прекращает ожидание и одновременно восстанавливает блокировку критической области.

пример

#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

cv = ConditionVariable.new
a = Thread.new {
   mutex.synchronize {
      puts "A: I have critical section, but will wait for cv"
      cv.wait(mutex)
      puts "A: I have critical section again! I rule!"
   }
}

puts "(Later, back at the ranch...)"

b = Thread.new {
   mutex.synchronize {
      puts "B: Now I am critical, but am done with cv"
      cv.signal
      puts "B: I am still critical, finishing up"
   }
}
a.join
b.join

Это даст следующий результат -

A: I have critical section, but will wait for cv
(Later, back at the ranch...)
B: Now I am critical, but am done with cv
B: I am still critical, finishing up
A: I have critical section again! I rule!

Состояния потоков

Существует пять возможных возвращаемых значений, соответствующих пяти возможным состояниям, как показано в следующей таблице. Метод статуса возвращает состояние потока.

Состояние потока Возвращаемое значение
Работоспособен бежать
Спать Спать
Прерывание прерывание
Прекращено нормально ложный
Прекращено за исключением ноль

Методы класса потока

Следующие методы предоставляются классом Thread и применимы ко всем потокам, доступным в программе. Эти методы будут вызываться с использованием имени класса Thread следующим образом:

Thread.abort_on_exception = true
Sr. No. Методы и описание
1

Thread.abort_on_exception

Возвращает состояние глобального прерывания при возникновении исключения . По умолчанию - false . Если установлено значение true , все потоки будут прерваны (процесс завершится (0)), если в каком-либо потоке возникнет исключение.

2

Thread.abort_on_exception=

Если установлено значение true , все потоки будут прерваны, если возникнет исключение. Возвращает новое состояние.

3

Thread.critical

Возвращает состояние критического состояния глобального потока .

4

Thread.critical=

Устанавливает состояние критического состояния глобального потока и возвращает его. Если установлено значение true , запрещает планирование любого существующего потока. Не блокирует создание и запуск новых потоков. Некоторые операции потока (такие как остановка или завершение потока, засыпание в текущем потоке и создание исключения) могут привести к тому, что поток будет запланирован даже в критическом разделе.

5

Thread.current

Возвращает текущий выполняющийся поток.

6

Thread.exit

Завершает текущий запущенный поток и планирует запуск другого потока. Если этот поток уже отмечен для уничтожения, exit возвращает Thread. Если это основной поток или последний поток, выйдите из процесса.

7

Thread.fork { block }

Синоним Thread.new.

8

Thread.kill( aThread )

Заставляет данный поток выйти

9

Thread.list

Возвращает массив объектов Thread для всех потоков, которые либо выполняются, либо остановлены. Нить.

10

Thread.main

Возвращает основной поток процесса.

11

Thread.new( [ arg ]* ) {| args | block }

Создает новый поток для выполнения инструкций, заданных в блоке, и начинает его выполнение. Любые аргументы, переданные в Thread.new , передаются в блок.

12

Thread.pass

Вызывает планировщик потоков для передачи выполнения другому потоку.

13

Thread.start( [ args ]* ) {| args | block }

В основном то же, что и Thread.new . Однако, если класс Thread является подклассом, то вызов start в этом подклассе не вызовет метод инициализации подкласса .

14

Thread.stop

Останавливает выполнение текущего потока, переводя его в состояние сна , и планирует выполнение другого потока. Сбрасывает критическое состояние на ложное.

Методы экземпляра потока

Эти методы применимы к экземпляру потока. Эти методы будут вызываться как использующие экземпляр потока следующим образом:

#!/usr/bin/ruby

thr = Thread.new do   # Calling a class method new
   puts "In second thread"
   raise "Raise exception"
end
thr.join   # Calling an instance method join
Sr. No. Методы и описание
1

thr[ aSymbol ]

Ссылка на атрибут - возвращает значение локальной переменной потока с использованием символа или имени aSymbol . Если указанная переменная не существует, возвращает ноль .

2

thr[ aSymbol ] =

Назначение атрибута - устанавливает или создает значение локальной переменной потока, используя символ или строку.

3

thr.abort_on_exception

Возвращает статус прерывания при условии исключения для thr . По умолчанию - false .

4

thr.abort_on_exception=

Если установлено значение true , все потоки (включая основную программу) прерываются, если в thr возникает исключение . Процесс фактически завершится (0) .

5

thr.alive?

Возвращает истину, если th работает или спит.

6

thr.exit

Завершает Чет и графики другой поток для запуска. Если этот поток уже отмечен для уничтожения, exit возвращает Thread . Если это основной поток или последний поток, завершает процесс.

7

thr.join

Вызывающий поток приостановит выполнение и запустит th . Не возвращается до Чет выходов. Любые неподключенные потоки будут уничтожены при выходе из основной программы.

8

thr.key?

Возвращает истину, если данная строка (или символ) существует как локальная переменная потока.

9

thr.kill

Синоним Thread.exit .

10

thr.priority

Возвращает приоритет thr . По умолчанию - ноль; потоки с более высоким приоритетом будут выполняться раньше потоков с более низким приоритетом.

11

thr.priority=

Устанавливает приоритет th в целое число. Потоки с более высоким приоритетом будут выполняться перед потоками с более низким приоритетом.

12

thr.raise( anException )

Вызывает исключение из троллейбуса . Звонящий не должен быть заброшен .

13

thr.run

Просыпается Чет , что делает его пригодным для планирования. Если не в критическом разделе, то вызывает планировщик.

14

thr.safe_level

Возвращает действующий безопасный уровень для th .

15

thr.status

Возвращает статус thr : sleep, если th спит или ожидает ввода-вывода, запускается, если th выполняется, false, если th завершился нормально, и nil, если th завершился с исключением.

16

thr.stop?

Возвращает true, если th мертв или спит.

17

thr.value

Ожидает завершения th через Thread.join и возвращает его значение.

18

thr.wakeup

Знаки Thr в качестве приемлемых для планирования, он все еще может оставаться заблокированы на I / O, однако.