Refactor ARM-optimized varint parsing.

This change improves deserialization performance by in two ways:

1) VarintParseSlowArm32 and VarintParseSlowArm64 return a pair for the updated buffer pointer and parsing result instead of storing the result and returning just the pointer. This allows the parsed result to be returned to higher level code in x1 rather than via the stack.

2) Remove prohibition on inlining VarintParseSlowArm32 and VarintParseSlowArm64. This prohibition was originally included to avoid bloating non-TDP code. Now that TDP is enabled the impact on code size is small.

PiperOrigin-RevId: 523142512
pull/12437/head
Protobuf Team Bot 2023-04-10 10:02:28 -07:00 committed by Copybara-Service
parent 7458e32f0d
commit 5a679dfb29
2 changed files with 206 additions and 217 deletions

View File

@ -591,206 +591,6 @@ const char* UnknownFieldParse(uint32_t tag, std::string* unknown,
return FieldParser(tag, field_parser, ptr, ctx);
}
#ifdef __aarch64__
// Generally, speaking, the ARM-optimized Varint decode algorithm is to extract
// and concatenate all potentially valid data bits, compute the actual length
// of the Varint, and mask off the data bits which are not actually part of the
// result. More detail on the two main parts is shown below.
//
// 1) Extract and concatenate all potentially valid data bits.
// Two ARM-specific features help significantly:
// a) Efficient and non-destructive bit extraction (UBFX)
// b) A single instruction can perform both an OR with a shifted
// second operand in one cycle. E.g., the following two lines do the same
// thing
// ```result = operand_1 | (operand2 << 7);```
// ```ORR %[result], %[operand_1], %[operand_2], LSL #7```
// The figure below shows the implementation for handling four chunks.
//
// Bits 32 31-24 23 22-16 15 14-8 7 6-0
// +----+---------+----+---------+----+---------+----+---------+
// |CB 3| Chunk 3 |CB 2| Chunk 2 |CB 1| Chunk 1 |CB 0| Chunk 0 |
// +----+---------+----+---------+----+---------+----+---------+
// | | | |
// UBFX UBFX UBFX UBFX -- cycle 1
// | | | |
// V V V V
// Combined LSL #7 and ORR Combined LSL #7 and ORR -- cycle 2
// | |
// V V
// Combined LSL #14 and ORR -- cycle 3
// |
// V
// Parsed bits 0-27
//
//
// 2) Calculate the index of the cleared continuation bit in order to determine
// where the encoded Varint ends and the size of the decoded value. The
// easiest way to do this is mask off all data bits, leaving just the
// continuation bits. We actually need to do the masking on an inverted
// copy of the data, which leaves a 1 in all continuation bits which were
// originally clear. The number of trailing zeroes in this value indicates
// the size of the Varint.
//
// AND 0x80 0x80 0x80 0x80 0x80 0x80 0x80 0x80
//
// Bits 63 55 47 39 31 23 15 7
// +----+--+----+--+----+--+----+--+----+--+----+--+----+--+----+--+
// ~ |CB 7| |CB 6| |CB 5| |CB 4| |CB 3| |CB 2| |CB 1| |CB 0| |
// +----+--+----+--+----+--+----+--+----+--+----+--+----+--+----+--+
// | | | | | | | |
// V V V V V V V V
// Bits 63 55 47 39 31 23 15 7
// +----+--+----+--+----+--+----+--+----+--+----+--+----+--+----+--+
// |~CB 7|0|~CB 6|0|~CB 5|0|~CB 4|0|~CB 3|0|~CB 2|0|~CB 1|0|~CB 0|0|
// +----+--+----+--+----+--+----+--+----+--+----+--+----+--+----+--+
// |
// CTZ
// V
// Index of first cleared continuation bit
//
//
// While this is implemented in C++ significant care has been taken to ensure
// the compiler emits the best instruction sequence. In some cases we use the
// following two functions to manipulate the compiler's scheduling decisions.
//
// Controls compiler scheduling by telling it that the first value is modified
// by the second value the callsite. This is useful if non-critical path
// instructions are too aggressively scheduled, resulting in a slowdown of the
// actual critical path due to opportunity costs. An example usage is shown
// where a false dependence of num_bits on result is added to prevent checking
// for a very unlikely error until all critical path instructions have been
// fetched.
//
// ```
// num_bits = <multiple operations to calculate new num_bits value>
// result = <multiple operations to calculate result>
// num_bits = ValueBarrier(num_bits, result);
// if (num_bits == 63) {
// ABSL_LOG(FATAL) << "Invalid num_bits value";
// }
// ```
PROTOBUF_ALWAYS_INLINE inline uint64_t ExtractAndMergeTwoChunks(
uint64_t data, uint64_t first_byte) {
ABSL_DCHECK_LE(first_byte, 6);
uint64_t first = Ubfx7(data, first_byte * 8);
uint64_t second = Ubfx7(data, (first_byte + 1) * 8);
return ValueBarrier(first | (second << 7));
}
struct SlowPathEncodedInfo {
const char* p;
uint64_t last8;
uint64_t valid_bits;
uint64_t valid_chunk_bits;
uint64_t masked_cont_bits;
};
// Performs multiple actions which are identical between 32 and 64 bit Varints
// in order to compute the length of the encoded Varint and compute the new
// of p.
PROTOBUF_ALWAYS_INLINE inline SlowPathEncodedInfo ComputeLengthAndUpdateP(
const char* p) {
SlowPathEncodedInfo result;
// Load the last two bytes of the encoded Varint.
std::memcpy(&result.last8, p + 2, sizeof(result.last8));
uint64_t mask = ValueBarrier(0x8080808080808080);
// Only set continuation bits remain
result.masked_cont_bits = ValueBarrier(mask & (~result.last8));
// The first cleared continuation bit is the most significant 1 in the
// reversed value. Result is undefined for an input of 0 and we handle that
// case below.
result.valid_bits = absl::countr_zero(result.masked_cont_bits);
// Calculates the number of chunks in the encoded Varint. This value is low
// by three as neither the cleared continuation chunk nor the first two chunks
// are counted.
uint64_t set_continuation_bits = result.valid_bits >> 3;
// Update p to point past the encoded Varint.
result.p = p + set_continuation_bits + 3;
// Calculate number of valid data bits in the decoded value so invalid bits
// can be masked off. Value is too low by 14 but we account for that when
// calculating the mask.
result.valid_chunk_bits = result.valid_bits - set_continuation_bits;
return result;
}
constexpr uint64_t kResultMaskUnshifted = 0xffffffffffffc000ULL;
constexpr uint64_t kFirstResultBitChunk1 = 1 * 7;
constexpr uint64_t kFirstResultBitChunk2 = 2 * 7;
constexpr uint64_t kFirstResultBitChunk3 = 3 * 7;
constexpr uint64_t kFirstResultBitChunk4 = 4 * 7;
constexpr uint64_t kFirstResultBitChunk6 = 6 * 7;
constexpr uint64_t kFirstResultBitChunk8 = 8 * 7;
constexpr uint64_t kValidBitsForInvalidVarint = 0x60;
PROTOBUF_NOINLINE const char* VarintParseSlowArm64(const char* p, uint64_t* out,
uint64_t first8) {
SlowPathEncodedInfo info = ComputeLengthAndUpdateP(p);
// Extract data bits from the low six chunks. This includes chunks zero and
// one which we already know are valid.
uint64_t merged_01 = ExtractAndMergeTwoChunks(first8, /*first_chunk=*/0);
uint64_t merged_23 = ExtractAndMergeTwoChunks(first8, /*first_chunk=*/2);
uint64_t merged_45 = ExtractAndMergeTwoChunks(first8, /*first_chunk=*/4);
// Low 42 bits of decoded value.
uint64_t result = merged_01 | merged_23 << kFirstResultBitChunk2 |
merged_45 << kFirstResultBitChunk4;
// This immediate ends in 14 zeroes since valid_chunk_bits is too low by 14.
uint64_t result_mask = kResultMaskUnshifted << info.valid_chunk_bits;
// masked_cont_bits is 0 iff the Varint is invalid.
if (PROTOBUF_PREDICT_FALSE(!info.masked_cont_bits)) {
*out = 0;
return nullptr;
}
// Test for early exit if Varint does not exceed 6 chunks. Branching on one
// bit is faster on ARM than via a compare and branch.
if (PROTOBUF_PREDICT_FALSE((info.valid_bits & 0x20) != 0)) {
// Extract data bits from high four chunks.
uint64_t merged_67 = ExtractAndMergeTwoChunks(first8, /*first_chunk=*/6);
// Last two chunks come from last two bytes of info.last8.
uint64_t merged_89 =
ExtractAndMergeTwoChunks(info.last8, /*first_chunk=*/6);
result |= merged_67 << kFirstResultBitChunk6;
result |= merged_89 << kFirstResultBitChunk8;
// Handle an invalid Varint with all 10 continuation bits set.
}
// Mask off invalid data bytes.
result &= ~result_mask;
*out = result;
return info.p;
}
// See comments in VarintParseSlowArm64 for a description of the algorithm.
// Differences in the 32 bit version are noted below.
PROTOBUF_NOINLINE const char* VarintParseSlowArm32(const char* p, uint32_t* out,
uint64_t first8) {
// This also skips the slop bytes.
SlowPathEncodedInfo info = ComputeLengthAndUpdateP(p);
// Extract data bits from chunks 1-4. Chunk zero is merged in below.
uint64_t merged_12 = ExtractAndMergeTwoChunks(first8, /*first_chunk=*/1);
uint64_t merged_34 = ExtractAndMergeTwoChunks(first8, /*first_chunk=*/3);
first8 = ValueBarrier(first8, p);
uint64_t result = Ubfx7(first8, /*start=*/0);
result = ValueBarrier(result | merged_12 << kFirstResultBitChunk1);
result = ValueBarrier(result | merged_34 << kFirstResultBitChunk3);
uint64_t result_mask = kResultMaskUnshifted << info.valid_chunk_bits;
result &= ~result_mask;
// It is extremely unlikely that a Varint is invalid so checking that
// condition isn't on the critical path. Here we make sure that we don't do so
// until result has been computed.
info.masked_cont_bits = ValueBarrier(info.masked_cont_bits, result);
if (PROTOBUF_PREDICT_FALSE(info.masked_cont_bits == 0)) {
// Makes the compiler think out was modified here. This ensures it won't
// predicate this extremely predictable branch.
out = ValueBarrier(out);
*out = 0;
return nullptr;
}
*out = result;
return info.p;
}
#endif // __aarch64__
} // namespace internal
} // namespace protobuf
} // namespace google

