Pythonのsocketでプロセス間通信をして価格データ等を送信する

Python
スポンサーリンク

どうも、お久しぶりです。キリンです。

取り敢えず1ヶ月ほど、連続でブログの更新を続けてみたのですが、それ以降更新が途絶えてしまっていました。本業(FXの運用)のほうが今鳴かず飛ばずなので、なんとか盛り返そうと頑張ってます。

その中で、どうしてもプロセス間通信(IPC, Inter Process Communication)をしなければならない事案に遭遇してしまったので、忘備録も兼ねてPythonでSocketを使ったプロセス間通信の方法を調べる際に学習したことと、実際に作成したプログラムをご紹介します。きちんとTCP/IPの通信について勉強したわけではないので、間違った理解、解釈があるかと思います。その際はご指導いただけると助かります。

プロセス間通信がしたい

Linux上でしか動かないPythonライブラリ

Linux上でしか動かないPythonのライブラリをどうしても使いたいという事案が発生し、Ubuntuを導入して試行錯誤していました。私はWindows環境で育っているので、Windows上でしか動かないアプリへの愛着があり、なんとかWindowsアプリとLinux上のアプリの非同期通信の実現にはどの方法が良いかと試行錯誤していました。

WineとUbuntuアプリとのプロセス間通信は難しい

Linux上でなんとかWindowsアプリを動かすということができるWineというソフトがあります。Wineを使えば、愛用しているWindowsアプリの動作は問題ありません。ただ、共有メモリや、PostMessageのやりとりでLinux上のソフトと通信する方法が難しいようなのです。

たとえば、PythonでWin32APIを呼ぼうと思うと、Wine上で動作するWindows版のPythonを導入しなければなりません。結局、Linux上で動くPythonとの通信にはならないのです。

実際にやりたかったこと

少し回りくどく書いていましたが、実際にやりたかったことはUbuntu上で取得したFXの価格データ(いわゆるTick)をWindows側で取得させたかったのです。

プロセス間通信の実装方法

共有メモリを使う

まず最初に候補に上げるべきは共有メモリです。共有メモリはメモリ管理の基本的な内容さえ理解しておけば完全にコントロールができ、さらに他の選択肢より圧倒的に速いです。ですが、OSをまたいで共有メモリを使うといった方法は非常に難易度が高くなります。

MySQL等のDBを使う

個人的に好きな方法です。何よりもお手軽で、すぐ書けます。DBの方で各環境ごとの呼び出し方法や通信の方法等が用意されているので、開発者はDBに格納するデータの内容だけを考えればよくなります。難点は、遅いところです。速度が求められる通信には、毎回クエリを投げるような処理は向きません。

Socketを使う

DBとの通信にもSocketを使っているのですが、ここでいうのは通信の仕様自体も開発者が設定する方法という意味で紹介しています。Python DocumentationのHowtoにもSocketが高速で人気だと説明があります。

Socketによる通信

Socketによる通信を行う際に、学習したことや実際の動作にて確認したことをご紹介します。

その前に、Python DocumentationのHowtoのリンクを紹介しておきます。必要な情報がコンパクトにまとめられていて、pythonでsocket通信をするなら、読んでおけば躓かないでsocket通信プログラムが作製できるよう書かれているためオススメです。

UDPとTCPの違い

オンラインゲーム等としていると、一度はルーターの設定をする機会があると思います。UDPとTCPという名前はよく耳にしていましたが、どういう違いがあるかということを理解していなかったので、ついでに調べてみました。

TCPはコネクション型で、パケットの順序や欠落を保証しているプロトコル。UDPは、コネクションレス型で、パケットの順序や欠落を保証していないかわりに、負荷が軽く高速に動作するプロトコル。

UDPは投げっぱなしでどういうデータが届いたかというフィードバックを行わない、TCPは届いたデータの確認を行うという形のようです。大容量のデータの送信や、非常に高速な通信が求められる際はUDPを使うといったところでしょうか。もちろん、UDPを使った大容量のデータ送付の際は、別の形でファイルの整合性確認を行う必要があります。

今回はFXのTickデータの送信なので、高速処理も大事ですがそれよりもデータの正確性のほうが大事なのでTCP通信を用いることにします。

Socketはサーバーを立てて、クライアントが接続、acceptされて初めて動作する

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("localhost", 2233))
server.listen(1)
client, client_address = server.accept()

上記はサーバーの接続確立コードです。socket.socketでソケットが作製され、bindしてサーバーとして認識され、listenすることでサーバーとして動作します。

