forked from EvolutionAPI/evolution-client-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwebsocket.py
More file actions
174 lines (147 loc) · 6.44 KB
/
websocket.py
File metadata and controls
174 lines (147 loc) · 6.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import socketio
from typing import Callable, Dict, Any
import logging
import time
from typing import Optional
from ..models.websocket import WebSocketConfig, WebSocketInfo
class WebSocketService:
def __init__(self, client):
self.client = client
def set_websocket(self, instance_id: str, config: WebSocketConfig, instance_token: str):
"""
Configure WebSocket settings for an instance
Args:
instance_id (str): The instance ID
config (WebSocketConfig): The WebSocket configuration
instance_token (str): The instance token
Returns:
dict: The response from the API
"""
return self.client.post(
f'websocket/set/{instance_id}',
data=config.__dict__,
instance_token=instance_token
)
def find_websocket(self, instance_id: str, instance_token: str) -> WebSocketInfo:
"""
Get WebSocket settings for an instance
Args:
instance_id (str): The instance ID
instance_token (str): The instance token
Returns:
WebSocketInfo: The WebSocket information
"""
response = self.client.get(
f'websocket/find/{instance_id}',
instance_token=instance_token
)
return WebSocketInfo(**response)
class WebSocketManager:
def __init__(self, base_url: str, instance_id: str, api_token: str, max_retries: int = 5, retry_delay: float = 1.0):
"""
Initialize the WebSocket manager
Args:
base_url (str): Base URL of the API
instance_id (str): Instance ID
api_token (str): API authentication token
max_retries (int): Maximum number of reconnection attempts
retry_delay (float): Initial delay between attempts in seconds
"""
self.base_url = base_url.rstrip('/')
self.instance_id = instance_id
self.api_token = api_token
self.max_retries = max_retries
self.retry_delay = retry_delay
self.retry_count = 0
self.should_reconnect = True
# Socket.IO configuration
self.sio = socketio.Client(
ssl_verify=False, # For local development
logger=False,
engineio_logger=False,
request_timeout=30
)
# Configure class logger to INFO
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Dictionary to store registered handlers
self._handlers = {}
# Configure event handlers
self.sio.on('connect', self._on_connect)
self.sio.on('disconnect', self._on_disconnect)
self.sio.on('error', self._on_error)
# Register global handler in instance-specific namespace
self.sio.on('*', self._handle_event, namespace=f'/{self.instance_id}')
def _on_connect(self):
"""Handler for connection event"""
self.logger.info("Socket.IO connected")
self.retry_count = 0 # Reset retry counter after successful connection
def _on_disconnect(self):
"""Handler for disconnection event"""
self.logger.warning(f"Socket.IO disconnected. Attempt {self.retry_count + 1}/{self.max_retries}")
if self.should_reconnect and self.retry_count < self.max_retries:
self._attempt_reconnect()
else:
self.logger.error("Maximum number of reconnection attempts reached")
def _on_error(self, error):
"""Handler for error events"""
self.logger.error(f"Socket.IO error: {str(error)}", exc_info=True)
def _attempt_reconnect(self):
"""Attempt to reconnect with exponential backoff"""
try:
delay = self.retry_delay * (2 ** self.retry_count) # Exponential backoff
self.logger.info(f"Attempting to reconnect in {delay:.2f} seconds...")
time.sleep(delay)
self.connect()
self.retry_count += 1
except Exception as e:
self.logger.error(f"Error during reconnection attempt: {str(e)}", exc_info=True)
if self.retry_count < self.max_retries:
self._attempt_reconnect()
else:
self.logger.error("All reconnection attempts failed")
def _handle_event(self, event, *args):
"""Global handler for all events"""
# Only process registered events
if event in self._handlers:
self.logger.debug(f"Event received in namespace /{self.instance_id}: {event}")
self.logger.debug(f"Event data: {args}")
try:
# Extract event data
raw_data = args[0] if args else {}
# Ensure we're passing the correct object to the callback
if isinstance(raw_data, dict):
self.logger.debug(f"Calling handler for {event} with data: {raw_data}")
self._handlers[event](raw_data)
else:
self.logger.error(f"Invalid data received for event {event}: {raw_data}")
except Exception as e:
self.logger.error(f"Error processing event {event}: {str(e)}", exc_info=True)
def connect(self):
"""Connect to Socket.IO server"""
try:
# Connect only to instance namespace with authentication header
self.sio.connect(
f"{self.base_url}?apikey={self.api_token}",
transports=['websocket'],
namespaces=[f'/{self.instance_id}'],
wait_timeout=30
)
# Join instance-specific room
self.sio.emit('subscribe', {'instance': self.instance_id}, namespace=f'/{self.instance_id}')
except Exception as e:
self.logger.error(f"Error connecting to Socket.IO: {str(e)}", exc_info=True)
raise
def disconnect(self):
"""Disconnect from Socket.IO server"""
self.should_reconnect = False # Prevent reconnection attempts
if self.sio.connected:
self.sio.disconnect()
def on(self, event: str, callback: Callable):
"""
Register a callback for a specific event
Args:
event (str): Event name
callback (Callable): Function to be called when the event occurs
"""
self._handlers[event] = callback