001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import static org.apache.commons.io.IOUtils.EOF; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.PipedInputStream; 024import java.io.PipedOutputStream; 025import java.time.Duration; 026import java.util.Objects; 027import java.util.concurrent.BlockingQueue; 028import java.util.concurrent.LinkedBlockingQueue; 029import java.util.concurrent.TimeUnit; 030 031import org.apache.commons.io.build.AbstractStreamBuilder; 032import org.apache.commons.io.output.QueueOutputStream; 033 034/** 035 * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue output stream. 036 * <p> 037 * To build an instance, see {@link Builder}. 038 * </p> 039 * <p> 040 * Example usage: 041 * </p> 042 * 043 * <pre> 044 * QueueInputStream inputStream = new QueueInputStream(); 045 * QueueOutputStream outputStream = inputStream.newQueueOutputStream(); 046 * 047 * outputStream.write("hello world".getBytes(UTF_8)); 048 * inputStream.read(); 049 * </pre> 050 * <p> 051 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads. 052 * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited. 053 * </p> 054 * <p> 055 * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an 056 * {@link IOException}. 057 * </p> 058 * 059 * @see QueueOutputStream 060 * @since 2.9.0 061 */ 062public class QueueInputStream extends InputStream { 063 064 /** 065 * Builds a new {@link QueueInputStream} instance. 066 * <p> 067 * For example: 068 * </p> 069 * 070 * <pre>{@code 071 * QueueInputStream s = QueueInputStream.builder() 072 * .setBlockingQueue(new LinkedBlockingQueue<>()) 073 * .setTimeout(Duration.ZERO) 074 * .get();} 075 * </pre> 076 * 077 * @since 2.12.0 078 */ 079 public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> { 080 081 private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(); 082 private Duration timeout = Duration.ZERO; 083 084 /** 085 * Constructs a new instance. 086 * <p> 087 * This builder use the aspects BlockingQueue and timeout. 088 * </p> 089 * 090 * @return a new instance. 091 */ 092 @Override 093 public QueueInputStream get() { 094 return new QueueInputStream(blockingQueue, timeout); 095 } 096 097 /** 098 * Sets backing queue for the stream. 099 * 100 * @param blockingQueue backing queue for the stream. 101 * @return this 102 */ 103 public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) { 104 this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>(); 105 return this; 106 } 107 108 /** 109 * Sets the polling timeout. 110 * 111 * @param timeout the polling timeout. 112 * @return this. 113 */ 114 public Builder setTimeout(final Duration timeout) { 115 if (timeout != null && timeout.toNanos() < 0) { 116 throw new IllegalArgumentException("timeout must not be negative"); 117 } 118 this.timeout = timeout != null ? timeout : Duration.ZERO; 119 return this; 120 } 121 122 } 123 124 /** 125 * Constructs a new {@link Builder}. 126 * 127 * @return a new {@link Builder}. 128 * @since 2.12.0 129 */ 130 public static Builder builder() { 131 return new Builder(); 132 } 133 134 private final BlockingQueue<Integer> blockingQueue; 135 136 private final long timeoutNanos; 137 138 /** 139 * Constructs a new instance with no limit to its internal queue size and zero timeout. 140 */ 141 public QueueInputStream() { 142 this(new LinkedBlockingQueue<>()); 143 } 144 145 /** 146 * Constructs a new instance with given queue and zero timeout. 147 * 148 * @param blockingQueue backing queue for the stream. 149 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}. 150 */ 151 @Deprecated 152 public QueueInputStream(final BlockingQueue<Integer> blockingQueue) { 153 this(blockingQueue, Duration.ZERO); 154 } 155 156 /** 157 * Constructs a new instance with given queue and timeout. 158 * 159 * @param blockingQueue backing queue for the stream. 160 * @param timeout how long to wait before giving up when polling the queue. 161 */ 162 private QueueInputStream(final BlockingQueue<Integer> blockingQueue, final Duration timeout) { 163 this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue"); 164 this.timeoutNanos = Objects.requireNonNull(timeout, "timeout").toNanos(); 165 } 166 167 /** 168 * Gets the blocking queue. 169 * 170 * @return the blocking queue. 171 */ 172 BlockingQueue<Integer> getBlockingQueue() { 173 return blockingQueue; 174 } 175 176 /** 177 * Gets the timeout duration. 178 * 179 * @return the timeout duration. 180 */ 181 Duration getTimeout() { 182 return Duration.ofNanos(timeoutNanos); 183 } 184 185 /** 186 * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream. 187 * 188 * @return QueueOutputStream connected to this stream. 189 */ 190 public QueueOutputStream newQueueOutputStream() { 191 return new QueueOutputStream(blockingQueue); 192 } 193 194 /** 195 * Reads and returns a single byte. 196 * 197 * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available. 198 * @throws IllegalStateException if thread is interrupted while waiting. 199 */ 200 @Override 201 public int read() { 202 try { 203 final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS); 204 return value == null ? EOF : 0xFF & value; 205 } catch (final InterruptedException e) { 206 Thread.currentThread().interrupt(); 207 // throw runtime unchecked exception to maintain signature backward-compatibility of 208 // this read method, which does not declare IOException 209 throw new IllegalStateException(e); 210 } 211 } 212 213}