☕️ Connecting Java's Streaming API for XML (StAX) with Streams
December 15, 2022
I recently had to process large XML documents in Java, so I reached for Java’s
Streaming API for XML (StAX). The document had a long list of relatively small
elements. Java’s new switch expressions made parsing the XML with StAX much
nicer than I remembered; however, I found myself missing the filtering,
transformation, and aggregation methods from java.util.stream.Stream
.
I wanted to use StAX to parse the XML elements into a Java record, and I wanted
to use a Stream
to process those records. In this blog post, I detail how to
do that efficiently.
Parsing All the Sandwiches
Consider an arbitrarily large XML document containing a list of favorite
sandwiches. Of course, we could parse all the favorite sandwiches into a
java.util.Collection
then call stream()
. However, we know this is not
efficient, because it requires the JVM to hold all the data from the large XML
document in memory.
// 🤢 Inefficient, because it reads all the records into memory before processing
List<FavoriteSandwich> sandwiches = parseAllSandwiches("./sandwiches.xml");
Stream sandwichesStream = sandwiches.stream();
Instead, we want a Stream
that parses the XML data as-needed. To build such a
stream, we define a new type that implements java.util.stream.Spliterator
interface. A Spliterator
is the java.util.stream
equivalent of a cursor, and
it facilitates making new Stream
instances from data that can be iterated on
and optionally partitioned. In this example, the XML data is given to the
Spliterator
as an InputStream
, so it cannot be partitioned.
class FavoriteSandwichXMLSpliterator
implements Spliterator<FavoriteSandwich> {
private final XMLEventReader reader;
public FavoriteSandwichXMLSpliterator(final InputStream is) throws IOException {
try {
reader = factory.createXMLEventReader(is);
} catch (XMLStreamException e) {
throw new IOException("Failed to parse favorite sandwiches from XML", e);
}
}
...
The Spliterator
parses the XML as-needed in its tryAdvance
method.
@Override
public boolean tryAdvance(final Consumer<? super FavoriteSandwich> action) {
while (reader.hasNext()) {
final XMLEvent event;
try {
event = reader.nextEvent();
} catch (XMLStreamException e) {
throw new UncheckedXMLStreamException("Failed to read favorite sandwiches from XML", e);
}
if (event.isStartElement()) {
final StartElement startElement = event.asStartElement();
if (startElement.getName().getLocalPart().equals("favorite-sandwich")) {
final FavoriteSandwich sandwich;
try {
sandwich = readSandwich(); // implementation omitted for brevity
} catch (XMLStreamException e) {
throw new UncheckedXMLStreamException("Failed to read favorite sandwich from XML", e);
}
action.accept(sandwich);
return true;
}
}
}
return false;
}
The remaining methods of the Spliterator
interface return values that
communicate to the Stream
that we do not know how long the Stream
is and it
cannot be partitioned. Finally, all this is encapsulated in a factory method
that returns a new Stream
.
public Stream<FavoriteSandwich> stream(final InputStream is) throws IOException {
final var spliterator = new FavoriteSandwichXMLSpliterator(is);
return StreamSupport.stream(spliterator, false);
}
This factory method provides the Stream<FavoriteSandwich>
that developers want
to use while encapsulating the XML parsing that developers do not want to think
about. For example, it’s now incredibly simple to compute a count of favorite
sandwiches by the U.S. state they’re from:
@Test
void stream_all_valid_sandwiches() throws IOException {
final FavoriteSandwichXMLParser reader = new FavoriteSandwichXMLParser();
try (var is =
FavoriteSandwichXMLParserTest.class.getResourceAsStream("/favorite-sandwiches.xml")) {
var sandwiches = reader.stream(is);
final Map<String, Long> countByState =
sandwiches.collect(
groupingBy(sandwich -> sandwich.restaurant().state(), Collectors.counting()));
final Map<String, Long> expected = Map.of("NJ", 2L, "MD", 3L);
assertThat(countByState).containsExactlyInAnyOrderEntriesOf(expected);
}
}
Perfect! 🥪
Resource Management Special Case
In the previous example, the Stream<FavoriteSandwich>
data was processed
entirely within a try-with-resources statement that ensures the InputStream
will be closed. While this is ideal, it may not always be possible to manage the
IO resources this way. For example, consider the case where some existing method
expects a Supplier<Stream<FavoriteSandwich>>
. In this case, the Supplier
cannot use try-with-resources, because it cannot know when to close the
InputStream
.
// 🐛 Will close the InputStream before any code can operate on the Stream
Supplier<Stream<FavoriteSandwich>> supplier = () -> {
try (var is =
FavoriteSandwichXMLParserTest.class.getResourceAsStream("/favorite-sandwiches.xml")) {
return reader.stream(is);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
};
Instead, we want to clean-up the resources when the Stream
has been closed.
Fortunately, Stream
implements AutoCloseable
, and the
Stream Javadoc
instructs users to close streams that are backed by an IO channel.
Streams have a BaseStream.close() method and implement AutoCloseable, but nearly all stream instances do not actually need to be closed after use. Generally, only streams whose source is an IO channel (such as those returned by Files.lines(Path, Charset)) will require closing. Most streams are backed by collections, arrays, or generating functions, which require no special resource management. (If a stream does require closing, it can be declared as a resource in a try-with-resources statement.)
We can take advantage of this to handle this special case where the Stream
must clean-up the IO channel it encapsulates.
First, we make the FavoriteSandwichXMLSpliterator
implement AutoCloseable
.
The close()
method simply closes the InputStream
passed to the constructor
and propagates any exceptions as a RuntimeException
.
@Override
public void close() {
try {
reader.close();
} catch (XMLStreamException e) {
throw new UncheckedXMLStreamException("Failed to close XMLEventReader", e);
}
}
In this case, we define a new runtime exception UncheckedXMLStreamException
that is the StAX analog to UncheckedIOException
.
Lastly, we change the factory that creates the Stream
, so that the Stream
closes the Spliterator
when it has been closed. This is made possible by the
Stream.onClose(Runnable)
method that allows users to schedule arbitrary
routines to be executed when the Stream
is closed.
public Stream<FavoriteSandwich> stream(final InputStream is) throws IOException {
final var spliterator = new FavoriteSandwichXMLSpliterator(is);
return StreamSupport.stream(spliterator, false).onClose(spliterator::close);
}
Having made these changes, the Stream
itself may be in a try-with-resources to
ensure that the IO is closed properly:
void process(final Supplier<Stream<FavoriteSandwich>> supplier) {
try (var sandwiches = supplier.get()) {
sandwiches.forEach(System.out::println);
}
}
Conclusion
Arbitrarily large streams of data, such as a large XML document, are ripe for
processing with the java.util.stream
APIs, but developers need way to parse
records from the stream as-needed and expose those elements as a Stream
. The
Spliterator
interface is the key that unlocks the power of the Stream
API
for data that may be processed stream-wise. Additionally, when the Stream
must
also clean-up the underlying IO resources after it has been exhausted, the
Stream.onClose(Runnable)
method is available to schedule that clean-up.
The accompanying code may be found at gilday/how-to-stream-stax.