📜  Ruby-多线程

📅  最后修改于: 2020-10-16 06:07:21             🧑  作者: Mango


传统程序只有一个执行线程,构成该程序的语句或指令将顺序执行,直到程序终止。

多线程程序具有多个执行线程。在每个线程中,语句是按顺序执行的,但是线程本身可以在例如多核CPU上并行执行。通常在单个CPU计算机上,实际上不是并行执行多个线程,而是通过交错执行线程来模拟并行性。

Ruby使使用Thread类编写多线程程序变得容易。 Ruby线程是在代码中实现并发的轻量级高效方法。

创建Ruby线程

要启动新线程,只需将一个块与对Thread.new的调用相关联。将创建一个新线程来执行该块中的代码,原始线程将立即从Thread.new返回并使用下一条语句恢复执行-

# Thread #1 is running here
Thread.new {
   # Thread #2 runs this code
}
# Thread #1 runs this code

这是一个示例,显示了如何使用多线程Ruby程序。

#!/usr/bin/ruby

def func1
   i = 0
   while i<=2
      puts "func1 at: #{Time.now}"
      sleep(2)
      i = i+1
   end
end

def func2
   j = 0
   while j<=2
      puts "func2 at: #{Time.now}"
      sleep(1)
      j = j+1
   end
end

puts "Started At #{Time.now}"
t1 = Thread.new{func1()}
t2 = Thread.new{func2()}
t1.join
t2.join
puts "End at #{Time.now}"

这将产生以下结果-

Started At Wed May 14 08:21:54 -0700 2008
func1 at: Wed May 14 08:21:54 -0700 2008
func2 at: Wed May 14 08:21:54 -0700 2008
func2 at: Wed May 14 08:21:55 -0700 2008
func1 at: Wed May 14 08:21:56 -0700 2008
func2 at: Wed May 14 08:21:56 -0700 2008
func1 at: Wed May 14 08:21:58 -0700 2008
End at Wed May 14 08:22:00 -0700 2008

线程生命周期

使用Thread.new创建一个新线程。您还可以使用同义词Thread.startThread.fork

创建线程后无需启动线程,当CPU资源可用时,它将自动开始运行。

Thread类定义了许多在线程运行时查询和操作线程的方法。线程在与Thread.new调用关联的块中运行代码,然后停止运行。

该块中最后一个表达式的值就是线程的值,可以通过调用Thread对象的value方法获得。如果线程已运行完毕,则该值立即返回线程的值。否则, value方法将阻塞并且直到线程完成后才返回。

类方法Thread.current返回表示当前线程的Thread对象。这允许线程进行自我操作。类方法Thread.main返回代表主线程的Thread对象。这是从Ruby程序启动时开始的执行线程。

您可以通过调用特定线程的Thread.join方法来等待该线程完成。调用线程将阻塞,直到给定线程完成。

线程和异常

如果在主线程中引发了异常,并且没有在任何地方进行处理,则Ruby解释器将显示一条消息并退出。在除主线程之外的线程中,未处理的异常导致线程停止运行。

如果因为未处理的异常的线程t退出,而另一线S通话t.join或t.value,那么发生在t时的例外是在纤维S提高。

如果Thread.abort_on_exceptionfalse (默认条件),则未处理的异常只会杀死当前线程,其余所有线程继续运行。

如果您希望任何线程中任何未处理的异常导致解释器退出,请将类方法Thread.abort_on_exception设置为true

t = Thread.new { ... }
t.abort_on_exception = true

线程变量

创建线程时,线程通常可以访问范围内的任何变量。线程块本地的变量是线程本地的,并且不共享。

线程类具有特殊的功能,该功能允许按名称创建和访问线程局部变量。您只需将线程对象视为哈希对象即可,使用[] =写入元素,然后使用[]读回它们。

在此示例中,每个线程都使用键mycount将变量count的当前值记录在threadlocal变量中。

#!/usr/bin/ruby

count = 0
arr = []

10.times do |i|
   arr[i] = Thread.new {
      sleep(rand(0)/10.0)
      Thread.current["mycount"] = count
      count += 1
   }
