程序员最近都爱上了这个网站  程序员们快来瞅瞅吧!  it98k网:it98k.com

本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长


+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

暂无数据

Concurrency - Conditional synchronized method

发布于2022-09-30 20:10     阅读(374)     评论(0)     点赞(6)     收藏(5)


I am reading two text files concurrently line by line.

What I am specifically want to do is when the lineCount on each thread are the same I want to take a look at the string that the scanner is currently reading.

I looked around for certain pattern I can implement like Compare and Swap and Slipped Condition but I cannot wrap my head around how it would help me achieve my goal. I am new to concurrency programming.

What I have managed so far is to synchronize the string reading and printing with counterSync method and I know that I have carry out my thread lock/pause operation there and take a look at the string.

public class concurrencyTest {

public static void main(String[] args) throws IOException {
        String filePath1 = "path1.txt";
        String filePath2 = "path2.txt";
        reader reader = new reader();
        MyThread source = new MyThread(reader, filePath1);
        MyThread target = new MyThread(reader, filePath2);

        source.start();
        target.start();
    }
    static public class reader {
        void read(String filePath) throws IOException {
            readFile(filePath);
        }
    }

    static synchronized void counterSync(String thread) {
        System.out.println(thread);
    }

    static class MyThread extends Thread {
        reader reader;
        String filePath;

        MyThread(reader reader, String filePath) {
            this.reader = reader;
            this.filePath = filePath;
        }

