Skip to content

Commit 65f4513

Browse files
committed
AVRO-4249 provide a cache of schema to avoid building
1 parent fed0011 commit 65f4513

2 files changed

Lines changed: 41 additions & 0 deletions

File tree

lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@ void validateMagic(byte[] magic) throws InvalidAvroMagicException {
118118

119119
/** Initialize the stream by reading from its head. */
120120
void initialize(InputStream in, byte[] magic) throws IOException {
121+
initialize(in, magic, SchemaCache.NO_CACHE);
122+
}
123+
124+
125+
/** Initialize the stream by reading from its head. */
126+
protected void initialize(InputStream in, byte[] magic, SchemaCache schemaCache) throws IOException {
121127
this.header = new Header();
122128
this.vin = DecoderFactory.get().binaryDecoder(in, vin);
123129
magic = (magic == null) ? readMagic() : magic;
@@ -140,6 +146,8 @@ void initialize(InputStream in, byte[] magic) throws IOException {
140146

141147
// finalize the header
142148
header.metaKeyList = Collections.unmodifiableList(header.metaKeyList);
149+
header.schema = schemaCache.getOrParseSchema(getMetaString(DataFileConstants.SCHEMA));
150+
143151
header.schema = new Schema.Parser(NameValidator.NO_VALIDATION).setValidateDefaults(false)
144152
.parse(getMetaString(DataFileConstants.SCHEMA));
145153
this.codec = resolveCodec();
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package org.apache.avro.file;
2+
3+
import org.apache.avro.NameValidator;
4+
import org.apache.avro.Schema;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.ConcurrentMap;
7+
8+
public abstract class SchemaCache {
9+
public abstract Schema getOrParseSchema(String metaString);
10+
11+
protected Schema parse(String metaString) {
12+
return new Schema.Parser(NameValidator.NO_VALIDATION).setValidateDefaults(false)
13+
.parse(metaString);
14+
}
15+
16+
public static final SchemaCache NO_CACHE = new SchemaCache() {
17+
@Override
18+
public Schema getOrParseSchema(String metaString) {
19+
return parse(metaString);
20+
}
21+
};
22+
public static SchemaCache createConcurrentCache() {
23+
return new SchemaCache() {
24+
private final ConcurrentMap<String, Schema> cache = new ConcurrentHashMap<>();
25+
26+
@Override
27+
public Schema getOrParseSchema(String metaString) {
28+
return cache.computeIfAbsent(metaString, this::parse);
29+
}
30+
};
31+
}
32+
33+
}

0 commit comments

Comments
 (0)