Reactive Extensions를 사용한 비동기 네트워크 프로그래밍 이 질문은 Software Engineering Stack Exchange에서 답변을

수행 한 후 일부 (더 또는 덜) “낮은 수준”을 비동기 socket(이벤트 기반 비동기 패턴에 년 전 프로그램 (EAP) 패션) 최근에 “최대”이동 TcpListener(비동기 프로그래밍 모델 (APM) ) 다음 async/await(태스크 기반 비동기 패턴 (TAP) ) 으로 이동하려고 노력 하면서이 모든 ‘낮은 수준의 배관’을 계속 귀찮게해야했습니다. 그래서 나는 생각하고 있었다. 왜 내 문제 도메인에 더 잘 맞을 수 있으므로 RXgo ( Reactive Extensions )를 제공하지 않습니까?

내가 작성한 많은 코드는 많은 클라이언트가 Tcp를 통해 내 응용 프로그램에 연결 한 다음 양방향 통신을 시작하는 것과 관련이 있습니다. 클라이언트 또는 서버는 언제든지 메시지를 보내야 할 필요가 있다고 판단 할 수 있습니다. 따라서 이것은 클래식 request/response설정이 아니라 양 당사자가 원하는 모든 것을 보낼 수있는 실시간 양방향 “라인”입니다. , 그들이 원할 때마다. (누구든지 이것을 묘사 할 적절한 이름을 가지고 있다면 나는 기뻐할 것입니다!).

“프로토콜”은 응용 프로그램마다 다르며 실제로 내 질문과 관련이 없습니다. 그러나 초기 질문이 있습니다.

  1. 하나의 “서버”만 실행 중이지만 내부 서버를 추적하기 위해 자체 “상태 머신”을 가지고있는 (자세한 설명이 부족한) 연결 (예 : 클라이언트)의 많은 (보통 수천) 연결을 추적해야합니다. 주 등 어떤 접근법을 선호하십니까? EAP / TAP / APM? RX도 옵션으로 간주됩니까? 그렇지 않다면 왜?

따라서 async 이후 Async를 사용해야합니다 .a) 요청 / 응답 프로토콜이 아니기 때문에 “메시지 대기 중”-차단 통화 또는 “메시지 보내기”-차단 통화 (그러나 전송이 해당 클라이언트에 대한 차단 만 가능합니다.) b) 많은 동시 연결을 처리해야합니다. 차단 호출을 사용하여 (신뢰할 수있는) 방법을 찾지 못했습니다.

내 응용 프로그램의 대부분은 VoiP와 관련이 있습니다. SIP cients의 SIP 메시지 또는 FreeSwitch / OpenSIPS 와 같은 응용 프로그램의 PBX (관련) 메시징 메시지 이지만 가장 간단한 형태로 많은 “채팅”클라이언트를 처리하려고하는 “채팅”서버를 상상할 수 있습니다. 대부분의 프로토콜은 텍스트 기반 (ASCII)입니다.

따라서 앞서 언급 한 기술에 대한 많은 다른 순열을 구현 한 후 간단히 인스턴스화 할 수있는 객체를 만들어서 IPEndpoint관심을 가질 대상에 대해 이야기하고 관심있는 일이 발생할 때마다 알려주도록 작업을 단순화하여 작업을 단순화하고 싶습니다. 이벤트를 사용하므로 일부 EAP는 일반적으로 다른 두 기술과 혼합됩니다. 클래스는 프로토콜을 ‘이해’하려고 애 쓰지 않아야합니다. 단지 들어오고 나가는 문자열을 처리해야합니다. 따라서 작업을 단순화 할 수있는 RX에 주목하면서 처음부터 새로운 “바이올린”을 만들었습니다.

using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Reactive.Linq;
using System.Text;

class Program
{
    static void Main(string[] args)
    {
        var f = new FiddleServer(new IPEndPoint(IPAddress.Any, 8084));
        f.Start();
        Console.ReadKey();
        f.Stop();
        Console.ReadKey();
    }
}

public class FiddleServer
{
    private TcpListener _listener;
    private ConcurrentDictionary<ulong, FiddleClient> _clients;
    private ulong _currentid = 0;

    public IPEndPoint LocalEP { get; private set; }

    public FiddleServer(IPEndPoint localEP)
    {
        this.LocalEP = localEP;
        _clients = new ConcurrentDictionary<ulong, FiddleClient>();
    }

    public void Start()
    {
        _listener = new TcpListener(this.LocalEP);
        _listener.Start();
        Observable.While(() => true, Observable.FromAsync(_listener.AcceptTcpClientAsync)).Subscribe(
            //OnNext
            tcpclient =>
            {
                //Create new FSClient with unique ID
                var fsclient = new FiddleClient(_currentid++, tcpclient);
                //Keep track of clients
                _clients.TryAdd(fsclient.ClientId, fsclient);
                //Initialize connection
                fsclient.Send("connect\n\n");

                Console.WriteLine("Client {0} accepted", fsclient.ClientId);
            },
            //OnError
            ex =>
            {

            },
            //OnComplete
            () =>
            {
                Console.WriteLine("Client connection initialized");
                //Accept new connections
                _listener.AcceptTcpClientAsync();
            }
        );
        Console.WriteLine("Started");
    }

