1.争用条件:如果两个或多个线程访问相同的对象,或者访问不同步的共享状态,就会出现争用条件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
namespace Wrox.ProCSharp.Threading
{
classProgram
{
staticvoidMain()
{
var state =newStateObject();
for(inti =0; i <20; i++)
{
newTask(newSampleTask().RaceCondition, state).Start();
}
Thread.Sleep(10000);
}
publicclassStateObject
{
privateintstate =5;
publicvoidChangeState(intloop)
{
if(state ==5)
{
state++;
Trace.Assert(state ==6,"Race condition occurred after"+ loop +"loops");
}
state =5;
}
}
publicclassSampleTask
{
publicvoidRaceCondition(object o)
{
Trace.Assert(o is StateObject,"o must be of type stateObject");
StateObject state = o as StateObject;
inti =0;
while(true)
{
state.ChangeState(i++);
}
}
}
}
}
|
其中StateObject类包含一个int字段和一个ChangeState()方法。在ChangeState()方法的实现代码中,验证状态变量是否包含5.如果它包含,就递增其值。然后用Trace.Assert验证state现在是否包含6.
在给包含5的变量递增1后,可能希望该变量的值就是6.但事实不一定是这样。例如,如果一个线程刚刚执行完If(state == 5)语句,它就会被其他线程抢占,调度器运行另一个线程。第二个线程现在进入if体,因为state的值仍是5,所以将它递增到6.第一个线程现在再次被调度,在下一条语句中,state递增到7.这时就发生了曾用条件,并显示断言消息。当然这只是其中一种可能。
解决争用条件的方法
(1)锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
namespace Wrox.ProCSharp.Threading
{
classProgram
{
staticvoidMain()
{
var state =newStateObject();
for(inti =0; i <20; i++)
{
newTask(newSampleTask().RaceCondition, state).Start();
}
Thread.Sleep(10000);
}
publicclassStateObject
{
privateintstate =5;
publicvoidChangeState(intloop)
{
if(state ==5)
{
state++;
Trace.Assert(state ==6,"Race condition occurred after"+ loop +"loops");
}
state =5;
}
}
publicclassSampleTask
{
publicvoidRaceCondition(object o)
{
Trace.Assert(o is StateObject,"o must be of type stateObject");
StateObject state = o as StateObject;
inti =0;
while(true)
{
lock(state)//no race condition with this lock
{
state.ChangeState(i++);
}
}
}
}
}
}
|
其中使用lock语句锁定在线程中共享的state变量。只有一个线程能在锁定块中处理共享的state对象。由于这个对象在所有的线程之间共享,因此如果一个线程锁定了state,另一个线程就必须等待该锁定的解除。一旦接受锁定,线程就拥有该锁定,直到该锁定块的末尾才解除锁定。如果改变state变量引用的对象的每个线程都使用一个锁定,就不会出现争用条件。
(2)设置线程安全的对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
namespace Wrox.ProCSharp.Threading
{
classProgram
{
staticvoidMain()
{
var state =newStateObject();
for(inti =0; i <20; i++)
{
newTask(newSampleTask().RaceCondition, state).Start();
}
Thread.Sleep(10000);
}
publicclassStateObject
{
privateintstate =5;
privateobject sync =newobject();
publicvoidChangeState(intloop)
{
lock(sync)
{
if(state ==5)
{
state++;
Trace.Assert(state ==6,"Race condition occurred after"+ loop +"loops");
}
state =5;
}
}
}
publicclassSampleTask
{
publicvoidRaceCondition(object o)
{
Trace.Assert(o is StateObject,"o must be of type stateObject");
StateObject state = o as StateObject;
inti =0;
while(true)
{
state.ChangeState(i++);
}
}
}
}
}
|
除了进行锁定之外,还可以将共享对象设置为线程安全的对象。其中ChangeState()方法包含一条lock语句。由于不能锁定state变量本身(只有引用类型才能用于锁定),因此定义一个object类型的变量sync,将它用于lock语句。如果每次state的值更改时,都使用同一个同步对象来锁定,就不会出现争用条件。
2.死锁
过多的锁定也会有麻烦。在死锁中,至少有2个线程被挂起,并等待对方解除锁定。由于两个线程都在等待对方,就出现了死锁,线程将无限等待下去。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
namespace Wrox.ProCSharp.Threading
{
classProgram
{
staticvoidMain()
{
var state1 =newStateObject();
var state2 =newStateObject();
newTask(newSampleThread(state1, state2).Deadlock1).Start();
newTask(newSampleThread(state1, state2).Deadlock2).Start();
}
public classStateObject
{
privateintstate =5;
publicvoidChangeState(intloop)
{
if(state ==5)
{
state++;
Trace.Assert(state ==6,"Race condition occurred after"+ loop +" loops");
}
state =5;
}
}
publicclassSampleThread
{
privateStateObject s1;
privateStateObject s2;
publicSampleThread(StateObject s1,StateObject s2)
{
this.s1 = s1;
this.s2 = s2;
}
publicvoidDeadlock1()
{
inti =0;
while(true)
{
lock(s1)
{
lock(s2)
{
s1.ChangeState(i);
s2.ChangeState(i++);
Console.WriteLine("still running,{0}", i);
}
}
}
}
publicvoidDeadlock2()
{
inti =0;
while(true)
{
lock(s2)
{
lock(s1)
{
s1.ChangeState(i);
s2.ChangeState(i++);
Console.WriteLine("still running, {0}", i);
}
}
}
}
}
}
}
|
其中Deadlock1()方法先锁定s1,接着锁定s2.Deadlock2()方法先锁定s2,在锁定s1.然后可能会发生Deadlock1()方法中s1的锁定会解除。接着,出现一次线程切换,Deadlock2()方法开始运行,并锁定s2.第二个线程现在等待s1锁定的解除。因为它需要等待,所以线程调度器再次调度第一个线程,但第一个线程在等待s2锁定的解除。这两个线程现在都在等待,只要锁定块没有结束,就不会解除锁定。这是一个典型的死锁。解决方法是一开始就设计好锁定顺序,或者为锁定定义超时时间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
namespace Wrox.ProCSharp.Threading
{
classProgram
{
staticvoidMain()
{
var state1 =newStateObject();
var state2 =newStateObject();
newTask(newSampleThread(state1, state2).Deadlock1).Start();
newTask(newSampleThread(state1, state2).Deadlock2).Start();
Thread.Sleep(1000);
}
publicclassStateObject
{
privateintstate =5;
publicvoidChangeState(intloop)
{
if(state ==5)
{
state++;
Trace.Assert(state ==6,"Race condition occurred after"+ loop +" loops");
}
state =5;
}
}
publicclassSampleThread
{
privateStateObject s1;
privateStateObject s2;
publicSampleThread(StateObject s1, StateObject s2)
{
this.s1 = s1;
this.s2 = s2;
}
publicvoidDeadlock1()
{
inti =0;
while(true)
{
bool lockTaken1 =false;
Monitor.TryEnter(s1,500, ref lockTaken1);
if(lockTaken1)
{
bool lockTaken2 =false;
Monitor.TryEnter(s2,500, ref lockTaken2);
if(lockTaken2)
{
s1.ChangeState(i);
s2.ChangeState(i++);
Console.WriteLine("still running Deadlock1,{0}", i);
}
}
}
}
publicvoidDeadlock2()
{
inti =0;
while(true)
{
bool lockTaken1 =false;
Monitor.TryEnter(s1,500, ref lockTaken1);
if(lockTaken1)
{
bool lockTaken2 =false;
Monitor.TryEnter(s2,500, ref lockTaken2);
if(lockTaken2)
{
s1.ChangeState(i);
s2.ChangeState(i++);
Console.WriteLine("still running Deadlock2,{0}", i);
}
}
}
}
}
}
}
|
C#的lock语句由编译器解析为使用Monitor类。下面的lock语句
lock(obj)
{
//synchronized region for obj
}
被解析为调用Enter()方法,该方法会一直等待,知道线程被对象锁定为止。一次只有一个线程能被对象锁定。只要解除了锁定,线程就可以进入同步阶段。Monitor类的Exit()方法解除了锁定。编译器吧Exit()方法放在try块的finally处理程序中,所以如果抛出了异常,就也会解除该锁定。
Monitor.Enter(obj)
try
{
//synchronized region for obj
}
finally
{
Monitor.Exit(obj);
}
与c#的lock语句相比,Monitor类的主要优点是:可以添加一个等待被锁定的超时值。这样就不会无限期地等待被锁定,而可以使用TryEnter()方法,其中给它传递一个超时值,指定等待被锁定的最长时间。如果obj被锁定,TryEnter()方法就把布尔值的引用参数设置为true,并同步地访问由对象obj锁定的状态。如果另一个线程锁定obj的时间超过500毫秒,TryEnter()方法就把变量lockTaken设置为false,线程不再等待,而是用于执行其他操作。也许在以后,该线程会尝试再次被锁定。
bool lockTaken = false;
Monitor.TryEnter(obj,500,ref lockTaken);
if(lockTaken)
{
try
{
//acquired the lock
//synchronized region for obj
}
finally
{
Monitor.Exit(obj);
else
{
//didnt get the lock ,do something else
}
如果基于对象的锁定对象(Monitor)的系统开销由于垃圾回收而过高,就可以使用SpinLock结构。如果有大量的锁定(例如,列表中的每个节点都有一个锁定),且锁定的时间总是非常短,SpinLock结构就很有用。应避免使用多个SpinLock结构,也不要调用任何可能阻塞的内容。除了体系结构上的区别之外,SpinLock结构的用法非常类似于Monitor类。获得锁定使用Enter()或TryEnter()方法,释放锁定使用Exit()方法。SpinLock结构还提供了属性IsHeld和IsHeldByCurrentThread,指定它当时是否是锁定的。传送SpinLock实例时要小心。因为SpinLock定义为结构,把一个变量赋予另一个变量会创建一个副本。总是通过引用传送SpinLock实例。
3 同步
同步问题,是在线程之间共享数据导致的。要确保一次只有一个线程访问和改变共享状态,避免争用条件和死
锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
namespace Wrox.ProCSharp.Threading
{
classProgram
{
staticvoidMain()
{
intnumTasks =20;
var state =newSharedState();
var tasks =newTask[numTasks];
for(inti =0; i < numTasks; i++)
{
tasks[i] =newTask(newJob(state).DoTheJob);
tasks[i].Start();
}
for(inti =0; i < numTasks; i++)
{
tasks[i].Wait();
}
Console.WriteLine("summarized {0}", state.State);
}
publicclassSharedState
{
publicintState { get; set; }
}
publicclassJob
{
SharedState sharedState;
publicJob(SharedState sharedState)
{
this.sharedState = sharedState;
}
publicvoidDoTheJob()
{
for(inti =0; i <50000; i++)
{
sharedState.State +=1;
}
}
}
}
}
|
在Main()方法中,创建一个SharedState对象,并把它传递给20个Task对象的构造函数。在启动所有的任务后,Main()方法进入另一个循环,使20个任务全部处于等待状态,直到所有的任务都执行完毕为止。由于每次传递给Task对象的SharedState中State的值可能以同样的状态被多次+1,所以多次运行会得到不同的结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
namespace Wrox.ProCSharp.Threading
{
classProgram
{
staticvoidMain()
{
intnumTasks =20;
var state =newSharedState();
var tasks =newTask[numTasks];
for(inti =0; i < numTasks; i++)
{
tasks[i] =newTask(newJob(state).DoTheJob);
tasks[i].Start();
}
for(inti =0; i < numTasks; i++)
{
tasks[i].Wait();
}
Console.WriteLine("summarized {0}", state.State);
}
publicclassSharedState
{
privateintstate =0;
privateobject syncRoot =newobject();
publicintState//there's still a race condition,
//don't do this
{
get { lock (syncRoot) {returnstate; } }
set { lock (syncRoot) { state = value; } }
}
}
publicclassJob
{
SharedState sharedState;
publicJob(SharedState sharedState)
{
this.sharedState = sharedState;
}
publicvoidDoTheJob()
{
for(inti =0; i <50000; i++)
{
sharedState.State +=1;
}
}
}
}
}
|
调用方法DoTheTask()方法的线程访问SharedState类的get存取器,以获得state的当前值,接着set存取器给state设置新值。在调用对象的get和set存取器期间,对象没有锁定,另一个线程可以获得临时值。最后得到的结果也是不确定的。i++不是线程安全的,它的操作包括从内存中获取一个值,然后给该值递增1,再将它存储会内存。这些操作都可能被线程调度器打断。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
namespace Wrox.ProCSharp.Threading
{
classProgram
{
staticvoidMain()
{
intnumTasks =20;
var state =newSharedState();
var tasks =newTask[numTasks];
for(inti =0; i < numTasks; i++)
{
tasks[i] =newTask(newJob(state).DoTheJob);
tasks[i].Start();
}
for(inti =0; i < numTasks; i++)
{
tasks[i].Wait();
}
Console.WriteLine("summarized {0}", state.State);
}
publicclassSharedState
{
publicintState { get; set; }
}
publicclassJob
{
SharedState sharedState;
publicJob(SharedState sharedState)
{
this.sharedState = sharedState;
}
publicvoidDoTheJob()
{
for(inti =0; i <50000; i++)
{
lock(sharedState)
{
sharedState.State +=1;
}
}
}
}
}
}
|
在一个地方使用lock语句并不意味着,访问对象的其他线程都正在等待。必须对每个访问共享状态的线程显式地使用同步功能。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
namespace Wrox.ProCSharp.Threading
{
classProgram
{
staticvoidMain()
{
intnumTasks =20;
var state =newSharedState();
var tasks =newTask[numTasks];
for(inti =0; i < numTasks; i++)
{
tasks[i] =newTask(newJob(state).DoTheJob);
tasks[i].Start();
}
for(inti =0; i < numTasks; i++)
{
tasks[i].Wait();
}
Console.WriteLine("summarized {0}", state.State);
}
publicclassSharedState
{
privateintstate =0;
privateobject syncRoot =newobject();
publicintState
{
get
{
returnstate;
}
}
publicintIncrementState()
{
lock(syncRoot)
{
return++state;
}
}
}
publicclassJob
{
SharedState sharedState;
publicJob(SharedState sharedState)
{
this.sharedState = sharedState;
}
publicvoidDoTheJob()
{
for(inti =0; i <50000; i++)
{
lock(sharedState)
{
sharedState.IncrementState();
}
}
}
}
}
}
|
上面的程序也可以得到正确的结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
namespace Wrox.ProCSharp.Threading
{
classProgram
{
staticvoidMain()
{
intnumTasks =20;
var state =newSharedState();
var tasks =newTask[numTasks];
for(inti =0; i < numTasks; i++)
{
tasks[i] =newTask(newJob(state).DoTheJob);
tasks[i].Start();
}
for(inti =0; i < numTasks; i++)
{
tasks[i].Wait();
}
Console.WriteLine("summarized {0}", state.State);
}
publicclassSharedState
{
privateintstate =0;
publicintState
{
get
{
returnstate;
}
}
publicintIncrementState()
{
returnInterlocked.Increment(ref state);
}
}
publicclassJob
{
SharedState sharedState;
publicJob(SharedState sharedState)
{
this.sharedState = sharedState;
}
publicvoidDoTheJob()
{
for(inti =0; i <50000; i++)
{
lock (sharedState)
{
sharedState.IncrementState();
}
}
}
}
}
}
|
Interlocked类用于使变量的简单语句原子化。i++不是线程安全的,它的操作包括从内存中获取一个值,给该值递增1,再将它存储会内存。这些操作可能会被线程调度器打断。Interlocked类提供了以线程安全的方式递增、递减、交换和读取值的方法。与其他技术相比,使用Interlocked类会快很多。但是,它只能用于简单的同步操作。
例如:
1
2
3
4
5
|
lock(this)
{
if(someState ==null)
someState = newState;
}
|
可以用Interlocked.CompareExchange(ref someState, newState,null)代替且比较快