Add ddlockd

Somewhere in the great Github migration of 2012, we lost ddlockd. This
is a straight copy from the most recent version I can find.
This commit is contained in:
mark 2012-12-11 08:00:11 +00:00
parent e913f5a71d
commit 2adbf9a82d
1 changed files with 556 additions and 0 deletions

556
bin/ddlockd Executable file
View File

@ -0,0 +1,556 @@
#!/usr/bin/perl
#
# Danga's Distributed Lock Daemon
#
# Copyright 2004, Danga Interactive
# Copyright 2005-2006, Six Apart, Ltd.
#
# Authors:
# Brad Fitzpatrick <brad@danga.com>
# Jonathan Steinert <jsteinert@sixapart.com>
#
# License:
# terms of Perl itself.
#
use strict;
use Getopt::Long;
use Carp;
use Danga::Socket;
use IO::Socket::INET;
use POSIX ();
## the storage load intervals: %IVs = ( time_iv => load, ... )
our %IVs = ();
## haw many load buckets we want
our $Bucket_qty = 100; #
## the total duration of load measurement
our $Total_duration = 60 * 5; # 5mn
## the duration handled by one bucket
our $IV_duration = int($Total_duration / $Bucket_qty);
## how many bucket held before we purge the old ones
our $Bucket_limit = 2 * $Bucket_qty;
use vars qw($DEBUG);
$DEBUG = 0;
my (
$daemonize,
$nokeepalive,
$hostname,
$table,
);
my $conf_port = 7002;
my $lock_type = "internal";
Getopt::Long::GetOptions(
'd|daemon' => \$daemonize,
'p|port=i' => \$conf_port,
'debug=i' => \$DEBUG,
'n|no-keepalive' => \$nokeepalive,
't|type=s' => \$lock_type,
'h|hostname=s' => \$hostname,
'T|table=s' => \$table,
);
# Statistics counters
my $lock_successes = 0;
my $lock_failures = 0;
my $client_class;
my @client_options;
if ($lock_type eq 'internal') {
$client_class = "Client::Internal";
}
elsif($lock_type eq 'dlmfs') {
$client_class = "Client::DLMFS";
}
elsif($lock_type eq 'dbi') {
length( $hostname ) or die( "-h (--hostname) must be included with a hostname in dbi mode\n" );
length( $table ) or die( "-T (--table) must be included with a table name in dbi mode\n" );
$client_class = "Client::DBI";
@client_options = ( $hostname, $table );
}
else {
die( "Unknown lock type of '$lock_type' specified.\n" );
}
$client_class->_setup( @client_options );
daemonize() if $daemonize;
use Socket qw(IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET);
# Linux-specific:
use constant TCP_KEEPIDLE => 4; # Start keeplives after this period
use constant TCP_KEEPINTVL => 5; # Interval between keepalives
use constant TCP_KEEPCNT => 6; # Number of keepalives before death
$SIG{'PIPE'} = "IGNORE"; # handled manually
# establish SERVER socket, bind and listen.
my $server = IO::Socket::INET->new(LocalPort => $conf_port,
Type => SOCK_STREAM,
Proto => IPPROTO_TCP,
Blocking => 0,
Reuse => 1,
Listen => 10 )
or die "Error creating socket: $@\n";
# Not sure if I'm crazy or not, but I can't see in strace where/how
# Perl 5.6 sets blocking to 0 without this. In Perl 5.8, IO::Socket::INET
# obviously sets it from watching strace.
IO::Handle::blocking($server, 0);
my $accept_handler = sub {
my $csock = $server->accept();
return unless $csock;
printf("Listen child making a Client for %d.\n", fileno($csock))
if $DEBUG;
IO::Handle::blocking($csock, 0);
setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
# Enable keep alive
unless ( $nokeepalive ) {
(setsockopt($csock, SOL_SOCKET, SO_KEEPALIVE, pack("l", 1)) &&
setsockopt($csock, IPPROTO_TCP, TCP_KEEPIDLE, pack("l", 30)) &&
setsockopt($csock, IPPROTO_TCP, TCP_KEEPCNT, pack("l", 10)) &&
setsockopt($csock, IPPROTO_TCP, TCP_KEEPINTVL, pack("l", 30)) &&
1
) || die "Couldn't set keep-alive settings on socket (Not on Linux?)";
}
my $client = $client_class->new($csock);
$client->watch_read(1);
};
Client->OtherFds(fileno($server) => $accept_handler);
Client->EventLoop();
sub daemonize {
my($pid, $sess_id, $i);
## Fork and exit parent
if ($pid = fork) { exit 0; }
## Detach ourselves from the terminal
croak "Cannot detach from controlling terminal"
unless $sess_id = POSIX::setsid();
## Prevent possibility of acquiring a controling terminal
$SIG{'HUP'} = 'IGNORE';
if ($pid = fork) { exit 0; }
## Change working directory
chdir "/";
## Clear file creation mask
umask 0;
## Close open file descriptors
close(STDIN);
close(STDOUT);
close(STDERR);
## Reopen stderr, stdout, stdin to /dev/null
open(STDIN, "+>/dev/null");
open(STDOUT, "+>&STDIN");
open(STDERR, "+>&STDIN");
}
#####################################################################
### C L I E N T C L A S S
#####################################################################
package Client;
use Danga::Socket;
use base 'Danga::Socket';
use fields (
'locks', # hashref of locks held by this connection. values are 1
'read_buf',
);
# TODO: out %waiters, lock -> arrayref of client waiters (waker should check not closed)
sub new {
my Client $self = shift;
$self = fields::new($self) unless ref $self;
$self->SUPER::new( @_ );
$self->{locks} = {};
$self->{read_buf} = '';
return $self;
}
# Client
sub event_read {
my Client $self = shift;
my $bref = $self->read(1024);
return $self->close() unless defined $bref;
$self->{read_buf} .= $$bref;
if ($self->{read_buf} =~ s/^(.+?)\r?\n//) {
my $line = $1;
$self->process_line( $line );
}
}
sub process_line {
my Client $self = shift;
my $line = shift;
if ($line =~ /^(\w+)\s*(.*)/) {
my ($cmd, $args) = ($1, $2);
$cmd = lc($cmd);
no strict 'refs';
my $cmd_handler = *{"cmd_$cmd"}{CODE};
if ($cmd_handler) {
my $args = decode_url_args(\$args);
$cmd_handler->($self, $args);
next;
}
}
return $self->err_line('unknown_command');
}
sub close {
my Client $self = shift;
foreach my $lock (keys %{$self->{locks}}) {
$self->_release_lock($lock);
}
$self->SUPER::close;
}
# Client
sub event_err { my $self = shift; $self->close; }
sub event_hup { my $self = shift; $self->close; }
sub cmd_status {
my Client $self = shift;
my $runtime = time - $^T;
my $load = current_load();
$self->write("STATUS: OK\n");
$self->write("SUCCESSES: $lock_successes\n");
$self->write("FAILURES: $lock_failures\n");
$self->write("RUNTIME: $runtime\n");
$self->write("LOAD: $load\n");
$self->write("\n");
return 1;
}
sub iv_for_time {
my $time = shift;
return $time - ($time % $main::IV_duration);
}
sub latest_ivs {
my $time = shift || time;
my $current_iv = iv_for_time($time) + $main::IV_duration;
return map { $current_iv -= $main::IV_duration } ( 0 .. $main::Bucket_qty - 1);
}
sub current_load {
my $sum = 0;
for (grep { $_ } @main::IVs{latest_ivs()}) {
$sum += $_;
}
return $sum;
}
sub cmd_load {
my Client $self = shift;
my $load = current_load();
$self->write("LOAD: $load\n");
$self->write("\n");
return 1;
}
sub increase_load {
my $time = time;
$main::IVs{ iv_for_time($time) }++;
if (scalar( keys %main::IVs ) > $main::Bucket_limit) {
# purge IVs
%main::IVs = map { $_ => $main::IVs{$_} } latest_ivs($time);
}
}
# gets a lock or fails with 'taken'
sub cmd_trylock {
my Client $self = shift;
my $args = shift;
my $lock = $args->{lock};
my $lockstate = $self->_trylock( $lock );
increase_load();
if ($lockstate) {
$lock_successes++;
} else {
$lock_failures++;
}
return $lockstate;
}
# releases a lock or fails with 'didnthave'
sub cmd_releaselock {
my Client $self = shift;
my $args = shift;
my $lock = $args->{lock};
return $self->err_line("empty_lock") unless length($lock);
return $self->err_line("didnthave") unless $self->{locks}{$lock};
$self->_release_lock($lock);
return $self->ok_line;
}
# shows current locks
sub cmd_locks {
my Client $self = shift;
my $args = shift;
$self->write("LOCKS:\n");
$self->write( join( "\n", $self->_get_locks ) );
$self->write("\n");
return 1;
}
sub cmd_noop {
my Client $self = shift;
# TODO: set self's last activity time so it isn't cleaned in a purge
# of stale connections?
return $self->ok_line;
}
sub ok_line {
my Client $self = shift;
my $args = shift || {};
my $argline = join('&', map { eurl($_) . "=" . eurl($args->{$_}) } keys %$args);
$self->write("OK $argline\r\n");
return 1;
}
sub err_line {
my Client $self = shift;
my $err_code = shift;
my $err_text = {
'unknown_command' => "Unknown server command",
}->{$err_code};
$self->write("ERR $err_code " . eurl($err_text) . "\r\n");
return 0;
}
sub eurl
{
my $a = $_[0];
$a =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg;
$a =~ tr/ /+/;
return $a;
}
sub durl
{
my ($a) = @_;
$a =~ tr/+/ /;
$a =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
return $a;
}
sub decode_url_args
{
my $a = shift;
my $buffer = ref $a ? $a : \$a;
my $ret = {};
my $pair;
my @pairs = split(/&/, $$buffer);
my ($name, $value);
foreach $pair (@pairs)
{
($name, $value) = split(/=/, $pair);
$value =~ tr/+/ /;
$value =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
$name =~ tr/+/ /;
$name =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
$ret->{$name} .= $ret->{$name} ? "\0$value" : $value;
}
return $ret;
}
package Client::Internal;
use base 'Client';
our (%holder); # hash of lock -> Client object holding it
# TODO: out %waiters, lock -> arrayref of client waiters (waker should check not closed)
sub _setup {
# Nothing to set up.
}
sub _trylock {
my Client::Internal $self = shift;
my $lock = shift;
return $self->err_line("empty_lock") unless length($lock);
return $self->err_line("taken") if defined $holder{$lock};
$holder{$lock} = $self;
$self->{locks}{$lock} = 1;
return $self->ok_line();
}
sub _release_lock {
my Client::Internal $self = shift;
my $lock = shift;
# TODO: notify waiters
delete $self->{locks}{$lock};
delete $holder{$lock};
return 1;
}
sub _get_locks {
return map { " $_ = " . $holder{$_}->as_string } (sort keys %holder);
}
package Client::DLMFS;
use base 'Client';
use Fcntl;
use Errno qw(EEXIST ETXTBSY);
sub FLAGS () { O_NONBLOCK | O_RDWR | O_CREAT | O_EXCL }
sub PATH () { "/dlm/ddlockd" };
sub _setup {
-d "/dlm" or die( "DLMFS mount at /dlm not found\n" );
mkdir PATH;
}
sub _trylock {
my Client::Internal $self = shift;
my $lock = shift;
return $self->err_line("empty_lock") unless length($lock);
if (sysopen( my $handle, PATH . "/$lock", FLAGS )) {
$self->{locks}{$lock} = 1;
return $self->ok_line();
}
else {
if ($! == EEXIST) {
return $self->err_line( "local taken" );
}
elsif( $! == ETXTBSY) {
unlink( PATH . "/$lock" );
return $self->err_line( "remote taken" );
}
else {
return $self->err_line( "unknown: $!" );
}
}
}
sub _release_lock {
my Client::Internal $self = shift;
my $lock = shift;
# TODO: notify waiters
delete $self->{locks}{$lock};
unlink( PATH . "/$lock" );
return 1;
}
sub _get_locks {
# TODO
# return map { " $_ = " . $holder{$_}->as_string } (sort keys %holder);
}
package Client::DBI;
# CREATE TABLE
use base 'Client';
use fields qw(dbh);
my $hostname;
my $table;
my $dbh;
sub _setup {
my Client::DBI $self = shift;
($hostname, $table) = @_;
eval "use DBI; 1" or die "No DBI available?";
$dbh = DBI->connect( 'dbi:mysql:dbname=sixalock', '', '', {AutoCommit => 0} ) or die;
}
sub _trylock {
my Client::DBI $self = shift;
my $lock = shift;
my $local_locks = $self->{locks};
exists( $local_locks->{$lock} ) and return $self->err_line( "local taken" );
$dbh->do( 'START TRANSACTION' ) or return $self->err_line( "Transaction failed to start" );
my $sth = $dbh->prepare( "SELECT * FROM $table WHERE name=?" );
$sth->execute( $lock )
or return $self->err_line( "STH->execute failed" );
my $ary = $sth->fetchall_arrayref;
ref($ary) eq 'ARRAY'
or return $self->err_line( "DBI->selectall_arrayref returned non-arrayref" );
scalar @$ary == 0
or return $self->err_line( "remote taken" );
$dbh->do( "INSERT INTO $table (name) VALUES (?)", {}, $lock )
or return $self->err_line( "INSERT failed" );
$dbh->do( 'COMMIT' )
or return $self->err_line( "COMMIT failed" );
return $self->ok_line();
}
sub _release_lock {
my Client::DBI $self = shift;
my $lock = shift;
my $locks = $self->{locks};
if (exists( $locks->{$lock} )) {
delete $locks->{$lock};
$dbh->do( "SELECT AND DELETE * FROM $table WHERE name=?", {}, $lock );
return 1;
}
else {
return 0;
}
}
sub _get_locks {
my Client::DBI $self = shift;
my $ary = $dbh->selectall_arrayref( "SELECT name FROM $table" );
return map { $_->[0] } @$ary;
}
# Local Variables:
# mode: perl
# c-basic-indent: 4
# indent-tabs-mode: nil
# End: