songhieng's picture
Upload folder using huggingface_hub
c1b3a0c verified
raw
history blame
24.2 kB
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { CallCredentials } from './call-credentials';
import { LogVerbosity, Status } from './constants';
import { Deadline } from './deadline';
import { Metadata } from './metadata';
import { CallConfig } from './resolver';
import * as logging from './logging';
import {
Call,
InterceptingListener,
MessageContext,
StatusObject,
WriteCallback,
WriteObject,
} from './call-interface';
import {
LoadBalancingCall,
StatusObjectWithProgress,
} from './load-balancing-call';
import { InternalChannel } from './internal-channel';
const TRACER_NAME = 'retrying_call';
export class RetryThrottler {
private tokens: number;
constructor(
private readonly maxTokens: number,
private readonly tokenRatio: number,
previousRetryThrottler?: RetryThrottler
) {
if (previousRetryThrottler) {
/* When carrying over tokens from a previous config, rescale them to the
* new max value */
this.tokens =
previousRetryThrottler.tokens *
(maxTokens / previousRetryThrottler.maxTokens);
} else {
this.tokens = maxTokens;
}
}
addCallSucceeded() {
this.tokens = Math.max(this.tokens + this.tokenRatio, this.maxTokens);
}
addCallFailed() {
this.tokens = Math.min(this.tokens - 1, 0);
}
canRetryCall() {
return this.tokens > this.maxTokens / 2;
}
}
export class MessageBufferTracker {
private totalAllocated = 0;
private allocatedPerCall: Map<number, number> = new Map<number, number>();
constructor(private totalLimit: number, private limitPerCall: number) {}
allocate(size: number, callId: number): boolean {
const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
if (
this.limitPerCall - currentPerCall < size ||
this.totalLimit - this.totalAllocated < size
) {
return false;
}
this.allocatedPerCall.set(callId, currentPerCall + size);
this.totalAllocated += size;
return true;
}
free(size: number, callId: number) {
if (this.totalAllocated < size) {
throw new Error(
`Invalid buffer allocation state: call ${callId} freed ${size} > total allocated ${this.totalAllocated}`
);
}
this.totalAllocated -= size;
const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
if (currentPerCall < size) {
throw new Error(
`Invalid buffer allocation state: call ${callId} freed ${size} > allocated for call ${currentPerCall}`
);
}
this.allocatedPerCall.set(callId, currentPerCall - size);
}
freeAll(callId: number) {
const currentPerCall = this.allocatedPerCall.get(callId) ?? 0;
if (this.totalAllocated < currentPerCall) {
throw new Error(
`Invalid buffer allocation state: call ${callId} allocated ${currentPerCall} > total allocated ${this.totalAllocated}`
);
}
this.totalAllocated -= currentPerCall;
this.allocatedPerCall.delete(callId);
}
}
type UnderlyingCallState = 'ACTIVE' | 'COMPLETED';
interface UnderlyingCall {
state: UnderlyingCallState;
call: LoadBalancingCall;
nextMessageToSend: number;
}
/**
* A retrying call can be in one of these states:
* RETRY: Retries are configured and new attempts may be sent
* HEDGING: Hedging is configured and new attempts may be sent
* TRANSPARENT_ONLY: Neither retries nor hedging are configured, and
* transparent retry attempts may still be sent
* COMMITTED: One attempt is committed, and no new attempts will be
* sent
*/
type RetryingCallState = 'RETRY' | 'HEDGING' | 'TRANSPARENT_ONLY' | 'COMMITTED';
/**
* The different types of objects that can be stored in the write buffer, with
* the following meanings:
* MESSAGE: This is a message to be sent.
* HALF_CLOSE: When this entry is reached, the calls should send a half-close.
* FREED: This slot previously contained a message that has been sent on all
* child calls and is no longer needed.
*/
type WriteBufferEntryType = 'MESSAGE' | 'HALF_CLOSE' | 'FREED';
/**
* Entry in the buffer of messages to send to the remote end.
*/
interface WriteBufferEntry {
entryType: WriteBufferEntryType;
/**
* Message to send.
* Only populated if entryType is MESSAGE.
*/
message?: WriteObject;
/**
* Callback to call after sending the message.
* Only populated if entryType is MESSAGE and the call is in the COMMITTED
* state.
*/
callback?: WriteCallback;
/**
* Indicates whether the message is allocated in the buffer tracker. Ignored
* if entryType is not MESSAGE. Should be the return value of
* bufferTracker.allocate.
*/
allocated: boolean;
}
const PREVIONS_RPC_ATTEMPTS_METADATA_KEY = 'grpc-previous-rpc-attempts';
export class RetryingCall implements Call {
private state: RetryingCallState;
private listener: InterceptingListener | null = null;
private initialMetadata: Metadata | null = null;
private underlyingCalls: UnderlyingCall[] = [];
private writeBuffer: WriteBufferEntry[] = [];
/**
* The offset of message indices in the writeBuffer. For example, if
* writeBufferOffset is 10, message 10 is in writeBuffer[0] and message 15
* is in writeBuffer[5].
*/
private writeBufferOffset = 0;
/**
* Tracks whether a read has been started, so that we know whether to start
* reads on new child calls. This only matters for the first read, because
* once a message comes in the child call becomes committed and there will
* be no new child calls.
*/
private readStarted = false;
private transparentRetryUsed = false;
/**
* Number of attempts so far
*/
private attempts = 0;
private hedgingTimer: NodeJS.Timeout | null = null;
private committedCallIndex: number | null = null;
private initialRetryBackoffSec = 0;
private nextRetryBackoffSec = 0;
constructor(
private readonly channel: InternalChannel,
private readonly callConfig: CallConfig,
private readonly methodName: string,
private readonly host: string,
private readonly credentials: CallCredentials,
private readonly deadline: Deadline,
private readonly callNumber: number,
private readonly bufferTracker: MessageBufferTracker,
private readonly retryThrottler?: RetryThrottler
) {
if (callConfig.methodConfig.retryPolicy) {
this.state = 'RETRY';
const retryPolicy = callConfig.methodConfig.retryPolicy;
this.nextRetryBackoffSec = this.initialRetryBackoffSec = Number(
retryPolicy.initialBackoff.substring(
0,
retryPolicy.initialBackoff.length - 1
)
);
} else if (callConfig.methodConfig.hedgingPolicy) {
this.state = 'HEDGING';
} else {
this.state = 'TRANSPARENT_ONLY';
}
}
getCallNumber(): number {
return this.callNumber;
}
private trace(text: string): void {
logging.trace(
LogVerbosity.DEBUG,
TRACER_NAME,
'[' + this.callNumber + '] ' + text
);
}
private reportStatus(statusObject: StatusObject) {
this.trace(
'ended with status: code=' +
statusObject.code +
' details="' +
statusObject.details +
'"'
);
this.bufferTracker.freeAll(this.callNumber);
this.writeBufferOffset = this.writeBufferOffset + this.writeBuffer.length;
this.writeBuffer = [];
process.nextTick(() => {
// Explicitly construct status object to remove progress field
this.listener?.onReceiveStatus({
code: statusObject.code,
details: statusObject.details,
metadata: statusObject.metadata,
});
});
}
cancelWithStatus(status: Status, details: string): void {
this.trace(
'cancelWithStatus code: ' + status + ' details: "' + details + '"'
);
this.reportStatus({ code: status, details, metadata: new Metadata() });
for (const { call } of this.underlyingCalls) {
call.cancelWithStatus(status, details);
}
}
getPeer(): string {
if (this.committedCallIndex !== null) {
return this.underlyingCalls[this.committedCallIndex].call.getPeer();
} else {
return 'unknown';
}
}
private getBufferEntry(messageIndex: number): WriteBufferEntry {
return (
this.writeBuffer[messageIndex - this.writeBufferOffset] ?? {
entryType: 'FREED',
allocated: false,
}
);
}
private getNextBufferIndex() {
return this.writeBufferOffset + this.writeBuffer.length;
}
private clearSentMessages() {
if (this.state !== 'COMMITTED') {
return;
}
const earliestNeededMessageIndex =
this.underlyingCalls[this.committedCallIndex!].nextMessageToSend;
for (
let messageIndex = this.writeBufferOffset;
messageIndex < earliestNeededMessageIndex;
messageIndex++
) {
const bufferEntry = this.getBufferEntry(messageIndex);
if (bufferEntry.allocated) {
this.bufferTracker.free(
bufferEntry.message!.message.length,
this.callNumber
);
}
}
this.writeBuffer = this.writeBuffer.slice(
earliestNeededMessageIndex - this.writeBufferOffset
);
this.writeBufferOffset = earliestNeededMessageIndex;
}
private commitCall(index: number) {
if (this.state === 'COMMITTED') {
return;
}
if (this.underlyingCalls[index].state === 'COMPLETED') {
return;
}
this.trace(
'Committing call [' +
this.underlyingCalls[index].call.getCallNumber() +
'] at index ' +
index
);
this.state = 'COMMITTED';
this.committedCallIndex = index;
for (let i = 0; i < this.underlyingCalls.length; i++) {
if (i === index) {
continue;
}
if (this.underlyingCalls[i].state === 'COMPLETED') {
continue;
}
this.underlyingCalls[i].state = 'COMPLETED';
this.underlyingCalls[i].call.cancelWithStatus(
Status.CANCELLED,
'Discarded in favor of other hedged attempt'
);
}
this.clearSentMessages();
}
private commitCallWithMostMessages() {
if (this.state === 'COMMITTED') {
return;
}
let mostMessages = -1;
let callWithMostMessages = -1;
for (const [index, childCall] of this.underlyingCalls.entries()) {
if (
childCall.state === 'ACTIVE' &&
childCall.nextMessageToSend > mostMessages
) {
mostMessages = childCall.nextMessageToSend;
callWithMostMessages = index;
}
}
if (callWithMostMessages === -1) {
/* There are no active calls, disable retries to force the next call that
* is started to be committed. */
this.state = 'TRANSPARENT_ONLY';
} else {
this.commitCall(callWithMostMessages);
}
}
private isStatusCodeInList(list: (Status | string)[], code: Status) {
return list.some(
value =>
value === code ||
value.toString().toLowerCase() === Status[code].toLowerCase()
);
}
private getNextRetryBackoffMs() {
const retryPolicy = this.callConfig?.methodConfig.retryPolicy;
if (!retryPolicy) {
return 0;
}
const nextBackoffMs = Math.random() * this.nextRetryBackoffSec * 1000;
const maxBackoffSec = Number(
retryPolicy.maxBackoff.substring(0, retryPolicy.maxBackoff.length - 1)
);
this.nextRetryBackoffSec = Math.min(
this.nextRetryBackoffSec * retryPolicy.backoffMultiplier,
maxBackoffSec
);
return nextBackoffMs;
}
private maybeRetryCall(
pushback: number | null,
callback: (retried: boolean) => void
) {
if (this.state !== 'RETRY') {
callback(false);
return;
}
const retryPolicy = this.callConfig!.methodConfig.retryPolicy!;
if (this.attempts >= Math.min(retryPolicy.maxAttempts, 5)) {
callback(false);
return;
}
let retryDelayMs: number;
if (pushback === null) {
retryDelayMs = this.getNextRetryBackoffMs();
} else if (pushback < 0) {
this.state = 'TRANSPARENT_ONLY';
callback(false);
return;
} else {
retryDelayMs = pushback;
this.nextRetryBackoffSec = this.initialRetryBackoffSec;
}
setTimeout(() => {
if (this.state !== 'RETRY') {
callback(false);
return;
}
if (this.retryThrottler?.canRetryCall() ?? true) {
callback(true);
this.attempts += 1;
this.startNewAttempt();
}
}, retryDelayMs);
}
private countActiveCalls(): number {
let count = 0;
for (const call of this.underlyingCalls) {
if (call?.state === 'ACTIVE') {
count += 1;
}
}
return count;
}
private handleProcessedStatus(
status: StatusObject,
callIndex: number,
pushback: number | null
) {
switch (this.state) {
case 'COMMITTED':
case 'TRANSPARENT_ONLY':
this.commitCall(callIndex);
this.reportStatus(status);
break;
case 'HEDGING':
if (
this.isStatusCodeInList(
this.callConfig!.methodConfig.hedgingPolicy!.nonFatalStatusCodes ??
[],
status.code
)
) {
this.retryThrottler?.addCallFailed();
let delayMs: number;
if (pushback === null) {
delayMs = 0;
} else if (pushback < 0) {
this.state = 'TRANSPARENT_ONLY';
this.commitCall(callIndex);
this.reportStatus(status);
return;
} else {
delayMs = pushback;
}
setTimeout(() => {
this.maybeStartHedgingAttempt();
// If after trying to start a call there are no active calls, this was the last one
if (this.countActiveCalls() === 0) {
this.commitCall(callIndex);
this.reportStatus(status);
}
}, delayMs);
} else {
this.commitCall(callIndex);
this.reportStatus(status);
}
break;
case 'RETRY':
if (
this.isStatusCodeInList(
this.callConfig!.methodConfig.retryPolicy!.retryableStatusCodes,
status.code
)
) {
this.retryThrottler?.addCallFailed();
this.maybeRetryCall(pushback, retried => {
if (!retried) {
this.commitCall(callIndex);
this.reportStatus(status);
}
});
} else {
this.commitCall(callIndex);
this.reportStatus(status);
}
break;
}
}
private getPushback(metadata: Metadata): number | null {
const mdValue = metadata.get('grpc-retry-pushback-ms');
if (mdValue.length === 0) {
return null;
}
try {
return parseInt(mdValue[0] as string);
} catch (e) {
return -1;
}
}
private handleChildStatus(
status: StatusObjectWithProgress,
callIndex: number
) {
if (this.underlyingCalls[callIndex].state === 'COMPLETED') {
return;
}
this.trace(
'state=' +
this.state +
' handling status with progress ' +
status.progress +
' from child [' +
this.underlyingCalls[callIndex].call.getCallNumber() +
'] in state ' +
this.underlyingCalls[callIndex].state
);
this.underlyingCalls[callIndex].state = 'COMPLETED';
if (status.code === Status.OK) {
this.retryThrottler?.addCallSucceeded();
this.commitCall(callIndex);
this.reportStatus(status);
return;
}
if (this.state === 'COMMITTED') {
this.reportStatus(status);
return;
}
const pushback = this.getPushback(status.metadata);
switch (status.progress) {
case 'NOT_STARTED':
// RPC never leaves the client, always safe to retry
this.startNewAttempt();
break;
case 'REFUSED':
// RPC reaches the server library, but not the server application logic
if (this.transparentRetryUsed) {
this.handleProcessedStatus(status, callIndex, pushback);
} else {
this.transparentRetryUsed = true;
this.startNewAttempt();
}
break;
case 'DROP':
this.commitCall(callIndex);
this.reportStatus(status);
break;
case 'PROCESSED':
this.handleProcessedStatus(status, callIndex, pushback);
break;
}
}
private maybeStartHedgingAttempt() {
if (this.state !== 'HEDGING') {
return;
}
if (!this.callConfig.methodConfig.hedgingPolicy) {
return;
}
const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) {
return;
}
this.attempts += 1;
this.startNewAttempt();
this.maybeStartHedgingTimer();
}
private maybeStartHedgingTimer() {
if (this.hedgingTimer) {
clearTimeout(this.hedgingTimer);
}
if (this.state !== 'HEDGING') {
return;
}
if (!this.callConfig.methodConfig.hedgingPolicy) {
return;
}
const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy;
if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) {
return;
}
const hedgingDelayString = hedgingPolicy.hedgingDelay ?? '0s';
const hedgingDelaySec = Number(
hedgingDelayString.substring(0, hedgingDelayString.length - 1)
);
this.hedgingTimer = setTimeout(() => {
this.maybeStartHedgingAttempt();
}, hedgingDelaySec * 1000);
this.hedgingTimer.unref?.();
}
private startNewAttempt() {
const child = this.channel.createLoadBalancingCall(
this.callConfig,
this.methodName,
this.host,
this.credentials,
this.deadline
);
this.trace(
'Created child call [' +
child.getCallNumber() +
'] for attempt ' +
this.attempts
);
const index = this.underlyingCalls.length;
this.underlyingCalls.push({
state: 'ACTIVE',
call: child,
nextMessageToSend: 0,
});
const previousAttempts = this.attempts - 1;
const initialMetadata = this.initialMetadata!.clone();
if (previousAttempts > 0) {
initialMetadata.set(
PREVIONS_RPC_ATTEMPTS_METADATA_KEY,
`${previousAttempts}`
);
}
let receivedMetadata = false;
child.start(initialMetadata, {
onReceiveMetadata: metadata => {
this.trace(
'Received metadata from child [' + child.getCallNumber() + ']'
);
this.commitCall(index);
receivedMetadata = true;
if (previousAttempts > 0) {
metadata.set(
PREVIONS_RPC_ATTEMPTS_METADATA_KEY,
`${previousAttempts}`
);
}
if (this.underlyingCalls[index].state === 'ACTIVE') {
this.listener!.onReceiveMetadata(metadata);
}
},
onReceiveMessage: message => {
this.trace(
'Received message from child [' + child.getCallNumber() + ']'
);
this.commitCall(index);
if (this.underlyingCalls[index].state === 'ACTIVE') {
this.listener!.onReceiveMessage(message);
}
},
onReceiveStatus: status => {
this.trace(
'Received status from child [' + child.getCallNumber() + ']'
);
if (!receivedMetadata && previousAttempts > 0) {
status.metadata.set(
PREVIONS_RPC_ATTEMPTS_METADATA_KEY,
`${previousAttempts}`
);
}
this.handleChildStatus(status, index);
},
});
this.sendNextChildMessage(index);
if (this.readStarted) {
child.startRead();
}
}
start(metadata: Metadata, listener: InterceptingListener): void {
this.trace('start called');
this.listener = listener;
this.initialMetadata = metadata;
this.attempts += 1;
this.startNewAttempt();
this.maybeStartHedgingTimer();
}
private handleChildWriteCompleted(childIndex: number) {
const childCall = this.underlyingCalls[childIndex];
const messageIndex = childCall.nextMessageToSend;
this.getBufferEntry(messageIndex).callback?.();
this.clearSentMessages();
childCall.nextMessageToSend += 1;
this.sendNextChildMessage(childIndex);
}
private sendNextChildMessage(childIndex: number) {
const childCall = this.underlyingCalls[childIndex];
if (childCall.state === 'COMPLETED') {
return;
}
if (this.getBufferEntry(childCall.nextMessageToSend)) {
const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend);
switch (bufferEntry.entryType) {
case 'MESSAGE':
childCall.call.sendMessageWithContext(
{
callback: error => {
// Ignore error
this.handleChildWriteCompleted(childIndex);
},
},
bufferEntry.message!.message
);
break;
case 'HALF_CLOSE':
childCall.nextMessageToSend += 1;
childCall.call.halfClose();
break;
case 'FREED':
// Should not be possible
break;
}
}
}
sendMessageWithContext(context: MessageContext, message: Buffer): void {
this.trace('write() called with message of length ' + message.length);
const writeObj: WriteObject = {
message,
flags: context.flags,
};
const messageIndex = this.getNextBufferIndex();
const bufferEntry: WriteBufferEntry = {
entryType: 'MESSAGE',
message: writeObj,
allocated: this.bufferTracker.allocate(message.length, this.callNumber),
};
this.writeBuffer.push(bufferEntry);
if (bufferEntry.allocated) {
context.callback?.();
for (const [callIndex, call] of this.underlyingCalls.entries()) {
if (
call.state === 'ACTIVE' &&
call.nextMessageToSend === messageIndex
) {
call.call.sendMessageWithContext(
{
callback: error => {
// Ignore error
this.handleChildWriteCompleted(callIndex);
},
},
message
);
}
}
} else {
this.commitCallWithMostMessages();
// commitCallWithMostMessages can fail if we are between ping attempts
if (this.committedCallIndex === null) {
return;
}
const call = this.underlyingCalls[this.committedCallIndex];
bufferEntry.callback = context.callback;
if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
call.call.sendMessageWithContext(
{
callback: error => {
// Ignore error
this.handleChildWriteCompleted(this.committedCallIndex!);
},
},
message
);
}
}
}
startRead(): void {
this.trace('startRead called');
this.readStarted = true;
for (const underlyingCall of this.underlyingCalls) {
if (underlyingCall?.state === 'ACTIVE') {
underlyingCall.call.startRead();
}
}
}
halfClose(): void {
this.trace('halfClose called');
const halfCloseIndex = this.getNextBufferIndex();
this.writeBuffer.push({
entryType: 'HALF_CLOSE',
allocated: false,
});
for (const call of this.underlyingCalls) {
if (
call?.state === 'ACTIVE' &&
call.nextMessageToSend === halfCloseIndex
) {
call.nextMessageToSend += 1;
call.call.halfClose();
}
}
}
setCredentials(newCredentials: CallCredentials): void {
throw new Error('Method not implemented.');
}
getMethod(): string {
return this.methodName;
}
getHost(): string {
return this.host;
}
}