Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@
<inlineHeader>${accumulo.build.license.header}</inlineHeader>
<excludes>
<exclude>src/main/java/org/apache/accumulo/core/bloomfilter/*.java</exclude>
<exclude>src/main/java/org/apache/accumulo/core/util/CountingInputStream.java</exclude>
<exclude>src/main/java/org/apache/accumulo/core/util/HostAndPort.java</exclude>
<exclude>src/test/resources/*.jceks</exclude>
<exclude>src/test/resources/org/apache/accumulo/core/file/rfile/*.rf</exclude>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.file.blockfile.cache;

import org.apache.accumulo.core.logging.LoggingBlockCache;
import org.apache.accumulo.core.spi.cache.BlockCache;
import org.apache.accumulo.core.spi.cache.CacheType;

public class BlockCacheUtil {
public static BlockCache instrument(CacheType type, BlockCache cache) {
if (cache == null) {
return null;
}

if (cache instanceof InstrumentedBlockCache || cache instanceof LoggingBlockCache) {
// its already instrumented
return cache;
}

return LoggingBlockCache.wrap(type, InstrumentedBlockCache.wrap(type, cache));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.file.blockfile.cache;

import java.util.Map;

import org.apache.accumulo.core.spi.cache.BlockCache;
import org.apache.accumulo.core.spi.cache.CacheEntry;
import org.apache.accumulo.core.spi.cache.CacheType;
import org.apache.accumulo.core.trace.ScanInstrumentation;

public class InstrumentedBlockCache implements BlockCache {

private final BlockCache blockCache;
private final ScanInstrumentation scanInstrumentation;
private final CacheType cacheType;

public InstrumentedBlockCache(CacheType cacheType, BlockCache blockCache,
ScanInstrumentation scanInstrumentation) {
this.blockCache = blockCache;
this.scanInstrumentation = scanInstrumentation;
this.cacheType = cacheType;
}

@Override
public CacheEntry cacheBlock(String blockName, byte[] buf) {
return blockCache.cacheBlock(blockName, buf);
}

@Override
public CacheEntry getBlock(String blockName) {
return blockCache.getBlock(blockName);
}

private final class CountingLoader implements Loader {

private final Loader loader;
int loadCount = 0;

private CountingLoader(Loader loader) {
this.loader = loader;
}

@Override
public Map<String,Loader> getDependencies() {
return loader.getDependencies();
}

@Override
public byte[] load(int maxSize, Map<String,byte[]> dependencies) {
loadCount++;
return loader.load(maxSize, dependencies);
}
}

@Override
public CacheEntry getBlock(String blockName, Loader loader) {
var cl = new CountingLoader(loader);
var ce = blockCache.getBlock(blockName, cl);
if (cl.loadCount == 0 && ce != null) {
scanInstrumentation.incrementCacheHit(cacheType);
} else {
scanInstrumentation.incrementCacheMiss(cacheType);
}
return ce;
}

@Override
public long getMaxHeapSize() {
return blockCache.getMaxHeapSize();
}

@Override
public long getMaxSize() {
return blockCache.getMaxSize();
}

@Override
public Stats getStats() {
return blockCache.getStats();
}

public static BlockCache wrap(CacheType cacheType, BlockCache cache) {
var si = ScanInstrumentation.get();
if (cache != null && si.enabled()) {
return new InstrumentedBlockCache(cacheType, cache, si);
} else {
return cache;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
import org.apache.accumulo.core.spi.cache.BlockCache;
import org.apache.accumulo.core.spi.cache.BlockCache.Loader;
import org.apache.accumulo.core.spi.cache.CacheEntry;
import org.apache.accumulo.core.spi.cache.CacheType;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.trace.ScanInstrumentation;
import org.apache.accumulo.core.util.CountingInputStream;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
Expand Down Expand Up @@ -405,6 +408,7 @@ public CachedBlockRead getMetaBlock(String blockName) throws IOException {
}

BlockReader _currBlock = getBCFile(null).getMetaBlock(blockName);
incrementCacheBypass(CacheType.INDEX);
return new CachedBlockRead(_currBlock);
}

Expand All @@ -421,6 +425,7 @@ public CachedBlockRead getMetaBlock(long offset, long compressedSize, long rawSi
}

BlockReader _currBlock = getBCFile(null).getDataBlock(offset, compressedSize, rawSize);
incrementCacheBypass(CacheType.INDEX);
return new CachedBlockRead(_currBlock);
}

Expand All @@ -443,6 +448,7 @@ public CachedBlockRead getDataBlock(int blockIndex) throws IOException {
}

BlockReader _currBlock = getBCFile().getDataBlock(blockIndex);
incrementCacheBypass(CacheType.DATA);
return new CachedBlockRead(_currBlock);
}

Expand All @@ -459,9 +465,14 @@ public CachedBlockRead getDataBlock(long offset, long compressedSize, long rawSi
}

BlockReader _currBlock = getBCFile().getDataBlock(offset, compressedSize, rawSize);
incrementCacheBypass(CacheType.DATA);
return new CachedBlockRead(_currBlock);
}

private void incrementCacheBypass(CacheType cacheType) {
ScanInstrumentation.get().incrementCacheBypass(cacheType);
}

@Override
public synchronized void close() throws IOException {
if (closed) {
Expand Down Expand Up @@ -491,12 +502,22 @@ public void setCacheProvider(CacheProvider cacheProvider) {
}

public static class CachedBlockRead extends DataInputStream {

private static InputStream wrapForTrace(InputStream inputStream) {
var scanInstrumentation = ScanInstrumentation.get();
if (scanInstrumentation.enabled()) {
return new CountingInputStream(inputStream);
} else {
return inputStream;
}
}

private final SeekableByteArrayInputStream seekableInput;
private final CacheEntry cb;
boolean indexable;

public CachedBlockRead(InputStream in) {
super(in);
super(wrapForTrace(in));
cb = null;
seekableInput = null;
indexable = false;
Expand All @@ -507,7 +528,7 @@ public CachedBlockRead(CacheEntry cb, byte[] buf) {
}

private CachedBlockRead(SeekableByteArrayInputStream seekableInput, CacheEntry cb) {
super(seekableInput);
super(wrapForTrace(seekableInput));
this.seekableInput = seekableInput;
this.cb = cb;
indexable = true;
Expand Down Expand Up @@ -536,5 +557,25 @@ public BlockIndex getIndex(Supplier<BlockIndex> indexSupplier) {
public void indexWeightChanged() {
cb.indexWeightChanged();
}

public void flushStats() {
if (in instanceof CountingInputStream) {
var cin = ((CountingInputStream) in);
ScanInstrumentation.get().incrementUncompressedBytesRead(cin.getCount());
cin.resetCount();
var src = cin.getWrappedStream();
if (src instanceof BlockReader) {
var br = (BlockReader) src;
br.flushStats();
}

}
}

@Override
public void close() throws IOException {
flushStats();
super.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.logging.LoggingBlockCache;
import org.apache.accumulo.core.file.blockfile.cache.BlockCacheUtil;
import org.apache.accumulo.core.spi.cache.BlockCache;
import org.apache.accumulo.core.spi.cache.CacheType;
import org.apache.accumulo.core.spi.scan.ScanDispatch;
Expand All @@ -33,8 +33,8 @@ public class ScanCacheProvider implements CacheProvider {
public ScanCacheProvider(AccumuloConfiguration tableConfig, ScanDispatch dispatch,
BlockCache indexCache, BlockCache dataCache) {

var loggingIndexCache = LoggingBlockCache.wrap(CacheType.INDEX, indexCache);
var loggingDataCache = LoggingBlockCache.wrap(CacheType.DATA, dataCache);
var loggingIndexCache = BlockCacheUtil.instrument(CacheType.INDEX, indexCache);
var loggingDataCache = BlockCacheUtil.instrument(CacheType.DATA, dataCache);

switch (dispatch.getIndexCacheUsage()) {
case ENABLED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,6 @@ public void next() throws IOException {
}

private void _next() throws IOException {

if (!hasTop) {
throw new IllegalStateException();
}
Expand Down Expand Up @@ -1164,6 +1163,12 @@ public void setCacheProvider(CacheProvider cacheProvider) {
public long estimateOverlappingEntries(KeyExtent extent) throws IOException {
throw new UnsupportedOperationException();
}

public void flushStats() {
if (currBlock != null) {
currBlock.flushStats();
}
}
}

public static class Reader extends HeapIterator implements FileSKVIterator {
Expand Down Expand Up @@ -1304,6 +1309,9 @@ private void closeLocalityGroupReaders(boolean ignoreIOExceptions) throws IOExce

@Override
public void closeDeepCopies() throws IOException {
for (LocalityGroupReader lgr : currentReaders) {
lgr.flushStats();
}
closeDeepCopies(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.apache.accumulo.core.spi.crypto.FileEncrypter;
import org.apache.accumulo.core.spi.crypto.NoFileDecrypter;
import org.apache.accumulo.core.spi.crypto.NoFileEncrypter;
import org.apache.accumulo.core.trace.ScanInstrumentation;
import org.apache.accumulo.core.util.CountingInputStream;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -468,6 +470,7 @@ private static final class RBlockState {
private final CompressionAlgorithm compressAlgo;
private Decompressor decompressor;
private final BlockRegion region;
private final InputStream rawInputStream;
private final InputStream in;
private volatile boolean closed;

Expand All @@ -481,9 +484,14 @@ public <InputStreamType extends InputStream & Seekable> RBlockState(
BoundedRangeFileInputStream boundedRangeFileInputStream = new BoundedRangeFileInputStream(
fsin, this.region.getOffset(), this.region.getCompressedSize());

if (ScanInstrumentation.get().enabled()) {
rawInputStream = new CountingInputStream(boundedRangeFileInputStream);
} else {
rawInputStream = boundedRangeFileInputStream;
}

try {
InputStream inputStreamToBeCompressed =
decrypter.decryptStream(boundedRangeFileInputStream);
InputStream inputStreamToBeCompressed = decrypter.decryptStream(rawInputStream);
this.in = compressAlgo.createDecompressionStream(inputStreamToBeCompressed, decompressor,
getFSInputBufferSize(conf));
} catch (IOException e) {
Expand All @@ -506,11 +514,20 @@ public BlockRegion getBlockRegion() {
return region;
}

public void flushStats() {
if (rawInputStream instanceof CountingInputStream) {
var ci = (CountingInputStream) rawInputStream;
ScanInstrumentation.get().incrementFileBytesRead(ci.getCount());
ci.resetCount();
}
}

public void finish() throws IOException {
synchronized (in) {
if (!closed) {
try {
in.close();
flushStats();
} finally {
closed = true;
if (decompressor != null) {
Expand Down Expand Up @@ -538,6 +555,10 @@ public static class BlockReader extends DataInputStream {
rBlkState = rbs;
}

public void flushStats() {
rBlkState.flushStats();
}

/**
* Finishing reading the block. Release all resources.
*/
Expand Down
Loading