Skip to content

Commit fa9de90

Browse files
committed
optimize websocket continue frame
1 parent 6184d5c commit fa9de90

File tree

14 files changed

+474
-58
lines changed

14 files changed

+474
-58
lines changed

ext-src/php_swoole_cxx.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,14 @@ int php_swoole_websocket_frame_object_pack_ex(swoole::String *buffer,
180180
zval *zdata,
181181
zend_bool mask,
182182
zend_bool allow_compress);
183+
int php_swoole_websocket_frame_unpack(swoole::String *buffer,
184+
swoole::String *message,
185+
zval *zframe,
186+
uchar *opcode,
187+
uchar *flags,
188+
bool open_websocket_ping_frame,
189+
bool open_websocket_pong_frame,
190+
uchar allow_uncompress);
183191
void php_swoole_websocket_frame_unpack(swoole::String *data, zval *zframe);
184192
void php_swoole_websocket_frame_unpack_ex(swoole::String *data, zval *zframe, uchar allow_uncompress);
185193

ext-src/swoole_http_client_coro.cc

Lines changed: 103 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,14 @@ class Client {
113113

114114
/* options */
115115
uint8_t max_retries = 0;
116-
bool keep_alive = true; // enable by default
117-
bool websocket = false; // if upgrade successfully
118-
bool chunked = false; // Transfer-Encoding: chunked
119-
bool websocket_mask = true; // enable websocket mask
116+
bool keep_alive = true; // enable by default
117+
bool websocket = false; // if upgrade successfully
118+
bool chunked = false; // Transfer-Encoding: chunked
119+
bool websocket_mask = true; // enable websocket mask
120+
bool open_websocket_ping_frame = false; // handle websocket ping frame by user
121+
bool open_websocket_pong_frame = false; // handle websocket pong frame by user
122+
bool open_websocket_close_frame = false; // handle websocket close frame by user
123+
bool send_close_frame = false; // send close frame to server
120124
bool body_decompression = true;
121125
bool http_compression = true;
122126
#ifdef SW_HAVE_ZLIB
@@ -140,6 +144,7 @@ class Client {
140144
* allowing access to the sent Request data even after the connection has been closed.
141145
*/
142146
String *tmp_write_buffer = nullptr;
147+
String *websocket_buffer = nullptr;
143148
bool connection_close = false;
144149
bool completed = false;
145150
bool event_stream = false;
@@ -789,6 +794,15 @@ void Client::apply_setting(zval *zset, const bool check_all) {
789794
delete write_func;
790795
write_func = sw_callable_create(ztmp);
791796
}
797+
if (php_swoole_array_get_value(vht, "open_websocket_ping_frame", ztmp)) {
798+
open_websocket_ping_frame = zval_is_true(ztmp);
799+
}
800+
if (php_swoole_array_get_value(vht, "open_websocket_pong_frame", ztmp)) {
801+
open_websocket_pong_frame = zval_is_true(ztmp);
802+
}
803+
if (php_swoole_array_get_value(vht, "open_websocket_close_frame", ztmp)) {
804+
open_websocket_close_frame = zval_is_true(ztmp);
805+
}
792806
}
793807
if (socket) {
794808
php_swoole_socket_set(socket, zset);
@@ -1534,26 +1548,76 @@ bool Client::recv_websocket_frame(zval *zframe, double timeout) {
15341548
SW_ASSERT(websocket);
15351549
ZVAL_FALSE(zframe);
15361550

1537-
ssize_t retval = socket->recv_packet(timeout);
1538-
if (retval <= 0) {
1539-
php_swoole_socket_set_error_properties(zobject, socket);
1540-
zend::object_set(zobject, ZEND_STRL("statusCode"), HTTP_ESTATUS_SERVER_RESET);
1541-
if (socket->errCode != ETIMEDOUT) {
1542-
close();
1543-
}
1544-
return false;
1545-
} else {
1546-
String msg;
1547-
msg.length = retval;
1548-
msg.str = socket->get_read_buffer()->str;
1551+
bool allow_uncompress = 0;
15491552
#ifdef SW_HAVE_ZLIB
1550-
php_swoole_websocket_frame_unpack_ex(&msg, zframe, accept_websocket_compression);
1551-
#else
1552-
php_swoole_websocket_frame_unpack(&msg, zframe);
1553+
allow_uncompress = accept_websocket_compression;
15531554
#endif
1555+
1556+
if (!websocket_buffer) {
1557+
websocket_buffer = new String();
1558+
}
1559+
1560+
do {
1561+
if (sw_unlikely(!socket)) {
1562+
return false;
1563+
}
1564+
1565+
ssize_t retval = socket->recv_packet(timeout);
1566+
if (retval > 0) {
1567+
String message = {};
1568+
message.length = retval;
1569+
message.str = socket->get_read_buffer()->str;
1570+
1571+
uchar flags = 0;
1572+
uchar opcode = 0;
1573+
1574+
int result = php_swoole_websocket_frame_unpack(websocket_buffer,
1575+
&message,
1576+
zframe,
1577+
&opcode,
1578+
&flags,
1579+
open_websocket_ping_frame,
1580+
open_websocket_pong_frame,
1581+
allow_uncompress);
1582+
1583+
if (sw_unlikely(result == SW_ERR)) {
1584+
return false;
1585+
}
1586+
1587+
if (opcode == WebSocket::OPCODE_PING && !open_websocket_ping_frame) {
1588+
push(zframe, WebSocket::OPCODE_PONG, WebSocket::FLAG_FIN);
1589+
zval_ptr_dtor(zframe);
1590+
ZVAL_FALSE(zframe);
1591+
continue;
1592+
}
1593+
1594+
if (opcode == WebSocket::OPCODE_PONG && !open_websocket_pong_frame) {
1595+
continue;
1596+
}
1597+
1598+
if (opcode == WebSocket::OPCODE_CLOSE && !open_websocket_close_frame) {
1599+
if (!send_close_frame && push(zframe, WebSocket::OPCODE_CLOSE, WebSocket::FLAG_FIN)) {
1600+
close();
1601+
}
1602+
zval_ptr_dtor(zframe);
1603+
ZVAL_FALSE(zframe);
1604+
return false;
1605+
}
1606+
} else {
1607+
php_swoole_socket_set_error_properties(zobject, socket);
1608+
zend::object_set(zobject, ZEND_STRL("statusCode"), HTTP_ESTATUS_SERVER_RESET);
1609+
if (socket->errCode != ETIMEDOUT) {
1610+
close();
1611+
}
1612+
return false;
1613+
}
1614+
} while (!ZVAL_IS_OBJECT(zframe));
1615+
1616+
if (ZVAL_IS_OBJECT(zframe)) {
15541617
zend_update_property_long(swoole_websocket_frame_ce, SW_Z8_OBJ_P(zframe), ZEND_STRL("fd"), socket->get_fd());
15551618
return true;
15561619
}
1620+
return false;
15571621
}
15581622

15591623
bool Client::upgrade(const std::string &path) {
@@ -1578,7 +1642,7 @@ bool Client::upgrade(const std::string &path) {
15781642
}
15791643

15801644
bool Client::push(zval *zdata, zend_long opcode, uint8_t flags) {
1581-
if (!websocket) {
1645+
if (!sw_unlikely(websocket)) {
15821646
swoole_set_last_error(SW_ERROR_WEBSOCKET_HANDSHAKE_FAILED);
15831647
php_swoole_fatal_error(E_WARNING, "websocket handshake failed, cannot push data");
15841648
zend_update_property_long(
@@ -1593,6 +1657,8 @@ bool Client::push(zval *zdata, zend_long opcode, uint8_t flags) {
15931657
String *buffer = socket->get_write_buffer();
15941658
buffer->clear();
15951659
if (php_swoole_websocket_frame_is_object(zdata)) {
1660+
zval *ztmp = sw_zend_read_property_ex(swoole_websocket_frame_ce, zdata, SW_ZSTR_KNOWN(SW_ZEND_STR_OPCODE), 1);
1661+
opcode = zval_get_long(ztmp);
15961662
if (php_swoole_websocket_frame_object_pack(buffer, zdata, websocket_mask, accept_websocket_compression) < 0) {
15971663
return false;
15981664
}
@@ -1603,13 +1669,17 @@ bool Client::push(zval *zdata, zend_long opcode, uint8_t flags) {
16031669
}
16041670
}
16051671

1606-
if (socket->send_all(buffer->str, buffer->length) != (ssize_t) buffer->length) {
1672+
if (opcode == WebSocket::OPCODE_CLOSE) {
1673+
send_close_frame = true;
1674+
}
1675+
1676+
if (socket->send_all(buffer->str, buffer->length) == (ssize_t) buffer->length) {
1677+
return true;
1678+
} else {
16071679
php_swoole_socket_set_error_properties(zobject, socket);
16081680
zend::object_set(zobject, ZEND_STRL("statusCode"), HTTP_ESTATUS_SERVER_RESET);
16091681
close();
16101682
return false;
1611-
} else {
1612-
return true;
16131683
}
16141684
}
16151685

@@ -1675,6 +1745,15 @@ bool Client::close(const bool should_be_reset) {
16751745
_socket->get_socket()->close_wait = 1;
16761746
return true;
16771747
}
1748+
1749+
if (websocket && !send_close_frame) {
1750+
zval zpayload = {};
1751+
const char *reason = "Normal closure";
1752+
ZVAL_STRINGL(&zpayload, reason, strlen(reason));
1753+
push(&zpayload, WebSocket::OPCODE_CLOSE, WebSocket::FLAG_FIN);
1754+
zval_ptr_dtor(&zpayload);
1755+
}
1756+
16781757
zend_update_property_bool(Z_OBJCE_P(zobject), SW_Z8_OBJ_P(zobject), ZEND_STRL("connected"), 0);
16791758
if (!_socket->close()) {
16801759
php_swoole_socket_set_error_properties(zobject, _socket);
@@ -1691,6 +1770,7 @@ Client::~Client() {
16911770
delete body;
16921771
delete tmp_write_buffer;
16931772
delete write_func;
1773+
delete websocket_buffer;
16941774
}
16951775

16961776
static sw_inline HttpClientObject *http_client_coro_fetch_object(zend_object *obj) {

ext-src/swoole_websocket_server.cc

Lines changed: 122 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -112,42 +112,141 @@ static void php_swoole_websocket_construct_frame(zval *zframe, zend_long opcode,
112112
swoole_websocket_frame_ce, SW_Z8_OBJ_P(zframe), ZEND_STRL("finish"), flags & WebSocket::FLAG_FIN);
113113
}
114114

115+
static int php_swoole_websocket_uncompress_and_construct_zframe(
116+
const char *data, size_t length, zval *zframe, uchar opcode, uchar flags, uchar allow_uncompress) {
117+
zval zpayload = {};
118+
119+
#ifdef SW_HAVE_ZLIB
120+
if (allow_uncompress && (flags & WebSocket::FLAG_RSV1)) {
121+
String *zlib_buffer = sw_tg_buffer();
122+
zlib_buffer->clear();
123+
124+
if (sw_likely(websocket_message_uncompress(zlib_buffer, data, length))) {
125+
ZVAL_STRINGL(&zpayload, zlib_buffer->str, zlib_buffer->length);
126+
flags ^= (WebSocket::FLAG_RSV1 | WebSocket::FLAG_COMPRESS);
127+
} else {
128+
ZVAL_FALSE(zframe);
129+
swoole_set_last_error(SW_ERROR_PROTOCOL_ERROR);
130+
return SW_ERR;
131+
}
132+
} else
133+
#endif
134+
{
135+
ZVAL_STRINGL(&zpayload, data, length);
136+
}
137+
php_swoole_websocket_construct_frame(zframe, opcode, &zpayload, flags);
138+
zval_ptr_dtor(&zpayload);
139+
return SW_OK;
140+
}
141+
142+
int php_swoole_websocket_frame_unpack(swoole::String *buffer,
143+
swoole::String *message,
144+
zval *zframe,
145+
uchar *opcode,
146+
uchar *flags,
147+
bool open_websocket_ping_frame,
148+
bool open_websocket_pong_frame,
149+
uchar allow_uncompress) {
150+
WebSocket::Frame frame = {};
151+
size_t length = message->length;
152+
const char *data = message->str;
153+
154+
if (sw_unlikely(!WebSocket::decode(&frame, const_cast<char *>(data), length))) {
155+
swoole_set_last_error(SW_ERROR_PROTOCOL_ERROR);
156+
return SW_ERR;
157+
}
158+
159+
if (sw_unlikely(static_cast<size_t>(length) < sizeof(frame.header))) {
160+
swoole_set_last_error(SW_ERROR_PROTOCOL_ERROR);
161+
return SW_ERR;
162+
}
163+
164+
*opcode = frame.header.OPCODE;
165+
*flags = WebSocket::get_flags(&frame);
166+
167+
if ((*opcode == WebSocket::OPCODE_TEXT || *opcode == WebSocket::OPCODE_BINARY)) {
168+
if (sw_unlikely(buffer->offset > 0)) {
169+
swoole_warning("must be sent as a sequence of continuation frames.");
170+
return SW_ERR;
171+
}
172+
173+
if (frame.header.FIN) {
174+
return php_swoole_websocket_uncompress_and_construct_zframe(
175+
frame.payload, frame.payload_length, zframe, *opcode, *flags, allow_uncompress);
176+
}
177+
178+
if (frame.payload) {
179+
buffer->append(frame.payload, frame.payload_length);
180+
}
181+
buffer->offset = WebSocket::get_ext_flags(*opcode, *flags);
182+
return SW_OK;
183+
}
184+
185+
if (*opcode == WebSocket::OPCODE_CONTINUATION) {
186+
if (sw_unlikely(buffer->offset == 0)) {
187+
swoole_warning("messages must start with a text or binary frame.");
188+
return SW_ERR;
189+
}
190+
191+
off_t offset = buffer->offset;
192+
if (sw_likely(frame.payload)) {
193+
buffer->append(frame.payload, frame.payload_length);
194+
}
195+
buffer->offset = offset;
196+
197+
if (frame.header.FIN) {
198+
uchar complete_opcode = 0;
199+
uchar complete_flags = 0;
200+
WebSocket::parse_ext_flags(buffer->offset, &complete_opcode, &complete_flags);
201+
202+
int result = php_swoole_websocket_uncompress_and_construct_zframe(
203+
frame.payload, frame.payload_length, zframe, complete_opcode, complete_flags, allow_uncompress);
204+
buffer->clear();
205+
return result;
206+
}
207+
return SW_OK;
208+
}
209+
210+
if (*opcode == WebSocket::OPCODE_CLOSE || *opcode == WebSocket::OPCODE_PING || *opcode == WebSocket::OPCODE_PONG) {
211+
if (sw_unlikely(frame.header.RSV1 || frame.header.RSV2 || frame.header.RSV3)) {
212+
swoole_warning("invalid control frame: The reserved bits (RSV1, RSV2, RSV3) are not all zero.");
213+
return SW_ERR;
214+
}
215+
216+
if (*opcode == WebSocket::OPCODE_PING && !open_websocket_ping_frame) {
217+
ZVAL_STRINGL(zframe, frame.payload, frame.payload_length);
218+
return SW_OK;
219+
}
220+
221+
if (*opcode == WebSocket::OPCODE_PONG && !open_websocket_pong_frame) {
222+
return SW_OK;
223+
}
224+
225+
return php_swoole_websocket_uncompress_and_construct_zframe(
226+
frame.payload, frame.payload_length, zframe, *opcode, *flags, 0);
227+
}
228+
return SW_ERR;
229+
}
230+
115231
void php_swoole_websocket_frame_unpack_ex(String *data, zval *zframe, uchar uncompress) {
116232
WebSocket::Frame frame;
117-
zval zpayload;
118-
uint8_t flags;
119-
120-
if (data->length < sizeof(frame.header)) {
233+
if (!WebSocket::decode(&frame, data)) {
121234
swoole_set_last_error(SW_ERROR_PROTOCOL_ERROR);
122235
ZVAL_FALSE(zframe);
123236
return;
124237
}
125238

126-
if (!WebSocket::decode(&frame, data)) {
239+
if (data->length < sizeof(frame.header)) {
127240
swoole_set_last_error(SW_ERROR_PROTOCOL_ERROR);
128241
ZVAL_FALSE(zframe);
129242
return;
130243
}
131244

132-
flags = WebSocket::get_flags(&frame);
133-
#ifdef SW_HAVE_ZLIB
134-
if (uncompress && frame.header.RSV1) {
135-
String *zlib_buffer = sw_tg_buffer();
136-
zlib_buffer->clear();
137-
if (!websocket_message_uncompress(zlib_buffer, frame.payload, frame.payload_length)) {
138-
swoole_set_last_error(SW_ERROR_PROTOCOL_ERROR);
139-
ZVAL_FALSE(zframe);
140-
return;
141-
}
142-
frame.payload = zlib_buffer->str;
143-
frame.payload_length = zlib_buffer->length;
144-
flags ^= (WebSocket::FLAG_RSV1 | WebSocket::FLAG_COMPRESS);
245+
int result = php_swoole_websocket_uncompress_and_construct_zframe(
246+
frame.payload, frame.payload_length, zframe, frame.header.OPCODE, WebSocket::get_flags(&frame), uncompress);
247+
if (sw_unlikely(result == SW_ERR)) {
248+
ZVAL_FALSE(zframe);
145249
}
146-
#endif
147-
/* TODO: optimize memory copy */
148-
ZVAL_STRINGL(&zpayload, frame.payload, frame.payload_length);
149-
php_swoole_websocket_construct_frame(zframe, frame.header.OPCODE, &zpayload, flags);
150-
zval_ptr_dtor(&zpayload);
151250
}
152251

153252
void php_swoole_websocket_frame_unpack(String *data, zval *zframe) {

include/swoole_websocket.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,13 @@ static inline uchar get_flags(const Frame *frame) {
127127
return flags;
128128
}
129129

130+
static inline uint16_t get_ext_flags(uchar opcode, uchar flags) {
131+
uint16_t ext_flags = opcode;
132+
ext_flags = ext_flags << 8;
133+
ext_flags += flags;
134+
return ext_flags;
135+
}
136+
130137
static inline uchar set_flags(uchar fin, uchar mask, uchar rsv1, uchar rsv2, uchar rsv3) {
131138
uchar flags = 0;
132139
if (fin) {

0 commit comments

Comments
 (0)