有效的超时,可实现无同步的阻止操作

我正在尝试使用超时线程进行同步,以创建阻塞操作的超时,在特定情况下为InputStream.read()

这是为了避免阻塞操作将永远持续下去,并且其目的是获得最佳性能

这应该是一个典型的用例:

try(InputStream input = request.getInputStream())
{
    Utils.consumeWithTimeout(input,60000,(buffer,n) ->
    {
        output.write(buffer,n);
        checksum.update(buffer,n);
    });
}

其中

public static void consumeWithTimeout(InputStream in,long timeout,BiConsumer<byte[],Integer> consumer) throws IOException
{
    byte[] buf = new byte[DEFAULT_BUFFER_SIZE];

    try(TimedOp timedOp = new TimedOp(timeout,() -> closeQuietly(in)))
    {
        while(true)
        {
            timedOp.start();
            int n = in.read(buf);
            timedOp.pause();

            if(n <= 0)
            {
                return;
            }

            consumer.accept(buf,n);
        }
    }
    finally
    {
        closeQuietly(in);
    }
}

public static class TimedOp implements AutoCloseable
{
    private Thread th;
    private volatile long last = 0;
    private volatile boolean paused = true;

    public TimedOp(long timeout,Runnable runnable)
    {
        th = new Thread(() ->
        {
            try
            {
                while(!th.isInterrupted())
                {
                    long now = System.currentTimeMillis();
                    if(last + timeout > now)
                    {
                        Thread.sleep(last + timeout - now);
                    }
                    else if(paused)
                    {
                        Thread.sleep(timeout);
                    }
                    else
                    {
                        runnable.run();
                        return;
                    }
                }
            }
            catch(InterruptedException e)
            {
                return;
            }
        });
    }

    public void start()
    {
        State state = th.getState();
        if(state == State.TERMINATED)
        {
            throw new IllegalStateException("thread is terminated");
        }

        if(!paused)
        {
            throw new IllegalStateException("already running");
        }

        last = System.currentTimeMillis();
        paused = false;

        if(state == State.NEW)
        {
            th.start();
        }
    }

    public void pause()
    {
        paused = true;
    }

    @Override
    public void close()
    {
        th.interrupt();
        try
        {
            th.join();
        }
        catch(InterruptedException e)
        {
            throw new RuntimeException(e);
        }
    }
}

您看到问题或需要改进的地方吗?


我尝试了什么

假设您需要关心8GB缓冲区的1GB数据传输。


我可以使用ExecutorService来安排read()吗?

不,我不能。

public static void consumeWithExecutor(InputStream in,Integer> consumer) throws IOException
{
    byte[] buf = new byte[DEFAULT_BUFFER_SIZE];
    ExecutorService executor = Executors.newSingleThreadExecutor();

    try
    {
        while(true)
        {
            Future<Integer> future = executor.submit(() -> in.read(buf));
            int n = future.get(timeout,TimeUnit.MILLISECONDS);
            if(n <= 0)
            {
                return;
            }

            consumer.accept(buf,n);
        }
    }
    catch(InterruptedException | ExecutionException | TimeoutException e)
    {
        // do nothing,handling in finally block
    }
    finally
    {
        closeQuietly(in);
        executor.shutdownNow();
    }
}

每次读取产生/重用/重新启动线程的开销过大。

性能损失难以忍受。


我可以使用Timer来安排read()吗?

不,我不应该。

public static void consumeWithTimer(InputStream in,Integer> consumer) throws IOException
{
    byte[] buf = new byte[DEFAULT_BUFFER_SIZE];

    try
    {
        while(true)
        {
            Timer timer = new Timer();

            TimerTask task = new TimerTask()
            {
                @Override
                public void run()
                {
                    closeQuietly(in);
                }
            };

            timer.schedule(task,timeout);

            int n = in.read(buf);

            timer.cancel();

            if(n <= 0)
            {
                return;
            }

            consumer.accept(buf,n);
        }
    }
    finally
    {
        closeQuietly(in);
    }
}

TimerTimerTask不可重用,应该为每次迭代创建一个新实例。

在内部,Timerqueue个任务上进行同步,从而导致不必要的锁定。

这会导致性能损失,比使用ExecutorService还要薄,但是效率却不如我的原始实现。

xyj_future 回答:有效的超时,可实现无同步的阻止操作

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3074187.html

大家都在问