#!/usr/bin/perl # # Danga's Distributed Lock Daemon # # Copyright 2004, Danga Interactive # Copyright 2005-2006, Six Apart, Ltd. # # Authors: # Brad Fitzpatrick # Jonathan Steinert # # License: # terms of Perl itself. # use strict; use Getopt::Long; use Carp; use Danga::Socket; use IO::Socket::INET; use POSIX (); ## the storage load intervals: %IVs = ( time_iv => load, ... ) our %IVs = (); ## haw many load buckets we want our $Bucket_qty = 100; # ## the total duration of load measurement our $Total_duration = 60 * 5; # 5mn ## the duration handled by one bucket our $IV_duration = int($Total_duration / $Bucket_qty); ## how many bucket held before we purge the old ones our $Bucket_limit = 2 * $Bucket_qty; use vars qw($DEBUG); $DEBUG = 0; my ( $daemonize, $nokeepalive, $hostname, $table, ); my $conf_port = 7002; my $lock_type = "internal"; Getopt::Long::GetOptions( 'd|daemon' => \$daemonize, 'p|port=i' => \$conf_port, 'debug=i' => \$DEBUG, 'n|no-keepalive' => \$nokeepalive, 't|type=s' => \$lock_type, 'h|hostname=s' => \$hostname, 'T|table=s' => \$table, ); # Statistics counters my $lock_successes = 0; my $lock_failures = 0; my $client_class; my @client_options; if ($lock_type eq 'internal') { $client_class = "Client::Internal"; } elsif($lock_type eq 'dlmfs') { $client_class = "Client::DLMFS"; } elsif($lock_type eq 'dbi') { length( $hostname ) or die( "-h (--hostname) must be included with a hostname in dbi mode\n" ); length( $table ) or die( "-T (--table) must be included with a table name in dbi mode\n" ); $client_class = "Client::DBI"; @client_options = ( $hostname, $table ); } else { die( "Unknown lock type of '$lock_type' specified.\n" ); } $client_class->_setup( @client_options ); daemonize() if $daemonize; use Socket qw(IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET); # Linux-specific: use constant TCP_KEEPIDLE => 4; # Start keeplives after this period use constant TCP_KEEPINTVL => 5; # Interval between keepalives use constant TCP_KEEPCNT => 6; # Number of keepalives before death $SIG{'PIPE'} = "IGNORE"; # handled manually # establish SERVER socket, bind and listen. my $server = IO::Socket::INET->new(LocalPort => $conf_port, Type => SOCK_STREAM, Proto => IPPROTO_TCP, Blocking => 0, Reuse => 1, Listen => 10 ) or die "Error creating socket: $@\n"; # Not sure if I'm crazy or not, but I can't see in strace where/how # Perl 5.6 sets blocking to 0 without this. In Perl 5.8, IO::Socket::INET # obviously sets it from watching strace. IO::Handle::blocking($server, 0); my $accept_handler = sub { my $csock = $server->accept(); return unless $csock; printf("Listen child making a Client for %d.\n", fileno($csock)) if $DEBUG; IO::Handle::blocking($csock, 0); setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die; # Enable keep alive unless ( $nokeepalive ) { (setsockopt($csock, SOL_SOCKET, SO_KEEPALIVE, pack("l", 1)) && setsockopt($csock, IPPROTO_TCP, TCP_KEEPIDLE, pack("l", 30)) && setsockopt($csock, IPPROTO_TCP, TCP_KEEPCNT, pack("l", 10)) && setsockopt($csock, IPPROTO_TCP, TCP_KEEPINTVL, pack("l", 30)) && 1 ) || die "Couldn't set keep-alive settings on socket (Not on Linux?)"; } my $client = $client_class->new($csock); $client->watch_read(1); }; Client->OtherFds(fileno($server) => $accept_handler); Client->EventLoop(); sub daemonize { my($pid, $sess_id, $i); ## Fork and exit parent if ($pid = fork) { exit 0; } ## Detach ourselves from the terminal croak "Cannot detach from controlling terminal" unless $sess_id = POSIX::setsid(); ## Prevent possibility of acquiring a controling terminal $SIG{'HUP'} = 'IGNORE'; if ($pid = fork) { exit 0; } ## Change working directory chdir "/"; ## Clear file creation mask umask 0; ## Close open file descriptors close(STDIN); close(STDOUT); close(STDERR); ## Reopen stderr, stdout, stdin to /dev/null open(STDIN, "+>/dev/null"); open(STDOUT, "+>&STDIN"); open(STDERR, "+>&STDIN"); } ##################################################################### ### C L I E N T C L A S S ##################################################################### package Client; use Danga::Socket; use base 'Danga::Socket'; use fields ( 'locks', # hashref of locks held by this connection. values are 1 'read_buf', ); # TODO: out %waiters, lock -> arrayref of client waiters (waker should check not closed) sub new { my Client $self = shift; $self = fields::new($self) unless ref $self; $self->SUPER::new( @_ ); $self->{locks} = {}; $self->{read_buf} = ''; return $self; } # Client sub event_read { my Client $self = shift; my $bref = $self->read(1024); return $self->close() unless defined $bref; $self->{read_buf} .= $$bref; if ($self->{read_buf} =~ s/^(.+?)\r?\n//) { my $line = $1; $self->process_line( $line ); } } sub process_line { my Client $self = shift; my $line = shift; if ($line =~ /^(\w+)\s*(.*)/) { my ($cmd, $args) = ($1, $2); $cmd = lc($cmd); no strict 'refs'; my $cmd_handler = *{"cmd_$cmd"}{CODE}; if ($cmd_handler) { my $args = decode_url_args(\$args); $cmd_handler->($self, $args); next; } } return $self->err_line('unknown_command'); } sub close { my Client $self = shift; foreach my $lock (keys %{$self->{locks}}) { $self->_release_lock($lock); } $self->SUPER::close; } # Client sub event_err { my $self = shift; $self->close; } sub event_hup { my $self = shift; $self->close; } sub cmd_status { my Client $self = shift; my $runtime = time - $^T; my $load = current_load(); $self->write("STATUS: OK\n"); $self->write("SUCCESSES: $lock_successes\n"); $self->write("FAILURES: $lock_failures\n"); $self->write("RUNTIME: $runtime\n"); $self->write("LOAD: $load\n"); $self->write("\n"); return 1; } sub iv_for_time { my $time = shift; return $time - ($time % $main::IV_duration); } sub latest_ivs { my $time = shift || time; my $current_iv = iv_for_time($time) + $main::IV_duration; return map { $current_iv -= $main::IV_duration } ( 0 .. $main::Bucket_qty - 1); } sub current_load { my $sum = 0; for (grep { $_ } @main::IVs{latest_ivs()}) { $sum += $_; } return $sum; } sub cmd_load { my Client $self = shift; my $load = current_load(); $self->write("LOAD: $load\n"); $self->write("\n"); return 1; } sub increase_load { my $time = time; $main::IVs{ iv_for_time($time) }++; if (scalar( keys %main::IVs ) > $main::Bucket_limit) { # purge IVs %main::IVs = map { $_ => $main::IVs{$_} } latest_ivs($time); } } # gets a lock or fails with 'taken' sub cmd_trylock { my Client $self = shift; my $args = shift; my $lock = $args->{lock}; my $lockstate = $self->_trylock( $lock ); increase_load(); if ($lockstate) { $lock_successes++; } else { $lock_failures++; } return $lockstate; } # releases a lock or fails with 'didnthave' sub cmd_releaselock { my Client $self = shift; my $args = shift; my $lock = $args->{lock}; return $self->err_line("empty_lock") unless length($lock); return $self->err_line("didnthave") unless $self->{locks}{$lock}; $self->_release_lock($lock); return $self->ok_line; } # shows current locks sub cmd_locks { my Client $self = shift; my $args = shift; $self->write("LOCKS:\n"); $self->write( join( "\n", $self->_get_locks ) ); $self->write("\n"); return 1; } sub cmd_noop { my Client $self = shift; # TODO: set self's last activity time so it isn't cleaned in a purge # of stale connections? return $self->ok_line; } sub ok_line { my Client $self = shift; my $args = shift || {}; my $argline = join('&', map { eurl($_) . "=" . eurl($args->{$_}) } keys %$args); $self->write("OK $argline\r\n"); return 1; } sub err_line { my Client $self = shift; my $err_code = shift; my $err_text = { 'unknown_command' => "Unknown server command", }->{$err_code}; $self->write("ERR $err_code " . eurl($err_text) . "\r\n"); return 0; } sub eurl { my $a = $_[0]; $a =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg; $a =~ tr/ /+/; return $a; } sub durl { my ($a) = @_; $a =~ tr/+/ /; $a =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg; return $a; } sub decode_url_args { my $a = shift; my $buffer = ref $a ? $a : \$a; my $ret = {}; my $pair; my @pairs = split(/&/, $$buffer); my ($name, $value); foreach $pair (@pairs) { ($name, $value) = split(/=/, $pair); $value =~ tr/+/ /; $value =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg; $name =~ tr/+/ /; $name =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg; $ret->{$name} .= $ret->{$name} ? "\0$value" : $value; } return $ret; } package Client::Internal; use base 'Client'; our (%holder); # hash of lock -> Client object holding it # TODO: out %waiters, lock -> arrayref of client waiters (waker should check not closed) sub _setup { # Nothing to set up. } sub _trylock { my Client::Internal $self = shift; my $lock = shift; return $self->err_line("empty_lock") unless length($lock); return $self->err_line("taken") if defined $holder{$lock}; $holder{$lock} = $self; $self->{locks}{$lock} = 1; return $self->ok_line(); } sub _release_lock { my Client::Internal $self = shift; my $lock = shift; # TODO: notify waiters delete $self->{locks}{$lock}; delete $holder{$lock}; return 1; } sub _get_locks { return map { " $_ = " . $holder{$_}->as_string } (sort keys %holder); } package Client::DLMFS; use base 'Client'; use Fcntl; use Errno qw(EEXIST ETXTBSY); sub FLAGS () { O_NONBLOCK | O_RDWR | O_CREAT | O_EXCL } sub PATH () { "/dlm/ddlockd" }; sub _setup { -d "/dlm" or die( "DLMFS mount at /dlm not found\n" ); mkdir PATH; } sub _trylock { my Client::Internal $self = shift; my $lock = shift; return $self->err_line("empty_lock") unless length($lock); if (sysopen( my $handle, PATH . "/$lock", FLAGS )) { $self->{locks}{$lock} = 1; return $self->ok_line(); } else { if ($! == EEXIST) { return $self->err_line( "local taken" ); } elsif( $! == ETXTBSY) { unlink( PATH . "/$lock" ); return $self->err_line( "remote taken" ); } else { return $self->err_line( "unknown: $!" ); } } } sub _release_lock { my Client::Internal $self = shift; my $lock = shift; # TODO: notify waiters delete $self->{locks}{$lock}; unlink( PATH . "/$lock" ); return 1; } sub _get_locks { # TODO # return map { " $_ = " . $holder{$_}->as_string } (sort keys %holder); } package Client::DBI; # CREATE TABLE use base 'Client'; use fields qw(dbh); my $hostname; my $table; my $dbh; sub _setup { my Client::DBI $self = shift; ($hostname, $table) = @_; eval "use DBI; 1" or die "No DBI available?"; $dbh = DBI->connect( 'dbi:mysql:dbname=sixalock', '', '', {AutoCommit => 0} ) or die; } sub _trylock { my Client::DBI $self = shift; my $lock = shift; my $local_locks = $self->{locks}; exists( $local_locks->{$lock} ) and return $self->err_line( "local taken" ); $dbh->do( 'START TRANSACTION' ) or return $self->err_line( "Transaction failed to start" ); my $sth = $dbh->prepare( "SELECT * FROM $table WHERE name=?" ); $sth->execute( $lock ) or return $self->err_line( "STH->execute failed" ); my $ary = $sth->fetchall_arrayref; ref($ary) eq 'ARRAY' or return $self->err_line( "DBI->selectall_arrayref returned non-arrayref" ); scalar @$ary == 0 or return $self->err_line( "remote taken" ); $dbh->do( "INSERT INTO $table (name) VALUES (?)", {}, $lock ) or return $self->err_line( "INSERT failed" ); $dbh->do( 'COMMIT' ) or return $self->err_line( "COMMIT failed" ); return $self->ok_line(); } sub _release_lock { my Client::DBI $self = shift; my $lock = shift; my $locks = $self->{locks}; if (exists( $locks->{$lock} )) { delete $locks->{$lock}; $dbh->do( "SELECT AND DELETE * FROM $table WHERE name=?", {}, $lock ); return 1; } else { return 0; } } sub _get_locks { my Client::DBI $self = shift; my $ary = $dbh->selectall_arrayref( "SELECT name FROM $table" ); return map { $_->[0] } @$ary; } # Local Variables: # mode: perl # c-basic-indent: 4 # indent-tabs-mode: nil # End: