发布于2022-09-30 20:10 阅读(374) 评论(0) 点赞(6) 收藏(5)
I am reading two text files concurrently line by line.
What I am specifically want to do is when the lineCount
on each thread are the same I want to take a look at the string that the scanner is currently reading.
I looked around for certain pattern I can implement like Compare and Swap and Slipped Condition but I cannot wrap my head around how it would help me achieve my goal. I am new to concurrency programming.
What I have managed so far is to synchronize the string reading and printing with counterSync
method and I know that I have carry out my thread lock/pause operation there and take a look at the string.
public class concurrencyTest {
public static void main(String[] args) throws IOException {
String filePath1 = "path1.txt";
String filePath2 = "path2.txt";
reader reader = new reader();
MyThread source = new MyThread(reader, filePath1);
MyThread target = new MyThread(reader, filePath2);
source.start();
target.start();
}
static public class reader {
void read(String filePath) throws IOException {
readFile(filePath);
}
}
static synchronized void counterSync(String thread) {
System.out.println(thread);
}
static class MyThread extends Thread {
reader reader;
String filePath;
MyThread(reader reader, String filePath) {
this.reader = reader;
this.filePath = filePath;
}
@Override
public void run() {
try {
reader.read(filePath);
} catch (IOException e) {
e.printStackTrace();
}
}
}
static void readFile(String filePath) throws IOException {
FileInputStream inputStream = null;
Scanner sc = null;
int lineCount = 0;
try {
inputStream = new FileInputStream(filePath);
sc = new Scanner(inputStream, "UTF-8");
while (sc.hasNextLine()) {
lineCount++;
System.out.println(lineCount + "===" + sc.nextLine());
counterSync(sc.nextLine());
}
if (sc.ioException() != null) {
throw sc.ioException();
}
} finally {
if (inputStream != null) {
inputStream.close();
}
if (sc != null) {
sc.close();
}
}
}
}
Ok, what you are looking for is a little bit complex but still possible.
Your question lacks of some examples so correct me if I'm wrong in something.
You have 2 threads:
and 2 files:
Content of file1:
file1
file2
file3
file4
file5
file6
file7
file8
file9
Content of file2:
file11
file22
file33
file44
file55
file66
file77
file88
file99
You want to stop all threads on the same line numbers and do some oeration with the output.
This is the thread implementation for reading the files, we will instantiate 2 instance of it, each instance will manage a file.
static class ReaderThread extends Thread {
private File fileToRead;
public final Object lock = new Object();
private String currentLine;
private AtomicInteger lineCount = new AtomicInteger(0);
public ReaderThread(File fileToRead) {
this.fileToRead = fileToRead;
}
@Override
public void run() {
synchronized (lock) {
try {
Stream<String> lines = Files.lines(Path.of(fileToRead.getPath()));
lines.forEach(line -> {
currentLine = line;
// Here's your logic on different lines
if (lineCount.get() == 4 || lineCount.get() == 5 || lineCount.get() == 6) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lineCount.getAndIncrement();
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
public String getCurrentLine() {
return currentLine;
}
public boolean isLocked() {
return getState().equals(State.WAITING);
}
}
Then we will use an helper thread to notify the reader threads when our elboration will be ok:
static class HelperThread extends Thread {
private List<ReaderThread> threads;
@Override
public void run() {
while (true) {
if (threads.stream().allMatch(ReaderThread::isLocked)) {
System.out.println("next line:");
threads.forEach(thread -> {
synchronized (thread.lock) {
System.out.println(thread.getCurrentLine());
thread.lock.notify();
}
});
System.out.println("\n");
}
}
}
public HelperThread(List<ReaderThread> threads) {
this.threads = threads;
}
}
Finally the main class for testing all:
public static void main(String[] args) {
File f1 = new File(Objects.requireNonNull(Main.class.getClassLoader().getResource("file1.txt")).getFile());
File f2 = new File(Objects.requireNonNull(Main.class.getClassLoader().getResource("file2.txt")).getFile());
ReaderThread t1 = new ReaderThread(f1);
ReaderThread t2 = new ReaderThread(f2);
HelperThread helperThread = new HelperThread(List.of(t1, t2));
helperThread.start();
t1.start();
t2.start();
}
Executing the program will result in this output:
next line: file5 file55 next line: file6 file66 next line: file7 file77
Here's the complete list of imports:
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
Please note: this is a rude example, you need to manage with the correct shutdown of the threads, some modifiers are public so encapsulate it following the java guidelines, coorrectly manage all exceptions and do some general refactor.
If you want a more versatile implementation, to interpolate different lines, the following should be ok:
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
public class Main2 {
public static void main(String[] args) {
File f1 = new File(Objects.requireNonNull(Main2.class.getClassLoader().getResource("file1.txt")).getFile());
File f2 = new File(Objects.requireNonNull(Main2.class.getClassLoader().getResource("file2.txt")).getFile());
ReaderThread t1 = new ReaderThread(f1);
ReaderThread t2 = new ReaderThread(f2);
HelperThread helperThread = new HelperThread(List.of(t1, t2));
helperThread.start();
t1.setName("Reader1");
t1.setName("Reader2");
t1.start();
t2.start();
}
static class ReaderThread extends Thread {
private final File fileToRead;
private final Object lock = new Object();
private final AtomicInteger lineCount = new AtomicInteger(0);
private String currentLine;
public ReaderThread(File fileToRead) {
this.fileToRead = fileToRead;
}
@Override
public void run() {
synchronized (lock) {
try {
Stream<String> lines = Files.lines(Path.of(fileToRead.getPath()));
lines.forEach(line -> {
currentLine = line;
lineCount.getAndIncrement();
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void lock() throws InterruptedException {
this.lock.wait();
}
public void unlock() {
this.lock.notify();
}
public boolean isLocked() {
return getState().equals(State.WAITING);
}
public Object getLock() {
return lock;
}
public AtomicInteger getLineCount() {
return lineCount;
}
public String getCurrentLine() {
return currentLine;
}
}
static class HelperThread extends Thread {
private List<ReaderThread> threads;
@Override
public void run() {
while (true) {
threads.forEach(t -> {
try {
if (t.getName().equals("Reader1") && t.getLineCount().get() == 3) t.lock();
if (t.getName().equals("Reader2") && t.getLineCount().get() == 4) t.lock();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
if (threads.stream().allMatch(ReaderThread::isLocked)) {
System.out.println("next line:");
threads.forEach(t -> {
synchronized (t.getLock()) {
System.out.println(t.getCurrentLine());
t.unlock();
}
});
System.out.println("\n");
}
}
}
public HelperThread(List<ReaderThread> threads) {
this.threads = threads;
}
}
}
Be sure that the HelperThread starts before the other threads or it's possible to loose some data.
作者:黑洞官方问答小能手
链接:http://www.javaheidong.com/blog/article/526540/7c582f968940a29c0466/
来源:java黑洞网
任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任
昵称:
评论内容:(最多支持255个字符)
---无人问津也好,技不如人也罢,你都要试着安静下来,去做自己该做的事,而不是让内心的烦躁、焦虑,坏掉你本来就不多的热情和定力
Copyright © 2018-2021 java黑洞网 All Rights Reserved 版权所有,并保留所有权利。京ICP备18063182号-2
投诉与举报,广告合作请联系vgs_info@163.com或QQ3083709327
免责声明:网站文章均由用户上传,仅供读者学习交流使用,禁止用做商业用途。若文章涉及色情,反动,侵权等违法信息,请向我们举报,一经核实我们会立即删除!