|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import { CallCredentials } from './call-credentials'; |
|
import { |
|
Call, |
|
CallStreamOptions, |
|
InterceptingListener, |
|
MessageContext, |
|
StatusObject, |
|
} from './call-interface'; |
|
import { LogVerbosity, Propagate, Status } from './constants'; |
|
import { |
|
Deadline, |
|
deadlineToString, |
|
getRelativeTimeout, |
|
minDeadline, |
|
} from './deadline'; |
|
import { FilterStack, FilterStackFactory } from './filter-stack'; |
|
import { InternalChannel } from './internal-channel'; |
|
import { Metadata } from './metadata'; |
|
import * as logging from './logging'; |
|
import { restrictControlPlaneStatusCode } from './control-plane-status'; |
|
|
|
const TRACER_NAME = 'resolving_call'; |
|
|
|
export class ResolvingCall implements Call { |
|
private child: Call | null = null; |
|
private readPending = false; |
|
private pendingMessage: { context: MessageContext; message: Buffer } | null = |
|
null; |
|
private pendingHalfClose = false; |
|
private ended = false; |
|
private readFilterPending = false; |
|
private writeFilterPending = false; |
|
private pendingChildStatus: StatusObject | null = null; |
|
private metadata: Metadata | null = null; |
|
private listener: InterceptingListener | null = null; |
|
private deadline: Deadline; |
|
private host: string; |
|
private statusWatchers: ((status: StatusObject) => void)[] = []; |
|
private deadlineTimer: NodeJS.Timeout = setTimeout(() => {}, 0); |
|
private filterStack: FilterStack | null = null; |
|
|
|
constructor( |
|
private readonly channel: InternalChannel, |
|
private readonly method: string, |
|
options: CallStreamOptions, |
|
private readonly filterStackFactory: FilterStackFactory, |
|
private credentials: CallCredentials, |
|
private callNumber: number |
|
) { |
|
this.deadline = options.deadline; |
|
this.host = options.host; |
|
if (options.parentCall) { |
|
if (options.flags & Propagate.CANCELLATION) { |
|
options.parentCall.on('cancelled', () => { |
|
this.cancelWithStatus(Status.CANCELLED, 'Cancelled by parent call'); |
|
}); |
|
} |
|
if (options.flags & Propagate.DEADLINE) { |
|
this.trace( |
|
'Propagating deadline from parent: ' + |
|
options.parentCall.getDeadline() |
|
); |
|
this.deadline = minDeadline( |
|
this.deadline, |
|
options.parentCall.getDeadline() |
|
); |
|
} |
|
} |
|
this.trace('Created'); |
|
this.runDeadlineTimer(); |
|
} |
|
|
|
private trace(text: string): void { |
|
logging.trace( |
|
LogVerbosity.DEBUG, |
|
TRACER_NAME, |
|
'[' + this.callNumber + '] ' + text |
|
); |
|
} |
|
|
|
private runDeadlineTimer() { |
|
clearTimeout(this.deadlineTimer); |
|
this.trace('Deadline: ' + deadlineToString(this.deadline)); |
|
const timeout = getRelativeTimeout(this.deadline); |
|
if (timeout !== Infinity) { |
|
this.trace('Deadline will be reached in ' + timeout + 'ms'); |
|
const handleDeadline = () => { |
|
this.cancelWithStatus(Status.DEADLINE_EXCEEDED, 'Deadline exceeded'); |
|
}; |
|
if (timeout <= 0) { |
|
process.nextTick(handleDeadline); |
|
} else { |
|
this.deadlineTimer = setTimeout(handleDeadline, timeout); |
|
} |
|
} |
|
} |
|
|
|
private outputStatus(status: StatusObject) { |
|
if (!this.ended) { |
|
this.ended = true; |
|
if (!this.filterStack) { |
|
this.filterStack = this.filterStackFactory.createFilter(); |
|
} |
|
clearTimeout(this.deadlineTimer); |
|
const filteredStatus = this.filterStack.receiveTrailers(status); |
|
this.trace( |
|
'ended with status: code=' + |
|
filteredStatus.code + |
|
' details="' + |
|
filteredStatus.details + |
|
'"' |
|
); |
|
this.statusWatchers.forEach(watcher => watcher(filteredStatus)); |
|
process.nextTick(() => { |
|
this.listener?.onReceiveStatus(filteredStatus); |
|
}); |
|
} |
|
} |
|
|
|
private sendMessageOnChild(context: MessageContext, message: Buffer): void { |
|
if (!this.child) { |
|
throw new Error('sendMessageonChild called with child not populated'); |
|
} |
|
const child = this.child; |
|
this.writeFilterPending = true; |
|
this.filterStack!.sendMessage( |
|
Promise.resolve({ message: message, flags: context.flags }) |
|
).then( |
|
filteredMessage => { |
|
this.writeFilterPending = false; |
|
child.sendMessageWithContext(context, filteredMessage.message); |
|
if (this.pendingHalfClose) { |
|
child.halfClose(); |
|
} |
|
}, |
|
(status: StatusObject) => { |
|
this.cancelWithStatus(status.code, status.details); |
|
} |
|
); |
|
} |
|
|
|
getConfig(): void { |
|
if (this.ended) { |
|
return; |
|
} |
|
if (!this.metadata || !this.listener) { |
|
throw new Error('getConfig called before start'); |
|
} |
|
const configResult = this.channel.getConfig(this.method, this.metadata); |
|
if (configResult.type === 'NONE') { |
|
this.channel.queueCallForConfig(this); |
|
return; |
|
} else if (configResult.type === 'ERROR') { |
|
if (this.metadata.getOptions().waitForReady) { |
|
this.channel.queueCallForConfig(this); |
|
} else { |
|
this.outputStatus(configResult.error); |
|
} |
|
return; |
|
} |
|
|
|
const config = configResult.config; |
|
if (config.status !== Status.OK) { |
|
const { code, details } = restrictControlPlaneStatusCode( |
|
config.status, |
|
'Failed to route call to method ' + this.method |
|
); |
|
this.outputStatus({ |
|
code: code, |
|
details: details, |
|
metadata: new Metadata(), |
|
}); |
|
return; |
|
} |
|
|
|
if (config.methodConfig.timeout) { |
|
const configDeadline = new Date(); |
|
configDeadline.setSeconds( |
|
configDeadline.getSeconds() + config.methodConfig.timeout.seconds |
|
); |
|
configDeadline.setMilliseconds( |
|
configDeadline.getMilliseconds() + |
|
config.methodConfig.timeout.nanos / 1_000_000 |
|
); |
|
this.deadline = minDeadline(this.deadline, configDeadline); |
|
this.runDeadlineTimer(); |
|
} |
|
|
|
this.filterStackFactory.push(config.dynamicFilterFactories); |
|
this.filterStack = this.filterStackFactory.createFilter(); |
|
this.filterStack.sendMetadata(Promise.resolve(this.metadata)).then( |
|
filteredMetadata => { |
|
this.child = this.channel.createInnerCall( |
|
config, |
|
this.method, |
|
this.host, |
|
this.credentials, |
|
this.deadline |
|
); |
|
this.trace('Created child [' + this.child.getCallNumber() + ']'); |
|
this.child.start(filteredMetadata, { |
|
onReceiveMetadata: metadata => { |
|
this.trace('Received metadata'); |
|
this.listener!.onReceiveMetadata( |
|
this.filterStack!.receiveMetadata(metadata) |
|
); |
|
}, |
|
onReceiveMessage: message => { |
|
this.trace('Received message'); |
|
this.readFilterPending = true; |
|
this.filterStack!.receiveMessage(message).then( |
|
filteredMesssage => { |
|
this.trace('Finished filtering received message'); |
|
this.readFilterPending = false; |
|
this.listener!.onReceiveMessage(filteredMesssage); |
|
if (this.pendingChildStatus) { |
|
this.outputStatus(this.pendingChildStatus); |
|
} |
|
}, |
|
(status: StatusObject) => { |
|
this.cancelWithStatus(status.code, status.details); |
|
} |
|
); |
|
}, |
|
onReceiveStatus: status => { |
|
this.trace('Received status'); |
|
if (this.readFilterPending) { |
|
this.pendingChildStatus = status; |
|
} else { |
|
this.outputStatus(status); |
|
} |
|
}, |
|
}); |
|
if (this.readPending) { |
|
this.child.startRead(); |
|
} |
|
if (this.pendingMessage) { |
|
this.sendMessageOnChild( |
|
this.pendingMessage.context, |
|
this.pendingMessage.message |
|
); |
|
} else if (this.pendingHalfClose) { |
|
this.child.halfClose(); |
|
} |
|
}, |
|
(status: StatusObject) => { |
|
this.outputStatus(status); |
|
} |
|
); |
|
} |
|
|
|
reportResolverError(status: StatusObject) { |
|
if (this.metadata?.getOptions().waitForReady) { |
|
this.channel.queueCallForConfig(this); |
|
} else { |
|
this.outputStatus(status); |
|
} |
|
} |
|
cancelWithStatus(status: Status, details: string): void { |
|
this.trace( |
|
'cancelWithStatus code: ' + status + ' details: "' + details + '"' |
|
); |
|
this.child?.cancelWithStatus(status, details); |
|
this.outputStatus({ |
|
code: status, |
|
details: details, |
|
metadata: new Metadata(), |
|
}); |
|
} |
|
getPeer(): string { |
|
return this.child?.getPeer() ?? this.channel.getTarget(); |
|
} |
|
start(metadata: Metadata, listener: InterceptingListener): void { |
|
this.trace('start called'); |
|
this.metadata = metadata.clone(); |
|
this.listener = listener; |
|
this.getConfig(); |
|
} |
|
sendMessageWithContext(context: MessageContext, message: Buffer): void { |
|
this.trace('write() called with message of length ' + message.length); |
|
if (this.child) { |
|
this.sendMessageOnChild(context, message); |
|
} else { |
|
this.pendingMessage = { context, message }; |
|
} |
|
} |
|
startRead(): void { |
|
this.trace('startRead called'); |
|
if (this.child) { |
|
this.child.startRead(); |
|
} else { |
|
this.readPending = true; |
|
} |
|
} |
|
halfClose(): void { |
|
this.trace('halfClose called'); |
|
if (this.child && !this.writeFilterPending) { |
|
this.child.halfClose(); |
|
} else { |
|
this.pendingHalfClose = true; |
|
} |
|
} |
|
setCredentials(credentials: CallCredentials): void { |
|
this.credentials = this.credentials.compose(credentials); |
|
} |
|
|
|
addStatusWatcher(watcher: (status: StatusObject) => void) { |
|
this.statusWatchers.push(watcher); |
|
} |
|
|
|
getCallNumber(): number { |
|
return this.callNumber; |
|
} |
|
} |
|
|