Options
All
  • Public
  • Public/Protected
  • All
Menu

Class CommManager<MsgType, ResponseType>

Manage a comm for a session, reconnecting when necessary.

MsgType - A type describing the shape of messages sent to the kernel ResponseType - A type describing the shape of messages sent to the client

remarks

Comm management is a very complex topic, covered here in some detail: https://docs.google.com/document/d/1Wg6LHCb9aesxZ-WjHenZHejKbX-J6LeBktThsz2G87E/edit#

A CommManager connects to a comm channel and handles the cross-cutting concerns of keeping that comm alive and handling what to do when it dies. This class only works for client-initiated, kernel-side comms.

Type parameters

  • MsgType: JSONValue

  • ResponseType: JSONValue

Hierarchy

  • CommManager

Implements

  • IDisposable

Index

Constructors

Protected constructor

  • new CommManager(__namedParameters: object): CommManager

Properties

Private _commClosed

_commClosed: Observable<never>

Private _commClosedSrc$

_commClosedSrc$: Subject<never> = new Subject<never>()

Private _commName

_commName: string

Private _commOpened

_commOpened: Observable<never>

Private _commOpenedSrc$

_commOpenedSrc$: Subject<never> = new Subject<never>()

Private _isDisposed

_isDisposed: boolean = false

Private _msgRecieved

_msgRecieved: Observable<ResponseType>

Private _msgRecievedSrc$

_msgRecievedSrc$: Subject<ResponseType> = new Subject<ResponseType>()

Private awaitingSetup

awaitingSetup: boolean = false

This is a flag to check for kernel setup.

If this comm attempts to connect and fails, then #connectToComm() will set this to true and wait for a special blob to arrive on the kernel IOPub channel. This blob has a mavenomics_state_ok key in it's transient dict, and is sent by the library __init__ to signal that the kernel is now setup and ready to accept connections. When that happens, this will be cleared and the CommManager will again call #connectToComm() to setup the channel.

Private comm

comm: IComm | null = null

Private connectionLock

connectionLock: Mutex

Private session

session: IClientSession

Accessors

commClosed

  • get commClosed(): Observable<never>
  • Throws an error whenever the comm has closed.

    remarks

    This can happen if the comm is disposed (on either side), the kernel-side of the comm encounters an unhandled exception, or if the kernel itself crashes.

    If you don't want errors, pipe this observable through catchError.

    Returns Observable<never>

commName

  • get commName(): string

commOpened

  • get commOpened(): Observable<never>
  • Emits when the comm has opened.

    remarks

    Usually, the comm is only opened when needed. However, it can also be opened via a push message from the kernel, signaling to the client that the kernel can accept comms. When either of those events happen, this observable will emit to signal that the comm is now open and ready for communication. This observable exists to handle cases where the client may need to eagerly sync state with the kernel, such as the SyncMetadata comm.

    Returns Observable<never>

isDisposed

  • get isDisposed(): boolean

isOpen

  • get isOpen(): boolean

msgRecieved

  • get msgRecieved(): Observable<ResponseType>
  • An Observable that emits whenever a message is sent by the kernel

    Returns Observable<ResponseType>

Methods

Private connectToComm

  • connectToComm(_hasRerun?: boolean): Promise<void>

dispose

  • dispose(): void

Private onIopubMsg

  • onIopubMsg(session: IClientSession, __namedParameters: object): Promise<void>

Private onKernelChanged

  • onKernelChanged(): void

Private onKernelStatusChanged

  • onKernelStatusChanged(_: IClientSession, status: Kernel.Status): void

send

  • send(msg: MsgType): Promise<undefined | IShellMessage<"comm_close" | "comm_msg" | "comm_open" | "comm_info_reply" | "comm_info_request" | "complete_reply" | "complete_request" | "execute_reply" | "execute_request" | "history_reply" | "history_request" | "inspect_reply" | "inspect_request" | "interrupt_reply" | "interrupt_request" | "is_complete_reply" | "is_complete_request" | "kernel_info_reply" | "kernel_info_request" | "shutdown_reply" | "shutdown_request">>
  • Send a message without expecting a response.

    Parameters

    • msg: MsgType

      The message to send to the kernel

    Returns Promise<undefined | IShellMessage<"comm_close" | "comm_msg" | "comm_open" | "comm_info_reply" | "comm_info_request" | "complete_reply" | "complete_request" | "execute_reply" | "execute_request" | "history_reply" | "history_request" | "inspect_reply" | "inspect_request" | "interrupt_reply" | "interrupt_request" | "is_complete_reply" | "is_complete_request" | "kernel_info_reply" | "kernel_info_request" | "shutdown_reply" | "shutdown_request">>

sendAndAwaitResponse

  • sendAndAwaitResponse<Resp>(msg: MsgType, responsePredicate: function, timeoutMs?: number): Promise<Resp>
  • Send a message and wait for a response from the kernel.

    Type parameters

    • Resp: ResponseType

    Parameters

    • msg: MsgType

      The message to send to the kernel

    • responsePredicate: function

      A predicate to select the desired response from the kernel

        • (msg: ResponseType): msg is Resp
        • Parameters

          • msg: ResponseType

          Returns msg is Resp

    • Default value timeoutMs: number = 20000

      A timeout to reject if no response is recieved in.

      Note: The timeout is not optional. Various things may cause comm death and while this function will try it's best, not all cases can be accounted for (such as the network disconnecting or the kernel freezing)

    Returns Promise<Resp>

Static Create

  • Create<T, U>(__namedParameters: object): CommManager<any, any>
  • Create a new CommManager, or recycle an already-instantiated manager

    Type parameters

    • T: JSONValue

    • U: JSONValue

    Parameters

    • __namedParameters: object

    Returns CommManager<any, any>

Generated using TypeDoc