|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import { CallCredentials } from './call-credentials'; |
|
import { LogVerbosity, Status } from './constants'; |
|
import { Deadline } from './deadline'; |
|
import { Metadata } from './metadata'; |
|
import { CallConfig } from './resolver'; |
|
import * as logging from './logging'; |
|
import { |
|
Call, |
|
InterceptingListener, |
|
MessageContext, |
|
StatusObject, |
|
WriteCallback, |
|
WriteObject, |
|
} from './call-interface'; |
|
import { |
|
LoadBalancingCall, |
|
StatusObjectWithProgress, |
|
} from './load-balancing-call'; |
|
import { InternalChannel } from './internal-channel'; |
|
|
|
const TRACER_NAME = 'retrying_call'; |
|
|
|
export class RetryThrottler { |
|
private tokens: number; |
|
constructor( |
|
private readonly maxTokens: number, |
|
private readonly tokenRatio: number, |
|
previousRetryThrottler?: RetryThrottler |
|
) { |
|
if (previousRetryThrottler) { |
|
|
|
|
|
this.tokens = |
|
previousRetryThrottler.tokens * |
|
(maxTokens / previousRetryThrottler.maxTokens); |
|
} else { |
|
this.tokens = maxTokens; |
|
} |
|
} |
|
|
|
addCallSucceeded() { |
|
this.tokens = Math.max(this.tokens + this.tokenRatio, this.maxTokens); |
|
} |
|
|
|
addCallFailed() { |
|
this.tokens = Math.min(this.tokens - 1, 0); |
|
} |
|
|
|
canRetryCall() { |
|
return this.tokens > this.maxTokens / 2; |
|
} |
|
} |
|
|
|
export class MessageBufferTracker { |
|
private totalAllocated = 0; |
|
private allocatedPerCall: Map<number, number> = new Map<number, number>(); |
|
|
|
constructor(private totalLimit: number, private limitPerCall: number) {} |
|
|
|
allocate(size: number, callId: number): boolean { |
|
const currentPerCall = this.allocatedPerCall.get(callId) ?? 0; |
|
if ( |
|
this.limitPerCall - currentPerCall < size || |
|
this.totalLimit - this.totalAllocated < size |
|
) { |
|
return false; |
|
} |
|
this.allocatedPerCall.set(callId, currentPerCall + size); |
|
this.totalAllocated += size; |
|
return true; |
|
} |
|
|
|
free(size: number, callId: number) { |
|
if (this.totalAllocated < size) { |
|
throw new Error( |
|
`Invalid buffer allocation state: call ${callId} freed ${size} > total allocated ${this.totalAllocated}` |
|
); |
|
} |
|
this.totalAllocated -= size; |
|
const currentPerCall = this.allocatedPerCall.get(callId) ?? 0; |
|
if (currentPerCall < size) { |
|
throw new Error( |
|
`Invalid buffer allocation state: call ${callId} freed ${size} > allocated for call ${currentPerCall}` |
|
); |
|
} |
|
this.allocatedPerCall.set(callId, currentPerCall - size); |
|
} |
|
|
|
freeAll(callId: number) { |
|
const currentPerCall = this.allocatedPerCall.get(callId) ?? 0; |
|
if (this.totalAllocated < currentPerCall) { |
|
throw new Error( |
|
`Invalid buffer allocation state: call ${callId} allocated ${currentPerCall} > total allocated ${this.totalAllocated}` |
|
); |
|
} |
|
this.totalAllocated -= currentPerCall; |
|
this.allocatedPerCall.delete(callId); |
|
} |
|
} |
|
|
|
type UnderlyingCallState = 'ACTIVE' | 'COMPLETED'; |
|
|
|
interface UnderlyingCall { |
|
state: UnderlyingCallState; |
|
call: LoadBalancingCall; |
|
nextMessageToSend: number; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type RetryingCallState = 'RETRY' | 'HEDGING' | 'TRANSPARENT_ONLY' | 'COMMITTED'; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type WriteBufferEntryType = 'MESSAGE' | 'HALF_CLOSE' | 'FREED'; |
|
|
|
|
|
|
|
|
|
interface WriteBufferEntry { |
|
entryType: WriteBufferEntryType; |
|
|
|
|
|
|
|
|
|
message?: WriteObject; |
|
|
|
|
|
|
|
|
|
|
|
callback?: WriteCallback; |
|
|
|
|
|
|
|
|
|
|
|
allocated: boolean; |
|
} |
|
|
|
const PREVIONS_RPC_ATTEMPTS_METADATA_KEY = 'grpc-previous-rpc-attempts'; |
|
|
|
export class RetryingCall implements Call { |
|
private state: RetryingCallState; |
|
private listener: InterceptingListener | null = null; |
|
private initialMetadata: Metadata | null = null; |
|
private underlyingCalls: UnderlyingCall[] = []; |
|
private writeBuffer: WriteBufferEntry[] = []; |
|
|
|
|
|
|
|
|
|
|
|
private writeBufferOffset = 0; |
|
|
|
|
|
|
|
|
|
|
|
|
|
private readStarted = false; |
|
private transparentRetryUsed = false; |
|
|
|
|
|
|
|
private attempts = 0; |
|
private hedgingTimer: NodeJS.Timeout | null = null; |
|
private committedCallIndex: number | null = null; |
|
private initialRetryBackoffSec = 0; |
|
private nextRetryBackoffSec = 0; |
|
constructor( |
|
private readonly channel: InternalChannel, |
|
private readonly callConfig: CallConfig, |
|
private readonly methodName: string, |
|
private readonly host: string, |
|
private readonly credentials: CallCredentials, |
|
private readonly deadline: Deadline, |
|
private readonly callNumber: number, |
|
private readonly bufferTracker: MessageBufferTracker, |
|
private readonly retryThrottler?: RetryThrottler |
|
) { |
|
if (callConfig.methodConfig.retryPolicy) { |
|
this.state = 'RETRY'; |
|
const retryPolicy = callConfig.methodConfig.retryPolicy; |
|
this.nextRetryBackoffSec = this.initialRetryBackoffSec = Number( |
|
retryPolicy.initialBackoff.substring( |
|
0, |
|
retryPolicy.initialBackoff.length - 1 |
|
) |
|
); |
|
} else if (callConfig.methodConfig.hedgingPolicy) { |
|
this.state = 'HEDGING'; |
|
} else { |
|
this.state = 'TRANSPARENT_ONLY'; |
|
} |
|
} |
|
getCallNumber(): number { |
|
return this.callNumber; |
|
} |
|
|
|
private trace(text: string): void { |
|
logging.trace( |
|
LogVerbosity.DEBUG, |
|
TRACER_NAME, |
|
'[' + this.callNumber + '] ' + text |
|
); |
|
} |
|
|
|
private reportStatus(statusObject: StatusObject) { |
|
this.trace( |
|
'ended with status: code=' + |
|
statusObject.code + |
|
' details="' + |
|
statusObject.details + |
|
'"' |
|
); |
|
this.bufferTracker.freeAll(this.callNumber); |
|
this.writeBufferOffset = this.writeBufferOffset + this.writeBuffer.length; |
|
this.writeBuffer = []; |
|
process.nextTick(() => { |
|
|
|
this.listener?.onReceiveStatus({ |
|
code: statusObject.code, |
|
details: statusObject.details, |
|
metadata: statusObject.metadata, |
|
}); |
|
}); |
|
} |
|
|
|
cancelWithStatus(status: Status, details: string): void { |
|
this.trace( |
|
'cancelWithStatus code: ' + status + ' details: "' + details + '"' |
|
); |
|
this.reportStatus({ code: status, details, metadata: new Metadata() }); |
|
for (const { call } of this.underlyingCalls) { |
|
call.cancelWithStatus(status, details); |
|
} |
|
} |
|
getPeer(): string { |
|
if (this.committedCallIndex !== null) { |
|
return this.underlyingCalls[this.committedCallIndex].call.getPeer(); |
|
} else { |
|
return 'unknown'; |
|
} |
|
} |
|
|
|
private getBufferEntry(messageIndex: number): WriteBufferEntry { |
|
return ( |
|
this.writeBuffer[messageIndex - this.writeBufferOffset] ?? { |
|
entryType: 'FREED', |
|
allocated: false, |
|
} |
|
); |
|
} |
|
|
|
private getNextBufferIndex() { |
|
return this.writeBufferOffset + this.writeBuffer.length; |
|
} |
|
|
|
private clearSentMessages() { |
|
if (this.state !== 'COMMITTED') { |
|
return; |
|
} |
|
const earliestNeededMessageIndex = |
|
this.underlyingCalls[this.committedCallIndex!].nextMessageToSend; |
|
for ( |
|
let messageIndex = this.writeBufferOffset; |
|
messageIndex < earliestNeededMessageIndex; |
|
messageIndex++ |
|
) { |
|
const bufferEntry = this.getBufferEntry(messageIndex); |
|
if (bufferEntry.allocated) { |
|
this.bufferTracker.free( |
|
bufferEntry.message!.message.length, |
|
this.callNumber |
|
); |
|
} |
|
} |
|
this.writeBuffer = this.writeBuffer.slice( |
|
earliestNeededMessageIndex - this.writeBufferOffset |
|
); |
|
this.writeBufferOffset = earliestNeededMessageIndex; |
|
} |
|
|
|
private commitCall(index: number) { |
|
if (this.state === 'COMMITTED') { |
|
return; |
|
} |
|
if (this.underlyingCalls[index].state === 'COMPLETED') { |
|
return; |
|
} |
|
this.trace( |
|
'Committing call [' + |
|
this.underlyingCalls[index].call.getCallNumber() + |
|
'] at index ' + |
|
index |
|
); |
|
this.state = 'COMMITTED'; |
|
this.committedCallIndex = index; |
|
for (let i = 0; i < this.underlyingCalls.length; i++) { |
|
if (i === index) { |
|
continue; |
|
} |
|
if (this.underlyingCalls[i].state === 'COMPLETED') { |
|
continue; |
|
} |
|
this.underlyingCalls[i].state = 'COMPLETED'; |
|
this.underlyingCalls[i].call.cancelWithStatus( |
|
Status.CANCELLED, |
|
'Discarded in favor of other hedged attempt' |
|
); |
|
} |
|
this.clearSentMessages(); |
|
} |
|
|
|
private commitCallWithMostMessages() { |
|
if (this.state === 'COMMITTED') { |
|
return; |
|
} |
|
let mostMessages = -1; |
|
let callWithMostMessages = -1; |
|
for (const [index, childCall] of this.underlyingCalls.entries()) { |
|
if ( |
|
childCall.state === 'ACTIVE' && |
|
childCall.nextMessageToSend > mostMessages |
|
) { |
|
mostMessages = childCall.nextMessageToSend; |
|
callWithMostMessages = index; |
|
} |
|
} |
|
if (callWithMostMessages === -1) { |
|
|
|
|
|
this.state = 'TRANSPARENT_ONLY'; |
|
} else { |
|
this.commitCall(callWithMostMessages); |
|
} |
|
} |
|
|
|
private isStatusCodeInList(list: (Status | string)[], code: Status) { |
|
return list.some( |
|
value => |
|
value === code || |
|
value.toString().toLowerCase() === Status[code].toLowerCase() |
|
); |
|
} |
|
|
|
private getNextRetryBackoffMs() { |
|
const retryPolicy = this.callConfig?.methodConfig.retryPolicy; |
|
if (!retryPolicy) { |
|
return 0; |
|
} |
|
const nextBackoffMs = Math.random() * this.nextRetryBackoffSec * 1000; |
|
const maxBackoffSec = Number( |
|
retryPolicy.maxBackoff.substring(0, retryPolicy.maxBackoff.length - 1) |
|
); |
|
this.nextRetryBackoffSec = Math.min( |
|
this.nextRetryBackoffSec * retryPolicy.backoffMultiplier, |
|
maxBackoffSec |
|
); |
|
return nextBackoffMs; |
|
} |
|
|
|
private maybeRetryCall( |
|
pushback: number | null, |
|
callback: (retried: boolean) => void |
|
) { |
|
if (this.state !== 'RETRY') { |
|
callback(false); |
|
return; |
|
} |
|
const retryPolicy = this.callConfig!.methodConfig.retryPolicy!; |
|
if (this.attempts >= Math.min(retryPolicy.maxAttempts, 5)) { |
|
callback(false); |
|
return; |
|
} |
|
let retryDelayMs: number; |
|
if (pushback === null) { |
|
retryDelayMs = this.getNextRetryBackoffMs(); |
|
} else if (pushback < 0) { |
|
this.state = 'TRANSPARENT_ONLY'; |
|
callback(false); |
|
return; |
|
} else { |
|
retryDelayMs = pushback; |
|
this.nextRetryBackoffSec = this.initialRetryBackoffSec; |
|
} |
|
setTimeout(() => { |
|
if (this.state !== 'RETRY') { |
|
callback(false); |
|
return; |
|
} |
|
if (this.retryThrottler?.canRetryCall() ?? true) { |
|
callback(true); |
|
this.attempts += 1; |
|
this.startNewAttempt(); |
|
} |
|
}, retryDelayMs); |
|
} |
|
|
|
private countActiveCalls(): number { |
|
let count = 0; |
|
for (const call of this.underlyingCalls) { |
|
if (call?.state === 'ACTIVE') { |
|
count += 1; |
|
} |
|
} |
|
return count; |
|
} |
|
|
|
private handleProcessedStatus( |
|
status: StatusObject, |
|
callIndex: number, |
|
pushback: number | null |
|
) { |
|
switch (this.state) { |
|
case 'COMMITTED': |
|
case 'TRANSPARENT_ONLY': |
|
this.commitCall(callIndex); |
|
this.reportStatus(status); |
|
break; |
|
case 'HEDGING': |
|
if ( |
|
this.isStatusCodeInList( |
|
this.callConfig!.methodConfig.hedgingPolicy!.nonFatalStatusCodes ?? |
|
[], |
|
status.code |
|
) |
|
) { |
|
this.retryThrottler?.addCallFailed(); |
|
let delayMs: number; |
|
if (pushback === null) { |
|
delayMs = 0; |
|
} else if (pushback < 0) { |
|
this.state = 'TRANSPARENT_ONLY'; |
|
this.commitCall(callIndex); |
|
this.reportStatus(status); |
|
return; |
|
} else { |
|
delayMs = pushback; |
|
} |
|
setTimeout(() => { |
|
this.maybeStartHedgingAttempt(); |
|
|
|
if (this.countActiveCalls() === 0) { |
|
this.commitCall(callIndex); |
|
this.reportStatus(status); |
|
} |
|
}, delayMs); |
|
} else { |
|
this.commitCall(callIndex); |
|
this.reportStatus(status); |
|
} |
|
break; |
|
case 'RETRY': |
|
if ( |
|
this.isStatusCodeInList( |
|
this.callConfig!.methodConfig.retryPolicy!.retryableStatusCodes, |
|
status.code |
|
) |
|
) { |
|
this.retryThrottler?.addCallFailed(); |
|
this.maybeRetryCall(pushback, retried => { |
|
if (!retried) { |
|
this.commitCall(callIndex); |
|
this.reportStatus(status); |
|
} |
|
}); |
|
} else { |
|
this.commitCall(callIndex); |
|
this.reportStatus(status); |
|
} |
|
break; |
|
} |
|
} |
|
|
|
private getPushback(metadata: Metadata): number | null { |
|
const mdValue = metadata.get('grpc-retry-pushback-ms'); |
|
if (mdValue.length === 0) { |
|
return null; |
|
} |
|
try { |
|
return parseInt(mdValue[0] as string); |
|
} catch (e) { |
|
return -1; |
|
} |
|
} |
|
|
|
private handleChildStatus( |
|
status: StatusObjectWithProgress, |
|
callIndex: number |
|
) { |
|
if (this.underlyingCalls[callIndex].state === 'COMPLETED') { |
|
return; |
|
} |
|
this.trace( |
|
'state=' + |
|
this.state + |
|
' handling status with progress ' + |
|
status.progress + |
|
' from child [' + |
|
this.underlyingCalls[callIndex].call.getCallNumber() + |
|
'] in state ' + |
|
this.underlyingCalls[callIndex].state |
|
); |
|
this.underlyingCalls[callIndex].state = 'COMPLETED'; |
|
if (status.code === Status.OK) { |
|
this.retryThrottler?.addCallSucceeded(); |
|
this.commitCall(callIndex); |
|
this.reportStatus(status); |
|
return; |
|
} |
|
if (this.state === 'COMMITTED') { |
|
this.reportStatus(status); |
|
return; |
|
} |
|
const pushback = this.getPushback(status.metadata); |
|
switch (status.progress) { |
|
case 'NOT_STARTED': |
|
|
|
this.startNewAttempt(); |
|
break; |
|
case 'REFUSED': |
|
|
|
if (this.transparentRetryUsed) { |
|
this.handleProcessedStatus(status, callIndex, pushback); |
|
} else { |
|
this.transparentRetryUsed = true; |
|
this.startNewAttempt(); |
|
} |
|
break; |
|
case 'DROP': |
|
this.commitCall(callIndex); |
|
this.reportStatus(status); |
|
break; |
|
case 'PROCESSED': |
|
this.handleProcessedStatus(status, callIndex, pushback); |
|
break; |
|
} |
|
} |
|
|
|
private maybeStartHedgingAttempt() { |
|
if (this.state !== 'HEDGING') { |
|
return; |
|
} |
|
if (!this.callConfig.methodConfig.hedgingPolicy) { |
|
return; |
|
} |
|
const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy; |
|
if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) { |
|
return; |
|
} |
|
this.attempts += 1; |
|
this.startNewAttempt(); |
|
this.maybeStartHedgingTimer(); |
|
} |
|
|
|
private maybeStartHedgingTimer() { |
|
if (this.hedgingTimer) { |
|
clearTimeout(this.hedgingTimer); |
|
} |
|
if (this.state !== 'HEDGING') { |
|
return; |
|
} |
|
if (!this.callConfig.methodConfig.hedgingPolicy) { |
|
return; |
|
} |
|
const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy; |
|
if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) { |
|
return; |
|
} |
|
const hedgingDelayString = hedgingPolicy.hedgingDelay ?? '0s'; |
|
const hedgingDelaySec = Number( |
|
hedgingDelayString.substring(0, hedgingDelayString.length - 1) |
|
); |
|
this.hedgingTimer = setTimeout(() => { |
|
this.maybeStartHedgingAttempt(); |
|
}, hedgingDelaySec * 1000); |
|
this.hedgingTimer.unref?.(); |
|
} |
|
|
|
private startNewAttempt() { |
|
const child = this.channel.createLoadBalancingCall( |
|
this.callConfig, |
|
this.methodName, |
|
this.host, |
|
this.credentials, |
|
this.deadline |
|
); |
|
this.trace( |
|
'Created child call [' + |
|
child.getCallNumber() + |
|
'] for attempt ' + |
|
this.attempts |
|
); |
|
const index = this.underlyingCalls.length; |
|
this.underlyingCalls.push({ |
|
state: 'ACTIVE', |
|
call: child, |
|
nextMessageToSend: 0, |
|
}); |
|
const previousAttempts = this.attempts - 1; |
|
const initialMetadata = this.initialMetadata!.clone(); |
|
if (previousAttempts > 0) { |
|
initialMetadata.set( |
|
PREVIONS_RPC_ATTEMPTS_METADATA_KEY, |
|
`${previousAttempts}` |
|
); |
|
} |
|
let receivedMetadata = false; |
|
child.start(initialMetadata, { |
|
onReceiveMetadata: metadata => { |
|
this.trace( |
|
'Received metadata from child [' + child.getCallNumber() + ']' |
|
); |
|
this.commitCall(index); |
|
receivedMetadata = true; |
|
if (previousAttempts > 0) { |
|
metadata.set( |
|
PREVIONS_RPC_ATTEMPTS_METADATA_KEY, |
|
`${previousAttempts}` |
|
); |
|
} |
|
if (this.underlyingCalls[index].state === 'ACTIVE') { |
|
this.listener!.onReceiveMetadata(metadata); |
|
} |
|
}, |
|
onReceiveMessage: message => { |
|
this.trace( |
|
'Received message from child [' + child.getCallNumber() + ']' |
|
); |
|
this.commitCall(index); |
|
if (this.underlyingCalls[index].state === 'ACTIVE') { |
|
this.listener!.onReceiveMessage(message); |
|
} |
|
}, |
|
onReceiveStatus: status => { |
|
this.trace( |
|
'Received status from child [' + child.getCallNumber() + ']' |
|
); |
|
if (!receivedMetadata && previousAttempts > 0) { |
|
status.metadata.set( |
|
PREVIONS_RPC_ATTEMPTS_METADATA_KEY, |
|
`${previousAttempts}` |
|
); |
|
} |
|
this.handleChildStatus(status, index); |
|
}, |
|
}); |
|
this.sendNextChildMessage(index); |
|
if (this.readStarted) { |
|
child.startRead(); |
|
} |
|
} |
|
|
|
start(metadata: Metadata, listener: InterceptingListener): void { |
|
this.trace('start called'); |
|
this.listener = listener; |
|
this.initialMetadata = metadata; |
|
this.attempts += 1; |
|
this.startNewAttempt(); |
|
this.maybeStartHedgingTimer(); |
|
} |
|
|
|
private handleChildWriteCompleted(childIndex: number) { |
|
const childCall = this.underlyingCalls[childIndex]; |
|
const messageIndex = childCall.nextMessageToSend; |
|
this.getBufferEntry(messageIndex).callback?.(); |
|
this.clearSentMessages(); |
|
childCall.nextMessageToSend += 1; |
|
this.sendNextChildMessage(childIndex); |
|
} |
|
|
|
private sendNextChildMessage(childIndex: number) { |
|
const childCall = this.underlyingCalls[childIndex]; |
|
if (childCall.state === 'COMPLETED') { |
|
return; |
|
} |
|
if (this.getBufferEntry(childCall.nextMessageToSend)) { |
|
const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend); |
|
switch (bufferEntry.entryType) { |
|
case 'MESSAGE': |
|
childCall.call.sendMessageWithContext( |
|
{ |
|
callback: error => { |
|
|
|
this.handleChildWriteCompleted(childIndex); |
|
}, |
|
}, |
|
bufferEntry.message!.message |
|
); |
|
break; |
|
case 'HALF_CLOSE': |
|
childCall.nextMessageToSend += 1; |
|
childCall.call.halfClose(); |
|
break; |
|
case 'FREED': |
|
|
|
break; |
|
} |
|
} |
|
} |
|
|
|
sendMessageWithContext(context: MessageContext, message: Buffer): void { |
|
this.trace('write() called with message of length ' + message.length); |
|
const writeObj: WriteObject = { |
|
message, |
|
flags: context.flags, |
|
}; |
|
const messageIndex = this.getNextBufferIndex(); |
|
const bufferEntry: WriteBufferEntry = { |
|
entryType: 'MESSAGE', |
|
message: writeObj, |
|
allocated: this.bufferTracker.allocate(message.length, this.callNumber), |
|
}; |
|
this.writeBuffer.push(bufferEntry); |
|
if (bufferEntry.allocated) { |
|
context.callback?.(); |
|
for (const [callIndex, call] of this.underlyingCalls.entries()) { |
|
if ( |
|
call.state === 'ACTIVE' && |
|
call.nextMessageToSend === messageIndex |
|
) { |
|
call.call.sendMessageWithContext( |
|
{ |
|
callback: error => { |
|
|
|
this.handleChildWriteCompleted(callIndex); |
|
}, |
|
}, |
|
message |
|
); |
|
} |
|
} |
|
} else { |
|
this.commitCallWithMostMessages(); |
|
|
|
if (this.committedCallIndex === null) { |
|
return; |
|
} |
|
const call = this.underlyingCalls[this.committedCallIndex]; |
|
bufferEntry.callback = context.callback; |
|
if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) { |
|
call.call.sendMessageWithContext( |
|
{ |
|
callback: error => { |
|
|
|
this.handleChildWriteCompleted(this.committedCallIndex!); |
|
}, |
|
}, |
|
message |
|
); |
|
} |
|
} |
|
} |
|
startRead(): void { |
|
this.trace('startRead called'); |
|
this.readStarted = true; |
|
for (const underlyingCall of this.underlyingCalls) { |
|
if (underlyingCall?.state === 'ACTIVE') { |
|
underlyingCall.call.startRead(); |
|
} |
|
} |
|
} |
|
halfClose(): void { |
|
this.trace('halfClose called'); |
|
const halfCloseIndex = this.getNextBufferIndex(); |
|
this.writeBuffer.push({ |
|
entryType: 'HALF_CLOSE', |
|
allocated: false, |
|
}); |
|
for (const call of this.underlyingCalls) { |
|
if ( |
|
call?.state === 'ACTIVE' && |
|
call.nextMessageToSend === halfCloseIndex |
|
) { |
|
call.nextMessageToSend += 1; |
|
call.call.halfClose(); |
|
} |
|
} |
|
} |
|
setCredentials(newCredentials: CallCredentials): void { |
|
throw new Error('Method not implemented.'); |
|
} |
|
getMethod(): string { |
|
return this.methodName; |
|
} |
|
getHost(): string { |
|
return this.host; |
|
} |
|
} |
|
|