end

arr.each {|t| t.join; print t["mycount"], ", " }
puts "count = #{count}"

这产生以下结果-

8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10

主线程等待子线程完成,然后打印出每个子线程捕获的计数值

线程优先级

影响线程调度的第一个因素是线程优先级:高优先级线程先于低优先级线程进行调度。更准确地说,只有在没有更高优先级的线程在等待运行时,线程才会获得CPU时间。

您可以使用priority =priority设置和查询Ruby Thread对象的优先。新创建的线程以与创建它的线程相同的优先级开始。主线程从优先级0开始。

在开始运行之前,无法设置线程的优先级。但是,线程可以将其优先级作为其采取的第一个操作来提高或降低。

线程排除

如果两个线程共享对同一数据的访问,并且至少有一个线程修改了该数据,则必须格外小心,以确保没有线程能够看到处于不一致状态的数据。这称为线程排除

Mutex是一个类,它实现简单的信号灯锁以互斥访问某些共享资源。也就是说,在给定的时间只有一个线程可以持有该锁。其他线程可能选择排队等待该锁变为可用,或者可能只是选择立即获得一个指示该锁不可用的错误。

通过将对共享数据的所有访问置于互斥锁的控制下,我们确保了一致性和原子操作。让我们尝试示例,第一个不带mutax,第二个不带mutax-

没有Mutax的示例

#!/usr/bin/ruby
require 'thread'

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      count1 += 1
      count2 += 1
   end
end
spy = Thread.new do
   loop do
      difference += (count1 - count2).abs
   end
end
sleep 1
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

这将产生以下结果-

count1 :  1583766
count2 :  1583766
difference : 0
#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      mutex.synchronize do
         count1 += 1
         count2 += 1
      end
   end
end
spy = Thread.new do
   loop do
      mutex.synchronize do
         difference += (count1 - count2).abs
      end
   end
end
sleep 1
mutex.lock
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

这将产生以下结果-

count1 :  696591
count2 :  696591
difference : 0

处理死锁

当我们开始使用Mutex对象进行线程排除时,我们必须小心避免死锁。死锁是所有线程都在等待获取另一个线程拥有的资源时发生的情况。由于所有线程均被阻止,因此它们无法释放所持有的锁。并且由于它们无法释放这些锁,因此其他任何线程都无法获取这些锁。

这是条件变量出现的地方。条件变量只是与资源关联的信号量,并在特定互斥量的保护内使用。当您需要不可用的资源时,请等待条件变量。该操作将释放相应互斥锁的锁定。当其他一些线程发出信号表明资源可用时,原始线程将退出等待状态,同时重新获得对关键区域的锁定。

#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

cv = ConditionVariable.new
a = Thread.new {
   mutex.synchronize {
      puts "A: I have critical section, but will wait for cv"
      cv.wait(mutex)
      puts "A: I have critical section again! I rule!"
   }
}

puts "(Later, back at the ranch...)"

b = Thread.new {
   mutex.synchronize {
      puts "B: Now I am critical, but am done with cv"
      cv.signal
      puts "B: I am still critical, finishing up"
   }
}
a.join
b.join

这将产生以下结果-

A: I have critical section, but will wait for cv
(Later, back at the ranch...)
B: Now I am critical, but am done with cv
B: I am still critical, finishing up
A: I have critical section again! I rule!

线程状态

下表列出了与五个可能状态相对应的五个可能返回值。 status方法返回线程的状态。

Thread state Return value
Runnable run
Sleeping Sleeping
Aborting aborting
Terminated normally false
Terminated with exception nil

线程类方法

Thread类提供了以下方法,它们适用于程序中所有可用的线程。这些方法将被称为使用Thread类名,如下所示-

Thread.abort_on_exception = true

线程实例方法

这些方法适用于线程的实例。这些方法将被称为使用Thread的实例,如下所示-

#!/usr/bin/ruby

thr = Thread.new do   # Calling a class method new
   puts "In second thread"
   raise "Raise exception"
end
thr.join   # Calling an instance method join