
websocketsをインストールしておく。pip install websockets
app.websocket()デコレータを用いることでWebSocket接続用のエンドポイントを定義できる。@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):WebSocketオブジェクトは、FastAPIから渡される接続オブジェクトである。
エンドポイントを/wsにした場合、全てのクライアントが同じURL(ws://127.0.0.1:8000/wsなど)に接続することになる。
つまり、接続した全員が同じチャンネルにいる状態。
対戦ゲームなどで部屋ごとに接続を分けたい場合は、@app.websocket("/ws/{room_id}")await websocket.accept()を実行することで、WebSocket接続が確立される。
その後のwhile Trueの中がWebSocket接続後のメイン処理であり、WebSocketメッセージの送受信を待機する。
接続が切断されるとWebSocketDisconnect例外になり、接続切断後の処理を実行する。@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
""" WebSocket接続を処理するエンドポイント """
await websocket.accept() # WebSocket接続を確立する
try:
while True:
# WebSocket接続確立後の処理
if anything_flag:
# WebSocket接続を切断
await websocket.close()
except WebSocketDisconnect:
# WebSocket接続切断時の処理typeを持つJSON形式で統一することが多い。@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
""" WebSocket接続を処理するエンドポイント """
await websocket.accept() # WebSocket接続を確立する
try:
while True:
# WebSocket接続確立後の処理
# クライアントからメッセージを受信する
text_data = await websocket.receive_text() # textとして受け取る
json_data = await websocket.receive_json() # jsonとして受け取る
# クライアントへメッセージを送信する
await websocket.send_text("テキスト") # text形式で送る
await websocket.send_json({"type": "タイプ", "message": "メッセージ"}) # json形式で送るweb_socket_channelというライブラリを利用する。
pubspec.yamlのdependenciesに必要なパッケージを記述し、getコマンドを実行する。dependencies:
web_socket_channel: ^2.1.0
flutter pub get
import 'package:web_socket_channel/web_socket_channel.dart';
WebSocketChannel? _channel;
final uri = Uri.parse('ws://127.0.0.1:8000/ws');
// WebSocketチャンネルを作成・接続
_channel = WebSocketChannel.connect(uri);Streamのlisten()で取得できる。
listen()の第1引数にはメッセージ受信時のコールバック関数を指定でき、このコールバック関数内で受信メッセージに応じた処理を行う。
他にもonDoneに接続切断時のコールバック、onErrorでエラー時のコールバックも指定できる。_channel!.stream.listen(
_receiveMessage, // メッセージ受信時のコールバック
onDone: _handleDisconnect, // 接続切断時のコールバック
onError: (error) {} // エラー発生時のコールバック
);WebSocketSinkのadd()を利用する。
テキスト形式でもJSON形式でも可能。_channel!.sink.add(
jsonEncode({
"type": "タイプ",
"message": "メッセージ"
})
);
単発処理 → HTTP
リアルタイム同期 → WebSocket
@app.websocket("/ws/{room_id}/{player_name}")/ws/12345/player1
/ws/12345/player2
Roomクラスでゲーム状態を管理している。class Room:
self.room_id
self.players
self.startedPlayerStateクラスで管理している。class PlayerState:
self.websocket
self.name
self.hand
self.choice_idbroadcast()を定義している。
上記のPlayerStateオブジェクト(プレイヤー)個々のWebSocket接続にそれぞれメッセージ送信を行うことで全体送信にしている。async def broadcast(self, payload: dict):
for player in self.players.values():
await player.websocket.send_json(payload)# ゲーム終了を全体通知
await room.broadcast({
"type": "game_over"
})typeを利用して処理を分岐している。
この方式であれば、イベント追加がしやすい、メッセージ形式を統一できる、フロント側の処理を整理しやすい、といった利点がある。# typeによるイベント処理の分岐の一部
if (type == 'start') {}
if (type == 'round_result') {}
if (type == 'game_over') {}main.pyとmain.dart)を下記のようにすることを想定している。本格的にやるのであればファイルを分けたり、もっと詳細なエラーハンドリングを入れるべきだが、サンプルなのであまり気にしない。main.dartfrom fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Dict, List, Optional
import random
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
MARKS = ["🔥", "🌿", "💧"]
MARK_ORDER = {"🔥": "🌿", "🌿": "💧", "💧": "🔥"}
NUMBER_OF_CARDS = 18
rooms: Dict[str, "Room"] = {}
class Card(BaseModel):
id: str
mark: str
number: int
def to_dict(self) -> dict:
return {"id": self.id, "mark": self.mark, "number": self.number}
class PlayerState:
def __init__(self, websocket: WebSocket, name: str) -> None:
self.websocket = websocket # このプレイヤーへのメッセージ送信用
self.name = name # プレイヤー名
self.hand: List[Card] = [] # 現在の手札
self.choice_id: Optional[str] = None # 選択したカードのID
def find_card(self, card_id: str) -> Optional[Card]:
for card in self.hand:
if card.id == card_id:
return card
return None
def remove_card(self, card_id: str) -> None:
self.hand = [card for card in self.hand if card.id != card_id]
class Room:
def __init__(self, room_id: str) -> None:
self.room_id = room_id
self.players: Dict[str, PlayerState] = {} # プレイヤー辞書
self.started = False # ゲーム開始フラグ
def add_player(self, player: PlayerState) -> None:
self.players[player.name] = player
def remove_player(self, name: str) -> None:
if name in self.players:
del self.players[name]
def get_other(self, name: str) -> Optional[PlayerState]:
for key, player in self.players.items():
if key != name:
return player
return None
def all_ready(self) -> bool:
return len(self.players) == 2
def clear_choices(self) -> None:
for player in self.players.values():
player.choice_id = None
def to_hand_payload(self) -> Dict[str, List[dict]]:
return {
name: [card.to_dict() for card in player.hand]
for name, player in self.players.items()
}
# すべてのプレイヤーに同じメッセージを送信する
async def broadcast(self, payload: dict) -> None:
for player in self.players.values():
try:
await player.websocket.send_json(payload)
except RuntimeError:
pass
async def send_to(self, name: str, payload: dict) -> None:
player = self.players.get(name)
if player:
await player.websocket.send_json(payload)
async def start_game(self) -> None:
self.started = True
deck = build_deck()
random.shuffle(deck)
selected = deck[:NUMBER_OF_CARDS]
hands = [selected[:NUMBER_OF_CARDS//2],
selected[NUMBER_OF_CARDS//2:NUMBER_OF_CARDS]]
for i, player in enumerate(self.players.values()):
player.hand = hands[i]
await player.websocket.send_json(
{
"type": "start",
"opponent": self.get_other(player.name).name,
"hand": [card.to_dict() for card in player.hand],
"message": "2人揃いました。カードを選んでください。",
}
)
await self.broadcast(
{
"type": "prompt",
"message": "カードを選んでください。",
}
)
def build_deck() -> List[Card]:
deck: List[Card] = []
for mark in MARKS:
for number in range(1, NUMBER_OF_CARDS//3 + 1):
deck.append(Card(id=f"{mark}{number}", mark=mark, number=number))
return deck
def compare_cards(card_a: Card, card_b: Card) -> int:
if card_a.mark == card_b.mark:
return 1 if card_a.number > card_b.number else 0
if MARK_ORDER[card_a.mark] == card_b.mark:
return 1
return 0
def generate_room_id() -> str:
while True:
room_id = f"{random.randint(10000, 99999)}"
if room_id not in rooms:
return room_id
@app.post("/create-room")
async def create_room():
""" 部屋のIDを生成しRoomオブジェクトを作成するエンドポイント """
room_id = generate_room_id()
rooms[room_id] = Room(room_id)
return {"room_id": room_id}
@app.get("/room/{room_id}")
async def room_status(room_id: str):
""" 部屋の状態を返すエンドポイント """
room = rooms.get(room_id)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
return {"room_id": room_id, "players": len(room.players), "started": room.started}
@app.websocket("/ws/{room_id}/{player_name}")
async def websocket_endpoint(room_id: str, player_name: str, websocket: WebSocket):
""" WebSocket接続を処理するエンドポイント """
# WebSocket接続を受け入れる
await websocket.accept()
room = rooms.get(room_id)
if room is None:
await websocket.send_json({"type": "error", "message": "その部屋は存在しません。"})
await websocket.close()
return
if room.all_ready():
await websocket.send_json({"type": "error", "message": "部屋は満員です。"})
await websocket.close()
return
player = PlayerState(websocket=websocket, name=player_name)
room.add_player(player)
# 参加したプレイヤーに部屋IDとプレイヤー名を送信
await player.websocket.send_json({"type": "joined", "room_id": room_id, "name": player_name})
if room.all_ready():
await room.start_game()
else:
await player.websocket.send_json(
{"type": "waiting", "message": "相手を待っています..."}
)
try:
while True:
payload = await websocket.receive_json()
event_type = payload.get("type")
if event_type == "play":
card_id = payload.get("card_id")
selected_card = player.find_card(card_id)
if selected_card is None:
continue
player.choice_id = card_id
other = room.get_other(player.name)
if other.choice_id is None:
await player.websocket.send_json(
{"type": "waiting", "message": "相手の選択を待っています..."}
)
continue
opponent_card = other.find_card(other.choice_id)
result = compare_cards(selected_card, opponent_card)
if result == 1:
winner, loser = player, other
winner_card, loser_card = selected_card, opponent_card
else:
winner, loser = other, player
winner_card, loser_card = opponent_card, selected_card
loser.remove_card(loser.choice_id)
round_payload = {
"type": "round_result",
"winner": winner.name,
"loser": loser.name,
"winner_card": winner_card.to_dict(),
"loser_card": loser_card.to_dict(),
"hand": {
winner.name: [card.to_dict() for card in winner.hand],
loser.name: [card.to_dict() for card in loser.hand],
},
"message": f"{winner.name} の勝ちです。",
}
await room.broadcast(round_payload)
room.clear_choices()
if len(loser.hand) == 0:
# 勝者と敗者にゲーム終了のメッセージを送信
await winner.websocket.send_json(
{"type": "game_over", "is_winner": True, "message": "ゲーム終了"}
)
await loser.websocket.send_json(
{"type": "game_over", "is_winner": False, "message": "ゲーム終了"}
)
break
else:
await room.broadcast(
{
"type": "prompt",
"message": "次のカードを選んでください。",
}
)
except WebSocketDisconnect:
other = room.get_other(player.name)
room.remove_player(player.name)
if other:
await other.websocket.send_json(
{
"type": "opponent_left",
"message": "相手が離脱しました。ゲームを終了します。",
}
)
if len(room.players) == 0:
rooms.pop(room_id, None)main.dartimport 'dart:convert';
import 'package:flutter/material.dart';
import 'package:http/http.dart' as http;
import 'package:web_socket_channel/web_socket_channel.dart';
void main() {
runApp(const CardDuelApp());
}
class CardDuelApp extends StatelessWidget {
const CardDuelApp({super.key});
Widget build(BuildContext context) {
return MaterialApp(
theme: ThemeData(primarySwatch: Colors.blue),
home: const LobbyPage(),
);
}
}
class LobbyPage extends StatefulWidget {
const LobbyPage({super.key});
State<LobbyPage> createState() => _LobbyPageState();
}
class _LobbyPageState extends State<LobbyPage> {
final TextEditingController _nameController =
TextEditingController(); // プレイヤー名入力用コントローラー
final TextEditingController _roomController =
TextEditingController(); // 部屋番号入力用コントローラー
String _status = ''; // エラーや進行状況などのステータスメッセージ
String? _roomId; // 部屋ID
String? _playerName; // プレイヤー名
String? _opponentName; // 対戦相手の名前
WebSocketChannel? _channel; // WebSocketチャンネル
List<CardModel> _hand = []; // プレイヤーの手札
bool _waiting = false; // 待ち状態の判定
bool _isStarted = false; // ゲーム開始の判定
bool _isGameOver = false; // ゲーム終了の判定
bool _resultDialogVisible = false; // 結果ダイアログの表示状態
bool? _isWinner; // 最終的な勝利フラグ(true: 勝ち, false: 負け)
String? _selectedCardId; // 選択中のカードID
Future<void> _createRoom() async {
final name = _nameController.text.trim();
// 部屋作成APIを呼び出す
final response =
await http.post(Uri.parse('http://127.0.0.1:8000/create-room'));
final body = jsonDecode(response.body);
final roomId = body['room_id'] as String;
// websocket接続を開始する
_connect(roomId, name);
// 部屋番号を表示する
setState(() {
_roomController.text = roomId;
_status = '部屋を作成しました。部屋番号: $roomId';
});
}
Future<void> _joinRoom() async {
final name = _nameController.text.trim();
final roomId = _roomController.text.trim();
_connect(roomId, name);
}
void _connect(String roomId, String playerName) {
// 状態をリセット(新しいゲーム準備)
setState(() {
_roomId = roomId;
_playerName = playerName;
_status = '接続中...';
_waiting = false;
_isGameOver = false;
_isWinner = null;
_hand = [];
_opponentName = null;
});
// WebSocket URI を構築
final uri = Uri.parse('ws://127.0.0.1:8000/ws/$roomId/$playerName');
// WebSocketチャネルを作成・接続
_channel = WebSocketChannel.connect(uri);
// メッセージ受信リスナーを設定し、非同期にメッセージを受信待機する
_channel!.stream.listen(_receiveMessage, // メッセージ受信時のコールバック
onDone: _handleDisconnect, // 接続切断時のコールバック
onError: (error) {
// エラー発生時のコールバック
setState(() => _status = 'WebSocketエラー: $error');
});
}
void _receiveMessage(dynamic message) {
final data = jsonDecode(message as String) as Map<String, dynamic>;
final type = data['type'] as String?;
// バックエンドから送信されるメッセージの種類(type)ごとに分岐処理
// 接続成功:接続が確立し、部屋に参加したことを通知
if (type == 'joined') {
setState(() => _status = '部屋に参加しました: $_roomId');
return;
}
// 待機状態:対戦相手を待っていることを通知
if (type == 'waiting') {
setState(() {
_status = data['message'] as String;
_waiting = true;
});
return;
}
// ゲーム開始:対戦相手の名前と初期手札を受け取り、ゲーム開始を通知
if (type == 'start') {
final handData = data['hand'] as List<dynamic>;
setState(() {
_opponentName = data['opponent'] as String?;
_hand = handData.map((item) => CardModel.fromJson(item)).toList();
_status = data['message'] as String? ?? 'ゲーム開始';
_waiting = false;
_isStarted = true;
});
return;
}
// カード選択の促し:カードを選ぶように促すメッセージを表示
if (type == 'prompt') {
setState(() {
_status = data['message'] as String? ?? 'カードを選んでください。';
// 結果ダイアログが表示されている場合は待ち状態を維持し、そうでない場合は待ち状態を解除
if (!_resultDialogVisible) {
_waiting = false;
}
});
return;
}
// 結果:各ラウンドの結果を受け取り、手札の更新やラウンドの勝敗の表示を行う
if (type == 'round_result') {
final handData = data['hand'] as Map<String, dynamic>;
final myHandData = handData[_playerName] as List<dynamic>?;
// 新しい手札とメッセージはモーダルを閉じてから反映するため、ここでは保持だけする
final newHand =
myHandData!.map((item) => CardModel.fromJson(item)).toList();
final winner = data['winner'] as String? ?? '結果';
final winnerCard = data['winner_card'] as Map<String, dynamic>;
final loserCard = data['loser_card'] as Map<String, dynamic>;
setState(() {
_status = data['message'] as String? ?? '結果が届きました。';
_waiting = true;
_selectedCardId = null;
});
WidgetsBinding.instance.addPostFrameCallback((_) {
_showRoundResultDialog(
'$winner の勝ち!',
'${winnerCard['mark']}${winnerCard['number']} vs ${loserCard['mark']}${loserCard['number']}',
newHand: newHand,
);
});
return;
}
// ゲーム終了:ゲームの勝者を受け取り、ゲーム終了のメッセージと勝者表示を行う
if (type == 'game_over') {
setState(() {
_isGameOver = true;
_isWinner = data['is_winner'] as bool?;
_status = data['message'] as String? ?? 'ゲーム終了';
});
return;
}
// 相手の離脱:対戦相手が離脱したことを受け取り、離脱のメッセージを表示し待ち状態にする
if (type == 'opponent_left') {
setState(() {
_status = data['message'] as String? ?? '相手が離脱しました。';
_waiting = true;
});
return;
}
}
void _handleDisconnect() {
setState(() {
if (!_isGameOver) {
_status = '接続が切断されました。';
}
_waiting = false;
});
}
void _playCard(String cardId) {
// カード選択をバックエンドに送信するためのメッセージを作成する
final message = jsonEncode({"type": "play", "card_id": cardId});
// WebSocketを通じてカード選択のメッセージを送信する
_channel!.sink.add(message);
setState(() {
_status = 'カード選択を送信しました。';
_waiting = true;
_selectedCardId = cardId;
});
}
Future<void> _showRoundResultDialog(
String title,
String content, {
required List<CardModel> newHand,
}) async {
if (_resultDialogVisible) {
return;
}
setState(() {
_resultDialogVisible = true;
_waiting = true;
});
// 結果ダイアログを表示(ユーザーがタップできない)
showDialog(
context: context,
barrierDismissible: false,
builder: (context) {
return AlertDialog(
title: Text(title),
content: Text(content),
);
},
);
// 3秒待機してからダイアログを閉じる
await Future.delayed(const Duration(seconds: 2));
if (!mounted) {
return;
}
if (_resultDialogVisible) {
Navigator.of(context).pop();
// モーダルを閉じてから手札とメッセージを更新する
setState(() {
_hand = newHand;
_resultDialogVisible = false;
_waiting = false;
});
}
}
void dispose() {
// WebSocket接続を閉じる
_channel?.sink.close();
// ウィジェットが破棄される際に呼ばれ各種リソースを解放する
_nameController.dispose();
_roomController.dispose();
super.dispose();
}
Widget build(BuildContext context) {
// メインのUIを構築するウィジェット
// 部屋に参加していない場合はロビー画面を、参加している場合はゲームボードを表示する
return Scaffold(
body: Padding(
padding: const EdgeInsets.all(16.0),
child: _roomId == null ? _buildLobby() : _buildGameBoard(),
),
);
}
Widget _buildLobby() {
return Column(
crossAxisAlignment: CrossAxisAlignment.stretch,
children: [
TextField(
controller: _nameController,
decoration: const InputDecoration(
labelText: 'プレイヤー名', border: OutlineInputBorder()),
),
const SizedBox(height: 16),
TextField(
controller: _roomController,
decoration: const InputDecoration(
labelText: '部屋番号 (参加時)', border: OutlineInputBorder()),
),
const SizedBox(height: 16),
ElevatedButton(onPressed: _createRoom, child: const Text('部屋を作成')),
const SizedBox(height: 8),
ElevatedButton(onPressed: _joinRoom, child: const Text('部屋に参加')),
const SizedBox(height: 16),
Text(_status,
style: const TextStyle(fontSize: 16, color: Colors.black87)),
],
);
}
// ゲームボードのUIを構築するウィジェット
Widget _buildGameBoard() {
return Column(
crossAxisAlignment: CrossAxisAlignment.stretch,
children: [
Text('${_playerName ?? ''}',
style: const TextStyle(fontSize: 18, fontWeight: FontWeight.bold)),
const SizedBox(height: 8),
Text('対戦相手: ${_opponentName ?? '待機中'}',
style: const TextStyle(fontSize: 16)),
const SizedBox(height: 8),
if (_roomId != null && !_isStarted)
Text('部屋番号: $_roomId',
style: const TextStyle(fontSize: 16, color: Colors.black54)),
const SizedBox(height: 12),
Text(_status,
style: const TextStyle(fontSize: 16, color: Colors.blueAccent)),
if (_isGameOver) ...[
// ゲーム終了の場合は勝者表示とタイトルに戻るボタンを表示する
Text(
_isWinner == true ? 'あなたの勝ち!' : 'あなたの負け...',
style: TextStyle(
fontSize: 20,
fontWeight: FontWeight.bold,
color: _isWinner == true ? Colors.red : Colors.blue,
),
),
const SizedBox(height: 16),
ElevatedButton(
onPressed: () {
_channel?.sink.close();
setState(() {
_roomId = null;
_status = '';
});
},
child: const Text('タイトルに戻る'),
),
] else ...[
// ゲーム開始後は手札とカード選択ボタンを表示する
if (_isStarted) ...[
Text('手札 (残り: ${_hand.length}枚)',
style:
const TextStyle(fontSize: 18, fontWeight: FontWeight.bold)),
const SizedBox(height: 8),
],
SizedBox(
height: 58,
child: SingleChildScrollView(
scrollDirection: Axis.horizontal,
child: Row(
children: _hand.map((card) {
return Padding(
padding: const EdgeInsets.only(right: 8.0),
child: ElevatedButton(
onPressed: _waiting ? null : () => _playCard(card.id),
style: ElevatedButton.styleFrom(
backgroundColor: _selectedCardId == card.id
? Colors.blueAccent
: Colors.white,
foregroundColor: _selectedCardId == card.id
? Colors.white
: Colors.black87,
// 選択中のカードは青背景、選択されていないカードは白背景にする
disabledBackgroundColor: _selectedCardId == card.id
? Colors.blueAccent
: Colors.white.withOpacity(0.6),
disabledForegroundColor: _selectedCardId == card.id
? Colors.white
: Colors.black38,
side: const BorderSide(color: Colors.blueAccent),
padding: const EdgeInsets.symmetric(
vertical: 8, horizontal: 10),
minimumSize: const Size(60, 44),
),
child: Text('${card.mark} ${card.number}',
style: const TextStyle(fontSize: 12)),
),
);
}).toList(),
),
),
),
const SizedBox(height: 8),
ElevatedButton(
onPressed: () {
_channel?.sink.close();
setState(() {
_roomId = null;
_status = '';
});
},
child: const Text('退出してタイトルに戻る'),
),
],
],
);
}
}
class CardModel {
final String id;
final String mark;
final int number;
CardModel({required this.id, required this.mark, required this.number});
factory CardModel.fromJson(Map<String, dynamic> json) {
return CardModel(
id: json['id'] as String,
mark: json['mark'] as String,
number: json['number'] as int,
);
}
}