题目:多线程连接字符串
看到有很多使用CountDownLatch和Condition的题解,这里使用一种不一样的解法
问题描述:
有4个线程和1个公共的字符数组。线程1的功能就是向数组输出A,线程2的功能就是向字符输出B,线程3的功能就是向数组输出C,线程4的功能就是向数组输出D。要求按顺序向数组赋值ABCDABCDABCD,ABCD的个数由线程函数1的参数指定。
解题思路
使用生产者消费者模式和观察者模式结合解决多线程之间的通信问题
Scanner负责读取输入的整数,并将结果放入需求队列(生产者)AppendTask作为一个线程的执行单元,负责给输出字符数组添加符号,并通知观察者操作完成,操作完成后自动休眠(消费者)isFinish作为用户输入结束标志,当需求队列中数据全部被打印出来后用来判断结束所有工作线程
因为每个工作线程都负责打印一个字符且是有顺序的,因此使用观察模式,详细源码参考JDK的Observer和Observable两个类
当第一个线程输出A后,它通知并唤醒第二个线程,第二个线程继续输出B,继续通知并唤醒第三个线程,以此类推。
当第四个线程完成输出D后,通过回调来判断当前任务是否完成,如果未完成继续通知线程1,如果完成了就从需求队列中取下一个任务,
当所有任务都完成后,再判断是否要退出所有工作线程。
AppendTask的生命周期为:
启动->运行<->暂停->结束
一些并发问题的处理:
Scanner产出的整数应该是一个队列,如果直接放进其他线程可能会覆盖正在运行的任务,在做操该队列时应当使用synchronized线程安全保护- 使用
volatile关键字修饰任务的运行状态,当变量值改变可以立刻通知其他线程同步变量
代码如下:
public class Main {
public static void main(String[] args) {
final LinkedList<Integer> inputList = new LinkedList<>();
final AtomicBoolean isFinish = new AtomicBoolean(false);
AppendTask t1 = new AppendTask("A");
AppendTask t2 = new AppendTask("B");
AppendTask t3 = new AppendTask("C");
AppendTask t4 = new AppendTask("D");
t1.addObserver(t2);
t2.addObserver(t3);
t3.addObserver(t4);
t4.addObserver(t1);
t4.setOnWorkListener(new AppendTask.OnWorkListener() {
@Override
public boolean onWorked(AppendTask task) {
synchronized (inputList) {
//获取首个任务计数
Integer count = inputList.pollFirst();
if(count != null) {
count--;
if(count > 0) {
//因为还没有完成,再放回队列首位
inputList.offerFirst(count);
} else {
System.out.println("");
//输入完成和队列为空都成立时停止所有线程
if(inputList.isEmpty() && isFinish.get()) {
t1.exit();
t2.exit();
t3.exit();
t4.exit();
}
}
}
return inputList.isEmpty();
}
}
});
new Thread(t1).start();
new Thread(t2).start();
new Thread(t3).start();
new Thread(t4).start();
Scanner scanner = new Scanner(System.in);
while(scanner.hasNextInt()) {
int count = scanner.nextInt();
if(count <= 0) {
continue;
}
//这里和工作线程不是同一个线程,需要加锁
synchronized (inputList) {
if (inputList.isEmpty()) {
t1.keepRunning();
}
inputList.offerLast(count);
}
}
isFinish.set(true);
scanner.close();
}
static class AppendTask extends Observable implements Runnable, Observer {
private String fillStr;
private volatile boolean running = true;
private volatile boolean pause = true;
private OnWorkListener listener;
public AppendTask(String fillStr) {
this.fillStr = fillStr;
}
@Override
public void run() {
while(running) {
while(!pause) {
doWork();
}
if(!running) {
break;
}
synchronized(this) {
try {
wait();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
}
protected void doWork() {
System.out.print(fillStr);
boolean ret = false;
if(listener != null) {
ret = listener.onWorked(this);
}
pause();
super.setChanged();
super.notifyObservers(ret);
}
public synchronized void pause() {
pause = true;
}
public synchronized void keepRunning() {
if(pause) {
pause = false;
notify();
}
}
public synchronized void exit() {
running = false;
keepRunning();
}
@Override
public void update(Observable o, Object arg) {
if(arg instanceof Boolean) {
Boolean b = (Boolean) arg;
if(!b) {
keepRunning();
}
}
}
public void setOnWorkListener(OnWorkListener l) {
listener = l;
}
interface OnWorkListener {
boolean onWorked(AppendTask task);
}
}
} 


京公网安备 11010502036488号