View File

@ -679,21 +679,84 @@ inline const char* VarintParseSlow(const char* p, uint32_t res, uint64_t* out) {
}
#ifdef __aarch64__
PROTOBUF_EXPORT
const char* VarintParseSlowArm64(const char* p, uint64_t* out, uint64_t first8);
PROTOBUF_EXPORT
const char* VarintParseSlowArm32(const char* p, uint32_t* out, uint64_t first8);
inline const char* VarintParseSlowArm(const char* p, uint32_t* out,
uint64_t first8) {
return VarintParseSlowArm32(p, out, first8);
}
inline const char* VarintParseSlowArm(const char* p, uint64_t* out,
uint64_t first8) {
return VarintParseSlowArm64(p, out, first8);
}
// Generally, speaking, the ARM-optimized Varint decode algorithm is to extract
// and concatenate all potentially valid data bits, compute the actual length
// of the Varint, and mask off the data bits which are not actually part of the
// result. More detail on the two main parts is shown below.
//
// 1) Extract and concatenate all potentially valid data bits.
// Two ARM-specific features help significantly:
// a) Efficient and non-destructive bit extraction (UBFX)
// b) A single instruction can perform both an OR with a shifted
// second operand in one cycle. E.g., the following two lines do the same
// thing
// ```result = operand_1 | (operand2 << 7);```
// ```ORR %[result], %[operand_1], %[operand_2], LSL #7```
// The figure below shows the implementation for handling four chunks.
//
// Bits 32 31-24 23 22-16 15 14-8 7 6-0
// +----+---------+----+---------+----+---------+----+---------+
// |CB 3| Chunk 3 |CB 2| Chunk 2 |CB 1| Chunk 1 |CB 0| Chunk 0 |
// +----+---------+----+---------+----+---------+----+---------+
// | | | |
// UBFX UBFX UBFX UBFX -- cycle 1
// | | | |
// V V V V
// Combined LSL #7 and ORR Combined LSL #7 and ORR -- cycle 2
// | |
// V V
// Combined LSL #14 and ORR -- cycle 3
// |
// V
// Parsed bits 0-27
//
//
// 2) Calculate the index of the cleared continuation bit in order to determine
// where the encoded Varint ends and the size of the decoded value. The
// easiest way to do this is mask off all data bits, leaving just the
// continuation bits. We actually need to do the masking on an inverted
// copy of the data, which leaves a 1 in all continuation bits which were
// originally clear. The number of trailing zeroes in this value indicates
// the size of the Varint.
//
// AND 0x80 0x80 0x80 0x80 0x80 0x80 0x80 0x80
//
// Bits 63 55 47 39 31 23 15 7
// +----+--+----+--+----+--+----+--+----+--+----+--+----+--+----+--+
// ~ |CB 7| |CB 6| |CB 5| |CB 4| |CB 3| |CB 2| |CB 1| |CB 0| |
// +----+--+----+--+----+--+----+--+----+--+----+--+----+--+----+--+
// | | | | | | | |
// V V V V V V V V
// Bits 63 55 47 39 31 23 15 7
// +----+--+----+--+----+--+----+--+----+--+----+--+----+--+----+--+
// |~CB 7|0|~CB 6|0|~CB 5|0|~CB 4|0|~CB 3|0|~CB 2|0|~CB 1|0|~CB 0|0|
// +----+--+----+--+----+--+----+--+----+--+----+--+----+--+----+--+
// |
// CTZ
// V
// Index of first cleared continuation bit
//
//
// While this is implemented in C++ significant care has been taken to ensure
// the compiler emits the best instruction sequence. In some cases we use the
// following two functions to manipulate the compiler's scheduling decisions.
//
// Controls compiler scheduling by telling it that the first value is modified
// by the second value the callsite. This is useful if non-critical path
// instructions are too aggressively scheduled, resulting in a slowdown of the
// actual critical path due to opportunity costs. An example usage is shown
// where a false dependence of num_bits on result is added to prevent checking
// for a very unlikely error until all critical path instructions have been
// fetched.
//
// ```
// num_bits = <multiple operations to calculate new num_bits value>
// result = <multiple operations to calculate result>
// num_bits = ValueBarrier(num_bits, result);
// if (num_bits == 63) {
// ABSL_LOG(FATAL) << "Invalid num_bits value";
// }
// ```
// Falsely indicate that the specific value is modified at this location. This
// prevents code which depends on this value from being scheduled earlier.
template <typename V1Type>
@ -715,7 +778,133 @@ static PROTOBUF_ALWAYS_INLINE inline uint64_t Ubfx7(uint64_t data,
return ValueBarrier((data >> start) & 0x7f);
}
#endif // __aarch64__
PROTOBUF_ALWAYS_INLINE inline uint64_t ExtractAndMergeTwoChunks(
uint64_t data, uint64_t first_byte) {
ABSL_DCHECK_LE(first_byte, 6U);
uint64_t first = Ubfx7(data, first_byte * 8);
uint64_t second = Ubfx7(data, (first_byte + 1) * 8);
return ValueBarrier(first | (second << 7));
}
struct SlowPathEncodedInfo {
const char* p;
uint64_t last8;
uint64_t valid_bits;
uint64_t valid_chunk_bits;
uint64_t masked_cont_bits;
};
// Performs multiple actions which are identical between 32 and 64 bit Varints
// in order to compute the length of the encoded Varint and compute the new
// of p.
PROTOBUF_ALWAYS_INLINE inline SlowPathEncodedInfo ComputeLengthAndUpdateP(
const char* p) {
SlowPathEncodedInfo result;
// Load the last two bytes of the encoded Varint.
std::memcpy(&result.last8, p + 2, sizeof(result.last8));
uint64_t mask = ValueBarrier(0x8080808080808080);
// Only set continuation bits remain
result.masked_cont_bits = ValueBarrier(mask & ~result.last8);
// The first cleared continuation bit is the most significant 1 in the
// reversed value. Result is undefined for an input of 0 and we handle that
// case below.
result.valid_bits = absl::countr_zero(result.masked_cont_bits);
// Calculates the number of chunks in the encoded Varint. This value is low
// by three as neither the cleared continuation chunk nor the first two chunks
// are counted.
uint64_t set_continuation_bits = result.valid_bits >> 3;
// Update p to point past the encoded Varint.
result.p = p + set_continuation_bits + 3;
// Calculate number of valid data bits in the decoded value so invalid bits
// can be masked off. Value is too low by 14 but we account for that when
// calculating the mask.
result.valid_chunk_bits = result.valid_bits - set_continuation_bits;
return result;
}
inline PROTOBUF_ALWAYS_INLINE std::pair<const char*, uint64_t>
VarintParseSlowArm64(const char* p, uint64_t first8) {
constexpr uint64_t kResultMaskUnshifted = 0xffffffffffffc000ULL;
constexpr uint64_t kFirstResultBitChunk2 = 2 * 7;
constexpr uint64_t kFirstResultBitChunk4 = 4 * 7;
constexpr uint64_t kFirstResultBitChunk6 = 6 * 7;
constexpr uint64_t kFirstResultBitChunk8 = 8 * 7;
SlowPathEncodedInfo info = ComputeLengthAndUpdateP(p);
// Extract data bits from the low six chunks. This includes chunks zero and
// one which we already know are valid.
uint64_t merged_01 = ExtractAndMergeTwoChunks(first8, /*first_chunk=*/0);
uint64_t merged_23 = ExtractAndMergeTwoChunks(first8, /*first_chunk=*/2);
uint64_t merged_45 = ExtractAndMergeTwoChunks(first8, /*first_chunk=*/4);
// Low 42 bits of decoded value.
uint64_t result = merged_01 | (merged_23 << kFirstResultBitChunk2) |
(merged_45 << kFirstResultBitChunk4);
// This immediate ends in 14 zeroes since valid_chunk_bits is too low by 14.
uint64_t result_mask = kResultMaskUnshifted << info.valid_chunk_bits;
// iff the Varint i invalid.
if (PROTOBUF_PREDICT_FALSE(info.masked_cont_bits == 0)) {
return {nullptr, 0};
}
// Test for early exit if Varint does not exceed 6 chunks. Branching on one
// bit is faster on ARM than via a compare and branch.
if (PROTOBUF_PREDICT_FALSE((info.valid_bits & 0x20) != 0)) {
// Extract data bits from high four chunks.
uint64_t merged_67 = ExtractAndMergeTwoChunks(first8, /*first_chunk=*/6);
// Last two chunks come from last two bytes of info.last8.
uint64_t merged_89 =
ExtractAndMergeTwoChunks(info.last8, /*first_chunk=*/6);
result |= merged_67 << kFirstResultBitChunk6;
result |= merged_89 << kFirstResultBitChunk8;
// Handle an invalid Varint with all 10 continuation bits set.
}
// Mask off invalid data bytes.
result &= ~result_mask;
return {info.p, result};
}
// See comments in VarintParseSlowArm64 for a description of the algorithm.
// Differences in the 32 bit version are noted below.
inline PROTOBUF_ALWAYS_INLINE std::pair<const char*, uint32_t>
VarintParseSlowArm32(const char* p, uint64_t first8) {
constexpr uint64_t kResultMaskUnshifted = 0xffffffffffffc000ULL;
constexpr uint64_t kFirstResultBitChunk1 = 1 * 7;
constexpr uint64_t kFirstResultBitChunk3 = 3 * 7;
// This also skips the slop bytes.
SlowPathEncodedInfo info = ComputeLengthAndUpdateP(p);
// Extract data bits from chunks 1-4. Chunk zero is merged in below.
uint64_t merged_12 = ExtractAndMergeTwoChunks(first8, /*first_chunk=*/1);
uint64_t merged_34 = ExtractAndMergeTwoChunks(first8, /*first_chunk=*/3);
first8 = ValueBarrier(first8, p);
uint64_t result = Ubfx7(first8, /*start=*/0);
result = ValueBarrier(result | merged_12 << kFirstResultBitChunk1);
result = ValueBarrier(result | merged_34 << kFirstResultBitChunk3);
uint64_t result_mask = kResultMaskUnshifted << info.valid_chunk_bits;
result &= ~result_mask;
// It is extremely unlikely that a Varint is invalid so checking that
// condition isn't on the critical path. Here we make sure that we don't do so
// until result has been computed.
info.masked_cont_bits = ValueBarrier(info.masked_cont_bits, result);
if (PROTOBUF_PREDICT_FALSE(info.masked_cont_bits == 0)) {
return {nullptr, 0};
}
return {info.p, result};
}
static const char* VarintParseSlowArm(const char* p, uint32_t* out,
uint64_t first8) {
auto tmp = VarintParseSlowArm32(p, first8);
*out = tmp.second;
return tmp.first;
}
static const char* VarintParseSlowArm(const char* p, uint64_t* out,
uint64_t first8) {
auto tmp = VarintParseSlowArm64(p, first8);
*out = tmp.second;
return tmp.first;
}
#endif
template <typename T>
PROTOBUF_NODISCARD const char* VarintParse(const char* p, T* out) {
@ -749,7 +938,7 @@ PROTOBUF_NODISCARD const char* VarintParse(const char* p, T* out) {
}
// Used for tags, could read up to 5 bytes which must be available.
// Caller must ensure its safe to call.
// Caller must ensure it's safe to call.
PROTOBUF_EXPORT
std::pair<const char*, uint32_t> ReadTagFallback(const char* p, uint32_t res);