From 16c932699e360cd44811afc68894e55a1a128962 Mon Sep 17 00:00:00 2001 From: Dirk Koopman Date: Sat, 14 Sep 2013 21:45:23 +0100 Subject: [PATCH] nominally working system! Need to go over AsyncMsg and stuff but that's for tomorrow. --- perl/Msg.pm | 148 +++++++++++++++++++++++++++--------------------- perl/Version.pm | 4 +- perl/cluster.pl | 2 +- perl/console.pl | 105 ++++++++++++++++++---------------- 4 files changed, 141 insertions(+), 118 deletions(-) diff --git a/perl/Msg.pm b/perl/Msg.pm index d62cb744..b2ee9b23 100644 --- a/perl/Msg.pm +++ b/perl/Msg.pm @@ -20,14 +20,14 @@ use Mojo::IOLoop::Stream; use DXDebug; use Timer; -use vars qw($now %conns $noconns $cnum $total_in $total_out); +use vars qw($now %conns $noconns $cnum $total_in $total_out $connect_timeout); $total_in = $total_out = 0; $now = time; $cnum = 0; - +$connect_timeout = 5; # #----------------------------------------------------------------- @@ -61,7 +61,7 @@ sub set_error { my $conn = shift; my $callback = shift; - $conn->{sock}->on(error => sub {my ($stream, $err) = @_; $callback->($conn, $err);}); + $conn->{sock}->on(error => sub {$callback->($conn, $_[1]);}); } sub set_on_eof @@ -128,9 +128,35 @@ sub peerhost #----------------------------------------------------------------- # Send side routines -sub connect { - my ($pkg, $to_host, $to_port, $rproc) = @_; +sub _on_connect +{ + my $conn = shift; + my $handle = shift; + undef $conn->{sock}; + my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($handle); + $sock->on(read => sub {$conn->_rcv($_[1]);} ); + $sock->on(error => sub {$conn->disconnect;}); + $sock->on(close => sub {$conn->disconnect;}); + $sock->timeout(0); + $sock->start; + dbg((ref $conn) . " connected $conn->{cnum} to $conn->{peerhost}:$conn->{peerport}") if isdbg('connll'); + if ($conn->{on_connect}) { + &{$conn->{on_connect}}($conn); + } +} + +sub is_connected +{ + my $conn = shift; + my $sock = $conn->{sock}; + return ref $sock && $sock->isa('Mojo::IOLoop::Stream'); +} + +sub connect { + my ($pkg, $to_host, $to_port, $rproc, %args) = @_; + my $timeout = delete $args{timeout} || $connect_timeout; + # Create a connection end-point object my $conn = $pkg; unless (ref $pkg) { @@ -144,17 +170,17 @@ sub connect { my $sock; $conn->{sock} = $sock = Mojo::IOLoop::Client->new; - $sock->on(connect => sub { dbg((ref $conn) . " connected $conn->{cnum} to $to_host:$to_port") if isdbg('connll');}, - error => {$conn->disconnect}, - close => {$conn->disconnect}); + $sock->on(connect => sub {$conn->_on_connect($_[1])} ); + $sock->on(error => sub {$conn->disconnect}); + $sock->on(close => sub {$conn->disconnect}); + + # copy any args like on_connect, on_disconnect etc + while (my ($k, $v) = each %args) { + $conn->{$k} = $v; + } - $sock->connect(address => $to_host, port => $to_port); + $sock->connect(address => $to_host, port => $to_port, timeout => $timeout); - dbg((ref $conn) . " connected $conn->{cnum} to $to_host:$to_port") if isdbg('connll'); - - if ($conn->{rproc}) { - $sock->on(read => sub {my ($stream, $msg) = @_; $conn->_rcv($msg);} ); - } return $conn; } @@ -226,6 +252,10 @@ sub disconnect $call ||= 'unallocated'; dbg((ref $conn) . " Connection $conn->{cnum} $call disconnected") if isdbg('connll'); + if ($conn->{on_disconnect}) { + &{$conn->{on_disconnect}}($conn); + } + # get rid of any references for (keys %$conn) { if (ref($conn->{$_})) { @@ -234,7 +264,7 @@ sub disconnect } if (defined($sock)) { - $sock->remove; + $sock->close_gracefully; } unless ($main::is_win) { @@ -256,7 +286,7 @@ sub _send_stuff dbgdump('raw', "$call send $lth: ", $lth); } } - if (defined $sock && !$sock->destroyed) { + if (defined $sock) { $sock->write($data); $total_out = $lth; } else { @@ -275,6 +305,13 @@ sub send_later { goto &send_now; } +sub send_raw +{ + my ($conn, $msg) = @_; + push @{$conn->{outqueue}}, $msg; + _send_stuff($conn); +} + sub enqueue { my $conn = shift; push @{$conn->{outqueue}}, defined $_[0] ? $_[0] : ''; @@ -300,9 +337,10 @@ sub new_server my ($pkg, $my_host, $my_port, $login_proc) = @_; my $conn = $pkg->new($login_proc); - $conn->{sock} = Mojo::IOLoop::Server->new; - $conn->{sock}->on(accept=>sub{$conn->new_client()}); - $conn->{sock}->listen(address=>$my_host, port=>$my_port); + my $sock = $conn->{sock} = Mojo::IOLoop::Server->new; + $sock->on(accept=>sub{$conn->new_client($_[1]);}); + $sock->listen(address=>$my_host, port=>$my_port); + $sock->start; die "Could not create socket: $! \n" unless $conn->{sock}; return $conn; @@ -335,60 +373,37 @@ sub dequeue sub _rcv { # Complement to _send my $conn = shift; # $rcv_now complement of $flush - # Find out how much has already been received, if at all - my ($msg, $offset, $bytes_to_read, $bytes_read); + my $msg = shift; my $sock = $conn->{sock}; return unless defined($sock); my @lines; -# if ($conn->{blocking}) { -# blocking($sock, 0); -# $conn->{blocking} = 0; -# } - $bytes_read = sysread ($sock, $msg, 1024, 0); - if (defined ($bytes_read)) { - if ($bytes_read > 0) { - $total_in += $bytes_read; - if (isdbg('raw')) { - my $call = $conn->{call} || 'none'; - dbgdump('raw', "$call read $bytes_read: ", $msg); - } - if ($conn->{echo}) { - my @ch = split //, $msg; - my $out; - for (@ch) { - if (/[\cH\x7f]/) { - $out .= "\cH \cH"; - $conn->{msg} =~ s/.$//; - } else { - $out .= $_; - $conn->{msg} .= $_; - } - } - if (defined $out) { - set_event_handler ($sock, write => sub{$conn->_send(0)}); - push @{$conn->{outqueue}}, $out; + if (isdbg('raw')) { + my $call = $conn->{call} || 'none'; + my $lth = length $msg; + dbgdump('raw', "$call read $lth: ", $msg); + } + if ($conn->{echo}) { + my @ch = split //, $msg; + my $out; + for (@ch) { + if (/[\cH\x7f]/) { + $out .= "\cH \cH"; + $conn->{msg} =~ s/.$//; + } else { + $out .= $_; + $conn->{msg} .= $_; } - } else { - $conn->{msg} .= $msg; } - } + if (defined $out) { + $conn->send_raw($out); + } } else { - if (_err_will_block($!)) { - return ; - } else { - $bytes_read = 0; - } - } + $conn->{msg} .= $msg; + } -FINISH: - if (defined $bytes_read && $bytes_read == 0) { - &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc}; - $conn->disconnect; - } else { - unless ($conn->{disable_read}) { - $conn->dequeue if exists $conn->{msg}; - } + unless ($conn->{disable_read}) { + $conn->dequeue if exists $conn->{msg}; } } @@ -399,6 +414,8 @@ sub new_client { my $conn = $server_conn->new($server_conn->{rproc}); my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($client); $sock->on(read => sub {$conn->_rcv($_[1])}); + $sock->timeout(0); + $sock->start; dbg((ref $conn) . "accept $conn->{cnum} from $conn->{peerhost} $conn->{peerport}") if isdbg('connll'); my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $client->peerhost, $conn->{peerport} = $client->peerport); @@ -412,6 +429,7 @@ sub new_client { &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc}; $conn->disconnect(); } + return $conn; } sub close_server diff --git a/perl/Version.pm b/perl/Version.pm index 27bd1c0f..f7145cf7 100644 --- a/perl/Version.pm +++ b/perl/Version.pm @@ -11,7 +11,7 @@ use vars qw($version $subversion $build $gitversion); $version = '1.57'; $subversion = '0'; -$build = '1'; -$gitversion = '06a6935'; +$build = '2'; +$gitversion = '61e7e87'; 1; diff --git a/perl/cluster.pl b/perl/cluster.pl index 51d1455b..ddcf7214 100755 --- a/perl/cluster.pl +++ b/perl/cluster.pl @@ -593,7 +593,7 @@ $script->run($main::me) if $script; my $main_loop = Mojo::IOLoop->recurring($idle_interval => \&idle_loop); -Mojo::IOLoop->start; +Mojo::IOLoop->start unless Mojo::IOLoop->is_running; cease(0); exit(0); diff --git a/perl/console.pl b/perl/console.pl index 0a6d7404..ea46553f 100755 --- a/perl/console.pl +++ b/perl/console.pl @@ -26,6 +26,8 @@ BEGIN { $is_win = ($^O =~ /^MS/ || $^O =~ /^OS-2/) ? 1 : 0; # is it Windows? } +use Mojo::IOLoop; + use Msg; use IntMsg; use DXVars; @@ -54,6 +56,9 @@ $khistpos = 0; $spos = $pos = $lth = 0; $inbuf = ""; @time = (); +$lastmin = 0; +$idle = 0; + #$SIG{WINCH} = sub {@time = gettimeofday}; @@ -443,6 +448,46 @@ sub rec_stdin $bot->refresh(); } +sub idle_loop +{ + my $t; + + $t = time; + if ($t > $lasttime) { + my ($min)= (gmtime($t))[1]; + if ($min != $lastmin) { + show_screen(); + $lastmin = $min; + } + $lasttime = $t; + } + my $ch = $bot->getch(); + if (@time && tv_interval(\@time, [gettimeofday]) >= 1) { + next; + } + if (defined $ch) { + if ($ch ne '-1') { + rec_stdin($ch); + } + } + $top->refresh() if $top->is_wintouched; + $bot->refresh(); +} + +sub on_connect +{ + my $conn = shift; + $conn->send_later("A$call|$connsort width=$cols"); + $conn->send_later("I$call|set/page $maxshist"); + #$conn->send_later("I$call|set/nobeep"); +} + +sub on_disconnect +{ + $conn = shift; + Mojo::IOLoop->remove($idle); + Mojo::IOLoop->stop; +} # # deal with args @@ -464,23 +509,6 @@ if ($call eq $mycall) { dbginit(); -$conn = IntMsg->connect("$clusteraddr", $clusterport, \&rec_socket); -if (! $conn) { - if (-r "$data/offline") { - open IN, "$data/offline" or die; - while () { - print $_; - } - close IN; - } else { - print "Sorry, the cluster $mycall is currently off-line\n"; - } - exit(0); -} - -$conn->set_error(sub{cease(0)}); - - unless ($DB::VERSION) { $SIG{'INT'} = \&sig_term; $SIG{'TERM'} = \&sig_term; @@ -493,40 +521,17 @@ do_resize(); $SIG{__DIE__} = \&sig_term; -$conn->send_later("A$call|$connsort width=$cols"); -$conn->send_later("I$call|set/page $maxshist"); -#$conn->send_later("I$call|set/nobeep"); - -#Msg->set_event_handler(\*STDIN, "read" => \&rec_stdin); - $Text::Wrap::Columns = $cols; my $lastmin = 0; -for (;;) { - my $t; - Msg->event_loop(1, 0.01); - $t = time; - if ($t > $lasttime) { - my ($min)= (gmtime($t))[1]; - if ($min != $lastmin) { - show_screen(); - $lastmin = $min; - } - $lasttime = $t; - } - my $ch = $bot->getch(); - if (@time && tv_interval(\@time, [gettimeofday]) >= 1) { -# mydbg("Got Resize"); -# do_resize(); - next; - } - if (defined $ch) { - if ($ch ne '-1') { - rec_stdin($ch); - } - } - $top->refresh() if $top->is_wintouched; - $bot->refresh(); -} -exit(0); + +$conn = IntMsg->connect($clusteraddr, $clusterport, \&rec_socket); +$conn->{on_connect} = \&on_connect; +$conn->{on_disconnect} = \&on_disconnect; + +$idle = Mojo::IOLoop->recurring(0.100 => \&idle_loop); +Mojo::IOLoop->start; + + +cease(0); -- 2.43.0