acceptメソッドは、clientからの接続があって初めて動作し、接続が確立されます。その後、データを送信したり受信したりできるようになります。

継続して接続を使いたければSO_REUSEADDRを使う

一度確立した接続を1度きりの通信ではなく、ストリーミングのように継続して利用したければ、上記のコードでも紹介したserver.setsockoptメソッドにて、SO_REUSEADDRを有効にしましょう。この辺りの認識には自信がないのですが、SO_REUSEADDR オプションを有効にしておかないと、TIME_WAIT状態が解消されないためソケットの再利用ができないようなのです。

実際に起こる問題としては、recvが指定したバイト数を受信するまで通信を待ち続けてしまうこと、一度送信が終わってから再度データの送信を行おうとすると "Error 10057"が起こって通信ができないことです。

sendしたデータは受信されるまでプールされる

頂いたご指摘より修正しました。ahaha_fxtraderさん、ありがとうございます!
[2014/12/23 14:13修正]
recvされるまで、sendした情報がプールされます。データの送信漏れが起きずに、TCPが信頼できる通信であるという理由の一つのようです。最初は、sendしたデータがrecvされないという現象に出くわしたので、仕様だと思っていましたが、バグだったようです・・・。

作成したサンプルプログラム

ctypesで作成した構造体をクライアントからバイナリで送信して、サーバー側で受信したバイナリを元の構造体に戻すという処理をしています。1000回の通信によるベンチマークもとりたかったので、1000回で処理を終了するようにしています。

tick_struct.py

Tickデータを格納するためのctypes.Structureです。symbol, datetime, bid, askというデータを格納するようにしました。

from ctypes import *

class TickStruct(Structure):
    _pack_ = 1
    _fields_ = [
        ('symbol',c_char*6),
        ('datetime',c_double),
        ('bid',c_double),
        ('ask',c_double)]

    def __repr__(self):
        return "Symbol : %s, datetime : %f, bid : %f, ask : %f"%(self.symbol, self.datetime, self.bid, self.ask)

test_scoket_server.py

import socket
import select
import ctypes
from tick_struct import TickStruct

MSGLEN = 1024

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("localhost", 2233))
server.listen(1)
client, client_address = server.accept()

print "Socket server started"
tick = TickStruct()
resv_size = ctypes.sizeof(tick)
count = 0
while True:
    r, w, e = select.select([client], [], [])
    for reader in r:
        reader.recv_into(tick)
        print tick
        count += 1

    if(count >= 1000):
        client.sendall("finish")
        break

受け取ったバイナリをrecv_intoで直接TickStruct構造体に渡しています。実際に受け取った内容を確認したければ、print文のコメントアウトを外してください。ただし、benchmarkが遅くなります。

test_scoket_client.py

import ctypes
from tick_struct import TickStruct
import socket
import time
from benchmarker import Benchmarker

MSGLEN = 1024

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(("localhost", 2233))

tick = TickStruct()
resv_size = ctypes.sizeof(tick)
bid = raw_input()
with Benchmarker(width=20) as bench:
    @bench("main")
    def _(bm):
        for _ in range(1000):
            tick.symbol = "USDJPY"
            tick.datetime = time.time()
            tick.bid = float(bid)
            tick.ask = float(bid)
            client.sendall(buffer(tick))
        client.recv(MSGLEN)

Tickデータを送信するクライアントです。起動後、送信する価格(119.82等)を入力してください。

動作結果

1000回のTickStructの通信にかかった時間が得られます。

99.9
## benchmarker:         release 4.0.1 (for python)
## python version:      2.7.8
## python compiler:     MSC v.1500 32 bit (Intel)
## python platform:     Windows-7-6.1.7601-SP1
## python executable:   C:Python27python.exe
## cpu model:           Intel64 Family 6 Model 58 Stepping 9, GenuineIntel
## parameters:          loop=1, cycle=1, extra=0

##                        real    (total    = user    + sys)
main                    0.0140    0.0000    0.0000    0.0000

## Ranking                real
main                    0.0140  (100.0) ********************

## Matrix                 real    [01]
[01] main               0.0140   100.0

Githubのレポジトリ

考察と感想

1000回の送受信にかかった時間はたったの0.014秒と確かにsocket通信は高速だということが確認できました。これだけ速いと、1/10,000秒ぐらいの早さでTickが送信されるようなことでもなければ、送信の欠損に関しては気にしないでも良さそうです。なので、受信完了次第、queueで別スレッドで動く価格情報処理プログラムにTickデータを渡すようにすれば、やりたいことは問題なく実装できそうです。

ご参考になれば幸いです!

コメント

タイトルとURLをコピーしました