Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions src/main/java/io/mycat/backend/jdbc/JDBCConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -372,10 +372,28 @@ private void executeSQL(RouteResultsetNode rrn, ServerConnection sc,
//ShowVariables.justReturnValue(sc,String.valueOf(sc.getId()));
ShowVariables.justReturnValue(sc,String.valueOf(sc.getId()),this);
} else {
ouputResultSet(sc, orgin);
if (sqlType == ServerParse.SELECT && dbType.equals("SQLITE") && orgin.contains("@@")) {
try{
ShowVariables.executeSelectVar(sc, orgin, this);
} catch (UnExecutedException e) {
LOGGER.error("sql parser error, will try backend." , e);
ouputResultSet(sc, orgin);
}
} else {
ouputResultSet(sc, orgin);
}
}
} else {
executeddl(sc, orgin);
if (sqlType == ServerParse.UPDATE && dbType.equals("SQLITE") && orgin.contains("@@")) {
try{
ShowVariables.executeSetVar(sc, orgin, this);
} catch (UnExecutedException e) {
LOGGER.error("sql parser error, will try backend." , e);
executeddl(sc, orgin);
}
} else {
executeddl(sc, orgin);
}
}

} catch (SQLException e) {
Expand Down
119 changes: 115 additions & 4 deletions src/main/java/io/mycat/backend/jdbc/ShowVariables.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.alibaba.druid.sql.ast.expr.SQLCharExpr;
import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr;
import com.alibaba.druid.sql.ast.expr.SQLVariantRefExpr;
import com.alibaba.druid.sql.ast.statement.*;
import io.mycat.backend.mysql.listener.SqlExecuteStage;
import io.mycat.net.mysql.*;
import io.mycat.route.parser.druid.MycatStatementParser;
import org.slf4j.Logger; import org.slf4j.LoggerFactory;

import io.mycat.backend.BackendConnection;
import io.mycat.backend.mysql.PacketUtil;
import io.mycat.config.Fields;
import io.mycat.net.mysql.EOFPacket;
import io.mycat.net.mysql.FieldPacket;
import io.mycat.net.mysql.ResultSetHeaderPacket;
import io.mycat.net.mysql.RowDataPacket;
import io.mycat.server.NonBlockingSession;
import io.mycat.server.ServerConnection;
import io.mycat.util.StringUtil;
Expand Down Expand Up @@ -183,16 +186,124 @@ private static RowDataPacket getRow(String name, String value, String charset) {
variables.put("time_zone", "SYSTEM");
variables.put("tx_isolation", "REPEATABLE-READ");
variables.put("wait_timeout", "172800");

//add by =
variables.put("auto_increment_increment", "1");
}

public static void execute(ServerConnection sc, String orgin, BackendConnection jdbcConnection) {
execute(sc, orgin);
NonBlockingSession session = sc.getSession2();
session.releaseConnectionIfSafe(jdbcConnection, LOGGER.isDebugEnabled(), false);
session.getSource().getListener().fireEvent(SqlExecuteStage.END);
}
public static void justReturnValue(ServerConnection sc, String orgin, BackendConnection jdbcConnection) {
justReturnValue(sc, orgin);
NonBlockingSession session = sc.getSession2();
session.releaseConnectionIfSafe(jdbcConnection, LOGGER.isDebugEnabled(), false);
session.getSource().getListener().fireEvent(SqlExecuteStage.END);
}

public static void executeSelectVar(ServerConnection c, String sql, BackendConnection jdbcConnection) throws UnExecutedException {

ResultSetHeaderPacket header;
FieldPacket[] fields;
EOFPacket eof;
RowDataPacket row;
EOFPacket lastEof;

try {
MycatStatementParser parser = new MycatStatementParser(sql);
SQLSelectStatement sss = parser.parseSelect();
SQLSelect ss = sss.getSelect();
SQLSelectQueryBlock qry = ss.getQueryBlock();
if (null != qry.getFrom() && !"dual".equalsIgnoreCase(qry.getFrom().toString())) {
throw new UnExecutedException("format error");
}

List<SQLSelectItem> ssis = qry.getSelectList();
int FIELD_COUNT = ssis.size();

byte packetId = 0;

header = PacketUtil.getHeader(FIELD_COUNT);
header.packetId = ++packetId;

fields = new FieldPacket[FIELD_COUNT];
for(int i = 0; i < FIELD_COUNT; i++) {
SQLSelectItem ssi = ssis.get(i);
if (null == ssi.getAlias() || null == ssi.getExpr() || !ssi.getExpr().toString().startsWith("@@")) {
throw new UnExecutedException("format error");
}
fields[i] = PacketUtil.getField(ssi.getAlias(), Fields.FIELD_TYPE_VAR_STRING);
fields[i].packetId = ++packetId;
}

eof = new EOFPacket();
eof.packetId = ++packetId;

row = new RowDataPacket(FIELD_COUNT);
for (SQLSelectItem ssi: ssis) {
String val = variables.get(ssi.getAlias());
row.add(StringUtil.encode(val, c.getCharset()));
}
row.packetId = ++packetId;

// write lastEof
lastEof = new EOFPacket();
lastEof.packetId = ++packetId;

} catch (Throwable e) {
throw new UnExecutedException(e);
}

ByteBuffer buffer = c.allocate();
// write header
buffer = header.write(buffer, c,true);
// write fields
for (FieldPacket field : fields) {
buffer = field.write(buffer, c,true);
}
// write eof
buffer = eof.write(buffer, c,true);
buffer = row.write(buffer, c,true);
buffer = lastEof.write(buffer, c,true);
// write buffer
c.write(buffer);

NonBlockingSession session = c.getSession2();
session.releaseConnectionIfSafe(jdbcConnection, LOGGER.isDebugEnabled(), false);
session.getSource().getListener().fireEvent(SqlExecuteStage.END);
}

public static void executeSetVar(ServerConnection c, String sql, BackendConnection jdbcConnection) throws UnExecutedException {

try {
MycatStatementParser parser = new MycatStatementParser(sql);
SQLSetStatement ss = (SQLSetStatement)parser.parseSet();
for (SQLAssignItem item : ss.getItems()) {
String tagert = ((SQLVariantRefExpr)item.getTarget()).getName();
String value = null;
if (item.getValue() instanceof SQLCharExpr){
value = ((SQLCharExpr) item.getValue()).getText();
} else if (item.getValue() instanceof SQLIntegerExpr) {
value = ((SQLIntegerExpr) item.getValue()).getNumber().toString();
}
if (tagert.startsWith("@@") && null != value) {
tagert = tagert.substring(2);
variables.put(tagert, value);
} else {
throw new UnExecutedException("format error");
}
}
} catch (Throwable e) {
throw new UnExecutedException(e);
}

c.write(c.writeToBuffer(OkPacket.OK, c.allocate()));

NonBlockingSession session = c.getSession2();
session.releaseConnectionIfSafe(jdbcConnection, LOGGER.isDebugEnabled(), false);
session.getSource().getListener().fireEvent(SqlExecuteStage.END);
}
}
16 changes: 16 additions & 0 deletions src/main/java/io/mycat/backend/jdbc/UnExecutedException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.mycat.backend.jdbc;

public class UnExecutedException extends Exception {
public UnExecutedException() {}
public UnExecutedException(String message) {
super(message);
}

public UnExecutedException(String message, Throwable cause) {
super(message, cause);
}

public UnExecutedException(Throwable cause) {
super(cause);
}
}