        @Override
        public void run() {
            try {
                reader.read(filePath);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    static void readFile(String filePath) throws IOException {
        FileInputStream inputStream = null;
        Scanner sc = null;
        int lineCount = 0;
        try {
            inputStream = new FileInputStream(filePath);
            sc = new Scanner(inputStream, "UTF-8");
            while (sc.hasNextLine()) {
                lineCount++;
                System.out.println(lineCount + "===" + sc.nextLine());
                counterSync(sc.nextLine());
            }
            if (sc.ioException() != null) {
                throw sc.ioException();
            }
        } finally {
            if (inputStream != null) {
                inputStream.close();
            }
            if (sc != null) {
                sc.close();
            }
        }
    }
}

解决方案


Ok, what you are looking for is a little bit complex but still possible.
Your question lacks of some examples so correct me if I'm wrong in something.


You have 2 threads:

  • thread1
  • thread2

and 2 files:

  • file1
  • file2

Content of file1:

file1  
file2  
file3  
file4  
file5  
file6  
file7  
file8  
file9  

Content of file2:

file11  
file22  
file33  
file44  
file55  
file66  
file77  
file88  
file99  

You want to stop all threads on the same line numbers and do some oeration with the output.


This is the thread implementation for reading the files, we will instantiate 2 instance of it, each instance will manage a file.

static class ReaderThread extends Thread {
    private File fileToRead;

    public final Object lock = new Object();
    private String currentLine;
    private AtomicInteger lineCount = new AtomicInteger(0);

    public ReaderThread(File fileToRead) {
        this.fileToRead = fileToRead;
    }

    @Override
    public void run() {
        synchronized (lock) {
            try {
                Stream<String> lines = Files.lines(Path.of(fileToRead.getPath()));
                lines.forEach(line -> {
                    currentLine = line;
                    // Here's your logic on different lines
                    if (lineCount.get() == 4 || lineCount.get() == 5 || lineCount.get() == 6) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    lineCount.getAndIncrement();
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public String getCurrentLine() {
        return currentLine;
    }

    public boolean isLocked() {
        return getState().equals(State.WAITING);
    }
}

Then we will use an helper thread to notify the reader threads when our elboration will be ok:

static class HelperThread extends Thread {
    private List<ReaderThread> threads;

    @Override
    public void run() {
        while (true) {
            if (threads.stream().allMatch(ReaderThread::isLocked)) {
                System.out.println("next line:");

                threads.forEach(thread -> {
                    synchronized (thread.lock) {
                        System.out.println(thread.getCurrentLine());
                        thread.lock.notify();
                    }
                });

                System.out.println("\n");
            }
        }

    }

    public HelperThread(List<ReaderThread> threads) {
        this.threads = threads;
    }
}   

Finally the main class for testing all:

public static void main(String[] args) {
    File f1 = new File(Objects.requireNonNull(Main.class.getClassLoader().getResource("file1.txt")).getFile());
    File f2 = new File(Objects.requireNonNull(Main.class.getClassLoader().getResource("file2.txt")).getFile());

    ReaderThread t1 = new ReaderThread(f1);
    ReaderThread t2 = new ReaderThread(f2);

    HelperThread helperThread = new HelperThread(List.of(t1, t2));

    helperThread.start();
    t1.start();
    t2.start();
}   

Executing the program will result in this output:

next line:
file5
file55


next line:
file6
file66


next line:
file7
file77   

Here's the complete list of imports:

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;  

Please note: this is a rude example, you need to manage with the correct shutdown of the threads, some modifiers are public so encapsulate it following the java guidelines, coorrectly manage all exceptions and do some general refactor.



If you want a more versatile implementation, to interpolate different lines, the following should be ok:

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

public class Main2 {

    public static void main(String[] args) {
        File f1 = new File(Objects.requireNonNull(Main2.class.getClassLoader().getResource("file1.txt")).getFile());
        File f2 = new File(Objects.requireNonNull(Main2.class.getClassLoader().getResource("file2.txt")).getFile());

        ReaderThread t1 = new ReaderThread(f1);
        ReaderThread t2 = new ReaderThread(f2);

        HelperThread helperThread = new HelperThread(List.of(t1, t2));

        helperThread.start();

        t1.setName("Reader1");
        t1.setName("Reader2");
        t1.start();
        t2.start();
    }

    static class ReaderThread extends Thread {
        private final File fileToRead;
        private final Object lock = new Object();
        private final AtomicInteger lineCount = new AtomicInteger(0);
        private String currentLine;

        public ReaderThread(File fileToRead) {
            this.fileToRead = fileToRead;
        }

        @Override
        public void run() {
            synchronized (lock) {
                try {
                    Stream<String> lines = Files.lines(Path.of(fileToRead.getPath()));
                    lines.forEach(line -> {
                        currentLine = line;
                        lineCount.getAndIncrement();
                    });
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        public void lock() throws InterruptedException {
            this.lock.wait();
        }

        public void unlock() {
            this.lock.notify();
        }

        public boolean isLocked() {
            return getState().equals(State.WAITING);
        }

        public Object getLock() {
            return lock;
        }

        public AtomicInteger getLineCount() {
            return lineCount;
        }

        public String getCurrentLine() {
            return currentLine;
        }

    }

    static class HelperThread extends Thread {
        private List<ReaderThread> threads;

        @Override
        public void run() {
            while (true) {
                threads.forEach(t -> {
                    try {
                        if (t.getName().equals("Reader1") && t.getLineCount().get() == 3) t.lock();
                        if (t.getName().equals("Reader2") && t.getLineCount().get() == 4) t.lock();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });

                if (threads.stream().allMatch(ReaderThread::isLocked)) {
                    System.out.println("next line:");

                    threads.forEach(t -> {
                        synchronized (t.getLock()) {
                            System.out.println(t.getCurrentLine());
                            t.unlock();
                        }
                    });

                    System.out.println("\n");
                }
            }

        }

        public HelperThread(List<ReaderThread> threads) {
            this.threads = threads;
        }
    }

}

Be sure that the HelperThread starts before the other threads or it's possible to loose some data.



所属网站分类: 技术文章 > 问答

作者:黑洞官方问答小能手

链接:http://www.javaheidong.com/blog/article/526540/7c582f968940a29c0466/

来源:java黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

6 0
收藏该文
已收藏

评论内容:(最多支持255个字符)