    public void Stop()
    {
        _listener.Stop();
        Console.WriteLine("Stopped");
    }

    public void Send(ulong clientid, string rawmessage)
    {
        FiddleClient fsclient;
        if (_clients.TryGetValue(clientid, out fsclient))
        {
            fsclient.Send(rawmessage);
        }
    }
}

public class FiddleClient
{
    private TcpClient _tcpclient;

    public ulong ClientId { get; private set; }

    public FiddleClient(ulong id, TcpClient tcpclient)
    {
        this.ClientId = id;
        _tcpclient = tcpclient;
    }

    public void Send(string rawmessage)
    {
        Console.WriteLine("Sending {0}", rawmessage);
        var data = Encoding.ASCII.GetBytes(rawmessage);
        _tcpclient.GetStream().WriteAsync(data, 0, data.Length);    //Write vs WriteAsync?
    }
}

이 “바이올린”에는 구현에 대한 세부 사항이 약간 있음을 알고 있습니다. 이 경우 나는 FreeSwitch ESL 과 함께 일하고 있으므로 "connect\n\n"더 일반적인 접근법으로 리팩토링 할 때 바이올린 의 in을 제거해야합니다.

또한 익명 클래스를 Server 클래스의 개인 인스턴스 메소드로 리팩터링해야한다는 것을 알고 있습니다. OnSomething메소드 이름에 어떤 규칙 (예 : ” “)을 사용 해야하는지 잘 모르겠습니다 .

이것은 나의 기초 / 시작점 / 파운데이션입니다 ( “비틀기”가 필요합니다). 이것에 대해 몇 가지 질문이 있습니다.

  1. 위의 질문 “1”을보십시오
  2. 내가 올바른 길을 가고 있습니까? 아니면 내 “디자인”결정이 부당합니까?
  3. 동시성 : 수천 명의 클라이언트에 대처할 수 있습니다 (실제 메시지를 파싱 / 처리)
  4. 예외 상황 : 클라이언트에서 서버로 예외를 발생시키는 방법을 잘 모르겠습니다 ( “RX-wise”). 좋은 방법은 무엇입니까?
  5. 클라이언트를 어떤 ClientId방식 으로든 노출한다고 가정 하고 서버 클래스에서 연결된 클라이언트를 가져 와서 직접 메소드를 호출 할 수 있습니다. Server 클래스를 통해 메소드를 호출 할 수도 있습니다 (예 : Send(clientId, rawmessage)메소드 (후자의 접근 방식은 다른쪽으로 빠르게 메시지를 가져 오는 “편의”메소드 임)).
  6. 여기에서 어디로 어떻게 갈지 잘 모르겠습니다.
    • a) 들어오는 메시지를 처리해야합니다. 어떻게 설정합니까? 물론 스트림을 얻을 수 있지만 수신 된 바이트 검색을 어디서 처리 할 수 ​​있습니까? 구독 할 수있는 “ObservableStream”이 필요하다고 생각하십니까? FiddleClient또는에 넣을 FiddleServer까요?
    • b)이 FiddleClient/ FiddleServer클래스가 더 구체적으로 FooClient/ FooServer클래스를 사용하여 응용 프로그램 특정 프로토콜 처리 등을 조정하기 위해 더 구체적으로 구현 될 때까지 이벤트를 사용하지 않기를 원한다고 가정 합니다. 기본 ‘Fiddle’클래스의 데이터를 데이터로 가져 오는 방법은 무엇입니까? 더 구체적인 대응?

내가 이미 읽거나 훑어 보았거나 참조로 사용 된 기사 / 링크 :



답변

… 간단히 인스턴스화 할 수있는 객체를 만들어서 어떤 IPEndpoint를 듣고 관심있는 일이 발생할 때마다 알려주도록 객체를 만들어서 작업을 단순화하고 싶습니다 …

그 진술을 읽은 후 나는 즉시 “배우들”이라고 생각했다. 액터는 객체의 메소드를 직접 호출하는 대신 메시지를 전달하는 단일 입력 만 있고 비동기 적으로 작동한다는 점을 제외하면 객체와 매우 유사합니다. 아주 간단한 예에서 … 액터를 만들고 IPEndpoint와 액터의 주소를 가진 메시지를 보내어 결과를 보냅니다. 꺼지고 백그라운드에서 작동합니다. “관심있는 무언가”가 발생했을 때만 답을들을 수 있습니다. 로드를 처리하는 데 필요한만큼의 액터를 인스턴스화 할 수 있습니다.

.Net의 액터 라이브러리에는 익숙하지 않지만 일부는 있다는 것을 알고 있습니다. TPL Dataflow 라이브러리에 익숙합니다 (내 책 http://DataflowBook.com 에서 다루는 섹션이 있습니다 ). 그 라이브러리로 간단한 액터 모델을 쉽게 구현할 수 있어야합니다.


답변