Fix #39, remote can raise a EOF error now

This commit is contained in:
chzyer 2016-03-28 17:37:07 +08:00
parent 1e409caaf3
commit 30b462e50b
1 changed files with 41 additions and 6 deletions

View File

@ -21,9 +21,11 @@ const (
T_ISTTY_REPORT T_ISTTY_REPORT
T_RAW T_RAW
T_ERAW // exit raw T_ERAW // exit raw
T_EOF
) )
type RemoteSvr struct { type RemoteSvr struct {
eof int32
closed int32 closed int32
width int32 width int32
reciveChan chan struct{} reciveChan chan struct{}
@ -31,6 +33,7 @@ type RemoteSvr struct {
conn net.Conn conn net.Conn
isTerminal bool isTerminal bool
funcWidthChan func() funcWidthChan func()
stopChan chan struct{}
dataBufM sync.Mutex dataBufM sync.Mutex
dataBuf bytes.Buffer dataBuf bytes.Buffer
@ -59,6 +62,7 @@ func NewRemoteSvr(conn net.Conn) (*RemoteSvr, error) {
conn: conn, conn: conn,
writeChan: make(chan *writeCtx), writeChan: make(chan *writeCtx),
reciveChan: make(chan struct{}), reciveChan: make(chan struct{}),
stopChan: make(chan struct{}),
} }
buf := bufio.NewReader(rs.conn) buf := bufio.NewReader(rs.conn)
@ -113,16 +117,35 @@ func (r *RemoteSvr) IsTerminal() bool {
return r.isTerminal return r.isTerminal
} }
func (r *RemoteSvr) checkEOF() error {
if atomic.LoadInt32(&r.eof) == 1 {
return io.EOF
}
return nil
}
func (r *RemoteSvr) Read(b []byte) (int, error) { func (r *RemoteSvr) Read(b []byte) (int, error) {
r.dataBufM.Lock() r.dataBufM.Lock()
n, err := r.dataBuf.Read(b) n, err := r.dataBuf.Read(b)
r.dataBufM.Unlock() r.dataBufM.Unlock()
if n == 0 {
if err := r.checkEOF(); err != nil {
return 0, err
}
}
if n == 0 && err == io.EOF { if n == 0 && err == io.EOF {
<-r.reciveChan <-r.reciveChan
r.dataBufM.Lock() r.dataBufM.Lock()
n, err = r.dataBuf.Read(b) n, err = r.dataBuf.Read(b)
r.dataBufM.Unlock() r.dataBufM.Unlock()
} }
if n == 0 {
if err := r.checkEOF(); err != nil {
return 0, err
}
}
return n, err return n, err
} }
@ -151,19 +174,24 @@ func (r *RemoteSvr) ExitRawMode() error {
func (r *RemoteSvr) writeLoop() { func (r *RemoteSvr) writeLoop() {
defer r.Close() defer r.Close()
loop:
for { for {
ctx, ok := <-r.writeChan select {
case ctx, ok := <-r.writeChan:
if !ok { if !ok {
break break
} }
n, err := ctx.msg.WriteTo(r.conn) n, err := ctx.msg.WriteTo(r.conn)
ctx.reply <- &writeReply{n, err} ctx.reply <- &writeReply{n, err}
case <-r.stopChan:
break loop
}
} }
} }
func (r *RemoteSvr) Close() { func (r *RemoteSvr) Close() {
if atomic.CompareAndSwapInt32(&r.closed, 0, 1) { if atomic.CompareAndSwapInt32(&r.closed, 0, 1) {
close(r.writeChan) close(r.stopChan)
r.conn.Close() r.conn.Close()
} }
} }
@ -176,6 +204,12 @@ func (r *RemoteSvr) readLoop(buf *bufio.Reader) {
break break
} }
switch m.Type { switch m.Type {
case T_EOF:
atomic.StoreInt32(&r.eof, 1)
select {
case r.reciveChan <- struct{}{}:
default:
}
case T_DATA: case T_DATA:
r.dataBufM.Lock() r.dataBufM.Lock()
r.dataBuf.Write(m.Data) r.dataBuf.Write(m.Data)
@ -350,6 +384,7 @@ func (r *RemoteCli) Serve() error {
for { for {
n, _ := io.Copy(r, os.Stdin) n, _ := io.Copy(r, os.Stdin)
if n == 0 { if n == 0 {
r.writeMsg(NewMessage(T_EOF, nil))
break break
} }
} }