我正在尝试使用超时线程不进行同步,以创建阻塞操作的超时,在特定情况下为InputStream.read()
。
这是为了避免阻塞操作将永远持续下去,并且其目的是获得最佳性能。
这应该是一个典型的用例:
try(InputStream input = request.getInputStream())
{
Utils.consumeWithTimeout(input,60000,(buffer,n) ->
{
output.write(buffer,n);
checksum.update(buffer,n);
});
}
其中
public static void consumeWithTimeout(InputStream in,long timeout,BiConsumer<byte[],Integer> consumer) throws IOException
{
byte[] buf = new byte[DEFAULT_BUFFER_SIZE];
try(TimedOp timedOp = new TimedOp(timeout,() -> closeQuietly(in)))
{
while(true)
{
timedOp.start();
int n = in.read(buf);
timedOp.pause();
if(n <= 0)
{
return;
}
consumer.accept(buf,n);
}
}
finally
{
closeQuietly(in);
}
}
和
public static class TimedOp implements AutoCloseable
{
private Thread th;
private volatile long last = 0;
private volatile boolean paused = true;
public TimedOp(long timeout,Runnable runnable)
{
th = new Thread(() ->
{
try
{
while(!th.isInterrupted())
{
long now = System.currentTimeMillis();
if(last + timeout > now)
{
Thread.sleep(last + timeout - now);
}
else if(paused)
{
Thread.sleep(timeout);
}
else
{
runnable.run();
return;
}
}
}
catch(InterruptedException e)
{
return;
}
});
}
public void start()
{
State state = th.getState();
if(state == State.TERMINATED)
{
throw new IllegalStateException("thread is terminated");
}
if(!paused)
{
throw new IllegalStateException("already running");
}
last = System.currentTimeMillis();
paused = false;
if(state == State.NEW)
{
th.start();
}
}
public void pause()
{
paused = true;
}
@Override
public void close()
{
th.interrupt();
try
{
th.join();
}
catch(InterruptedException e)
{
throw new RuntimeException(e);
}
}
}
您看到问题或需要改进的地方吗?
我尝试了什么
假设您需要关心8GB缓冲区的1GB数据传输。
我可以使用ExecutorService
来安排read()
吗?
不,我不能。
public static void consumeWithExecutor(InputStream in,Integer> consumer) throws IOException
{
byte[] buf = new byte[DEFAULT_BUFFER_SIZE];
ExecutorService executor = Executors.newSingleThreadExecutor();
try
{
while(true)
{
Future<Integer> future = executor.submit(() -> in.read(buf));
int n = future.get(timeout,TimeUnit.MILLISECONDS);
if(n <= 0)
{
return;
}
consumer.accept(buf,n);
}
}
catch(InterruptedException | ExecutionException | TimeoutException e)
{
// do nothing,handling in finally block
}
finally
{
closeQuietly(in);
executor.shutdownNow();
}
}
每次读取产生/重用/重新启动线程的开销过大。
性能损失难以忍受。
我可以使用Timer
来安排read()
吗?
不,我不应该。
public static void consumeWithTimer(InputStream in,Integer> consumer) throws IOException
{
byte[] buf = new byte[DEFAULT_BUFFER_SIZE];
try
{
while(true)
{
Timer timer = new Timer();
TimerTask task = new TimerTask()
{
@Override
public void run()
{
closeQuietly(in);
}
};
timer.schedule(task,timeout);
int n = in.read(buf);
timer.cancel();
if(n <= 0)
{
return;
}
consumer.accept(buf,n);
}
}
finally
{
closeQuietly(in);
}
}
Timer
和TimerTask
不可重用,应该为每次迭代创建一个新实例。
在内部,Timer
在queue
个任务上进行同步,从而导致不必要的锁定。
这会导致性能损失,比使用ExecutorService
还要薄,但是效率却不如我的原始实现。